|
|
|
@ -88,16 +88,9 @@ defmodule Indexer.Block.Realtime.Fetcher do |
|
|
|
|
number = quantity_to_integer(quantity) |
|
|
|
|
# Subscriptions don't support getting all the blocks and transactions data, |
|
|
|
|
# so we need to go back and get the full block |
|
|
|
|
{new_previous_number, new_max_number} = |
|
|
|
|
case start_fetch_and_import(number, block_fetcher, previous_number, max_number_seen) do |
|
|
|
|
# The number may have not been inserted if it was part of a small skip |
|
|
|
|
:skip -> |
|
|
|
|
Logger.debug(["#{inspect(number)} was skipped"]) |
|
|
|
|
{previous_number, max_number_seen} |
|
|
|
|
start_fetch_and_import(number, block_fetcher, previous_number, max_number_seen) |
|
|
|
|
|
|
|
|
|
_ -> |
|
|
|
|
{number, new_max_number(number, max_number_seen)} |
|
|
|
|
end |
|
|
|
|
new_max_number = new_max_number(number, max_number_seen) |
|
|
|
|
|
|
|
|
|
Process.cancel_timer(timer) |
|
|
|
|
new_timer = schedule_polling() |
|
|
|
@ -105,7 +98,7 @@ defmodule Indexer.Block.Realtime.Fetcher do |
|
|
|
|
{:noreply, |
|
|
|
|
%{ |
|
|
|
|
state |
|
|
|
|
| previous_number: new_previous_number, |
|
|
|
|
| previous_number: number, |
|
|
|
|
max_number_seen: new_max_number, |
|
|
|
|
timer: new_timer |
|
|
|
|
}} |
|
|
|
@ -123,14 +116,7 @@ defmodule Indexer.Block.Realtime.Fetcher do |
|
|
|
|
{number, new_max_number} = |
|
|
|
|
case EthereumJSONRPC.fetch_block_number_by_tag("latest", json_rpc_named_arguments) do |
|
|
|
|
{:ok, number} when is_nil(max_number_seen) or number > max_number_seen -> |
|
|
|
|
# in case of polling the realtime fetcher should take care of all the |
|
|
|
|
# blocks in the skipping window, because the cathup fetcher wont |
|
|
|
|
max_skipping_distance = Application.get_env(:indexer, :max_skipping_distance) |
|
|
|
|
|
|
|
|
|
last_catchup_number = max(0, 10 - max_skipping_distance - 1) |
|
|
|
|
starting_number = max(previous_number, last_catchup_number) || last_catchup_number |
|
|
|
|
|
|
|
|
|
start_fetch_and_import(number, block_fetcher, starting_number, nil) |
|
|
|
|
start_fetch_and_import(number, block_fetcher, previous_number, number) |
|
|
|
|
|
|
|
|
|
{max_number_seen, number} |
|
|
|
|
|
|
|
|
@ -225,35 +211,27 @@ defmodule Indexer.Block.Realtime.Fetcher do |
|
|
|
|
end |
|
|
|
|
|
|
|
|
|
defp start_fetch_and_import(number, block_fetcher, previous_number, max_number_seen) do |
|
|
|
|
fetching_action = determine_fetching_action(number, previous_number, max_number_seen) |
|
|
|
|
start_at = determine_start_at(number, previous_number, max_number_seen) |
|
|
|
|
|
|
|
|
|
if fetching_action != :skip do |
|
|
|
|
for block_number_to_fetch <- fetching_action do |
|
|
|
|
for block_number_to_fetch <- start_at..number do |
|
|
|
|
args = [block_number_to_fetch, block_fetcher, reorg?(number, max_number_seen)] |
|
|
|
|
Task.Supervisor.start_child(TaskSupervisor, __MODULE__, :fetch_and_import_block, args) |
|
|
|
|
end |
|
|
|
|
end |
|
|
|
|
|
|
|
|
|
fetching_action |
|
|
|
|
end |
|
|
|
|
|
|
|
|
|
def determine_fetching_action(number, previous_number, max_number_seen) do |
|
|
|
|
cond do |
|
|
|
|
reorg?(number, max_number_seen) -> |
|
|
|
|
[number] |
|
|
|
|
defp determine_start_at(number, nil, nil), do: number |
|
|
|
|
|
|
|
|
|
can_be_skipped?(number, max_number_seen) -> |
|
|
|
|
:skip |
|
|
|
|
|
|
|
|
|
is_nil(previous_number) -> |
|
|
|
|
[number] |
|
|
|
|
defp determine_start_at(number, nil, max_number_seen) do |
|
|
|
|
determine_start_at(number, number - 1, max_number_seen) |
|
|
|
|
end |
|
|
|
|
|
|
|
|
|
true -> |
|
|
|
|
if number - previous_number - 1 > 10 do |
|
|
|
|
(number - 10)..number |
|
|
|
|
defp determine_start_at(number, previous_number, max_number_seen) do |
|
|
|
|
if reorg?(number, max_number_seen) do |
|
|
|
|
# set start_at to NOT fill in skipped numbers |
|
|
|
|
number |
|
|
|
|
else |
|
|
|
|
(previous_number + 1)..number |
|
|
|
|
end |
|
|
|
|
# set start_at to fill in skipped numbers, if any |
|
|
|
|
previous_number + 1 |
|
|
|
|
end |
|
|
|
|
end |
|
|
|
|
|
|
|
|
@ -263,14 +241,6 @@ defmodule Indexer.Block.Realtime.Fetcher do |
|
|
|
|
|
|
|
|
|
defp reorg?(_, _), do: false |
|
|
|
|
|
|
|
|
|
defp can_be_skipped?(number, max_number_seen) when is_integer(max_number_seen) and number > max_number_seen + 1 do |
|
|
|
|
max_skipping_distance = Application.get_env(:indexer, :max_skipping_distance) |
|
|
|
|
|
|
|
|
|
max_skipping_distance > 1 and number <= max_number_seen + max_skipping_distance |
|
|
|
|
end |
|
|
|
|
|
|
|
|
|
defp can_be_skipped?(_, _), do: false |
|
|
|
|
|
|
|
|
|
@reorg_delay 5_000 |
|
|
|
|
|
|
|
|
|
@decorate trace(name: "fetch", resource: "Indexer.Block.Realtime.Fetcher.fetch_and_import_block/3", tracer: Tracer) |
|
|
|
|