|
|
|
@ -31,6 +31,7 @@ defmodule Indexer.Block.Catchup.Fetcher do |
|
|
|
|
@blocks_batch_size 10 |
|
|
|
|
@blocks_concurrency 10 |
|
|
|
|
@sequence_name :block_catchup_sequencer |
|
|
|
|
@geth_block_limit 128 |
|
|
|
|
|
|
|
|
|
defstruct blocks_batch_size: @blocks_batch_size, |
|
|
|
|
blocks_concurrency: @blocks_concurrency, |
|
|
|
@ -84,6 +85,7 @@ defmodule Indexer.Block.Catchup.Fetcher do |
|
|
|
|
Logger.metadata(first_block_number: first, last_block_number: last) |
|
|
|
|
|
|
|
|
|
missing_ranges = Chain.missing_block_number_ranges(first..last) |
|
|
|
|
|
|
|
|
|
range_count = Enum.count(missing_ranges) |
|
|
|
|
|
|
|
|
|
missing_block_count = |
|
|
|
@ -119,7 +121,7 @@ defmodule Indexer.Block.Catchup.Fetcher do |
|
|
|
|
@async_import_remaining_block_data_options ~w(address_hash_to_fetched_balance_block_number)a |
|
|
|
|
|
|
|
|
|
@impl Block.Fetcher |
|
|
|
|
def import(_, options) when is_map(options) do |
|
|
|
|
def import(%Block.Fetcher{json_rpc_named_arguments: json_rpc_named_arguments}, options) when is_map(options) do |
|
|
|
|
{async_import_remaining_block_data_options, options_with_block_rewards_errors} = |
|
|
|
|
Map.split(options, @async_import_remaining_block_data_options) |
|
|
|
|
|
|
|
|
@ -132,18 +134,23 @@ defmodule Indexer.Block.Catchup.Fetcher do |
|
|
|
|
with {:import, {:ok, imported} = ok} <- {:import, Chain.import(full_chain_import_options)} do |
|
|
|
|
async_import_remaining_block_data( |
|
|
|
|
imported, |
|
|
|
|
Map.put(async_import_remaining_block_data_options, :block_rewards, %{errors: block_reward_errors}) |
|
|
|
|
Map.put(async_import_remaining_block_data_options, :block_rewards, %{errors: block_reward_errors}), |
|
|
|
|
json_rpc_named_arguments |
|
|
|
|
) |
|
|
|
|
|
|
|
|
|
ok |
|
|
|
|
end |
|
|
|
|
end |
|
|
|
|
|
|
|
|
|
defp async_import_remaining_block_data(imported, %{block_rewards: %{errors: block_reward_errors}} = options) do |
|
|
|
|
defp async_import_remaining_block_data( |
|
|
|
|
imported, |
|
|
|
|
%{block_rewards: %{errors: block_reward_errors}} = options, |
|
|
|
|
json_rpc_named_arguments |
|
|
|
|
) do |
|
|
|
|
async_import_block_rewards(block_reward_errors) |
|
|
|
|
async_import_coin_balances(imported, options) |
|
|
|
|
async_import_created_contract_codes(imported) |
|
|
|
|
async_import_internal_transactions(imported) |
|
|
|
|
async_import_internal_transactions(imported, json_rpc_named_arguments) |
|
|
|
|
async_import_tokens(imported) |
|
|
|
|
async_import_token_balances(imported) |
|
|
|
|
async_import_uncles(imported) |
|
|
|
@ -173,19 +180,31 @@ defmodule Indexer.Block.Catchup.Fetcher do |
|
|
|
|
|
|
|
|
|
defp async_import_created_contract_codes(_), do: :ok |
|
|
|
|
|
|
|
|
|
defp async_import_internal_transactions(%{transactions: transactions}) do |
|
|
|
|
transactions |
|
|
|
|
|> Enum.flat_map(fn |
|
|
|
|
%Transaction{block_number: block_number, index: index, hash: hash, internal_transactions_indexed_at: nil} -> |
|
|
|
|
[%{block_number: block_number, index: index, hash: hash}] |
|
|
|
|
defp async_import_internal_transactions(%{transactions: transactions}, json_rpc_named_arguments) do |
|
|
|
|
transaction_data = |
|
|
|
|
Enum.flat_map(transactions, fn |
|
|
|
|
%Transaction{block_number: block_number, index: index, hash: hash, internal_transactions_indexed_at: nil} -> |
|
|
|
|
[%{block_number: block_number, index: index, hash: hash}] |
|
|
|
|
|
|
|
|
|
%Transaction{internal_transactions_indexed_at: %DateTime{}} -> |
|
|
|
|
[] |
|
|
|
|
end) |
|
|
|
|
|> InternalTransaction.Fetcher.async_fetch(10_000) |
|
|
|
|
%Transaction{internal_transactions_indexed_at: %DateTime{}} -> |
|
|
|
|
[] |
|
|
|
|
end) |
|
|
|
|
|
|
|
|
|
filtered_transaction_data = |
|
|
|
|
if Keyword.get(json_rpc_named_arguments, :variant) == EthereumJSONRPC.Geth do |
|
|
|
|
{_, max_block_number} = Chain.fetch_min_and_max_block_numbers() |
|
|
|
|
|
|
|
|
|
Enum.filter(transaction_data, fn %{block_number: block_number} -> |
|
|
|
|
max_block_number - block_number < @geth_block_limit |
|
|
|
|
end) |
|
|
|
|
else |
|
|
|
|
transaction_data |
|
|
|
|
end |
|
|
|
|
|
|
|
|
|
InternalTransaction.Fetcher.async_fetch(filtered_transaction_data, 10_000) |
|
|
|
|
end |
|
|
|
|
|
|
|
|
|
defp async_import_internal_transactions(_), do: :ok |
|
|
|
|
defp async_import_internal_transactions(_, _), do: :ok |
|
|
|
|
|
|
|
|
|
defp async_import_token_balances(%{address_token_balances: token_balances}) do |
|
|
|
|
TokenBalance.Fetcher.async_fetch(token_balances) |
|
|
|
|