|
|
|
@ -124,10 +124,7 @@ defmodule Indexer.Block.Realtime.Fetcher do |
|
|
|
|
end |
|
|
|
|
end |
|
|
|
|
|
|
|
|
|
def fetch_and_import_block(block_number_to_fetch, block_fetcher) do |
|
|
|
|
# Wait half a second to give Parity/Geth time to sync. |
|
|
|
|
:timer.sleep(500) |
|
|
|
|
|
|
|
|
|
def fetch_and_import_block(block_number_to_fetch, block_fetcher, retry \\ 3) do |
|
|
|
|
case fetch_and_import_range(block_fetcher, block_number_to_fetch..block_number_to_fetch) do |
|
|
|
|
{:ok, {_inserted, _next}} -> |
|
|
|
|
Logger.debug(fn -> |
|
|
|
@ -148,15 +145,24 @@ defmodule Indexer.Block.Realtime.Fetcher do |
|
|
|
|
end) |
|
|
|
|
|
|
|
|
|
{:error, changesets} when is_list(changesets) -> |
|
|
|
|
Logger.error(fn -> |
|
|
|
|
[ |
|
|
|
|
"realtime indexer failed to validate for block ", |
|
|
|
|
to_string(block_number_to_fetch), |
|
|
|
|
": ", |
|
|
|
|
inspect(changesets), |
|
|
|
|
". Block will be retried by catchup indexer." |
|
|
|
|
] |
|
|
|
|
end) |
|
|
|
|
params = %{ |
|
|
|
|
changesets: changesets, |
|
|
|
|
block_number_to_fetch: block_number_to_fetch, |
|
|
|
|
block_fetcher: block_fetcher, |
|
|
|
|
retry: retry |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
if retry_fetch_and_import_block(params) == :ignore do |
|
|
|
|
Logger.error(fn -> |
|
|
|
|
[ |
|
|
|
|
"realtime indexer failed to validate for block ", |
|
|
|
|
to_string(block_number_to_fetch), |
|
|
|
|
": ", |
|
|
|
|
inspect(changesets), |
|
|
|
|
". Block will be retried by catchup indexer." |
|
|
|
|
] |
|
|
|
|
end) |
|
|
|
|
end |
|
|
|
|
|
|
|
|
|
{:error, {step, failed_value, _changes_so_far}} -> |
|
|
|
|
Logger.error(fn -> |
|
|
|
@ -173,6 +179,27 @@ defmodule Indexer.Block.Realtime.Fetcher do |
|
|
|
|
end |
|
|
|
|
end |
|
|
|
|
|
|
|
|
|
defp retry_fetch_and_import_block(%{retry: retry}) when retry < 1, do: :ignore |
|
|
|
|
|
|
|
|
|
defp retry_fetch_and_import_block(%{changesets: changesets} = params) do |
|
|
|
|
if unknown_block_number_error?(changesets) do |
|
|
|
|
# Wait half a second to give Parity time to sync. |
|
|
|
|
:timer.sleep(500) |
|
|
|
|
|
|
|
|
|
number = params.block_number_to_fetch |
|
|
|
|
fetcher = params.block_fetcher |
|
|
|
|
updated_retry = params.retry - 1 |
|
|
|
|
|
|
|
|
|
fetch_and_import_block(number, fetcher, updated_retry) |
|
|
|
|
else |
|
|
|
|
:ignore |
|
|
|
|
end |
|
|
|
|
end |
|
|
|
|
|
|
|
|
|
defp unknown_block_number_error?(changesets) do |
|
|
|
|
Enum.any?(changesets, &(Map.get(&1, :message) == "Unknown block number")) |
|
|
|
|
end |
|
|
|
|
|
|
|
|
|
defp async_import_remaining_block_data(%{block_second_degree_relations: block_second_degree_relations, tokens: tokens}) do |
|
|
|
|
tokens |
|
|
|
|
|> Enum.map(& &1.contract_address_hash) |
|
|
|
|