Partial retries for fetch_blocks_by_range

Means that both Block.Fetchers, realtime and catchup, now handle
fetch errors.  The Block.Catchup.Fetcher will retry those blocks that
failed to fetch while importing the rest that did fetch instead of
refetching the whole batch.
pull/1135/head
Luke Imhoff 6 years ago
parent d43a8c6133
commit 3499155cdb
  1. 90
      apps/ethereum_jsonrpc/lib/ethereum_jsonrpc.ex
  2. 16
      apps/ethereum_jsonrpc/lib/ethereum_jsonrpc/block.ex
  3. 11
      apps/ethereum_jsonrpc/lib/ethereum_jsonrpc/block/by_number.ex
  4. 8
      apps/ethereum_jsonrpc/lib/ethereum_jsonrpc/blocks.ex
  5. 14
      apps/ethereum_jsonrpc/lib/ethereum_jsonrpc/blocks/by_hash.ex
  6. 76
      apps/ethereum_jsonrpc/test/ethereum_jsonrpc_test.exs
  7. 61
      apps/indexer/lib/indexer/block/catchup/fetcher.ex
  8. 33
      apps/indexer/lib/indexer/block/fetcher.ex
  9. 13
      apps/indexer/lib/indexer/block/realtime/fetcher.ex
  10. 2
      apps/indexer/test/indexer/block/catchup/bound_interval_supervisor_test.exs
  11. 18
      apps/indexer/test/indexer/block/fetcher_test.exs
  12. 7
      apps/indexer/test/indexer/block/realtime/fetcher_test.exs

