Partial retry of batches of blocks by hash

pull/1135/head
Luke Imhoff 6 years ago
parent ad1119eb4c
commit d43a8c6133
  1. 2
      .credo.exs
  2. 26
      apps/ethereum_jsonrpc/lib/ethereum_jsonrpc.ex
  3. 15
      apps/ethereum_jsonrpc/lib/ethereum_jsonrpc/block.ex
  4. 11
      apps/ethereum_jsonrpc/lib/ethereum_jsonrpc/block/by_hash.ex
  5. 46
      apps/ethereum_jsonrpc/lib/ethereum_jsonrpc/blocks.ex
  6. 14
      apps/ethereum_jsonrpc/lib/ethereum_jsonrpc/blocks/by_hash.ex
  7. 21
      apps/ethereum_jsonrpc/test/ethereum_jsonrpc_test.exs
  8. 112
      apps/indexer/lib/indexer/block/uncle/fetcher.ex
  9. 2
      apps/indexer/lib/indexer/coin_balance/fetcher.ex
  10. 3
      apps/indexer/test/indexer/block/uncle/fetcher_test.exs

@ -75,7 +75,7 @@
# Priority values are: `low, normal, high, higher` # Priority values are: `low, normal, high, higher`
# #
{Credo.Check.Design.AliasUsage, {Credo.Check.Design.AliasUsage,
excluded_namespaces: ~w(Import Socket Task), excluded_namespaces: ~w(Block Blocks Import Socket Task),
excluded_lastnames: ~w(Address DateTime Exporter Fetcher Full Instrumenter Monitor Name Number Repo Time Unit), excluded_lastnames: ~w(Address DateTime Exporter Fetcher Full Instrumenter Monitor Name Number Repo Time Unit),
priority: :low}, priority: :low},

