@ -88,9 +88,16 @@ defmodule Indexer.Block.Realtime.Fetcher do
number = quantity_to_integer ( quantity )
# Subscriptions don't support getting all the blocks and transactions data,
# so we need to go back and get the full block
start_fetch_and_import ( number , block_fetcher , previous_number , max_number_seen )
{ new_previous_number , new_max_number } =
case start_fetch_and_import ( number , block_fetcher , previous_number , max_number_seen ) do
# The number may have not been inserted if it was part of a small skip
:skip ->
Logger . debug ( [ " #{ inspect ( number ) } was skipped " ] )
{ previous_number , max_number_seen }
new_max_number = new_max_number ( number , max_number_seen )
_ ->
{ number , new_max_number ( number , max_number_seen ) }
end
Process . cancel_timer ( timer )
new_timer = schedule_polling ( )
@ -98,7 +105,7 @@ defmodule Indexer.Block.Realtime.Fetcher do
{ :noreply ,
%{
state
| previous_number : number ,
| previous_number : new_previous_n umber ,
max_number_seen : new_max_number ,
timer : new_timer
} }
@ -116,7 +123,14 @@ defmodule Indexer.Block.Realtime.Fetcher do
{ number , new_max_number } =
case EthereumJSONRPC . fetch_block_number_by_tag ( " latest " , json_rpc_named_arguments ) do
{ :ok , number } when is_nil ( max_number_seen ) or number > max_number_seen ->
start_fetch_and_import ( number , block_fetcher , previous_number , number )
# in case of polling the realtime fetcher should take care of all the
# blocks in the skipping window, because the cathup fetcher wont
max_skipping_distance = Application . get_env ( :indexer , :max_skipping_distance )
last_catchup_number = max ( 0 , 10 - max_skipping_distance - 1 )
starting_number = max ( previous_number , last_catchup_number ) || last_catchup_number
start_fetch_and_import ( number , block_fetcher , starting_number , nil )
{ max_number_seen , number }
@ -212,27 +226,31 @@ defmodule Indexer.Block.Realtime.Fetcher do
end
defp start_fetch_and_import ( number , block_fetcher , previous_number , max_number_seen ) do
start_at = determine_start_at ( number , previous_number , max_number_seen )
fetching_action = determine_fetching_action ( number , previous_number , max_number_seen )
for block_number_to_fetch <- start_at . . number do
args = [ block_number_to_fetch , block_fetcher , reorg? ( number , max_number_seen ) ]
Task.Supervisor . start_child ( TaskSupervisor , __MODULE__ , :fetch_and_import_block , args )
if fetching_action != :skip do
for block_number_to_fetch <- fetching_action do
args = [ block_number_to_fetch , block_fetcher , reorg? ( number , max_number_seen ) ]
Task.Supervisor . start_child ( TaskSupervisor , __MODULE__ , :fetch_and_import_block , args )
end
end
fetching_action
end
defp determine_start_at ( number , nil , nil ) , do : number
def determine_fetching_action ( number , previous_number , max_number_seen ) do
cond do
reorg? ( number , max_number_seen ) ->
[ number ]
defp determine_start_at ( number , nil , max_number_seen ) do
determine_start_at ( number , number - 1 , max_number_seen )
end
can_be_skipped? ( number , max_number_seen ) ->
:skip
defp determine_start_at ( number , previous_number , max_number_seen ) do
if reorg? ( number , max_number_seen ) do
# set start_at to NOT fill in skipped numbers
number
else
# set start_at to fill in skipped numbers, if any
previous_number + 1
is_nil ( previous_number ) ->
[ number ]
true ->
( previous_number + 1 ) . . number
end
end
@ -242,6 +260,14 @@ defmodule Indexer.Block.Realtime.Fetcher do
defp reorg? ( _ , _ ) , do : false
defp can_be_skipped? ( number , max_number_seen ) when is_integer ( max_number_seen ) and number > max_number_seen + 1 do
max_skipping_distance = Application . get_env ( :indexer , :max_skipping_distance )
max_skipping_distance > 1 and number <= max_number_seen + max_skipping_distance
end
defp can_be_skipped? ( _ , _ ) , do : false
@reorg_delay 5_000
@decorate trace ( name : " fetch " , resource : " Indexer.Block.Realtime.Fetcher.fetch_and_import_block/3 " , tracer : Tracer )