@ -26,14 +26,13 @@ defmodule EthereumJSONRPC do
"""
alias EthereumJSONRPC.{
Block,
Blocks,
FetchedBalances,
Receipts,
RequestCoordinator,
Subscription,
Transactions,
Transport,
Uncles,
Variant
}
@ -87,11 +86,6 @@ defmodule EthereumJSONRPC do
{:transport, Transport.t()} | {:transport_options, Transport.options()} | {:variant, Variant.t()}
]
@typedoc """
If there are more blocks.
"""
@type next :: :end_of_chain | :more
@typedoc """
8 byte [KECCAK-256](https://en.wikipedia.org/wiki/SHA-3) hash of the proof-of-work.
"""
@ -214,41 +208,21 @@ defmodule EthereumJSONRPC do
Transaction data is included for each block.
"""
@spec fetch_blocks_by_hash([hash()], json_rpc_named_arguments) :: {:ok, Blocks.t()} | {:error, reason :: term}
def fetch_blocks_by_hash(block_hashes, json_rpc_named_arguments) do
id_to_params =
block_hashes
|> Enum.map(fn block_hash -> %{hash: block_hash} end)
|> id_to_params()
with {:ok, responses} <-
id_to_params
|> Blocks.ByHash.requests()
|> json_rpc(json_rpc_named_arguments) do
{:ok, Blocks.from_responses(responses, id_to_params)}
end
|> fetch_blocks_by_params(&Block.ByHash.request/1, json_rpc_named_arguments)
end
@doc """
Fetches blocks by block number range.
"""
@spec fetch_blocks_by_range(Range.t(), json_rpc_named_arguments) ::
{:ok, next,
%{
blocks: Blocks.params(),
block_second_degree_relations: Uncles.params(),
transactions: Transactions.params()
}}
| {:error, [reason :: term, ...]}
@spec fetch_blocks_by_range(Range.t(), json_rpc_named_arguments) :: {:ok, Blocks.t()} | {:error, reason :: term}
def fetch_blocks_by_range(_first.._last = range, json_rpc_named_arguments) do
id_to_params =
range
|> Enum.map(fn number -> %{number: number} end)
|> id_to_params()
id_to_params
|> get_block_by_number_requests()
|> json_rpc(json_rpc_named_arguments)
|> handle_get_blocks(id_to_params)
|> fetch_blocks_by_params(&Block.ByNumber.request/1, json_rpc_named_arguments)
end
@doc """
@ -409,10 +383,16 @@ defmodule EthereumJSONRPC do
|> Timex.from_unix()
end
defp get_block_by_number_requests(id_to_params) do
Enum.map(id_to_params, fn {id, %{number: number}} ->
get_block_by_number_request(%{id: id, quantity: number, transactions: :full})
end)
defp fetch_blocks_by_params(params, request, json_rpc_named_arguments)
when is_list(params) and is_function(request, 1) do
id_to_params = id_to_params(params)
with {:ok, responses} <-
id_to_params
|> Blocks.requests(request)
|> json_rpc(json_rpc_named_arguments) do
{:ok, Blocks.from_responses(responses, id_to_params)}
end
end
defp get_block_by_number_request(%{id: id} = options) do
@ -445,46 +425,6 @@ defmodule EthereumJSONRPC do
end
end
defp handle_get_blocks({:ok, results}, id_to_params) when is_list(results) do
with {:ok, next, blocks} <- reduce_results(results, id_to_params) do
elixir_blocks = Blocks.to_elixir(blocks)
elixir_uncles = Blocks.elixir_to_uncles(elixir_blocks)
elixir_transactions = Blocks.elixir_to_transactions(elixir_blocks)
block_second_degree_relations_params = Uncles.elixir_to_params(elixir_uncles)
transactions_params = Transactions.elixir_to_params(elixir_transactions)
blocks_params = Blocks.elixir_to_params(elixir_blocks)
{:ok, next,
%{
blocks: blocks_params,
block_second_degree_relations: block_second_degree_relations_params,
transactions: transactions_params
}}
end
end
defp handle_get_blocks({:error, _} = error, _id_to_params), do: error
defp reduce_results(results, id_to_params) do
Enum.reduce(results, {:ok, :more, []}, &reduce_result(&1, &2, id_to_params))
end
defp reduce_result(%{result: nil}, {:ok, _, blocks}, _id_to_params), do: {:ok, :end_of_chain, blocks}
defp reduce_result(%{result: %{} = block}, {:ok, next, blocks}, _id_to_params), do: {:ok, next, [block | blocks]}
defp reduce_result(%{result: _}, {:error, _} = error, _id_to_params), do: error
defp reduce_result(%{error: reason, id: id}, acc, id_to_params) do
data = Map.fetch!(id_to_params, id)
annotated_reason = Map.put(reason, :data, data)
case acc do
{:ok, _, _} -> {:error, [annotated_reason]}
{:error, reasons} -> {:error, [annotated_reason | reasons]}
end
end
defp handle_get_block_by_tag({:ok, %{"number" => nil}}), do: {:error, :not_found}
defp handle_get_block_by_tag({:ok, %{"number" => quantity}}) when is_binary(quantity) do

@ -68,17 +68,21 @@ defmodule EthereumJSONRPC.Block do
"""
@type t :: %{String.t() => EthereumJSONRPC.data() | EthereumJSONRPC.hash() | EthereumJSONRPC.quantity() | nil}
def from_response(%{id: id, result: %{"hash" => hash} = block}, id_to_params) when is_map(id_to_params) do
# `^` verifies returned hash matches sent hash
%{hash: ^hash} = Map.fetch!(id_to_params, id)
def from_response(%{id: id, result: nil}, id_to_params) when is_map(id_to_params) do
params = Map.fetch!(id_to_params, id)
{:error, %{code: 404, message: "Not Found", data: params}}
end
def from_response(%{id: id, result: block}, id_to_params) when is_map(id_to_params) do
true = Map.has_key?(id_to_params, id)
{:ok, block}
end
def from_response(%{id: id, error: error}, id_to_params) when is_map(id_to_params) do
%{hash: hash} = Map.fetch!(id_to_params, id)
annotated_error = Map.put(error, :data, %{hash: hash})
params = Map.fetch!(id_to_params, id)
annotated_error = Map.put(error, :data, params)
{:error, annotated_error}
end

@ -0,0 +1,11 @@
defmodule EthereumJSONRPC.Block.ByNumber do
@moduledoc """
Block format as returned by [`eth_getBlockByNumber`](https://github.com/ethereum/wiki/wiki/JSON-RPC#eth_getblockbyhash)
"""
import EthereumJSONRPC, only: [integer_to_quantity: 1]
def request(%{id: id, number: number}) do
EthereumJSONRPC.request(%{id: id, method: "eth_getBlockByNumber", params: [integer_to_quantity(number), true]})
end
end

@ -20,6 +20,14 @@ defmodule EthereumJSONRPC.Blocks do
transactions_params: [],
errors: []
def requests(id_to_params, request) when is_map(id_to_params) and is_function(request, 1) do
Enum.map(id_to_params, fn {id, params} ->
params
|> Map.put(:id, id)
|> request.()
end)
end
@spec from_responses(list(), map()) :: t()
def from_responses(responses, id_to_params) when is_list(responses) and is_map(id_to_params) do
%{errors: errors, blocks: blocks} =

@ -1,14 +0,0 @@
defmodule EthereumJSONRPC.Blocks.ByHash do
@moduledoc """
Blocks format as returned by [`eth_getBlockByHash`](https://github.com/ethereum/wiki/wiki/JSON-RPC#eth_getblockbyhash)
from batch requests.
"""
alias EthereumJSONRPC.Block
def requests(id_to_params) when is_map(id_to_params) do
Enum.map(id_to_params, fn {id, %{hash: hash}} ->
Block.ByHash.request(%{id: id, hash: hash})
end)
end
end

@ -329,15 +329,29 @@ defmodule EthereumJSONRPCTest do
end)
end
assert {:error,
[%{data: %{number: 1_000_000_000_000_000_000_001}}, %{data: %{number: 1_000_000_000_000_000_000_000}}]} =
assert {:ok,
%EthereumJSONRPC.Blocks{
block_second_degree_relations_params: [],
blocks_params: [],
errors: [
%{
data: %{number: 1_000_000_000_000_000_000_001}
},
%{
data: %{number: 1_000_000_000_000_000_000_000}
}
],
transactions_params: []
}} =
EthereumJSONRPC.fetch_blocks_by_range(
1_000_000_000_000_000_000_000..1_000_000_000_000_000_000_001,
json_rpc_named_arguments
)
end
test "returns only errors if a mix of results and errors", %{json_rpc_named_arguments: json_rpc_named_arguments} do
test "returns only errors and results if a mix of results and errors", %{
json_rpc_named_arguments: json_rpc_named_arguments
} do
# Can't be faked reliably on real chain
moxed_json_rpc_named_arguments = Keyword.put(json_rpc_named_arguments, :transport, EthereumJSONRPC.Mox)
@ -356,30 +370,71 @@ defmodule EthereumJSONRPCTest do
id: 1,
result: %{
"difficulty" => "0x0",
"extraData" => "0x",
"gasLimit" => "0x0",
"gasUsed" => "0x0",
"hash" => "0x0",
"logsBloom" => "0x",
"miner" => "0x0",
"number" => "0x0",
"parentHash" => "0x0",
"receiptsRoot" => "0x0",
"sha3Uncles" => "0x0",
"size" => "0x0",
"stateRoot" => "0x0",
"timestamp" => "0x0",
"totalDifficulty" => "0x0",
"transactions" => []
"transactions" => [],
"transactionsRoot" => [],
"uncles" => []
},
jsonrpc: "2.0"
}
]}
end)
assert {:error, [%{data: %{number: 1_000_000_000_000_000_000_000}}]} =
assert {:ok,
%EthereumJSONRPC.Blocks{
block_second_degree_relations_params: [],
blocks_params: [
%{
difficulty: 0,
extra_data: "0x",
gas_limit: 0,
gas_used: 0,
hash: "0x0",
logs_bloom: "0x",
miner_hash: "0x0",
mix_hash: "0x0",
nonce: 0,
number: 0,
parent_hash: "0x0",
receipts_root: "0x0",
sha3_uncles: "0x0",
size: 0,
state_root: "0x0",
timestamp: _,
total_difficulty: 0,
transactions_root: [],
uncles: []
}
],
errors: [
%{
code: -32602,
data: %{number: 1_000_000_000_000_000_000_000},
message: "Invalid params: Invalid block number: number too large to fit in target type."
}
],
transactions_params: []
}} =
EthereumJSONRPC.fetch_blocks_by_range(
1_000_000_000_000_000_000_000..1_000_000_000_000_000_000_001,
moxed_json_rpc_named_arguments
)
end
test "nil result indicated end-of-chain", %{json_rpc_named_arguments: json_rpc_named_arguments} do
test "nil result indicated error code 404", %{json_rpc_named_arguments: json_rpc_named_arguments} do
# Can't be faked reliably on real chain
moxed_json_rpc_named_arguments = Keyword.put(json_rpc_named_arguments, :transport, EthereumJSONRPC.Mox)
@ -418,8 +473,13 @@ defmodule EthereumJSONRPCTest do
]}
end)
assert {:ok, :end_of_chain, %{blocks: [_], transactions: []}} =
EthereumJSONRPC.fetch_blocks_by_range(0..1, moxed_json_rpc_named_arguments)
assert {:ok,
%EthereumJSONRPC.Blocks{
block_second_degree_relations_params: [],
blocks_params: [%{}],
errors: [%{code: 404, data: %{number: 1}, message: "Not Found"}],
transactions_params: []
}} = EthereumJSONRPC.fetch_blocks_by_range(0..1, moxed_json_rpc_named_arguments)
end
end