@ -220,13 +220,11 @@ defmodule EthereumJSONRPC do
|> Enum.map(fn block_hash -> %{hash: block_hash} end) |> Enum.map(fn block_hash -> %{hash: block_hash} end)
|> id_to_params() |> id_to_params()
id_to_params with {:ok, responses} <-
|> get_block_by_hash_requests() id_to_params
|> json_rpc(json_rpc_named_arguments) |> Blocks.ByHash.requests()
|> handle_get_blocks(id_to_params) |> json_rpc(json_rpc_named_arguments) do
|> case do {:ok, Blocks.from_responses(responses, id_to_params)}
{:ok, _next, results} -> {:ok, results}
{:error, reason} -> {:error, reason}
end end
end end
@ -411,20 +409,6 @@ defmodule EthereumJSONRPC do
|> Timex.from_unix() |> Timex.from_unix()
end end
defp get_block_by_hash_requests(id_to_params) do
Enum.map(id_to_params, fn {id, %{hash: hash}} ->
get_block_by_hash_request(%{id: id, hash: hash, transactions: :full})
end)
end
defp get_block_by_hash_request(%{id: id} = options) do
request(%{id: id, method: "eth_getBlockByHash", params: get_block_by_hash_params(options)})
end
defp get_block_by_hash_params(%{hash: hash} = options) do
[hash, get_block_transactions(options)]
end
defp get_block_by_number_requests(id_to_params) do defp get_block_by_number_requests(id_to_params) do
Enum.map(id_to_params, fn {id, %{number: number}} -> Enum.map(id_to_params, fn {id, %{number: number}} ->
get_block_by_number_request(%{id: id, quantity: number, transactions: :full}) get_block_by_number_request(%{id: id, quantity: number, transactions: :full})

@ -68,6 +68,21 @@ defmodule EthereumJSONRPC.Block do
""" """
@type t :: %{String.t() => EthereumJSONRPC.data() | EthereumJSONRPC.hash() | EthereumJSONRPC.quantity() | nil} @type t :: %{String.t() => EthereumJSONRPC.data() | EthereumJSONRPC.hash() | EthereumJSONRPC.quantity() | nil}
def from_response(%{id: id, result: %{"hash" => hash} = block}, id_to_params) when is_map(id_to_params) do
# `^` verifies returned hash matches sent hash
%{hash: ^hash} = Map.fetch!(id_to_params, id)
{:ok, block}
end
def from_response(%{id: id, error: error}, id_to_params) when is_map(id_to_params) do
%{hash: hash} = Map.fetch!(id_to_params, id)
annotated_error = Map.put(error, :data, %{hash: hash})
{:error, annotated_error}
end
@doc """ @doc """
Converts `t:elixir/0` format to params used in `Explorer.Chain`. Converts `t:elixir/0` format to params used in `Explorer.Chain`.

@ -0,0 +1,11 @@
defmodule EthereumJSONRPC.Block.ByHash do
@moduledoc """
Block format as returned by [`eth_getBlockByHash`](https://github.com/ethereum/wiki/wiki/JSON-RPC#eth_getblockbyhash)
"""
@include_transactions true
def request(%{id: id, hash: hash}) do
EthereumJSONRPC.request(%{id: id, method: "eth_getBlockByHash", params: [hash, @include_transactions]})
end
end

@ -4,11 +4,51 @@ defmodule EthereumJSONRPC.Blocks do
and [`eth_getBlockByNumber`](https://github.com/ethereum/wiki/wiki/JSON-RPC#eth_getblockbynumber) from batch requests. and [`eth_getBlockByNumber`](https://github.com/ethereum/wiki/wiki/JSON-RPC#eth_getblockbynumber) from batch requests.
""" """
alias EthereumJSONRPC.{Block, Transactions, Uncles} alias EthereumJSONRPC.{Block, Transactions, Transport, Uncles}
@type elixir :: [Block.elixir()] @type elixir :: [Block.elixir()]
@type params :: [Block.params()] @type params :: [Block.params()]
@type t :: [Block.t()] @type t :: %__MODULE__{
blocks_params: [map()],
block_second_degree_relations_params: [map()],
transactions_params: [map()],
errors: [Transport.error()]
}
defstruct blocks_params: [],
block_second_degree_relations_params: [],
transactions_params: [],
errors: []
@spec from_responses(list(), map()) :: t()
def from_responses(responses, id_to_params) when is_list(responses) and is_map(id_to_params) do
%{errors: errors, blocks: blocks} =
responses
|> Enum.map(&Block.from_response(&1, id_to_params))
|> Enum.reduce(%{errors: [], blocks: []}, fn
{:ok, block}, %{blocks: blocks} = acc ->
%{acc | blocks: [block | blocks]}
{:error, error}, %{errors: errors} = acc ->
%{acc | errors: [error | errors]}
end)
elixir_blocks = to_elixir(blocks)
elixir_uncles = elixir_to_uncles(elixir_blocks)
elixir_transactions = elixir_to_transactions(elixir_blocks)
block_second_degree_relations_params = Uncles.elixir_to_params(elixir_uncles)
transactions_params = Transactions.elixir_to_params(elixir_transactions)
blocks_params = elixir_to_params(elixir_blocks)
%__MODULE__{
errors: errors,
blocks_params: blocks_params,
block_second_degree_relations_params: block_second_degree_relations_params,
transactions_params: transactions_params
}
end
@doc """ @doc """
Converts `t:elixir/0` elements to params used by `Explorer.Chain.Block.changeset/2`. Converts `t:elixir/0` elements to params used by `Explorer.Chain.Block.changeset/2`.
@ -282,7 +322,7 @@ defmodule EthereumJSONRPC.Blocks do
} }
] ]
""" """
@spec to_elixir(t) :: elixir @spec to_elixir([Block.t()]) :: elixir
def to_elixir(blocks) when is_list(blocks) do def to_elixir(blocks) when is_list(blocks) do
Enum.map(blocks, &Block.to_elixir/1) Enum.map(blocks, &Block.to_elixir/1)
end end

@ -0,0 +1,14 @@
defmodule EthereumJSONRPC.Blocks.ByHash do
@moduledoc """
Blocks format as returned by [`eth_getBlockByHash`](https://github.com/ethereum/wiki/wiki/JSON-RPC#eth_getblockbyhash)
from batch requests.
"""
alias EthereumJSONRPC.Block
def requests(id_to_params) when is_map(id_to_params) do
Enum.map(id_to_params, fn {id, %{hash: hash}} ->
Block.ByHash.request(%{id: id, hash: hash})
end)
end
end

