* master: (28 commits) feat: verify contracts via an RPC endpoint Remove obsolete InvalidConsensus.Worker Discard child block with parent_hash not matching hash of imported block Expand non-consensus block regression test to test for race conditions Remove obsolete ConsensusEnsurer feat: add not_decompiled_with_version filter Update fetcher_test.exs mix format fix test update gettext fix build remove fetching token 2 times update CHANGELOG update tokens in fetcher add CHANGELOG entry exclude decompiled smart contract from encoding add CHANGELOG entry update metadata in controller add CHANGELOG entry decrease token metadata update interval ...pull/1704/head
commit
ee635fa42a
File diff suppressed because one or more lines are too long
@ -1,45 +0,0 @@ |
||||
defmodule Indexer.Block.InvalidConsensus.Supervisor do |
||||
@moduledoc """ |
||||
Supervises process for ensuring blocks with invalid consensus get queued for |
||||
indexing. |
||||
""" |
||||
|
||||
use Supervisor |
||||
|
||||
alias Indexer.Block.InvalidConsensus.Worker |
||||
|
||||
def child_spec([]) do |
||||
child_spec([[]]) |
||||
end |
||||
|
||||
def child_spec([init_arguments]) do |
||||
child_spec([init_arguments, [name: __MODULE__]]) |
||||
end |
||||
|
||||
def child_spec([_init_arguments, _gen_server_options] = start_link_arguments) do |
||||
spec = %{ |
||||
id: __MODULE__, |
||||
start: {__MODULE__, :start_link, start_link_arguments}, |
||||
restart: :transient, |
||||
type: :supervisor |
||||
} |
||||
|
||||
Supervisor.child_spec(spec, []) |
||||
end |
||||
|
||||
def start_link(init_arguments, gen_server_options \\ []) do |
||||
Supervisor.start_link(__MODULE__, init_arguments, gen_server_options) |
||||
end |
||||
|
||||
@impl Supervisor |
||||
def init(_) do |
||||
children = [ |
||||
{Worker, [[supervisor: self()], [name: Worker]]}, |
||||
{Task.Supervisor, name: Indexer.Block.InvalidConsensus.TaskSupervisor} |
||||
] |
||||
|
||||
opts = [strategy: :one_for_all] |
||||
|
||||
Supervisor.init(children, opts) |
||||
end |
||||
end |
@ -1,99 +0,0 @@ |
||||
defmodule Indexer.Block.InvalidConsensus.Worker do |
||||
@moduledoc """ |
||||
Finds blocks with invalid consensus and queues them up to be refetched. This |
||||
process does this once, after the application starts up. |
||||
|
||||
A block has invalid consensus when it is referenced as the parent hash of a |
||||
block with consensus while not having consensus (consensus=false). Only one |
||||
block can have consensus at a given height (block number). |
||||
""" |
||||
|
||||
use GenServer |
||||
|
||||
require Logger |
||||
|
||||
alias Explorer.Chain |
||||
alias Indexer.Block.Catchup.Fetcher |
||||
alias Indexer.Block.InvalidConsensus.TaskSupervisor |
||||
|
||||
def child_spec([init_arguments]) do |
||||
child_spec([init_arguments, []]) |
||||
end |
||||
|
||||
def child_spec([_init_arguments, _gen_server_options] = start_link_arguments) do |
||||
spec = %{ |
||||
id: __MODULE__, |
||||
start: {__MODULE__, :start_link, start_link_arguments}, |
||||
restart: :transient, |
||||
type: :worker |
||||
} |
||||
|
||||
Supervisor.child_spec(spec, []) |
||||
end |
||||
|
||||
def start_link(init_arguments, gen_server_options \\ []) do |
||||
GenServer.start_link(__MODULE__, init_arguments, gen_server_options) |
||||
end |
||||
|
||||
def init(opts) do |
||||
sup_pid = Keyword.fetch!(opts, :supervisor) |
||||
retry_interval = Keyword.get(opts, :retry_interval, 10_000) |
||||
|
||||
send(self(), :scan) |
||||
|
||||
state = %{ |
||||
block_numbers: [], |
||||
retry_interval: retry_interval, |
||||
sup_pid: sup_pid, |
||||
task_ref: nil |
||||
} |
||||
|
||||
{:ok, state} |
||||
end |
||||
|
||||
def handle_info(:scan, state) do |
||||
block_numbers = Chain.list_block_numbers_with_invalid_consensus() |
||||
|
||||
case block_numbers do |
||||
[] -> |
||||
Supervisor.stop(state.sup_pid, :normal) |
||||
{:noreply, state} |
||||
|
||||
block_numbers -> |
||||
Process.send_after(self(), :push_front_blocks, state.retry_interval) |
||||
{:noreply, %{state | block_numbers: block_numbers}} |
||||
end |
||||
end |
||||
|
||||
def handle_info(:push_front_blocks, %{block_numbers: block_numbers} = state) do |
||||
%Task{ref: ref} = async_push_front(block_numbers) |
||||
{:noreply, %{state | task_ref: ref}} |
||||
end |
||||
|
||||
def handle_info({ref, :ok}, %{task_ref: ref, sup_pid: sup_pid}) do |
||||
Process.demonitor(ref, [:flush]) |
||||
Supervisor.stop(sup_pid, :normal) |
||||
{:stop, :shutdown} |
||||
end |
||||
|
||||
def handle_info({ref, {:error, reason}}, %{task_ref: ref, retry_interval: millis} = state) do |
||||
case reason do |
||||
:queue_unavailable -> :ok |
||||
_ -> Logger.error(fn -> inspect(reason) end) |
||||
end |
||||
|
||||
Process.demonitor(ref, [:flush]) |
||||
Process.send_after(self(), :push_front_blocks, millis) |
||||
|
||||
{:noreply, %{state | task_ref: nil}} |
||||
end |
||||
|
||||
def handle_info({:DOWN, ref, :process, _, _}, %{task_ref: ref, retry_interval: millis} = state) do |
||||
Process.send_after(self(), :push_front_blocks, millis) |
||||
{:noreply, %{state | task_ref: nil}} |
||||
end |
||||
|
||||
defp async_push_front(block_numbers) do |
||||
Task.Supervisor.async_nolink(TaskSupervisor, Fetcher, :push_front, [block_numbers]) |
||||
end |
||||
end |
@ -1,34 +0,0 @@ |
||||
defmodule Indexer.Block.Realtime.ConsensusEnsurer do |
||||
@moduledoc """ |
||||
Triggers a refetch if a given block doesn't have consensus. |
||||
|
||||
""" |
||||
require Logger |
||||
|
||||
alias Explorer.Chain |
||||
alias Explorer.Chain.Hash |
||||
alias Indexer.Block.Realtime.Fetcher |
||||
|
||||
def perform(_, number, _) when not is_integer(number) or number < 0, do: :ok |
||||
|
||||
def perform(%Hash{byte_count: unquote(Hash.Full.byte_count())} = block_hash, number, block_fetcher) do |
||||
case Chain.hash_to_block(block_hash) do |
||||
{:ok, %{consensus: true} = _block} -> |
||||
:ignore |
||||
|
||||
_ -> |
||||
Logger.info(fn -> |
||||
[ |
||||
"refetch from consensus was found on block (", |
||||
to_string(number), |
||||
"). A reorg initiated." |
||||
] |
||||
end) |
||||
|
||||
# trigger refetch if consensus=false or block was not found |
||||
Fetcher.fetch_and_import_block(number, block_fetcher, true) |
||||
end |
||||
|
||||
:ok |
||||
end |
||||
end |
@ -1,87 +0,0 @@ |
||||
defmodule Indexer.Block.InvalidConsensus.WorkerTest do |
||||
use Explorer.DataCase |
||||
|
||||
alias Indexer.Sequence |
||||
alias Indexer.Block.InvalidConsensus.{Worker, TaskSupervisor} |
||||
|
||||
@moduletag :capture_log |
||||
|
||||
describe "start_link/1" do |
||||
test "starts the worker" do |
||||
assert {:ok, _pid} = Worker.start_link(supervisor: self()) |
||||
end |
||||
end |
||||
|
||||
describe "init/1" do |
||||
test "sends message to self" do |
||||
pid = self() |
||||
assert {:ok, %{task_ref: nil, block_numbers: [], sup_pid: ^pid}} = Worker.init(supervisor: self()) |
||||
assert_received :scan |
||||
end |
||||
end |
||||
|
||||
describe "handle_info with :scan" do |
||||
test "sends shutdown to supervisor" do |
||||
state = %{task_ref: nil, block_numbers: [], sup_pid: self()} |
||||
Task.async(fn -> Worker.handle_info(:scan, state) end) |
||||
assert_receive {_, _, {:terminate, :normal}} |
||||
end |
||||
|
||||
test "sends message to self when blocks with invalid consensus are found" do |
||||
block1 = insert(:block) |
||||
block2_with_invalid_consensus = insert(:block, parent_hash: block1.hash, consensus: false) |
||||
_block2 = insert(:block, parent_hash: block1.hash, number: block2_with_invalid_consensus.number) |
||||
_block3 = insert(:block, parent_hash: block2_with_invalid_consensus.hash) |
||||
|
||||
block_number = block2_with_invalid_consensus.number |
||||
|
||||
expected_state = %{task_ref: nil, block_numbers: [block_number], retry_interval: 1} |
||||
state = %{task_ref: nil, block_numbers: [], retry_interval: 1} |
||||
|
||||
assert {:noreply, ^expected_state} = Worker.handle_info(:scan, state) |
||||
assert_receive :push_front_blocks |
||||
end |
||||
end |
||||
|
||||
describe "handle_info with :push_front_blocks" do |
||||
test "starts a task" do |
||||
task_sup_pid = start_supervised!({Task.Supervisor, name: TaskSupervisor}) |
||||
start_supervised!({Sequence, [[ranges: [], step: -1], [name: :block_catchup_sequencer]]}) |
||||
|
||||
state = %{task_ref: nil, block_numbers: [1]} |
||||
assert {:noreply, %{task_ref: task_ref}} = Worker.handle_info(:push_front_blocks, state) |
||||
assert is_reference(task_ref) |
||||
|
||||
refute_receive {^task_ref, {:error, :queue_unavailable}} |
||||
assert_receive {^task_ref, :ok} |
||||
|
||||
stop_supervised(task_sup_pid) |
||||
end |
||||
end |
||||
|
||||
describe "handle_info with task ref tuple" do |
||||
test "sends shutdown to supervisor on success" do |
||||
ref = Process.monitor(self()) |
||||
state = %{task_ref: ref, block_numbers: [], sup_pid: self()} |
||||
Task.async(fn -> assert Worker.handle_info({ref, :ok}, state) end) |
||||
assert_receive {_, _, {:terminate, :normal}} |
||||
end |
||||
|
||||
test "sends message to self to try again on failure" do |
||||
ref = Process.monitor(self()) |
||||
state = %{task_ref: ref, block_numbers: [1], sup_pid: self(), retry_interval: 1} |
||||
expected_state = %{state | task_ref: nil} |
||||
assert {:noreply, ^expected_state} = Worker.handle_info({ref, {:error, :queue_unavailable}}, state) |
||||
assert_receive :push_front_blocks |
||||
end |
||||
end |
||||
|
||||
describe "handle_info with failed task" do |
||||
test "sends message to self to try again" do |
||||
ref = Process.monitor(self()) |
||||
state = %{task_ref: ref, block_numbers: [1], sup_pid: self(), retry_interval: 1} |
||||
assert {:noreply, %{task_ref: nil}} = Worker.handle_info({:DOWN, ref, :process, self(), :EXIT}, state) |
||||
assert_receive :push_front_blocks |
||||
end |
||||
end |
||||
end |
Loading…
Reference in new issue