Fix block realtime fetcher

pull/6840/head
POA 2 years ago
parent 844a753c85
commit 0895bc8293
  1. 50
      apps/indexer/lib/indexer/block/realtime/fetcher.ex

@ -42,7 +42,7 @@ defmodule Indexer.Block.Realtime.Fetcher do
@minimum_safe_polling_period :timer.seconds(1) @minimum_safe_polling_period :timer.seconds(1)
@enforce_keys ~w(block_fetcher)a @enforce_keys ~w(block_fetcher)a
defstruct ~w(block_fetcher subscription previous_number max_number_seen timer)a defstruct ~w(block_fetcher subscription previous_number timer)a
@type t :: %__MODULE__{ @type t :: %__MODULE__{
block_fetcher: %Block.Fetcher{ block_fetcher: %Block.Fetcher{
@ -53,8 +53,7 @@ defmodule Indexer.Block.Realtime.Fetcher do
receipts_concurrency: pos_integer() receipts_concurrency: pos_integer()
}, },
subscription: Subscription.t(), subscription: Subscription.t(),
previous_number: pos_integer() | nil, previous_number: pos_integer() | nil
max_number_seen: pos_integer() | nil
} }
def start_link([arguments, gen_server_options]) do def start_link([arguments, gen_server_options]) do
@ -82,7 +81,6 @@ defmodule Indexer.Block.Realtime.Fetcher do
block_fetcher: %Block.Fetcher{} = block_fetcher, block_fetcher: %Block.Fetcher{} = block_fetcher,
subscription: %Subscription{} = subscription, subscription: %Subscription{} = subscription,
previous_number: previous_number, previous_number: previous_number,
max_number_seen: max_number_seen,
timer: timer timer: timer
} = state } = state
) )
@ -95,9 +93,7 @@ defmodule Indexer.Block.Realtime.Fetcher do
# Subscriptions don't support getting all the blocks and transactions data, # Subscriptions don't support getting all the blocks and transactions data,
# so we need to go back and get the full block # so we need to go back and get the full block
start_fetch_and_import(number, block_fetcher, previous_number, max_number_seen) start_fetch_and_import(number, block_fetcher, previous_number)
new_max_number = new_max_number(number, max_number_seen)
Process.cancel_timer(timer) Process.cancel_timer(timer)
new_timer = schedule_polling() new_timer = schedule_polling()
@ -106,7 +102,6 @@ defmodule Indexer.Block.Realtime.Fetcher do
%{ %{
state state
| previous_number: number, | previous_number: number,
max_number_seen: new_max_number,
timer: new_timer timer: new_timer
}} }}
end end
@ -116,19 +111,18 @@ defmodule Indexer.Block.Realtime.Fetcher do
:poll_latest_block_number, :poll_latest_block_number,
%__MODULE__{ %__MODULE__{
block_fetcher: %Block.Fetcher{json_rpc_named_arguments: json_rpc_named_arguments} = block_fetcher, block_fetcher: %Block.Fetcher{json_rpc_named_arguments: json_rpc_named_arguments} = block_fetcher,
previous_number: previous_number, previous_number: previous_number
max_number_seen: max_number_seen
} = state } = state
) do ) do
{number, new_max_number} = new_previous_number =
case EthereumJSONRPC.fetch_block_number_by_tag("latest", json_rpc_named_arguments) do case EthereumJSONRPC.fetch_block_number_by_tag("latest", json_rpc_named_arguments) do
{:ok, number} when is_nil(max_number_seen) or number > max_number_seen -> {:ok, number} when is_nil(previous_number) or number != previous_number ->
start_fetch_and_import(number, block_fetcher, previous_number, number) start_fetch_and_import(number, block_fetcher, previous_number)
{max_number_seen, number} number
_ -> _ ->
{previous_number, max_number_seen} previous_number
end end
timer = schedule_polling() timer = schedule_polling()
@ -136,8 +130,7 @@ defmodule Indexer.Block.Realtime.Fetcher do
{:noreply, {:noreply,
%{ %{
state state
| previous_number: number, | previous_number: new_previous_number,
max_number_seen: new_max_number,
timer: timer timer: timer
}} }}
end end
@ -169,10 +162,6 @@ defmodule Indexer.Block.Realtime.Fetcher do
defp subscribe_to_new_heads(state, _), do: state defp subscribe_to_new_heads(state, _), do: state
defp new_max_number(number, nil), do: number
defp new_max_number(number, max_number_seen), do: max(number, max_number_seen)
defp schedule_polling do defp schedule_polling do
polling_period = polling_period =
case AverageBlockTime.average_block_time() do case AverageBlockTime.average_block_time() do
@ -239,23 +228,20 @@ defmodule Indexer.Block.Realtime.Fetcher do
{:ok, []} {:ok, []}
end end
defp start_fetch_and_import(number, block_fetcher, previous_number, max_number_seen) do defp start_fetch_and_import(number, block_fetcher, previous_number) do
start_at = determine_start_at(number, previous_number, max_number_seen) start_at = determine_start_at(number, previous_number)
is_reorg = reorg?(number, previous_number)
for block_number_to_fetch <- start_at..number do for block_number_to_fetch <- start_at..number do
args = [block_number_to_fetch, block_fetcher, reorg?(number, max_number_seen)] args = [block_number_to_fetch, block_fetcher, is_reorg]
Task.Supervisor.start_child(TaskSupervisor, __MODULE__, :fetch_and_import_block, args) Task.Supervisor.start_child(TaskSupervisor, __MODULE__, :fetch_and_import_block, args)
end end
end end
defp determine_start_at(number, nil, nil), do: number defp determine_start_at(number, nil), do: number
defp determine_start_at(number, nil, max_number_seen) do
determine_start_at(number, number - 1, max_number_seen)
end
defp determine_start_at(number, previous_number, max_number_seen) do defp determine_start_at(number, previous_number) do
if reorg?(number, max_number_seen) do if reorg?(number, previous_number) do
# set start_at to NOT fill in skipped numbers # set start_at to NOT fill in skipped numbers
number number
else else
@ -264,7 +250,7 @@ defmodule Indexer.Block.Realtime.Fetcher do
end end
end end
defp reorg?(number, max_number_seen) when is_integer(max_number_seen) and number <= max_number_seen do defp reorg?(number, previous_number) when is_integer(previous_number) and number <= previous_number do
true true
end end

Loading…
Cancel
Save