@ -4,7 +4,7 @@ defmodule EthereumJSONRPCTest do
import EthereumJSONRPC.Case import EthereumJSONRPC.Case
import Mox import Mox
alias EthereumJSONRPC.{FetchedBalances, FetchedBeneficiaries, Subscription} alias EthereumJSONRPC.{Blocks, FetchedBalances, FetchedBeneficiaries, Subscription}
alias EthereumJSONRPC.WebSocket.WebSocketClient alias EthereumJSONRPC.WebSocket.WebSocketClient
setup :verify_on_exit! setup :verify_on_exit!
@ -205,12 +205,13 @@ defmodule EthereumJSONRPCTest do
end end
if json_rpc_named_arguments[:transport] == EthereumJSONRPC.Mox do if json_rpc_named_arguments[:transport] == EthereumJSONRPC.Mox do
expect(EthereumJSONRPC.Mox, :json_rpc, fn _json, _options -> expect(EthereumJSONRPC.Mox, :json_rpc, fn [%{id: id}], _options ->
block_number = "0x0" block_number = "0x0"
{:ok, {:ok,
[ [
%{ %{
id: id,
result: %{ result: %{
"difficulty" => "0x0", "difficulty" => "0x0",
"gasLimit" => "0x0", "gasLimit" => "0x0",
@ -253,7 +254,7 @@ defmodule EthereumJSONRPCTest do
end) end)
end end
assert {:ok, %{blocks: [_ | _], transactions: [_ | _]}} = assert {:ok, %Blocks{blocks_params: [_ | _], transactions_params: [_ | _]}} =
EthereumJSONRPC.fetch_blocks_by_hash([block_hash], json_rpc_named_arguments) EthereumJSONRPC.fetch_blocks_by_hash([block_hash], json_rpc_named_arguments)
end end
@ -274,8 +275,18 @@ defmodule EthereumJSONRPCTest do
end) end)
end end
assert {:error, [%{data: %{hash: "0x0"}}]} = hash = "0x0"
EthereumJSONRPC.fetch_blocks_by_hash(["0x0"], json_rpc_named_arguments)
assert {:ok,
%Blocks{
errors: [
%{
data: %{
hash: ^hash
}
}
]
}} = EthereumJSONRPC.fetch_blocks_by_hash([hash], json_rpc_named_arguments)
end end
test "full batch errors are returned", %{json_rpc_named_arguments: json_rpc_named_arguments} do test "full batch errors are returned", %{json_rpc_named_arguments: json_rpc_named_arguments} do

@ -6,6 +6,7 @@ defmodule Indexer.Block.Uncle.Fetcher do
require Logger require Logger
alias EthereumJSONRPC.Blocks
alias Explorer.Chain alias Explorer.Chain
alias Explorer.Chain.Hash alias Explorer.Chain.Hash
alias Indexer.{AddressExtraction, Block, BufferedTask} alias Indexer.{AddressExtraction, Block, BufferedTask}
@ -72,38 +73,8 @@ defmodule Indexer.Block.Uncle.Fetcher do
Logger.debug(fn -> "fetching #{length(unique_hashes)} uncle blocks" end) Logger.debug(fn -> "fetching #{length(unique_hashes)} uncle blocks" end)
case EthereumJSONRPC.fetch_blocks_by_hash(unique_hashes, json_rpc_named_arguments) do case EthereumJSONRPC.fetch_blocks_by_hash(unique_hashes, json_rpc_named_arguments) do
{:ok, {:ok, blocks} ->
%{ run_blocks(blocks, block_fetcher, unique_hashes)
blocks: blocks_params,
transactions: transactions_params,
block_second_degree_relations: block_second_degree_relations_params
}} ->
addresses_params =
AddressExtraction.extract_addresses(%{blocks: blocks_params, transactions: transactions_params})
case Block.Fetcher.import(block_fetcher, %{
addresses: %{params: addresses_params},
blocks: %{params: blocks_params},
block_second_degree_relations: %{params: block_second_degree_relations_params},
transactions: %{params: transactions_params, on_conflict: :nothing}
}) do
{:ok, _} ->
:ok
{:error, step, failed_value, _changes_so_far} ->
Logger.error(fn ->
[
"failed to import ",
unique_hashes |> length() |> to_string(),
"uncle blocks in step ",
inspect(step),
": ",
inspect(failed_value)
]
end)
{:retry, unique_hashes}
end
{:error, reason} -> {:error, reason} ->
Logger.error(fn -> Logger.error(fn ->
@ -114,6 +85,45 @@ defmodule Indexer.Block.Uncle.Fetcher do
end end
end end
defp run_blocks(%Blocks{blocks_params: []}, _, original_entries), do: {:retry, original_entries}
defp run_blocks(
%Blocks{
blocks_params: blocks_params,
transactions_params: transactions_params,
block_second_degree_relations_params: block_second_degree_relations_params,
errors: errors
},
block_fetcher,
original_entries
) do
addresses_params = AddressExtraction.extract_addresses(%{blocks: blocks_params, transactions: transactions_params})
case Block.Fetcher.import(block_fetcher, %{
addresses: %{params: addresses_params},
blocks: %{params: blocks_params},
block_second_degree_relations: %{params: block_second_degree_relations_params},
transactions: %{params: transactions_params, on_conflict: :nothing}
}) do
{:ok, _} ->
retry(errors, original_entries)
{:error, step, failed_value, _changes_so_far} ->
Logger.error(fn ->
[
"failed to import ",
original_entries |> length() |> to_string(),
"uncle blocks in step ",
inspect(step),
": ",
inspect(failed_value)
]
end)
{:retry, original_entries}
end
end
@ignored_options ~w(address_hash_to_fetched_balance_block_number transaction_hash_to_block_number)a @ignored_options ~w(address_hash_to_fetched_balance_block_number transaction_hash_to_block_number)a
@impl Block.Fetcher @impl Block.Fetcher
@ -170,4 +180,42 @@ defmodule Indexer.Block.Uncle.Fetcher do
%{uncle_hash: uncle_hash, index: index, hash: hash} %{uncle_hash: uncle_hash, index: index, hash: hash}
end) end)
end end
defp retry([], _), do: :ok
defp retry(errors, original_entries) when is_list(errors) do
retried_entries = errors_to_entries(errors)
Logger.error(fn ->
[
"failed to fetch ",
retried_entries |> length() |> to_string(),
"/",
original_entries |> length() |> to_string(),
" uncles: ",
errors_to_iodata(errors)
]
end)
end
defp errors_to_entries(errors) when is_list(errors) do
Enum.map(errors, &error_to_entry/1)
end
defp error_to_entry(%{data: %{hash: hash}}) when is_binary(hash), do: hash
defp errors_to_iodata(errors) when is_list(errors) do
errors_to_iodata(errors, [])
end
defp errors_to_iodata([], iodata), do: iodata
defp errors_to_iodata([error | errors], iodata) do
errors_to_iodata(errors, [iodata | error_to_iodata(error)])
end
defp error_to_iodata(%{code: code, message: message, data: %{hash: hash}})
when is_integer(code) and is_binary(message) and is_binary(hash) do
[hash, ": (", to_string(code), ") ", message, ?\n]
end
end end

@ -173,6 +173,6 @@ defmodule Indexer.CoinBalance.Fetcher do
data: %{block_quantity: block_quantity, hash_data: hash_data} data: %{block_quantity: block_quantity, hash_data: hash_data}
}) })
when is_integer(code) and is_binary(message) and is_binary(block_quantity) and is_binary(hash_data) do when is_integer(code) and is_binary(message) and is_binary(block_quantity) and is_binary(hash_data) do
[hash_data, "@", quantity_to_integer(block_quantity), ": (", to_string(code), ") ", message] [hash_data, "@", quantity_to_integer(block_quantity), ": (", to_string(code), ") ", message, ?\n]
end end
end end

@ -51,12 +51,13 @@ defmodule Indexer.Block.Uncle.FetcherTest do
uncle_uncle_hash_data = to_string(block_hash()) uncle_uncle_hash_data = to_string(block_hash())
EthereumJSONRPC.Mox EthereumJSONRPC.Mox
|> expect(:json_rpc, fn [%{method: "eth_getBlockByHash", params: [^uncle_hash_data, true]}], _ -> |> expect(:json_rpc, fn [%{id: id, method: "eth_getBlockByHash", params: [^uncle_hash_data, true]}], _ ->
number_quantity = "0x0" number_quantity = "0x0"
{:ok, {:ok,
[ [
%{ %{
id: id,
result: %{ result: %{
"author" => "0xe2ac1c6843a33f81ae4935e5ef1277a392990381", "author" => "0xe2ac1c6843a33f81ae4935e5ef1277a392990381",
"difficulty" => "0xfffffffffffffffffffffffffffffffe", "difficulty" => "0xfffffffffffffffffffffffffffffffe",

Loading…
Cancel
Save