Label all steps in Block.Fetcher implementations

Fixes #1232

`{:error, :timeout}` should have been caught, but additionally, it
should never have been allowed to propagate out unassigned to a step as
it is not possible to track the timeout to a specific fetch.
pull/1233/head
Luke Imhoff 6 years ago
parent 37c06ff7ce
commit adc03fd08b
  1. 26
      apps/indexer/lib/indexer/block/catchup/fetcher.ex
  2. 5
      apps/indexer/lib/indexer/block/fetcher.ex
  3. 62
      apps/indexer/lib/indexer/block/realtime/fetcher.ex

@ -10,6 +10,7 @@ defmodule Indexer.Block.Catchup.Fetcher do
import Indexer.Block.Fetcher, import Indexer.Block.Fetcher,
only: [async_import_coin_balances: 2, async_import_tokens: 1, async_import_uncles: 1, fetch_and_import_range: 2] only: [async_import_coin_balances: 2, async_import_tokens: 1, async_import_uncles: 1, fetch_and_import_range: 2]
alias Ecto.Changeset
alias Explorer.Chain alias Explorer.Chain
alias Indexer.{Block, InternalTransaction, Sequence, TokenBalance, Tracer} alias Indexer.{Block, InternalTransaction, Sequence, TokenBalance, Tracer}
alias Indexer.Memory.Shrinkable alias Indexer.Memory.Shrinkable
@ -114,10 +115,9 @@ defmodule Indexer.Block.Catchup.Fetcher do
{async_import_remaining_block_data_options, chain_import_options} = {async_import_remaining_block_data_options, chain_import_options} =
Map.split(options, @async_import_remaining_block_data_options) Map.split(options, @async_import_remaining_block_data_options)
with {:ok, imported} = ok <- full_chain_import_options = put_in(chain_import_options, [:blocks, :params, Access.all(), :consensus], true)
chain_import_options
|> put_in([:blocks, :params, Access.all(), :consensus], true) with {:import, {:ok, imported} = ok} <- {:import, Chain.import(full_chain_import_options)} do
|> Chain.import() do
async_import_remaining_block_data( async_import_remaining_block_data(
imported, imported,
async_import_remaining_block_data_options async_import_remaining_block_data_options
@ -187,6 +187,15 @@ defmodule Indexer.Block.Catchup.Fetcher do
{:ok, inserted: inserted} {:ok, inserted: inserted}
{:error, {:import, [%Changeset{} | _] = changesets}} = error ->
Logger.error(fn ->
["failed to validate: ", inspect(changesets), ". Retrying."]
end)
push_back(sequence, range)
error
{:error, {step, reason}} = error -> {:error, {step, reason}} = error ->
Logger.error( Logger.error(
fn -> fn ->
@ -199,15 +208,6 @@ defmodule Indexer.Block.Catchup.Fetcher do
error error
{:error, changesets} = error when is_list(changesets) ->
Logger.error(fn ->
["failed to validate: ", inspect(changesets), ". Retrying."]
end)
push_back(sequence, range)
error
{:error, {step, failed_value, _changes_so_far}} = error -> {:error, {step, failed_value, _changes_so_far}} = error ->
Logger.error( Logger.error(
fn -> fn ->

@ -83,8 +83,7 @@ defmodule Indexer.Block.Fetcher do
@spec fetch_and_import_range(t, Range.t()) :: @spec fetch_and_import_range(t, Range.t()) ::
{:ok, %{inserted: %{}, errors: [EthereumJSONRPC.Transport.error()]}} {:ok, %{inserted: %{}, errors: [EthereumJSONRPC.Transport.error()]}}
| {:error, | {:error,
{step :: atom(), reason :: term()} {step :: atom(), reason :: [%Ecto.Changeset{}] | term()}
| [%Ecto.Changeset{}]
| {step :: atom(), failed_value :: term(), changes_so_far :: term()}} | {step :: atom(), failed_value :: term(), changes_so_far :: term()}}
def fetch_and_import_range( def fetch_and_import_range(
%__MODULE__{ %__MODULE__{
@ -150,8 +149,6 @@ defmodule Indexer.Block.Fetcher do
{:ok, %{inserted: inserted, errors: blocks_errors ++ beneficiaries_errors}} {:ok, %{inserted: inserted, errors: blocks_errors ++ beneficiaries_errors}}
else else
{step, {:error, reason}} -> {:error, {step, reason}} {step, {:error, reason}} -> {:error, {step, reason}}
{:error, :timeout} = error -> error
{:error, changesets} = error when is_list(changesets) -> error
{:error, step, failed_value, changes_so_far} -> {:error, {step, failed_value, changes_so_far}} {:error, step, failed_value, changes_so_far} -> {:error, {step, failed_value, changes_so_far}}
end end
end end

@ -97,22 +97,26 @@ defmodule Indexer.Block.Realtime.Fetcher do
transactions: %{params: transactions_params} transactions: %{params: transactions_params}
} = options } = options
) do ) do
with {:ok, with {:internal_transactions,
%{ {:ok,
addresses_params: internal_transactions_addresses_params, %{
internal_transactions_params: internal_transactions_params
}} <-
internal_transactions(block_fetcher, %{
addresses_params: addresses_params,
transactions_params: transactions_params
}),
{:ok, %{addresses_params: balances_addresses_params, balances_params: balances_params}} <-
balances(block_fetcher, %{
address_hash_to_block_number: address_hash_to_block_number,
addresses_params: internal_transactions_addresses_params, addresses_params: internal_transactions_addresses_params,
balances_params: address_coin_balances_params internal_transactions_params: internal_transactions_params
}), }}} <-
{:ok, address_token_balances} <- fetch_token_balances(address_token_balances_params), {:internal_transactions,
internal_transactions(block_fetcher, %{
addresses_params: addresses_params,
transactions_params: transactions_params
})},
{:balances, {:ok, %{addresses_params: balances_addresses_params, balances_params: balances_params}}} <-
{:balances,
balances(block_fetcher, %{
address_hash_to_block_number: address_hash_to_block_number,
addresses_params: internal_transactions_addresses_params,
balances_params: address_coin_balances_params
})},
{:address_token_balances, {:ok, address_token_balances}} <-
{:address_token_balances, fetch_token_balances(address_token_balances_params)},
chain_import_options = chain_import_options =
options options
|> Map.drop(@import_options) |> Map.drop(@import_options)
@ -122,7 +126,7 @@ defmodule Indexer.Block.Realtime.Fetcher do
|> put_in([Access.key(:address_current_token_balances, %{}), :params], address_token_balances) |> put_in([Access.key(:address_current_token_balances, %{}), :params], address_token_balances)
|> put_in([Access.key(:address_token_balances), :params], address_token_balances) |> put_in([Access.key(:address_token_balances), :params], address_token_balances)
|> put_in([Access.key(:internal_transactions, %{}), :params], internal_transactions_params), |> put_in([Access.key(:internal_transactions, %{}), :params], internal_transactions_params),
{:ok, imported} = ok <- Chain.import(chain_import_options) do {:import, {:ok, imported} = ok} <- {:import, Chain.import(chain_import_options)} do
async_import_remaining_block_data(imported) async_import_remaining_block_data(imported)
ok ok
end end
@ -194,19 +198,7 @@ defmodule Indexer.Block.Realtime.Fetcher do
] ]
end) end)
{:error, {step, reason}} -> {:error, {:import, [%Changeset{} | _] = changesets}} ->
Logger.error(
fn ->
[
"failed to fetch: ",
inspect(reason),
". Block will be retried by catchup indexer."
]
end,
step: step
)
{:error, [%Changeset{} | _] = changesets} ->
params = %{ params = %{
changesets: changesets, changesets: changesets,
block_number_to_fetch: block_number_to_fetch, block_number_to_fetch: block_number_to_fetch,
@ -226,6 +218,18 @@ defmodule Indexer.Block.Realtime.Fetcher do
end) end)
end end
{:error, {step, reason}} ->
Logger.error(
fn ->
[
"failed to fetch: ",
inspect(reason),
". Block will be retried by catchup indexer."
]
end,
step: step
)
{:error, {step, failed_value, _changes_so_far}} -> {:error, {step, failed_value, _changes_so_far}} ->
Logger.error( Logger.error(
fn -> fn ->

Loading…
Cancel
Save