@ -166,9 +166,11 @@ defmodule Indexer.Block.Catchup.Fetcher do
sequence
) do
case fetch_and_import_range(block_fetcher, range) do
{:ok, {inserted, next}} ->
cap_seq(sequence, next, range)
{:ok, inserted}
{:ok, %{inserted: inserted, errors: errors}} ->
errors = cap_seq(sequence, errors, range)
retry(sequence, errors)
{:ok, inserted: inserted}
{:error, {step, reason}} = error ->
Logger.error(fn ->
@ -200,19 +202,27 @@ defmodule Indexer.Block.Catchup.Fetcher do
end
end
defp cap_seq(seq, next, range) do
case next do
:more ->
defp cap_seq(seq, errors, range) do
{not_founds, other_errors} =
Enum.split_with(errors, fn
%{code: 404, data: %{number: _}} -> true
_ -> false
end)
case not_founds do
[] ->
Logger.debug(fn ->
first_block_number..last_block_number = range
"got blocks #{first_block_number} - #{last_block_number}"
end)
:end_of_chain ->
other_errors
_ ->
Sequence.cap(seq)
end
:ok
other_errors
end
defp push_back(sequence, range) do
@ -222,6 +232,41 @@ defmodule Indexer.Block.Catchup.Fetcher do
end
end
defp retry(sequence, errors) when is_list(errors) do
errors
|> errors_to_ranges()
|> Enum.map(&push_back(sequence, &1))
end
defp errors_to_ranges(errors) when is_list(errors) do
errors
|> Enum.flat_map(&error_to_numbers/1)
|> numbers_to_ranges()
end
defp error_to_numbers(%{data: %{number: number}}) when is_integer(number), do: [number]
defp numbers_to_ranges([]), do: []
defp numbers_to_ranges(numbers) when is_list(numbers) do
numbers
|> Enum.sort()
|> Enum.chunk_while(
nil,
fn
number, nil ->
{:cont, number..number}
number, first..last when number == last + 1 ->
{:cont, first..number}
number, range ->
{:cont, range, number..number}
end,
fn range -> {:cont, range} end
)
end
defp put_memory_monitor(sequence_options, %__MODULE__{memory_monitor: nil}) when is_list(sequence_options),
do: sequence_options

@ -5,7 +5,7 @@ defmodule Indexer.Block.Fetcher do
require Logger
alias EthereumJSONRPC.FetchedBeneficiaries
alias EthereumJSONRPC.{Blocks, FetchedBeneficiaries}
alias Explorer.Chain.{Address, Block, Import}
alias Indexer.{AddressExtraction, CoinBalance, MintTransfer, Token, TokenTransfers}
alias Indexer.Address.{CoinBalances, TokenBalances}
@ -77,7 +77,7 @@ defmodule Indexer.Block.Fetcher do
end
@spec fetch_and_import_range(t, Range.t()) ::
{:ok, {inserted :: %{}, next :: :more | :end_of_chain}}
{:ok, %{inserted: %{}, errors: [EthereumJSONRPC.Transport.error()]}}
| {:error,
{step :: atom(), reason :: term()}
| [%Ecto.Changeset{}]
@ -91,20 +91,22 @@ defmodule Indexer.Block.Fetcher do
_.._ = range
)
when callback_module != nil do
with {:blocks, {:ok, next, result}} <-
{:blocks, EthereumJSONRPC.fetch_blocks_by_range(range, json_rpc_named_arguments)},
%{
blocks: blocks,
transactions: transactions_without_receipts,
block_second_degree_relations: block_second_degree_relations
} = result,
blocks = Transform.transform_blocks(blocks),
{:receipts, {:ok, receipt_params}} <- {:receipts, Receipts.fetch(state, transactions_without_receipts)},
with {:blocks,
{:ok,
%Blocks{
blocks_params: blocks_params,
transactions_params: transactions_params_without_receipts,
block_second_degree_relations_params: block_second_degree_relations_params,
errors: blocks_errors
}}} <- {:blocks, EthereumJSONRPC.fetch_blocks_by_range(range, json_rpc_named_arguments)},
blocks = Transform.transform_blocks(blocks_params),
{:receipts, {:ok, receipt_params}} <- {:receipts, Receipts.fetch(state, transactions_params_without_receipts)},
%{logs: logs, receipts: receipts} = receipt_params,
transactions_with_receipts = Receipts.put(transactions_without_receipts, receipts),
transactions_with_receipts = Receipts.put(transactions_params_without_receipts, receipts),
%{token_transfers: token_transfers, tokens: tokens} = TokenTransfers.parse(logs),
%{mint_transfers: mint_transfers} = MintTransfer.parse(logs),
{:beneficiaries, {:ok, %FetchedBeneficiaries{params_set: beneficiary_params_set}}} <-
{:beneficiaries,
{:ok, %FetchedBeneficiaries{params_set: beneficiary_params_set, errors: beneficiaries_errors}}} <-
fetch_beneficiaries(range, json_rpc_named_arguments),
addresses =
AddressExtraction.extract_addresses(%{
@ -132,16 +134,15 @@ defmodule Indexer.Block.Fetcher do
address_coin_balances: %{params: coin_balances_params_set},
address_token_balances: %{params: address_token_balances},
blocks: %{params: blocks},
block_second_degree_relations: %{params: block_second_degree_relations},
block_second_degree_relations: %{params: block_second_degree_relations_params},
logs: %{params: logs},
token_transfers: %{params: token_transfers},
tokens: %{on_conflict: :nothing, params: tokens},
transactions: %{params: transactions_with_receipts}
}
) do
{:ok, {inserted, next}}
{:ok, %{inserted: inserted, errors: blocks_errors ++ beneficiaries_errors}}
else
{:beneficiaries = step, {:ok, %FetchedBeneficiaries{errors: [_ | _] = errors}}} -> {:error, {step, errors}}
{step, {:error, reason}} -> {:error, {step, reason}}
{:error, :timeout} = error -> error
{:error, changesets} = error when is_list(changesets) -> error

@ -127,11 +127,22 @@ defmodule Indexer.Block.Realtime.Fetcher do
def fetch_and_import_block(block_number_to_fetch, block_fetcher, retry \\ 3) do
case fetch_and_import_range(block_fetcher, block_number_to_fetch..block_number_to_fetch) do
{:ok, {_inserted, _next}} ->
{:ok, %{inserted: _, errors: []}} ->
Logger.debug(fn ->
["realtime indexer fetched and imported block ", to_string(block_number_to_fetch)]
end)
{:ok, %{inserted: _, errors: [_ | _] = errors}} ->
Logger.error(fn ->
[
"realtime indexer failed to fetch block",
to_string(block_number_to_fetch),
": ",
inspect(errors),
". Block will be retried by catchup indexer."
]
end)
{:error, {step, reason}} ->
Logger.error(fn ->
[

@ -22,6 +22,8 @@ defmodule Indexer.Block.Catchup.BoundIntervalSupervisorTest do
# See https://github.com/poanetwork/blockscout/issues/597
@tag :no_geth
test "starts fetching blocks from latest and goes down", %{json_rpc_named_arguments: json_rpc_named_arguments} do
Logger.configure(truncate: :infinity)
if json_rpc_named_arguments[:transport] == EthereumJSONRPC.Mox do
case Keyword.fetch!(json_rpc_named_arguments, :variant) do
EthereumJSONRPC.Parity ->

@ -214,10 +214,13 @@ defmodule Indexer.Block.FetcherTest do
fn -> Fetcher.fetch_and_import_range(block_fetcher, block_number..block_number) end,
fn result ->
assert {:ok,
{%{
%{
inserted: %{
addresses: [%Address{hash: ^address_hash}],
blocks: [%Chain.Block{hash: ^block_hash}]
}, :more}} = result
},
errors: []
}} = result
wait_for_tasks(InternalTransaction.Fetcher)
wait_for_tasks(CoinBalance.Fetcher)
@ -506,15 +509,16 @@ defmodule Indexer.Block.FetcherTest do
EthereumJSONRPC.Parity ->
assert {:ok,
{%{
%{
inserted: %{
addresses: [
%Address{
hash:
%Explorer.Chain.Hash{
byte_count: 20,
bytes:
<<139, 243, 141, 71, 100, 146, 144, 100, 242, 212, 211, 165, 101, 32, 167, 106, 179, 223,
65, 91>>
<<139, 243, 141, 71, 100, 146, 144, 100, 242, 212, 211, 165, 101, 32, 167, 106, 179,
223, 65, 91>>
} = first_address_hash
},
%Address{
@ -556,7 +560,9 @@ defmodule Indexer.Block.FetcherTest do
57, 101, 36, 140, 57, 254, 153, 47, 255, 212, 51, 229>>
}
]
}, :more}} = Fetcher.fetch_and_import_range(block_fetcher, block_number..block_number)
},
errors: []
}} = Fetcher.fetch_and_import_range(block_fetcher, block_number..block_number)
wait_for_tasks(InternalTransaction.Fetcher)
wait_for_tasks(CoinBalance.Fetcher)

@ -370,7 +370,8 @@ defmodule Indexer.Block.Realtime.FetcherTest do
end
assert {:ok,
{%{
%{
inserted: %{
addresses: [
%Address{hash: first_address_hash, fetched_coin_balance_block_number: 3_946_079},
%Address{hash: second_address_hash, fetched_coin_balance_block_number: 3_946_079},
@ -410,7 +411,9 @@ defmodule Indexer.Block.Realtime.FetcherTest do
%{index: 5, transaction_hash: transaction_hash}
],
transactions: [transaction_hash]
}, :more}} = Indexer.Block.Fetcher.fetch_and_import_range(block_fetcher, 3_946_079..3_946_080)
},
errors: []
}} = Indexer.Block.Fetcher.fetch_and_import_range(block_fetcher, 3_946_079..3_946_080)
end
end
end

Loading…
Cancel
Save