|
|
|
@ -71,59 +71,67 @@ defmodule Indexer.Block.Catchup.Fetcher do |
|
|
|
|
) do |
|
|
|
|
Logger.metadata(fetcher: :block_catchup) |
|
|
|
|
|
|
|
|
|
{:ok, latest_block_number} = |
|
|
|
|
case latest_block() do |
|
|
|
|
nil -> |
|
|
|
|
EthereumJSONRPC.fetch_block_number_by_tag("latest", json_rpc_named_arguments) |
|
|
|
|
|
|
|
|
|
number -> |
|
|
|
|
{:ok, number} |
|
|
|
|
with {:ok, latest_block_number} <- fetch_last_block(json_rpc_named_arguments) do |
|
|
|
|
case latest_block_number do |
|
|
|
|
# let realtime indexer get the genesis block |
|
|
|
|
0 -> |
|
|
|
|
%{first_block_number: 0, missing_block_count: 0, last_block_number: 0, shrunk: false} |
|
|
|
|
|
|
|
|
|
_ -> |
|
|
|
|
# realtime indexer gets the current latest block |
|
|
|
|
first = latest_block_number - 1 |
|
|
|
|
last = last_block() |
|
|
|
|
|
|
|
|
|
Logger.metadata(first_block_number: first, last_block_number: last) |
|
|
|
|
|
|
|
|
|
missing_ranges = Chain.missing_block_number_ranges(first..last) |
|
|
|
|
|
|
|
|
|
range_count = Enum.count(missing_ranges) |
|
|
|
|
|
|
|
|
|
missing_block_count = |
|
|
|
|
missing_ranges |
|
|
|
|
|> Stream.map(&Enum.count/1) |
|
|
|
|
|> Enum.sum() |
|
|
|
|
|
|
|
|
|
Logger.debug(fn -> "Missed blocks in ranges." end, |
|
|
|
|
missing_block_range_count: range_count, |
|
|
|
|
missing_block_count: missing_block_count |
|
|
|
|
) |
|
|
|
|
|
|
|
|
|
shrunk = |
|
|
|
|
case missing_block_count do |
|
|
|
|
0 -> |
|
|
|
|
false |
|
|
|
|
|
|
|
|
|
_ -> |
|
|
|
|
step = step(first, last, blocks_batch_size) |
|
|
|
|
sequence_opts = put_memory_monitor([ranges: missing_ranges, step: step], state) |
|
|
|
|
gen_server_opts = [name: @sequence_name] |
|
|
|
|
{:ok, sequence} = Sequence.start_link(sequence_opts, gen_server_opts) |
|
|
|
|
Sequence.cap(sequence) |
|
|
|
|
|
|
|
|
|
stream_fetch_and_import(state, sequence) |
|
|
|
|
|
|
|
|
|
Shrinkable.shrunk?(sequence) |
|
|
|
|
end |
|
|
|
|
|
|
|
|
|
%{ |
|
|
|
|
first_block_number: first, |
|
|
|
|
last_block_number: last, |
|
|
|
|
missing_block_count: missing_block_count, |
|
|
|
|
shrunk: shrunk |
|
|
|
|
} |
|
|
|
|
end |
|
|
|
|
end |
|
|
|
|
end |
|
|
|
|
|
|
|
|
|
case latest_block_number do |
|
|
|
|
# let realtime indexer get the genesis block |
|
|
|
|
0 -> |
|
|
|
|
%{first_block_number: 0, missing_block_count: 0, last_block_number: 0, shrunk: false} |
|
|
|
|
|
|
|
|
|
_ -> |
|
|
|
|
# realtime indexer gets the current latest block |
|
|
|
|
first = latest_block_number - 1 |
|
|
|
|
last = last_block() |
|
|
|
|
|
|
|
|
|
Logger.metadata(first_block_number: first, last_block_number: last) |
|
|
|
|
|
|
|
|
|
missing_ranges = Chain.missing_block_number_ranges(first..last) |
|
|
|
|
|
|
|
|
|
range_count = Enum.count(missing_ranges) |
|
|
|
|
|
|
|
|
|
missing_block_count = |
|
|
|
|
missing_ranges |
|
|
|
|
|> Stream.map(&Enum.count/1) |
|
|
|
|
|> Enum.sum() |
|
|
|
|
|
|
|
|
|
Logger.debug(fn -> "Missed blocks in ranges." end, |
|
|
|
|
missing_block_range_count: range_count, |
|
|
|
|
missing_block_count: missing_block_count |
|
|
|
|
) |
|
|
|
|
|
|
|
|
|
shrunk = |
|
|
|
|
case missing_block_count do |
|
|
|
|
0 -> |
|
|
|
|
false |
|
|
|
|
|
|
|
|
|
_ -> |
|
|
|
|
step = step(first, last, blocks_batch_size) |
|
|
|
|
sequence_opts = put_memory_monitor([ranges: missing_ranges, step: step], state) |
|
|
|
|
gen_server_opts = [name: @sequence_name] |
|
|
|
|
{:ok, sequence} = Sequence.start_link(sequence_opts, gen_server_opts) |
|
|
|
|
Sequence.cap(sequence) |
|
|
|
|
|
|
|
|
|
stream_fetch_and_import(state, sequence) |
|
|
|
|
|
|
|
|
|
Shrinkable.shrunk?(sequence) |
|
|
|
|
end |
|
|
|
|
defp fetch_last_block(json_rpc_named_arguments) do |
|
|
|
|
case latest_block() do |
|
|
|
|
nil -> |
|
|
|
|
EthereumJSONRPC.fetch_block_number_by_tag("latest", json_rpc_named_arguments) |
|
|
|
|
|
|
|
|
|
%{first_block_number: first, last_block_number: last, missing_block_count: missing_block_count, shrunk: shrunk} |
|
|
|
|
number -> |
|
|
|
|
{:ok, number} |
|
|
|
|
end |
|
|
|
|
end |
|
|
|
|
|
|
|
|
|