diff --git a/apps/indexer/lib/indexer/block/realtime/fetcher.ex b/apps/indexer/lib/indexer/block/realtime/fetcher.ex index aac96a77e8..282ca0ceed 100644 --- a/apps/indexer/lib/indexer/block/realtime/fetcher.ex +++ b/apps/indexer/lib/indexer/block/realtime/fetcher.ex @@ -124,10 +124,7 @@ defmodule Indexer.Block.Realtime.Fetcher do end end - def fetch_and_import_block(block_number_to_fetch, block_fetcher) do - # Wait half a second to give Parity/Geth time to sync. - :timer.sleep(500) - + def fetch_and_import_block(block_number_to_fetch, block_fetcher, retry \\ 3) do case fetch_and_import_range(block_fetcher, block_number_to_fetch..block_number_to_fetch) do {:ok, {_inserted, _next}} -> Logger.debug(fn -> @@ -148,15 +145,24 @@ defmodule Indexer.Block.Realtime.Fetcher do end) {:error, changesets} when is_list(changesets) -> - Logger.error(fn -> - [ - "realtime indexer failed to validate for block ", - to_string(block_number_to_fetch), - ": ", - inspect(changesets), - ". Block will be retried by catchup indexer." - ] - end) + params = %{ + changesets: changesets, + block_number_to_fetch: block_number_to_fetch, + block_fetcher: block_fetcher, + retry: retry + } + + if retry_fetch_and_import_block(params) == :ignore do + Logger.error(fn -> + [ + "realtime indexer failed to validate for block ", + to_string(block_number_to_fetch), + ": ", + inspect(changesets), + ". Block will be retried by catchup indexer." + ] + end) + end {:error, {step, failed_value, _changes_so_far}} -> Logger.error(fn -> @@ -173,6 +179,27 @@ defmodule Indexer.Block.Realtime.Fetcher do end end + defp retry_fetch_and_import_block(%{retry: retry}) when retry < 1, do: :ignore + + defp retry_fetch_and_import_block(%{changesets: changesets} = params) do + if unknown_block_number_error?(changesets) do + # Wait half a second to give Parity time to sync. + :timer.sleep(500) + + number = params.block_number_to_fetch + fetcher = params.block_fetcher + updated_retry = params.retry - 1 + + fetch_and_import_block(number, fetcher, updated_retry) + else + :ignore + end + end + + defp unknown_block_number_error?(changesets) do + Enum.any?(changesets, &(Map.get(&1, :message) == "Unknown block number")) + end + defp async_import_remaining_block_data(%{block_second_degree_relations: block_second_degree_relations, tokens: tokens}) do tokens |> Enum.map(& &1.contract_address_hash)