Merge pull request #1684 from poanetwork/gs-fix-consensus-loss

Discard child block with parent_hash not matching hash of imported block
pull/1665/head
Paul Tsupikoff 6 years ago committed by GitHub
commit 313df94cec
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 1
      CHANGELOG.md
  2. 18
      apps/explorer/lib/explorer/chain.ex
  3. 28
      apps/explorer/lib/explorer/chain/import/runner/blocks.ex
  4. 40
      apps/explorer/test/explorer/chain/import/runner/blocks_test.exs
  5. 21
      apps/explorer/test/explorer/chain_test.exs
  6. 45
      apps/indexer/lib/indexer/block/invalid_consensus/supervisor.ex
  7. 99
      apps/indexer/lib/indexer/block/invalid_consensus/worker.ex
  8. 34
      apps/indexer/lib/indexer/block/realtime/consensus_ensurer.ex
  9. 9
      apps/indexer/lib/indexer/block/realtime/fetcher.ex
  10. 3
      apps/indexer/lib/indexer/block/supervisor.ex
  11. 87
      apps/indexer/test/indexer/block/invalid_consensus/worker_test.exs

@ -12,6 +12,7 @@
- [#1691](https://github.com/poanetwork/blockscout/pull/1691) - decrease token metadata update interval
- [#1688](https://github.com/poanetwork/blockscout/pull/1688) - do not fail if failure reason is atom
- [#1692](https://github.com/poanetwork/blockscout/pull/1692) - exclude decompiled smart contract from encoding
- [#1684](https://github.com/poanetwork/blockscout/pull/1684) - Discard child block with parent_hash not matching hash of imported block
### Chore

@ -2648,24 +2648,6 @@ defmodule Explorer.Chain do
@spec data() :: Dataloader.Ecto.t()
def data, do: DataloaderEcto.new(Repo)
@doc """
Returns a list of block numbers with invalid consensus.
"""
@spec list_block_numbers_with_invalid_consensus :: [integer()]
def list_block_numbers_with_invalid_consensus do
query =
from(
block in Block,
join: parent in Block,
on: parent.hash == block.parent_hash,
where: block.consensus == true,
where: parent.consensus == false,
select: parent.number
)
Repo.all(query, timeout: :infinity)
end
def list_decompiled_contracts(limit, offset, not_decompiled_with_version \\ nil) do
query =
from(

@ -46,7 +46,7 @@ defmodule Explorer.Chain.Import.Runner.Blocks do
|> Map.put(:timestamps, timestamps)
ordered_consensus_block_numbers = ordered_consensus_block_numbers(changes_list)
where_invalid_parent = where_invalid_parent(changes_list)
where_invalid_neighbour = where_invalid_neighbour(changes_list)
where_forked = where_forked(changes_list)
multi
@ -70,8 +70,8 @@ defmodule Explorer.Chain.Import.Runner.Blocks do
|> Multi.run(:lose_consensus, fn repo, _ ->
lose_consensus(repo, ordered_consensus_block_numbers, insert_options)
end)
|> Multi.run(:lose_invalid_parent_consensus, fn repo, _ ->
lose_invalid_parent_consensus(repo, where_invalid_parent, insert_options)
|> Multi.run(:lose_invalid_neighbour_consensus, fn repo, _ ->
lose_invalid_neighbour_consensus(repo, where_invalid_neighbour, insert_options)
end)
|> Multi.run(:delete_address_token_balances, fn repo, _ ->
delete_address_token_balances(repo, ordered_consensus_block_numbers, insert_options)
@ -316,13 +316,13 @@ defmodule Explorer.Chain.Import.Runner.Blocks do
end
end
defp lose_invalid_parent_consensus(repo, where_invalid_parent, %{
defp lose_invalid_neighbour_consensus(repo, where_invalid_neighbour, %{
timeout: timeout,
timestamps: %{updated_at: updated_at}
}) do
query =
from(
block in where_invalid_parent,
block in where_invalid_neighbour,
update: [
set: [
consensus: false,
@ -338,7 +338,7 @@ defmodule Explorer.Chain.Import.Runner.Blocks do
{:ok, result}
rescue
postgrex_error in Postgrex.Error ->
{:error, %{exception: postgrex_error, where_invalid_parent: where_invalid_parent}}
{:error, %{exception: postgrex_error, where_invalid_neighbour: where_invalid_neighbour}}
end
end
@ -581,12 +581,22 @@ defmodule Explorer.Chain.Import.Runner.Blocks do
end)
end
defp where_invalid_parent(blocks_changes) when is_list(blocks_changes) do
defp where_invalid_neighbour(blocks_changes) when is_list(blocks_changes) do
initial = from(b in Block, where: false)
Enum.reduce(blocks_changes, initial, fn %{consensus: consensus, parent_hash: parent_hash, number: number}, acc ->
Enum.reduce(blocks_changes, initial, fn %{
consensus: consensus,
hash: hash,
parent_hash: parent_hash,
number: number
},
acc ->
if consensus do
from(block in acc, or_where: block.number == ^(number - 1) and block.hash != ^parent_hash)
from(
block in acc,
or_where: block.number == ^(number - 1) and block.hash != ^parent_hash,
or_where: block.number == ^(number + 1) and block.parent_hash != ^hash
)
else
acc
end

@ -261,34 +261,36 @@ defmodule Explorer.Chain.Import.Runner.BlocksTest do
end
# Regression test for https://github.com/poanetwork/blockscout/issues/1644
test "discards parent block if it isn't related to the current one because of reorg",
test "discards neighbouring blocks if they aren't related to the current one because of reorg and/or import timeout",
%{consensus_block: %Block{number: block_number, hash: block_hash, miner_hash: miner_hash}, options: options} do
old_block = insert(:block, parent_hash: block_hash, number: block_number + 1)
insert(:block, parent_hash: old_block.hash, number: old_block.number + 1)
old_block1 = params_for(:block, miner_hash: miner_hash, parent_hash: block_hash, number: block_number + 1)
new_block1 = params_for(:block, parent_hash: block_hash, number: block_number + 1, miner_hash: miner_hash)
new_block1 = params_for(:block, miner_hash: miner_hash, parent_hash: block_hash, number: block_number + 1)
new_block2 = params_for(:block, miner_hash: miner_hash, parent_hash: new_block1.hash, number: block_number + 2)
new_block2 =
params_for(:block, parent_hash: new_block1.hash, number: new_block1.number + 1, miner_hash: miner_hash)
range = block_number..(block_number + 2)
%Ecto.Changeset{valid?: true, changes: block_changes} = Block.changeset(%Block{}, new_block2)
changes_list = [block_changes]
insert_block(new_block1, options)
insert_block(new_block2, options)
assert Chain.missing_block_number_ranges(range) == []
Multi.new()
|> Blocks.run(changes_list, options)
|> Repo.transaction()
insert_block(old_block1, options)
assert Chain.missing_block_number_ranges(range) == [(block_number + 2)..(block_number + 2)]
assert Chain.missing_block_number_ranges(block_number..new_block2.number) == [old_block.number..old_block.number]
insert_block(new_block2, options)
assert Chain.missing_block_number_ranges(range) == [(block_number + 1)..(block_number + 1)]
%Ecto.Changeset{valid?: true, changes: block_changes} = Block.changeset(%Block{}, new_block1)
changes_list = [block_changes]
insert_block(new_block1, options)
assert Chain.missing_block_number_ranges(range) == []
end
end
Multi.new()
|> Blocks.run(changes_list, options)
|> Repo.transaction()
defp insert_block(block_params, options) do
%Ecto.Changeset{valid?: true, changes: block_changes} = Block.changeset(%Block{}, block_params)
assert Chain.missing_block_number_ranges(block_number..new_block2.number) == []
end
Multi.new()
|> Blocks.run([block_changes], options)
|> Repo.transaction()
end
defp count(schema) do

@ -3667,27 +3667,6 @@ defmodule Explorer.ChainTest do
end
end
describe "list_block_numbers_with_invalid_consensus/0" do
test "returns a list of block numbers with invalid consensus" do
block1 = insert(:block)
block2_with_invalid_consensus = insert(:block, parent_hash: block1.hash, consensus: false)
_block2 = insert(:block, parent_hash: block1.hash, number: block2_with_invalid_consensus.number)
block3 = insert(:block, parent_hash: block2_with_invalid_consensus.hash)
block4 = insert(:block, parent_hash: block3.hash)
block5 = insert(:block, parent_hash: block4.hash)
block6_without_consensus = insert(:block, parent_hash: block5.hash, consensus: false)
block6 = insert(:block, parent_hash: block5.hash, number: block6_without_consensus.number)
block7 = insert(:block, parent_hash: block6.hash)
block8_with_invalid_consensus = insert(:block, parent_hash: block7.hash, consensus: false)
_block8 = insert(:block, parent_hash: block7.hash, number: block8_with_invalid_consensus.number)
block9 = insert(:block, parent_hash: block8_with_invalid_consensus.hash)
_block10 = insert(:block, parent_hash: block9.hash)
assert Chain.list_block_numbers_with_invalid_consensus() ==
[block2_with_invalid_consensus.number, block8_with_invalid_consensus.number]
end
end
describe "block_combined_rewards/1" do
test "sums the block_rewards values" do
block = insert(:block)

@ -1,45 +0,0 @@
defmodule Indexer.Block.InvalidConsensus.Supervisor do
@moduledoc """
Supervises process for ensuring blocks with invalid consensus get queued for
indexing.
"""
use Supervisor
alias Indexer.Block.InvalidConsensus.Worker
def child_spec([]) do
child_spec([[]])
end
def child_spec([init_arguments]) do
child_spec([init_arguments, [name: __MODULE__]])
end
def child_spec([_init_arguments, _gen_server_options] = start_link_arguments) do
spec = %{
id: __MODULE__,
start: {__MODULE__, :start_link, start_link_arguments},
restart: :transient,
type: :supervisor
}
Supervisor.child_spec(spec, [])
end
def start_link(init_arguments, gen_server_options \\ []) do
Supervisor.start_link(__MODULE__, init_arguments, gen_server_options)
end
@impl Supervisor
def init(_) do
children = [
{Worker, [[supervisor: self()], [name: Worker]]},
{Task.Supervisor, name: Indexer.Block.InvalidConsensus.TaskSupervisor}
]
opts = [strategy: :one_for_all]
Supervisor.init(children, opts)
end
end

@ -1,99 +0,0 @@
defmodule Indexer.Block.InvalidConsensus.Worker do
@moduledoc """
Finds blocks with invalid consensus and queues them up to be refetched. This
process does this once, after the application starts up.
A block has invalid consensus when it is referenced as the parent hash of a
block with consensus while not having consensus (consensus=false). Only one
block can have consensus at a given height (block number).
"""
use GenServer
require Logger
alias Explorer.Chain
alias Indexer.Block.Catchup.Fetcher
alias Indexer.Block.InvalidConsensus.TaskSupervisor
def child_spec([init_arguments]) do
child_spec([init_arguments, []])
end
def child_spec([_init_arguments, _gen_server_options] = start_link_arguments) do
spec = %{
id: __MODULE__,
start: {__MODULE__, :start_link, start_link_arguments},
restart: :transient,
type: :worker
}
Supervisor.child_spec(spec, [])
end
def start_link(init_arguments, gen_server_options \\ []) do
GenServer.start_link(__MODULE__, init_arguments, gen_server_options)
end
def init(opts) do
sup_pid = Keyword.fetch!(opts, :supervisor)
retry_interval = Keyword.get(opts, :retry_interval, 10_000)
send(self(), :scan)
state = %{
block_numbers: [],
retry_interval: retry_interval,
sup_pid: sup_pid,
task_ref: nil
}
{:ok, state}
end
def handle_info(:scan, state) do
block_numbers = Chain.list_block_numbers_with_invalid_consensus()
case block_numbers do
[] ->
Supervisor.stop(state.sup_pid, :normal)
{:noreply, state}
block_numbers ->
Process.send_after(self(), :push_front_blocks, state.retry_interval)
{:noreply, %{state | block_numbers: block_numbers}}
end
end
def handle_info(:push_front_blocks, %{block_numbers: block_numbers} = state) do
%Task{ref: ref} = async_push_front(block_numbers)
{:noreply, %{state | task_ref: ref}}
end
def handle_info({ref, :ok}, %{task_ref: ref, sup_pid: sup_pid}) do
Process.demonitor(ref, [:flush])
Supervisor.stop(sup_pid, :normal)
{:stop, :shutdown}
end
def handle_info({ref, {:error, reason}}, %{task_ref: ref, retry_interval: millis} = state) do
case reason do
:queue_unavailable -> :ok
_ -> Logger.error(fn -> inspect(reason) end)
end
Process.demonitor(ref, [:flush])
Process.send_after(self(), :push_front_blocks, millis)
{:noreply, %{state | task_ref: nil}}
end
def handle_info({:DOWN, ref, :process, _, _}, %{task_ref: ref, retry_interval: millis} = state) do
Process.send_after(self(), :push_front_blocks, millis)
{:noreply, %{state | task_ref: nil}}
end
defp async_push_front(block_numbers) do
Task.Supervisor.async_nolink(TaskSupervisor, Fetcher, :push_front, [block_numbers])
end
end

@ -1,34 +0,0 @@
defmodule Indexer.Block.Realtime.ConsensusEnsurer do
@moduledoc """
Triggers a refetch if a given block doesn't have consensus.
"""
require Logger
alias Explorer.Chain
alias Explorer.Chain.Hash
alias Indexer.Block.Realtime.Fetcher
def perform(_, number, _) when not is_integer(number) or number < 0, do: :ok
def perform(%Hash{byte_count: unquote(Hash.Full.byte_count())} = block_hash, number, block_fetcher) do
case Chain.hash_to_block(block_hash) do
{:ok, %{consensus: true} = _block} ->
:ignore
_ ->
Logger.info(fn ->
[
"refetch from consensus was found on block (",
to_string(number),
"). A reorg initiated."
]
end)
# trigger refetch if consensus=false or block was not found
Fetcher.fetch_and_import_block(number, block_fetcher, true)
end
:ok
end
end

@ -27,7 +27,7 @@ defmodule Indexer.Block.Realtime.Fetcher do
alias Explorer.Chain.TokenTransfer
alias Explorer.Counters.AverageBlockTime
alias Indexer.{AddressExtraction, Block, TokenBalances, Tracer}
alias Indexer.Block.Realtime.{ConsensusEnsurer, TaskSupervisor}
alias Indexer.Block.Realtime.TaskSupervisor
alias Timex.Duration
@behaviour Block.Fetcher
@ -269,12 +269,7 @@ defmodule Indexer.Block.Realtime.Fetcher do
@decorate span(tracer: Tracer)
defp do_fetch_and_import_block(block_number_to_fetch, block_fetcher, retry) do
case fetch_and_import_range(block_fetcher, block_number_to_fetch..block_number_to_fetch) do
{:ok, %{inserted: inserted, errors: []}} ->
for block <- Map.get(inserted, :blocks, []) do
args = [block.parent_hash, block.number - 1, block_fetcher]
Task.Supervisor.start_child(TaskSupervisor, ConsensusEnsurer, :perform, args)
end
{:ok, %{inserted: _, errors: []}} ->
Logger.debug("Fetched and imported.")
{:ok, %{inserted: _, errors: [_ | _] = errors}} ->

@ -4,7 +4,7 @@ defmodule Indexer.Block.Supervisor do
"""
alias Indexer.Block
alias Indexer.Block.{Catchup, InvalidConsensus, Realtime, Reward, Uncle}
alias Indexer.Block.{Catchup, Realtime, Reward, Uncle}
alias Indexer.Temporary.{AddressesWithoutCode, FailedCreatedAddresses}
use Supervisor
@ -50,7 +50,6 @@ defmodule Indexer.Block.Supervisor do
%{block_fetcher: block_fetcher, block_interval: block_interval, memory_monitor: memory_monitor},
[name: Catchup.Supervisor]
]},
{InvalidConsensus.Supervisor, [[], [name: InvalidConsensus.Supervisor]]},
{Realtime.Supervisor,
[
%{block_fetcher: realtime_block_fetcher, subscribe_named_arguments: realtime_subscribe_named_arguments},

@ -1,87 +0,0 @@
defmodule Indexer.Block.InvalidConsensus.WorkerTest do
use Explorer.DataCase
alias Indexer.Sequence
alias Indexer.Block.InvalidConsensus.{Worker, TaskSupervisor}
@moduletag :capture_log
describe "start_link/1" do
test "starts the worker" do
assert {:ok, _pid} = Worker.start_link(supervisor: self())
end
end
describe "init/1" do
test "sends message to self" do
pid = self()
assert {:ok, %{task_ref: nil, block_numbers: [], sup_pid: ^pid}} = Worker.init(supervisor: self())
assert_received :scan
end
end
describe "handle_info with :scan" do
test "sends shutdown to supervisor" do
state = %{task_ref: nil, block_numbers: [], sup_pid: self()}
Task.async(fn -> Worker.handle_info(:scan, state) end)
assert_receive {_, _, {:terminate, :normal}}
end
test "sends message to self when blocks with invalid consensus are found" do
block1 = insert(:block)
block2_with_invalid_consensus = insert(:block, parent_hash: block1.hash, consensus: false)
_block2 = insert(:block, parent_hash: block1.hash, number: block2_with_invalid_consensus.number)
_block3 = insert(:block, parent_hash: block2_with_invalid_consensus.hash)
block_number = block2_with_invalid_consensus.number
expected_state = %{task_ref: nil, block_numbers: [block_number], retry_interval: 1}
state = %{task_ref: nil, block_numbers: [], retry_interval: 1}
assert {:noreply, ^expected_state} = Worker.handle_info(:scan, state)
assert_receive :push_front_blocks
end
end
describe "handle_info with :push_front_blocks" do
test "starts a task" do
task_sup_pid = start_supervised!({Task.Supervisor, name: TaskSupervisor})
start_supervised!({Sequence, [[ranges: [], step: -1], [name: :block_catchup_sequencer]]})
state = %{task_ref: nil, block_numbers: [1]}
assert {:noreply, %{task_ref: task_ref}} = Worker.handle_info(:push_front_blocks, state)
assert is_reference(task_ref)
refute_receive {^task_ref, {:error, :queue_unavailable}}
assert_receive {^task_ref, :ok}
stop_supervised(task_sup_pid)
end
end
describe "handle_info with task ref tuple" do
test "sends shutdown to supervisor on success" do
ref = Process.monitor(self())
state = %{task_ref: ref, block_numbers: [], sup_pid: self()}
Task.async(fn -> assert Worker.handle_info({ref, :ok}, state) end)
assert_receive {_, _, {:terminate, :normal}}
end
test "sends message to self to try again on failure" do
ref = Process.monitor(self())
state = %{task_ref: ref, block_numbers: [1], sup_pid: self(), retry_interval: 1}
expected_state = %{state | task_ref: nil}
assert {:noreply, ^expected_state} = Worker.handle_info({ref, {:error, :queue_unavailable}}, state)
assert_receive :push_front_blocks
end
end
describe "handle_info with failed task" do
test "sends message to self to try again" do
ref = Process.monitor(self())
state = %{task_ref: ref, block_numbers: [1], sup_pid: self(), retry_interval: 1}
assert {:noreply, %{task_ref: nil}} = Worker.handle_info({:DOWN, ref, :process, self(), :EXIT}, state)
assert_receive :push_front_blocks
end
end
end
Loading…
Cancel
Save