This worker only handled one of invalid consensus cases, i.e. when a parent block lost consensus, but the child one didn't. Even then it worked only when all blocks are reliably and sequentially imported (certainly not our case), and only once on indexer launch. Now this particular case is covered by previous commit, and we don't need this worker at all.pull/1684/head
parent
6450c3ac61
commit
0e15873fbc
@ -1,45 +0,0 @@ |
||||
defmodule Indexer.Block.InvalidConsensus.Supervisor do |
||||
@moduledoc """ |
||||
Supervises process for ensuring blocks with invalid consensus get queued for |
||||
indexing. |
||||
""" |
||||
|
||||
use Supervisor |
||||
|
||||
alias Indexer.Block.InvalidConsensus.Worker |
||||
|
||||
def child_spec([]) do |
||||
child_spec([[]]) |
||||
end |
||||
|
||||
def child_spec([init_arguments]) do |
||||
child_spec([init_arguments, [name: __MODULE__]]) |
||||
end |
||||
|
||||
def child_spec([_init_arguments, _gen_server_options] = start_link_arguments) do |
||||
spec = %{ |
||||
id: __MODULE__, |
||||
start: {__MODULE__, :start_link, start_link_arguments}, |
||||
restart: :transient, |
||||
type: :supervisor |
||||
} |
||||
|
||||
Supervisor.child_spec(spec, []) |
||||
end |
||||
|
||||
def start_link(init_arguments, gen_server_options \\ []) do |
||||
Supervisor.start_link(__MODULE__, init_arguments, gen_server_options) |
||||
end |
||||
|
||||
@impl Supervisor |
||||
def init(_) do |
||||
children = [ |
||||
{Worker, [[supervisor: self()], [name: Worker]]}, |
||||
{Task.Supervisor, name: Indexer.Block.InvalidConsensus.TaskSupervisor} |
||||
] |
||||
|
||||
opts = [strategy: :one_for_all] |
||||
|
||||
Supervisor.init(children, opts) |
||||
end |
||||
end |
@ -1,99 +0,0 @@ |
||||
defmodule Indexer.Block.InvalidConsensus.Worker do |
||||
@moduledoc """ |
||||
Finds blocks with invalid consensus and queues them up to be refetched. This |
||||
process does this once, after the application starts up. |
||||
|
||||
A block has invalid consensus when it is referenced as the parent hash of a |
||||
block with consensus while not having consensus (consensus=false). Only one |
||||
block can have consensus at a given height (block number). |
||||
""" |
||||
|
||||
use GenServer |
||||
|
||||
require Logger |
||||
|
||||
alias Explorer.Chain |
||||
alias Indexer.Block.Catchup.Fetcher |
||||
alias Indexer.Block.InvalidConsensus.TaskSupervisor |
||||
|
||||
def child_spec([init_arguments]) do |
||||
child_spec([init_arguments, []]) |
||||
end |
||||
|
||||
def child_spec([_init_arguments, _gen_server_options] = start_link_arguments) do |
||||
spec = %{ |
||||
id: __MODULE__, |
||||
start: {__MODULE__, :start_link, start_link_arguments}, |
||||
restart: :transient, |
||||
type: :worker |
||||
} |
||||
|
||||
Supervisor.child_spec(spec, []) |
||||
end |
||||
|
||||
def start_link(init_arguments, gen_server_options \\ []) do |
||||
GenServer.start_link(__MODULE__, init_arguments, gen_server_options) |
||||
end |
||||
|
||||
def init(opts) do |
||||
sup_pid = Keyword.fetch!(opts, :supervisor) |
||||
retry_interval = Keyword.get(opts, :retry_interval, 10_000) |
||||
|
||||
send(self(), :scan) |
||||
|
||||
state = %{ |
||||
block_numbers: [], |
||||
retry_interval: retry_interval, |
||||
sup_pid: sup_pid, |
||||
task_ref: nil |
||||
} |
||||
|
||||
{:ok, state} |
||||
end |
||||
|
||||
def handle_info(:scan, state) do |
||||
block_numbers = Chain.list_block_numbers_with_invalid_consensus() |
||||
|
||||
case block_numbers do |
||||
[] -> |
||||
Supervisor.stop(state.sup_pid, :normal) |
||||
{:noreply, state} |
||||
|
||||
block_numbers -> |
||||
Process.send_after(self(), :push_front_blocks, state.retry_interval) |
||||
{:noreply, %{state | block_numbers: block_numbers}} |
||||
end |
||||
end |
||||
|
||||
def handle_info(:push_front_blocks, %{block_numbers: block_numbers} = state) do |
||||
%Task{ref: ref} = async_push_front(block_numbers) |
||||
{:noreply, %{state | task_ref: ref}} |
||||
end |
||||
|
||||
def handle_info({ref, :ok}, %{task_ref: ref, sup_pid: sup_pid}) do |
||||
Process.demonitor(ref, [:flush]) |
||||
Supervisor.stop(sup_pid, :normal) |
||||
{:stop, :shutdown} |
||||
end |
||||
|
||||
def handle_info({ref, {:error, reason}}, %{task_ref: ref, retry_interval: millis} = state) do |
||||
case reason do |
||||
:queue_unavailable -> :ok |
||||
_ -> Logger.error(fn -> inspect(reason) end) |
||||
end |
||||
|
||||
Process.demonitor(ref, [:flush]) |
||||
Process.send_after(self(), :push_front_blocks, millis) |
||||
|
||||
{:noreply, %{state | task_ref: nil}} |
||||
end |
||||
|
||||
def handle_info({:DOWN, ref, :process, _, _}, %{task_ref: ref, retry_interval: millis} = state) do |
||||
Process.send_after(self(), :push_front_blocks, millis) |
||||
{:noreply, %{state | task_ref: nil}} |
||||
end |
||||
|
||||
defp async_push_front(block_numbers) do |
||||
Task.Supervisor.async_nolink(TaskSupervisor, Fetcher, :push_front, [block_numbers]) |
||||
end |
||||
end |
@ -1,87 +0,0 @@ |
||||
defmodule Indexer.Block.InvalidConsensus.WorkerTest do |
||||
use Explorer.DataCase |
||||
|
||||
alias Indexer.Sequence |
||||
alias Indexer.Block.InvalidConsensus.{Worker, TaskSupervisor} |
||||
|
||||
@moduletag :capture_log |
||||
|
||||
describe "start_link/1" do |
||||
test "starts the worker" do |
||||
assert {:ok, _pid} = Worker.start_link(supervisor: self()) |
||||
end |
||||
end |
||||
|
||||
describe "init/1" do |
||||
test "sends message to self" do |
||||
pid = self() |
||||
assert {:ok, %{task_ref: nil, block_numbers: [], sup_pid: ^pid}} = Worker.init(supervisor: self()) |
||||
assert_received :scan |
||||
end |
||||
end |
||||
|
||||
describe "handle_info with :scan" do |
||||
test "sends shutdown to supervisor" do |
||||
state = %{task_ref: nil, block_numbers: [], sup_pid: self()} |
||||
Task.async(fn -> Worker.handle_info(:scan, state) end) |
||||
assert_receive {_, _, {:terminate, :normal}} |
||||
end |
||||
|
||||
test "sends message to self when blocks with invalid consensus are found" do |
||||
block1 = insert(:block) |
||||
block2_with_invalid_consensus = insert(:block, parent_hash: block1.hash, consensus: false) |
||||
_block2 = insert(:block, parent_hash: block1.hash, number: block2_with_invalid_consensus.number) |
||||
_block3 = insert(:block, parent_hash: block2_with_invalid_consensus.hash) |
||||
|
||||
block_number = block2_with_invalid_consensus.number |
||||
|
||||
expected_state = %{task_ref: nil, block_numbers: [block_number], retry_interval: 1} |
||||
state = %{task_ref: nil, block_numbers: [], retry_interval: 1} |
||||
|
||||
assert {:noreply, ^expected_state} = Worker.handle_info(:scan, state) |
||||
assert_receive :push_front_blocks |
||||
end |
||||
end |
||||
|
||||
describe "handle_info with :push_front_blocks" do |
||||
test "starts a task" do |
||||
task_sup_pid = start_supervised!({Task.Supervisor, name: TaskSupervisor}) |
||||
start_supervised!({Sequence, [[ranges: [], step: -1], [name: :block_catchup_sequencer]]}) |
||||
|
||||
state = %{task_ref: nil, block_numbers: [1]} |
||||
assert {:noreply, %{task_ref: task_ref}} = Worker.handle_info(:push_front_blocks, state) |
||||
assert is_reference(task_ref) |
||||
|
||||
refute_receive {^task_ref, {:error, :queue_unavailable}} |
||||
assert_receive {^task_ref, :ok} |
||||
|
||||
stop_supervised(task_sup_pid) |
||||
end |
||||
end |
||||
|
||||
describe "handle_info with task ref tuple" do |
||||
test "sends shutdown to supervisor on success" do |
||||
ref = Process.monitor(self()) |
||||
state = %{task_ref: ref, block_numbers: [], sup_pid: self()} |
||||
Task.async(fn -> assert Worker.handle_info({ref, :ok}, state) end) |
||||
assert_receive {_, _, {:terminate, :normal}} |
||||
end |
||||
|
||||
test "sends message to self to try again on failure" do |
||||
ref = Process.monitor(self()) |
||||
state = %{task_ref: ref, block_numbers: [1], sup_pid: self(), retry_interval: 1} |
||||
expected_state = %{state | task_ref: nil} |
||||
assert {:noreply, ^expected_state} = Worker.handle_info({ref, {:error, :queue_unavailable}}, state) |
||||
assert_receive :push_front_blocks |
||||
end |
||||
end |
||||
|
||||
describe "handle_info with failed task" do |
||||
test "sends message to self to try again" do |
||||
ref = Process.monitor(self()) |
||||
state = %{task_ref: ref, block_numbers: [1], sup_pid: self(), retry_interval: 1} |
||||
assert {:noreply, %{task_ref: nil}} = Worker.handle_info({:DOWN, ref, :process, self(), :EXIT}, state) |
||||
assert_receive :push_front_blocks |
||||
end |
||||
end |
||||
end |
Loading…
Reference in new issue