From 0e15873fbcf4de2f399f5032d86b9afc238227bb Mon Sep 17 00:00:00 2001 From: goodsoft Date: Fri, 29 Mar 2019 20:16:48 +0200 Subject: [PATCH] Remove obsolete InvalidConsensus.Worker This worker only handled one of invalid consensus cases, i.e. when a parent block lost consensus, but the child one didn't. Even then it worked only when all blocks are reliably and sequentially imported (certainly not our case), and only once on indexer launch. Now this particular case is covered by previous commit, and we don't need this worker at all. --- apps/explorer/lib/explorer/chain.ex | 18 ---- apps/explorer/test/explorer/chain_test.exs | 21 ---- .../block/invalid_consensus/supervisor.ex | 45 --------- .../indexer/block/invalid_consensus/worker.ex | 99 ------------------- apps/indexer/lib/indexer/block/supervisor.ex | 3 +- .../block/invalid_consensus/worker_test.exs | 87 ---------------- 6 files changed, 1 insertion(+), 272 deletions(-) delete mode 100644 apps/indexer/lib/indexer/block/invalid_consensus/supervisor.ex delete mode 100644 apps/indexer/lib/indexer/block/invalid_consensus/worker.ex delete mode 100644 apps/indexer/test/indexer/block/invalid_consensus/worker_test.exs diff --git a/apps/explorer/lib/explorer/chain.ex b/apps/explorer/lib/explorer/chain.ex index 34f755f276..d882e56b51 100644 --- a/apps/explorer/lib/explorer/chain.ex +++ b/apps/explorer/lib/explorer/chain.ex @@ -2648,24 +2648,6 @@ defmodule Explorer.Chain do @spec data() :: Dataloader.Ecto.t() def data, do: DataloaderEcto.new(Repo) - @doc """ - Returns a list of block numbers with invalid consensus. - """ - @spec list_block_numbers_with_invalid_consensus :: [integer()] - def list_block_numbers_with_invalid_consensus do - query = - from( - block in Block, - join: parent in Block, - on: parent.hash == block.parent_hash, - where: block.consensus == true, - where: parent.consensus == false, - select: parent.number - ) - - Repo.all(query, timeout: :infinity) - end - def list_decompiled_contracts(limit, offset, not_decompiled_with_version \\ nil) do query = from( diff --git a/apps/explorer/test/explorer/chain_test.exs b/apps/explorer/test/explorer/chain_test.exs index c408716afd..ebb4b880dd 100644 --- a/apps/explorer/test/explorer/chain_test.exs +++ b/apps/explorer/test/explorer/chain_test.exs @@ -3667,27 +3667,6 @@ defmodule Explorer.ChainTest do end end - describe "list_block_numbers_with_invalid_consensus/0" do - test "returns a list of block numbers with invalid consensus" do - block1 = insert(:block) - block2_with_invalid_consensus = insert(:block, parent_hash: block1.hash, consensus: false) - _block2 = insert(:block, parent_hash: block1.hash, number: block2_with_invalid_consensus.number) - block3 = insert(:block, parent_hash: block2_with_invalid_consensus.hash) - block4 = insert(:block, parent_hash: block3.hash) - block5 = insert(:block, parent_hash: block4.hash) - block6_without_consensus = insert(:block, parent_hash: block5.hash, consensus: false) - block6 = insert(:block, parent_hash: block5.hash, number: block6_without_consensus.number) - block7 = insert(:block, parent_hash: block6.hash) - block8_with_invalid_consensus = insert(:block, parent_hash: block7.hash, consensus: false) - _block8 = insert(:block, parent_hash: block7.hash, number: block8_with_invalid_consensus.number) - block9 = insert(:block, parent_hash: block8_with_invalid_consensus.hash) - _block10 = insert(:block, parent_hash: block9.hash) - - assert Chain.list_block_numbers_with_invalid_consensus() == - [block2_with_invalid_consensus.number, block8_with_invalid_consensus.number] - end - end - describe "block_combined_rewards/1" do test "sums the block_rewards values" do block = insert(:block) diff --git a/apps/indexer/lib/indexer/block/invalid_consensus/supervisor.ex b/apps/indexer/lib/indexer/block/invalid_consensus/supervisor.ex deleted file mode 100644 index cca692a76c..0000000000 --- a/apps/indexer/lib/indexer/block/invalid_consensus/supervisor.ex +++ /dev/null @@ -1,45 +0,0 @@ -defmodule Indexer.Block.InvalidConsensus.Supervisor do - @moduledoc """ - Supervises process for ensuring blocks with invalid consensus get queued for - indexing. - """ - - use Supervisor - - alias Indexer.Block.InvalidConsensus.Worker - - def child_spec([]) do - child_spec([[]]) - end - - def child_spec([init_arguments]) do - child_spec([init_arguments, [name: __MODULE__]]) - end - - def child_spec([_init_arguments, _gen_server_options] = start_link_arguments) do - spec = %{ - id: __MODULE__, - start: {__MODULE__, :start_link, start_link_arguments}, - restart: :transient, - type: :supervisor - } - - Supervisor.child_spec(spec, []) - end - - def start_link(init_arguments, gen_server_options \\ []) do - Supervisor.start_link(__MODULE__, init_arguments, gen_server_options) - end - - @impl Supervisor - def init(_) do - children = [ - {Worker, [[supervisor: self()], [name: Worker]]}, - {Task.Supervisor, name: Indexer.Block.InvalidConsensus.TaskSupervisor} - ] - - opts = [strategy: :one_for_all] - - Supervisor.init(children, opts) - end -end diff --git a/apps/indexer/lib/indexer/block/invalid_consensus/worker.ex b/apps/indexer/lib/indexer/block/invalid_consensus/worker.ex deleted file mode 100644 index cbe93c604b..0000000000 --- a/apps/indexer/lib/indexer/block/invalid_consensus/worker.ex +++ /dev/null @@ -1,99 +0,0 @@ -defmodule Indexer.Block.InvalidConsensus.Worker do - @moduledoc """ - Finds blocks with invalid consensus and queues them up to be refetched. This - process does this once, after the application starts up. - - A block has invalid consensus when it is referenced as the parent hash of a - block with consensus while not having consensus (consensus=false). Only one - block can have consensus at a given height (block number). - """ - - use GenServer - - require Logger - - alias Explorer.Chain - alias Indexer.Block.Catchup.Fetcher - alias Indexer.Block.InvalidConsensus.TaskSupervisor - - def child_spec([init_arguments]) do - child_spec([init_arguments, []]) - end - - def child_spec([_init_arguments, _gen_server_options] = start_link_arguments) do - spec = %{ - id: __MODULE__, - start: {__MODULE__, :start_link, start_link_arguments}, - restart: :transient, - type: :worker - } - - Supervisor.child_spec(spec, []) - end - - def start_link(init_arguments, gen_server_options \\ []) do - GenServer.start_link(__MODULE__, init_arguments, gen_server_options) - end - - def init(opts) do - sup_pid = Keyword.fetch!(opts, :supervisor) - retry_interval = Keyword.get(opts, :retry_interval, 10_000) - - send(self(), :scan) - - state = %{ - block_numbers: [], - retry_interval: retry_interval, - sup_pid: sup_pid, - task_ref: nil - } - - {:ok, state} - end - - def handle_info(:scan, state) do - block_numbers = Chain.list_block_numbers_with_invalid_consensus() - - case block_numbers do - [] -> - Supervisor.stop(state.sup_pid, :normal) - {:noreply, state} - - block_numbers -> - Process.send_after(self(), :push_front_blocks, state.retry_interval) - {:noreply, %{state | block_numbers: block_numbers}} - end - end - - def handle_info(:push_front_blocks, %{block_numbers: block_numbers} = state) do - %Task{ref: ref} = async_push_front(block_numbers) - {:noreply, %{state | task_ref: ref}} - end - - def handle_info({ref, :ok}, %{task_ref: ref, sup_pid: sup_pid}) do - Process.demonitor(ref, [:flush]) - Supervisor.stop(sup_pid, :normal) - {:stop, :shutdown} - end - - def handle_info({ref, {:error, reason}}, %{task_ref: ref, retry_interval: millis} = state) do - case reason do - :queue_unavailable -> :ok - _ -> Logger.error(fn -> inspect(reason) end) - end - - Process.demonitor(ref, [:flush]) - Process.send_after(self(), :push_front_blocks, millis) - - {:noreply, %{state | task_ref: nil}} - end - - def handle_info({:DOWN, ref, :process, _, _}, %{task_ref: ref, retry_interval: millis} = state) do - Process.send_after(self(), :push_front_blocks, millis) - {:noreply, %{state | task_ref: nil}} - end - - defp async_push_front(block_numbers) do - Task.Supervisor.async_nolink(TaskSupervisor, Fetcher, :push_front, [block_numbers]) - end -end diff --git a/apps/indexer/lib/indexer/block/supervisor.ex b/apps/indexer/lib/indexer/block/supervisor.ex index 2bab786f18..50b24a6dd5 100644 --- a/apps/indexer/lib/indexer/block/supervisor.ex +++ b/apps/indexer/lib/indexer/block/supervisor.ex @@ -4,7 +4,7 @@ defmodule Indexer.Block.Supervisor do """ alias Indexer.Block - alias Indexer.Block.{Catchup, InvalidConsensus, Realtime, Reward, Uncle} + alias Indexer.Block.{Catchup, Realtime, Reward, Uncle} alias Indexer.Temporary.{AddressesWithoutCode, FailedCreatedAddresses} use Supervisor @@ -50,7 +50,6 @@ defmodule Indexer.Block.Supervisor do %{block_fetcher: block_fetcher, block_interval: block_interval, memory_monitor: memory_monitor}, [name: Catchup.Supervisor] ]}, - {InvalidConsensus.Supervisor, [[], [name: InvalidConsensus.Supervisor]]}, {Realtime.Supervisor, [ %{block_fetcher: realtime_block_fetcher, subscribe_named_arguments: realtime_subscribe_named_arguments}, diff --git a/apps/indexer/test/indexer/block/invalid_consensus/worker_test.exs b/apps/indexer/test/indexer/block/invalid_consensus/worker_test.exs deleted file mode 100644 index a6fa07f0d3..0000000000 --- a/apps/indexer/test/indexer/block/invalid_consensus/worker_test.exs +++ /dev/null @@ -1,87 +0,0 @@ -defmodule Indexer.Block.InvalidConsensus.WorkerTest do - use Explorer.DataCase - - alias Indexer.Sequence - alias Indexer.Block.InvalidConsensus.{Worker, TaskSupervisor} - - @moduletag :capture_log - - describe "start_link/1" do - test "starts the worker" do - assert {:ok, _pid} = Worker.start_link(supervisor: self()) - end - end - - describe "init/1" do - test "sends message to self" do - pid = self() - assert {:ok, %{task_ref: nil, block_numbers: [], sup_pid: ^pid}} = Worker.init(supervisor: self()) - assert_received :scan - end - end - - describe "handle_info with :scan" do - test "sends shutdown to supervisor" do - state = %{task_ref: nil, block_numbers: [], sup_pid: self()} - Task.async(fn -> Worker.handle_info(:scan, state) end) - assert_receive {_, _, {:terminate, :normal}} - end - - test "sends message to self when blocks with invalid consensus are found" do - block1 = insert(:block) - block2_with_invalid_consensus = insert(:block, parent_hash: block1.hash, consensus: false) - _block2 = insert(:block, parent_hash: block1.hash, number: block2_with_invalid_consensus.number) - _block3 = insert(:block, parent_hash: block2_with_invalid_consensus.hash) - - block_number = block2_with_invalid_consensus.number - - expected_state = %{task_ref: nil, block_numbers: [block_number], retry_interval: 1} - state = %{task_ref: nil, block_numbers: [], retry_interval: 1} - - assert {:noreply, ^expected_state} = Worker.handle_info(:scan, state) - assert_receive :push_front_blocks - end - end - - describe "handle_info with :push_front_blocks" do - test "starts a task" do - task_sup_pid = start_supervised!({Task.Supervisor, name: TaskSupervisor}) - start_supervised!({Sequence, [[ranges: [], step: -1], [name: :block_catchup_sequencer]]}) - - state = %{task_ref: nil, block_numbers: [1]} - assert {:noreply, %{task_ref: task_ref}} = Worker.handle_info(:push_front_blocks, state) - assert is_reference(task_ref) - - refute_receive {^task_ref, {:error, :queue_unavailable}} - assert_receive {^task_ref, :ok} - - stop_supervised(task_sup_pid) - end - end - - describe "handle_info with task ref tuple" do - test "sends shutdown to supervisor on success" do - ref = Process.monitor(self()) - state = %{task_ref: ref, block_numbers: [], sup_pid: self()} - Task.async(fn -> assert Worker.handle_info({ref, :ok}, state) end) - assert_receive {_, _, {:terminate, :normal}} - end - - test "sends message to self to try again on failure" do - ref = Process.monitor(self()) - state = %{task_ref: ref, block_numbers: [1], sup_pid: self(), retry_interval: 1} - expected_state = %{state | task_ref: nil} - assert {:noreply, ^expected_state} = Worker.handle_info({ref, {:error, :queue_unavailable}}, state) - assert_receive :push_front_blocks - end - end - - describe "handle_info with failed task" do - test "sends message to self to try again" do - ref = Process.monitor(self()) - state = %{task_ref: ref, block_numbers: [1], sup_pid: self(), retry_interval: 1} - assert {:noreply, %{task_ref: nil}} = Worker.handle_info({:DOWN, ref, :process, self(), :EXIT}, state) - assert_receive :push_front_blocks - end - end -end