From adc03fd08b56ec15606c00cc4bc20d490037382f Mon Sep 17 00:00:00 2001 From: Luke Imhoff Date: Sun, 16 Dec 2018 20:22:33 -0600 Subject: [PATCH] Label all steps in Block.Fetcher implementations Fixes #1232 `{:error, :timeout}` should have been caught, but additionally, it should never have been allowed to propagate out unassigned to a step as it is not possible to track the timeout to a specific fetch. --- .../lib/indexer/block/catchup/fetcher.ex | 26 ++++---- apps/indexer/lib/indexer/block/fetcher.ex | 5 +- .../lib/indexer/block/realtime/fetcher.ex | 62 ++++++++++--------- 3 files changed, 47 insertions(+), 46 deletions(-) diff --git a/apps/indexer/lib/indexer/block/catchup/fetcher.ex b/apps/indexer/lib/indexer/block/catchup/fetcher.ex index f7206dc094..6dd039404a 100644 --- a/apps/indexer/lib/indexer/block/catchup/fetcher.ex +++ b/apps/indexer/lib/indexer/block/catchup/fetcher.ex @@ -10,6 +10,7 @@ defmodule Indexer.Block.Catchup.Fetcher do import Indexer.Block.Fetcher, only: [async_import_coin_balances: 2, async_import_tokens: 1, async_import_uncles: 1, fetch_and_import_range: 2] + alias Ecto.Changeset alias Explorer.Chain alias Indexer.{Block, InternalTransaction, Sequence, TokenBalance, Tracer} alias Indexer.Memory.Shrinkable @@ -114,10 +115,9 @@ defmodule Indexer.Block.Catchup.Fetcher do {async_import_remaining_block_data_options, chain_import_options} = Map.split(options, @async_import_remaining_block_data_options) - with {:ok, imported} = ok <- - chain_import_options - |> put_in([:blocks, :params, Access.all(), :consensus], true) - |> Chain.import() do + full_chain_import_options = put_in(chain_import_options, [:blocks, :params, Access.all(), :consensus], true) + + with {:import, {:ok, imported} = ok} <- {:import, Chain.import(full_chain_import_options)} do async_import_remaining_block_data( imported, async_import_remaining_block_data_options @@ -187,6 +187,15 @@ defmodule Indexer.Block.Catchup.Fetcher do {:ok, inserted: inserted} + {:error, {:import, [%Changeset{} | _] = changesets}} = error -> + Logger.error(fn -> + ["failed to validate: ", inspect(changesets), ". Retrying."] + end) + + push_back(sequence, range) + + error + {:error, {step, reason}} = error -> Logger.error( fn -> @@ -199,15 +208,6 @@ defmodule Indexer.Block.Catchup.Fetcher do error - {:error, changesets} = error when is_list(changesets) -> - Logger.error(fn -> - ["failed to validate: ", inspect(changesets), ". Retrying."] - end) - - push_back(sequence, range) - - error - {:error, {step, failed_value, _changes_so_far}} = error -> Logger.error( fn -> diff --git a/apps/indexer/lib/indexer/block/fetcher.ex b/apps/indexer/lib/indexer/block/fetcher.ex index 76acbbae52..ae3193aef6 100644 --- a/apps/indexer/lib/indexer/block/fetcher.ex +++ b/apps/indexer/lib/indexer/block/fetcher.ex @@ -83,8 +83,7 @@ defmodule Indexer.Block.Fetcher do @spec fetch_and_import_range(t, Range.t()) :: {:ok, %{inserted: %{}, errors: [EthereumJSONRPC.Transport.error()]}} | {:error, - {step :: atom(), reason :: term()} - | [%Ecto.Changeset{}] + {step :: atom(), reason :: [%Ecto.Changeset{}] | term()} | {step :: atom(), failed_value :: term(), changes_so_far :: term()}} def fetch_and_import_range( %__MODULE__{ @@ -150,8 +149,6 @@ defmodule Indexer.Block.Fetcher do {:ok, %{inserted: inserted, errors: blocks_errors ++ beneficiaries_errors}} else {step, {:error, reason}} -> {:error, {step, reason}} - {:error, :timeout} = error -> error - {:error, changesets} = error when is_list(changesets) -> error {:error, step, failed_value, changes_so_far} -> {:error, {step, failed_value, changes_so_far}} end end diff --git a/apps/indexer/lib/indexer/block/realtime/fetcher.ex b/apps/indexer/lib/indexer/block/realtime/fetcher.ex index 0380c7504d..b320e3485c 100644 --- a/apps/indexer/lib/indexer/block/realtime/fetcher.ex +++ b/apps/indexer/lib/indexer/block/realtime/fetcher.ex @@ -97,22 +97,26 @@ defmodule Indexer.Block.Realtime.Fetcher do transactions: %{params: transactions_params} } = options ) do - with {:ok, - %{ - addresses_params: internal_transactions_addresses_params, - internal_transactions_params: internal_transactions_params - }} <- - internal_transactions(block_fetcher, %{ - addresses_params: addresses_params, - transactions_params: transactions_params - }), - {:ok, %{addresses_params: balances_addresses_params, balances_params: balances_params}} <- - balances(block_fetcher, %{ - address_hash_to_block_number: address_hash_to_block_number, + with {:internal_transactions, + {:ok, + %{ addresses_params: internal_transactions_addresses_params, - balances_params: address_coin_balances_params - }), - {:ok, address_token_balances} <- fetch_token_balances(address_token_balances_params), + internal_transactions_params: internal_transactions_params + }}} <- + {:internal_transactions, + internal_transactions(block_fetcher, %{ + addresses_params: addresses_params, + transactions_params: transactions_params + })}, + {:balances, {:ok, %{addresses_params: balances_addresses_params, balances_params: balances_params}}} <- + {:balances, + balances(block_fetcher, %{ + address_hash_to_block_number: address_hash_to_block_number, + addresses_params: internal_transactions_addresses_params, + balances_params: address_coin_balances_params + })}, + {:address_token_balances, {:ok, address_token_balances}} <- + {:address_token_balances, fetch_token_balances(address_token_balances_params)}, chain_import_options = options |> Map.drop(@import_options) @@ -122,7 +126,7 @@ defmodule Indexer.Block.Realtime.Fetcher do |> put_in([Access.key(:address_current_token_balances, %{}), :params], address_token_balances) |> put_in([Access.key(:address_token_balances), :params], address_token_balances) |> put_in([Access.key(:internal_transactions, %{}), :params], internal_transactions_params), - {:ok, imported} = ok <- Chain.import(chain_import_options) do + {:import, {:ok, imported} = ok} <- {:import, Chain.import(chain_import_options)} do async_import_remaining_block_data(imported) ok end @@ -194,19 +198,7 @@ defmodule Indexer.Block.Realtime.Fetcher do ] end) - {:error, {step, reason}} -> - Logger.error( - fn -> - [ - "failed to fetch: ", - inspect(reason), - ". Block will be retried by catchup indexer." - ] - end, - step: step - ) - - {:error, [%Changeset{} | _] = changesets} -> + {:error, {:import, [%Changeset{} | _] = changesets}} -> params = %{ changesets: changesets, block_number_to_fetch: block_number_to_fetch, @@ -226,6 +218,18 @@ defmodule Indexer.Block.Realtime.Fetcher do end) end + {:error, {step, reason}} -> + Logger.error( + fn -> + [ + "failed to fetch: ", + inspect(reason), + ". Block will be retried by catchup indexer." + ] + end, + step: step + ) + {:error, {step, failed_value, _changes_so_far}} -> Logger.error( fn ->