diff --git a/CHANGELOG.md b/CHANGELOG.md index a729aa7bad..2d2255cf86 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -12,6 +12,7 @@ - [#1691](https://github.com/poanetwork/blockscout/pull/1691) - decrease token metadata update interval - [#1688](https://github.com/poanetwork/blockscout/pull/1688) - do not fail if failure reason is atom - [#1692](https://github.com/poanetwork/blockscout/pull/1692) - exclude decompiled smart contract from encoding + - [#1684](https://github.com/poanetwork/blockscout/pull/1684) - Discard child block with parent_hash not matching hash of imported block ### Chore diff --git a/apps/explorer/lib/explorer/chain.ex b/apps/explorer/lib/explorer/chain.ex index 34f755f276..d882e56b51 100644 --- a/apps/explorer/lib/explorer/chain.ex +++ b/apps/explorer/lib/explorer/chain.ex @@ -2648,24 +2648,6 @@ defmodule Explorer.Chain do @spec data() :: Dataloader.Ecto.t() def data, do: DataloaderEcto.new(Repo) - @doc """ - Returns a list of block numbers with invalid consensus. - """ - @spec list_block_numbers_with_invalid_consensus :: [integer()] - def list_block_numbers_with_invalid_consensus do - query = - from( - block in Block, - join: parent in Block, - on: parent.hash == block.parent_hash, - where: block.consensus == true, - where: parent.consensus == false, - select: parent.number - ) - - Repo.all(query, timeout: :infinity) - end - def list_decompiled_contracts(limit, offset, not_decompiled_with_version \\ nil) do query = from( diff --git a/apps/explorer/lib/explorer/chain/import/runner/blocks.ex b/apps/explorer/lib/explorer/chain/import/runner/blocks.ex index 100f120290..209d5d2fa6 100644 --- a/apps/explorer/lib/explorer/chain/import/runner/blocks.ex +++ b/apps/explorer/lib/explorer/chain/import/runner/blocks.ex @@ -46,7 +46,7 @@ defmodule Explorer.Chain.Import.Runner.Blocks do |> Map.put(:timestamps, timestamps) ordered_consensus_block_numbers = ordered_consensus_block_numbers(changes_list) - where_invalid_parent = where_invalid_parent(changes_list) + where_invalid_neighbour = where_invalid_neighbour(changes_list) where_forked = where_forked(changes_list) multi @@ -70,8 +70,8 @@ defmodule Explorer.Chain.Import.Runner.Blocks do |> Multi.run(:lose_consensus, fn repo, _ -> lose_consensus(repo, ordered_consensus_block_numbers, insert_options) end) - |> Multi.run(:lose_invalid_parent_consensus, fn repo, _ -> - lose_invalid_parent_consensus(repo, where_invalid_parent, insert_options) + |> Multi.run(:lose_invalid_neighbour_consensus, fn repo, _ -> + lose_invalid_neighbour_consensus(repo, where_invalid_neighbour, insert_options) end) |> Multi.run(:delete_address_token_balances, fn repo, _ -> delete_address_token_balances(repo, ordered_consensus_block_numbers, insert_options) @@ -316,13 +316,13 @@ defmodule Explorer.Chain.Import.Runner.Blocks do end end - defp lose_invalid_parent_consensus(repo, where_invalid_parent, %{ + defp lose_invalid_neighbour_consensus(repo, where_invalid_neighbour, %{ timeout: timeout, timestamps: %{updated_at: updated_at} }) do query = from( - block in where_invalid_parent, + block in where_invalid_neighbour, update: [ set: [ consensus: false, @@ -338,7 +338,7 @@ defmodule Explorer.Chain.Import.Runner.Blocks do {:ok, result} rescue postgrex_error in Postgrex.Error -> - {:error, %{exception: postgrex_error, where_invalid_parent: where_invalid_parent}} + {:error, %{exception: postgrex_error, where_invalid_neighbour: where_invalid_neighbour}} end end @@ -581,12 +581,22 @@ defmodule Explorer.Chain.Import.Runner.Blocks do end) end - defp where_invalid_parent(blocks_changes) when is_list(blocks_changes) do + defp where_invalid_neighbour(blocks_changes) when is_list(blocks_changes) do initial = from(b in Block, where: false) - Enum.reduce(blocks_changes, initial, fn %{consensus: consensus, parent_hash: parent_hash, number: number}, acc -> + Enum.reduce(blocks_changes, initial, fn %{ + consensus: consensus, + hash: hash, + parent_hash: parent_hash, + number: number + }, + acc -> if consensus do - from(block in acc, or_where: block.number == ^(number - 1) and block.hash != ^parent_hash) + from( + block in acc, + or_where: block.number == ^(number - 1) and block.hash != ^parent_hash, + or_where: block.number == ^(number + 1) and block.parent_hash != ^hash + ) else acc end diff --git a/apps/explorer/test/explorer/chain/import/runner/blocks_test.exs b/apps/explorer/test/explorer/chain/import/runner/blocks_test.exs index 3d6fd912cc..ecf1d71dfb 100644 --- a/apps/explorer/test/explorer/chain/import/runner/blocks_test.exs +++ b/apps/explorer/test/explorer/chain/import/runner/blocks_test.exs @@ -261,34 +261,36 @@ defmodule Explorer.Chain.Import.Runner.BlocksTest do end # Regression test for https://github.com/poanetwork/blockscout/issues/1644 - test "discards parent block if it isn't related to the current one because of reorg", + test "discards neighbouring blocks if they aren't related to the current one because of reorg and/or import timeout", %{consensus_block: %Block{number: block_number, hash: block_hash, miner_hash: miner_hash}, options: options} do - old_block = insert(:block, parent_hash: block_hash, number: block_number + 1) - insert(:block, parent_hash: old_block.hash, number: old_block.number + 1) + old_block1 = params_for(:block, miner_hash: miner_hash, parent_hash: block_hash, number: block_number + 1) - new_block1 = params_for(:block, parent_hash: block_hash, number: block_number + 1, miner_hash: miner_hash) + new_block1 = params_for(:block, miner_hash: miner_hash, parent_hash: block_hash, number: block_number + 1) + new_block2 = params_for(:block, miner_hash: miner_hash, parent_hash: new_block1.hash, number: block_number + 2) - new_block2 = - params_for(:block, parent_hash: new_block1.hash, number: new_block1.number + 1, miner_hash: miner_hash) + range = block_number..(block_number + 2) - %Ecto.Changeset{valid?: true, changes: block_changes} = Block.changeset(%Block{}, new_block2) - changes_list = [block_changes] + insert_block(new_block1, options) + insert_block(new_block2, options) + assert Chain.missing_block_number_ranges(range) == [] - Multi.new() - |> Blocks.run(changes_list, options) - |> Repo.transaction() + insert_block(old_block1, options) + assert Chain.missing_block_number_ranges(range) == [(block_number + 2)..(block_number + 2)] - assert Chain.missing_block_number_ranges(block_number..new_block2.number) == [old_block.number..old_block.number] + insert_block(new_block2, options) + assert Chain.missing_block_number_ranges(range) == [(block_number + 1)..(block_number + 1)] - %Ecto.Changeset{valid?: true, changes: block_changes} = Block.changeset(%Block{}, new_block1) - changes_list = [block_changes] + insert_block(new_block1, options) + assert Chain.missing_block_number_ranges(range) == [] + end + end - Multi.new() - |> Blocks.run(changes_list, options) - |> Repo.transaction() + defp insert_block(block_params, options) do + %Ecto.Changeset{valid?: true, changes: block_changes} = Block.changeset(%Block{}, block_params) - assert Chain.missing_block_number_ranges(block_number..new_block2.number) == [] - end + Multi.new() + |> Blocks.run([block_changes], options) + |> Repo.transaction() end defp count(schema) do diff --git a/apps/explorer/test/explorer/chain_test.exs b/apps/explorer/test/explorer/chain_test.exs index c408716afd..ebb4b880dd 100644 --- a/apps/explorer/test/explorer/chain_test.exs +++ b/apps/explorer/test/explorer/chain_test.exs @@ -3667,27 +3667,6 @@ defmodule Explorer.ChainTest do end end - describe "list_block_numbers_with_invalid_consensus/0" do - test "returns a list of block numbers with invalid consensus" do - block1 = insert(:block) - block2_with_invalid_consensus = insert(:block, parent_hash: block1.hash, consensus: false) - _block2 = insert(:block, parent_hash: block1.hash, number: block2_with_invalid_consensus.number) - block3 = insert(:block, parent_hash: block2_with_invalid_consensus.hash) - block4 = insert(:block, parent_hash: block3.hash) - block5 = insert(:block, parent_hash: block4.hash) - block6_without_consensus = insert(:block, parent_hash: block5.hash, consensus: false) - block6 = insert(:block, parent_hash: block5.hash, number: block6_without_consensus.number) - block7 = insert(:block, parent_hash: block6.hash) - block8_with_invalid_consensus = insert(:block, parent_hash: block7.hash, consensus: false) - _block8 = insert(:block, parent_hash: block7.hash, number: block8_with_invalid_consensus.number) - block9 = insert(:block, parent_hash: block8_with_invalid_consensus.hash) - _block10 = insert(:block, parent_hash: block9.hash) - - assert Chain.list_block_numbers_with_invalid_consensus() == - [block2_with_invalid_consensus.number, block8_with_invalid_consensus.number] - end - end - describe "block_combined_rewards/1" do test "sums the block_rewards values" do block = insert(:block) diff --git a/apps/indexer/lib/indexer/block/invalid_consensus/supervisor.ex b/apps/indexer/lib/indexer/block/invalid_consensus/supervisor.ex deleted file mode 100644 index cca692a76c..0000000000 --- a/apps/indexer/lib/indexer/block/invalid_consensus/supervisor.ex +++ /dev/null @@ -1,45 +0,0 @@ -defmodule Indexer.Block.InvalidConsensus.Supervisor do - @moduledoc """ - Supervises process for ensuring blocks with invalid consensus get queued for - indexing. - """ - - use Supervisor - - alias Indexer.Block.InvalidConsensus.Worker - - def child_spec([]) do - child_spec([[]]) - end - - def child_spec([init_arguments]) do - child_spec([init_arguments, [name: __MODULE__]]) - end - - def child_spec([_init_arguments, _gen_server_options] = start_link_arguments) do - spec = %{ - id: __MODULE__, - start: {__MODULE__, :start_link, start_link_arguments}, - restart: :transient, - type: :supervisor - } - - Supervisor.child_spec(spec, []) - end - - def start_link(init_arguments, gen_server_options \\ []) do - Supervisor.start_link(__MODULE__, init_arguments, gen_server_options) - end - - @impl Supervisor - def init(_) do - children = [ - {Worker, [[supervisor: self()], [name: Worker]]}, - {Task.Supervisor, name: Indexer.Block.InvalidConsensus.TaskSupervisor} - ] - - opts = [strategy: :one_for_all] - - Supervisor.init(children, opts) - end -end diff --git a/apps/indexer/lib/indexer/block/invalid_consensus/worker.ex b/apps/indexer/lib/indexer/block/invalid_consensus/worker.ex deleted file mode 100644 index cbe93c604b..0000000000 --- a/apps/indexer/lib/indexer/block/invalid_consensus/worker.ex +++ /dev/null @@ -1,99 +0,0 @@ -defmodule Indexer.Block.InvalidConsensus.Worker do - @moduledoc """ - Finds blocks with invalid consensus and queues them up to be refetched. This - process does this once, after the application starts up. - - A block has invalid consensus when it is referenced as the parent hash of a - block with consensus while not having consensus (consensus=false). Only one - block can have consensus at a given height (block number). - """ - - use GenServer - - require Logger - - alias Explorer.Chain - alias Indexer.Block.Catchup.Fetcher - alias Indexer.Block.InvalidConsensus.TaskSupervisor - - def child_spec([init_arguments]) do - child_spec([init_arguments, []]) - end - - def child_spec([_init_arguments, _gen_server_options] = start_link_arguments) do - spec = %{ - id: __MODULE__, - start: {__MODULE__, :start_link, start_link_arguments}, - restart: :transient, - type: :worker - } - - Supervisor.child_spec(spec, []) - end - - def start_link(init_arguments, gen_server_options \\ []) do - GenServer.start_link(__MODULE__, init_arguments, gen_server_options) - end - - def init(opts) do - sup_pid = Keyword.fetch!(opts, :supervisor) - retry_interval = Keyword.get(opts, :retry_interval, 10_000) - - send(self(), :scan) - - state = %{ - block_numbers: [], - retry_interval: retry_interval, - sup_pid: sup_pid, - task_ref: nil - } - - {:ok, state} - end - - def handle_info(:scan, state) do - block_numbers = Chain.list_block_numbers_with_invalid_consensus() - - case block_numbers do - [] -> - Supervisor.stop(state.sup_pid, :normal) - {:noreply, state} - - block_numbers -> - Process.send_after(self(), :push_front_blocks, state.retry_interval) - {:noreply, %{state | block_numbers: block_numbers}} - end - end - - def handle_info(:push_front_blocks, %{block_numbers: block_numbers} = state) do - %Task{ref: ref} = async_push_front(block_numbers) - {:noreply, %{state | task_ref: ref}} - end - - def handle_info({ref, :ok}, %{task_ref: ref, sup_pid: sup_pid}) do - Process.demonitor(ref, [:flush]) - Supervisor.stop(sup_pid, :normal) - {:stop, :shutdown} - end - - def handle_info({ref, {:error, reason}}, %{task_ref: ref, retry_interval: millis} = state) do - case reason do - :queue_unavailable -> :ok - _ -> Logger.error(fn -> inspect(reason) end) - end - - Process.demonitor(ref, [:flush]) - Process.send_after(self(), :push_front_blocks, millis) - - {:noreply, %{state | task_ref: nil}} - end - - def handle_info({:DOWN, ref, :process, _, _}, %{task_ref: ref, retry_interval: millis} = state) do - Process.send_after(self(), :push_front_blocks, millis) - {:noreply, %{state | task_ref: nil}} - end - - defp async_push_front(block_numbers) do - Task.Supervisor.async_nolink(TaskSupervisor, Fetcher, :push_front, [block_numbers]) - end -end diff --git a/apps/indexer/lib/indexer/block/realtime/consensus_ensurer.ex b/apps/indexer/lib/indexer/block/realtime/consensus_ensurer.ex deleted file mode 100644 index 816f1d76bf..0000000000 --- a/apps/indexer/lib/indexer/block/realtime/consensus_ensurer.ex +++ /dev/null @@ -1,34 +0,0 @@ -defmodule Indexer.Block.Realtime.ConsensusEnsurer do - @moduledoc """ - Triggers a refetch if a given block doesn't have consensus. - - """ - require Logger - - alias Explorer.Chain - alias Explorer.Chain.Hash - alias Indexer.Block.Realtime.Fetcher - - def perform(_, number, _) when not is_integer(number) or number < 0, do: :ok - - def perform(%Hash{byte_count: unquote(Hash.Full.byte_count())} = block_hash, number, block_fetcher) do - case Chain.hash_to_block(block_hash) do - {:ok, %{consensus: true} = _block} -> - :ignore - - _ -> - Logger.info(fn -> - [ - "refetch from consensus was found on block (", - to_string(number), - "). A reorg initiated." - ] - end) - - # trigger refetch if consensus=false or block was not found - Fetcher.fetch_and_import_block(number, block_fetcher, true) - end - - :ok - end -end diff --git a/apps/indexer/lib/indexer/block/realtime/fetcher.ex b/apps/indexer/lib/indexer/block/realtime/fetcher.ex index 49787ad675..fe486f62c2 100644 --- a/apps/indexer/lib/indexer/block/realtime/fetcher.ex +++ b/apps/indexer/lib/indexer/block/realtime/fetcher.ex @@ -27,7 +27,7 @@ defmodule Indexer.Block.Realtime.Fetcher do alias Explorer.Chain.TokenTransfer alias Explorer.Counters.AverageBlockTime alias Indexer.{AddressExtraction, Block, TokenBalances, Tracer} - alias Indexer.Block.Realtime.{ConsensusEnsurer, TaskSupervisor} + alias Indexer.Block.Realtime.TaskSupervisor alias Timex.Duration @behaviour Block.Fetcher @@ -269,12 +269,7 @@ defmodule Indexer.Block.Realtime.Fetcher do @decorate span(tracer: Tracer) defp do_fetch_and_import_block(block_number_to_fetch, block_fetcher, retry) do case fetch_and_import_range(block_fetcher, block_number_to_fetch..block_number_to_fetch) do - {:ok, %{inserted: inserted, errors: []}} -> - for block <- Map.get(inserted, :blocks, []) do - args = [block.parent_hash, block.number - 1, block_fetcher] - Task.Supervisor.start_child(TaskSupervisor, ConsensusEnsurer, :perform, args) - end - + {:ok, %{inserted: _, errors: []}} -> Logger.debug("Fetched and imported.") {:ok, %{inserted: _, errors: [_ | _] = errors}} -> diff --git a/apps/indexer/lib/indexer/block/supervisor.ex b/apps/indexer/lib/indexer/block/supervisor.ex index 2bab786f18..50b24a6dd5 100644 --- a/apps/indexer/lib/indexer/block/supervisor.ex +++ b/apps/indexer/lib/indexer/block/supervisor.ex @@ -4,7 +4,7 @@ defmodule Indexer.Block.Supervisor do """ alias Indexer.Block - alias Indexer.Block.{Catchup, InvalidConsensus, Realtime, Reward, Uncle} + alias Indexer.Block.{Catchup, Realtime, Reward, Uncle} alias Indexer.Temporary.{AddressesWithoutCode, FailedCreatedAddresses} use Supervisor @@ -50,7 +50,6 @@ defmodule Indexer.Block.Supervisor do %{block_fetcher: block_fetcher, block_interval: block_interval, memory_monitor: memory_monitor}, [name: Catchup.Supervisor] ]}, - {InvalidConsensus.Supervisor, [[], [name: InvalidConsensus.Supervisor]]}, {Realtime.Supervisor, [ %{block_fetcher: realtime_block_fetcher, subscribe_named_arguments: realtime_subscribe_named_arguments}, diff --git a/apps/indexer/test/indexer/block/invalid_consensus/worker_test.exs b/apps/indexer/test/indexer/block/invalid_consensus/worker_test.exs deleted file mode 100644 index a6fa07f0d3..0000000000 --- a/apps/indexer/test/indexer/block/invalid_consensus/worker_test.exs +++ /dev/null @@ -1,87 +0,0 @@ -defmodule Indexer.Block.InvalidConsensus.WorkerTest do - use Explorer.DataCase - - alias Indexer.Sequence - alias Indexer.Block.InvalidConsensus.{Worker, TaskSupervisor} - - @moduletag :capture_log - - describe "start_link/1" do - test "starts the worker" do - assert {:ok, _pid} = Worker.start_link(supervisor: self()) - end - end - - describe "init/1" do - test "sends message to self" do - pid = self() - assert {:ok, %{task_ref: nil, block_numbers: [], sup_pid: ^pid}} = Worker.init(supervisor: self()) - assert_received :scan - end - end - - describe "handle_info with :scan" do - test "sends shutdown to supervisor" do - state = %{task_ref: nil, block_numbers: [], sup_pid: self()} - Task.async(fn -> Worker.handle_info(:scan, state) end) - assert_receive {_, _, {:terminate, :normal}} - end - - test "sends message to self when blocks with invalid consensus are found" do - block1 = insert(:block) - block2_with_invalid_consensus = insert(:block, parent_hash: block1.hash, consensus: false) - _block2 = insert(:block, parent_hash: block1.hash, number: block2_with_invalid_consensus.number) - _block3 = insert(:block, parent_hash: block2_with_invalid_consensus.hash) - - block_number = block2_with_invalid_consensus.number - - expected_state = %{task_ref: nil, block_numbers: [block_number], retry_interval: 1} - state = %{task_ref: nil, block_numbers: [], retry_interval: 1} - - assert {:noreply, ^expected_state} = Worker.handle_info(:scan, state) - assert_receive :push_front_blocks - end - end - - describe "handle_info with :push_front_blocks" do - test "starts a task" do - task_sup_pid = start_supervised!({Task.Supervisor, name: TaskSupervisor}) - start_supervised!({Sequence, [[ranges: [], step: -1], [name: :block_catchup_sequencer]]}) - - state = %{task_ref: nil, block_numbers: [1]} - assert {:noreply, %{task_ref: task_ref}} = Worker.handle_info(:push_front_blocks, state) - assert is_reference(task_ref) - - refute_receive {^task_ref, {:error, :queue_unavailable}} - assert_receive {^task_ref, :ok} - - stop_supervised(task_sup_pid) - end - end - - describe "handle_info with task ref tuple" do - test "sends shutdown to supervisor on success" do - ref = Process.monitor(self()) - state = %{task_ref: ref, block_numbers: [], sup_pid: self()} - Task.async(fn -> assert Worker.handle_info({ref, :ok}, state) end) - assert_receive {_, _, {:terminate, :normal}} - end - - test "sends message to self to try again on failure" do - ref = Process.monitor(self()) - state = %{task_ref: ref, block_numbers: [1], sup_pid: self(), retry_interval: 1} - expected_state = %{state | task_ref: nil} - assert {:noreply, ^expected_state} = Worker.handle_info({ref, {:error, :queue_unavailable}}, state) - assert_receive :push_front_blocks - end - end - - describe "handle_info with failed task" do - test "sends message to self to try again" do - ref = Process.monitor(self()) - state = %{task_ref: ref, block_numbers: [1], sup_pid: self(), retry_interval: 1} - assert {:noreply, %{task_ref: nil}} = Worker.handle_info({:DOWN, ref, :process, self(), :EXIT}, state) - assert_receive :push_front_blocks - end - end -end