Merge pull request #1135 from poanetwork/1070

Partial retry for batched JSONRPC requests
pull/1148/head
Luke Imhoff 6 years ago committed by GitHub
commit 25eebac2a3
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 2
      .credo.exs
  2. 226
      apps/ethereum_jsonrpc/lib/ethereum_jsonrpc.ex
  3. 19
      apps/ethereum_jsonrpc/lib/ethereum_jsonrpc/block.ex
  4. 11
      apps/ethereum_jsonrpc/lib/ethereum_jsonrpc/block/by_hash.ex
  5. 11
      apps/ethereum_jsonrpc/lib/ethereum_jsonrpc/block/by_number.ex
  6. 25
      apps/ethereum_jsonrpc/lib/ethereum_jsonrpc/block/by_tag.ex
  7. 54
      apps/ethereum_jsonrpc/lib/ethereum_jsonrpc/blocks.ex
  8. 62
      apps/ethereum_jsonrpc/lib/ethereum_jsonrpc/fetched_balance.ex
  9. 49
      apps/ethereum_jsonrpc/lib/ethereum_jsonrpc/fetched_balances.ex
  10. 16
      apps/ethereum_jsonrpc/lib/ethereum_jsonrpc/fetched_beneficiaries.ex
  11. 8
      apps/ethereum_jsonrpc/lib/ethereum_jsonrpc/fetched_beneficiary.ex
  12. 54
      apps/ethereum_jsonrpc/lib/ethereum_jsonrpc/parity.ex
  13. 94
      apps/ethereum_jsonrpc/lib/ethereum_jsonrpc/parity/fetched_beneficiaries.ex
  14. 8
      apps/ethereum_jsonrpc/lib/ethereum_jsonrpc/variant.ex
  15. 130
      apps/ethereum_jsonrpc/test/ethereum_jsonrpc/parity_test.exs
  16. 130
      apps/ethereum_jsonrpc/test/ethereum_jsonrpc_test.exs
  17. 61
      apps/indexer/lib/indexer/block/catchup/fetcher.ex
  18. 41
      apps/indexer/lib/indexer/block/fetcher.ex
  19. 30
      apps/indexer/lib/indexer/block/realtime/fetcher.ex
  20. 86
      apps/indexer/lib/indexer/block/uncle/fetcher.ex
  21. 91
      apps/indexer/lib/indexer/coin_balance/fetcher.ex
  22. 10
      apps/indexer/test/indexer/block/catchup/bound_interval_supervisor_test.exs
  23. 27
      apps/indexer/test/indexer/block/fetcher_test.exs
  24. 12
      apps/indexer/test/indexer/block/realtime/fetcher_test.exs
  25. 3
      apps/indexer/test/indexer/block/uncle/fetcher_test.exs
  26. 78
      apps/indexer/test/indexer/coin_balance/fetcher_test.exs

@ -75,7 +75,7 @@
# Priority values are: `low, normal, high, higher` # Priority values are: `low, normal, high, higher`
# #
{Credo.Check.Design.AliasUsage, {Credo.Check.Design.AliasUsage,
excluded_namespaces: ~w(Import Socket Task), excluded_namespaces: ~w(Block Blocks Import Socket Task),
excluded_lastnames: ~w(Address DateTime Exporter Fetcher Full Instrumenter Monitor Name Number Repo Time Unit), excluded_lastnames: ~w(Address DateTime Exporter Fetcher Full Instrumenter Monitor Name Number Repo Time Unit),
priority: :low}, priority: :low},

@ -25,16 +25,14 @@ defmodule EthereumJSONRPC do
documentation for `EthereumJSONRPC.RequestCoordinator`. documentation for `EthereumJSONRPC.RequestCoordinator`.
""" """
alias Explorer.Chain.Block
alias EthereumJSONRPC.{ alias EthereumJSONRPC.{
Block,
Blocks, Blocks,
FetchedBalances,
Receipts, Receipts,
RequestCoordinator, RequestCoordinator,
Subscription, Subscription,
Transactions,
Transport, Transport,
Uncles,
Variant Variant
} }
@ -88,11 +86,6 @@ defmodule EthereumJSONRPC do
{:transport, Transport.t()} | {:transport_options, Transport.options()} | {:variant, Variant.t()} {:transport, Transport.t()} | {:transport_options, Transport.options()} | {:variant, Variant.t()}
] ]
@typedoc """
If there are more blocks.
"""
@type next :: :end_of_chain | :more
@typedoc """ @typedoc """
8 byte [KECCAK-256](https://en.wikipedia.org/wiki/SHA-3) hash of the proof-of-work. 8 byte [KECCAK-256](https://en.wikipedia.org/wiki/SHA-3) hash of the proof-of-work.
""" """
@ -190,25 +183,16 @@ defmodule EthereumJSONRPC do
@spec fetch_balances( @spec fetch_balances(
[%{required(:block_quantity) => quantity, required(:hash_data) => data()}], [%{required(:block_quantity) => quantity, required(:hash_data) => data()}],
json_rpc_named_arguments json_rpc_named_arguments
) :: ) :: {:ok, FetchedBalances.t()} | {:error, reason :: term}
{:ok,
[
%{
required(:address_hash) => quantity,
required(:block_number) => Block.block_number(),
required(:value) => non_neg_integer()
}
]}
| {:error, reason :: term}
def fetch_balances(params_list, json_rpc_named_arguments) def fetch_balances(params_list, json_rpc_named_arguments)
when is_list(params_list) and is_list(json_rpc_named_arguments) do when is_list(params_list) and is_list(json_rpc_named_arguments) do
id_to_params = id_to_params(params_list) id_to_params = id_to_params(params_list)
with {:ok, responses} <- with {:ok, responses} <-
id_to_params id_to_params
|> get_balance_requests() |> FetchedBalances.requests()
|> json_rpc(json_rpc_named_arguments) do |> json_rpc(json_rpc_named_arguments) do
get_balance_responses_to_balances_params(responses, id_to_params) {:ok, FetchedBalances.from_responses(responses, id_to_params)}
end end
end end
@ -224,43 +208,21 @@ defmodule EthereumJSONRPC do
Transaction data is included for each block. Transaction data is included for each block.
""" """
@spec fetch_blocks_by_hash([hash()], json_rpc_named_arguments) :: {:ok, Blocks.t()} | {:error, reason :: term}
def fetch_blocks_by_hash(block_hashes, json_rpc_named_arguments) do def fetch_blocks_by_hash(block_hashes, json_rpc_named_arguments) do
id_to_params =
block_hashes block_hashes
|> Enum.map(fn block_hash -> %{hash: block_hash} end) |> Enum.map(fn block_hash -> %{hash: block_hash} end)
|> id_to_params() |> fetch_blocks_by_params(&Block.ByHash.request/1, json_rpc_named_arguments)
id_to_params
|> get_block_by_hash_requests()
|> json_rpc(json_rpc_named_arguments)
|> handle_get_blocks(id_to_params)
|> case do
{:ok, _next, results} -> {:ok, results}
{:error, reason} -> {:error, reason}
end
end end
@doc """ @doc """
Fetches blocks by block number range. Fetches blocks by block number range.
""" """
@spec fetch_blocks_by_range(Range.t(), json_rpc_named_arguments) :: @spec fetch_blocks_by_range(Range.t(), json_rpc_named_arguments) :: {:ok, Blocks.t()} | {:error, reason :: term}
{:ok, next,
%{
blocks: Blocks.params(),
block_second_degree_relations: Uncles.params(),
transactions: Transactions.params()
}}
| {:error, [reason :: term, ...]}
def fetch_blocks_by_range(_first.._last = range, json_rpc_named_arguments) do def fetch_blocks_by_range(_first.._last = range, json_rpc_named_arguments) do
id_to_params =
range range
|> Enum.map(fn number -> %{number: number} end) |> Enum.map(fn number -> %{number: number} end)
|> id_to_params() |> fetch_blocks_by_params(&Block.ByNumber.request/1, json_rpc_named_arguments)
id_to_params
|> get_block_by_number_requests()
|> json_rpc(json_rpc_named_arguments)
|> handle_get_blocks(id_to_params)
end end
@doc """ @doc """
@ -276,10 +238,10 @@ defmodule EthereumJSONRPC do
@spec fetch_block_number_by_tag(tag(), json_rpc_named_arguments) :: @spec fetch_block_number_by_tag(tag(), json_rpc_named_arguments) ::
{:ok, non_neg_integer()} | {:error, reason :: :invalid_tag | :not_found | term()} {:ok, non_neg_integer()} | {:error, reason :: :invalid_tag | :not_found | term()}
def fetch_block_number_by_tag(tag, json_rpc_named_arguments) when tag in ~w(earliest latest pending) do def fetch_block_number_by_tag(tag, json_rpc_named_arguments) when tag in ~w(earliest latest pending) do
tag %{id: 0, tag: tag}
|> get_block_by_tag_request() |> Block.ByTag.request()
|> json_rpc(json_rpc_named_arguments) |> json_rpc(json_rpc_named_arguments)
|> handle_get_block_by_tag() |> Block.ByTag.number_from_result()
end end
@doc """ @doc """
@ -312,6 +274,7 @@ defmodule EthereumJSONRPC do
@doc """ @doc """
Assigns an id to each set of params in `params_list` for batch request-response correlation Assigns an id to each set of params in `params_list` for batch request-response correlation
""" """
@spec id_to_params([params]) :: %{id => params} when id: non_neg_integer(), params: map()
def id_to_params(params_list) do def id_to_params(params_list) do
params_list params_list
|> Stream.with_index() |> Stream.with_index()
@ -420,162 +383,15 @@ defmodule EthereumJSONRPC do
|> Timex.from_unix() |> Timex.from_unix()
end end
defp get_balance_requests(id_to_params) when is_map(id_to_params) do defp fetch_blocks_by_params(params, request, json_rpc_named_arguments)
Enum.map(id_to_params, fn {id, %{block_quantity: block_quantity, hash_data: hash_data}} -> when is_list(params) and is_function(request, 1) do
get_balance_request(%{id: id, block_quantity: block_quantity, hash_data: hash_data}) id_to_params = id_to_params(params)
end)
end
defp get_balance_request(%{id: id, block_quantity: block_quantity, hash_data: hash_data}) do
request(%{id: id, method: "eth_getBalance", params: [hash_data, block_quantity]})
end
defp get_balance_responses_to_balances_params(responses, id_to_params)
when is_list(responses) and is_map(id_to_params) do
{status, reversed} =
responses
|> Enum.map(&get_balance_responses_to_balance_params(&1, id_to_params))
|> Enum.reduce(
{:ok, []},
fn
{:ok, address_params}, {:ok, address_params_list} ->
{:ok, [address_params | address_params_list]}
{:ok, _}, {:error, _} = acc_error ->
acc_error
{:error, reason}, {:ok, _} ->
{:error, [reason]}
{:error, reason}, {:error, acc_reason} ->
{:error, [reason | acc_reason]}
end
)
{status, Enum.reverse(reversed)}
end
defp get_balance_responses_to_balance_params(%{id: id, result: fetched_balance_quantity}, id_to_params)
when is_map(id_to_params) do
%{block_quantity: block_quantity, hash_data: hash_data} = Map.fetch!(id_to_params, id)
{:ok,
%{
value: quantity_to_integer(fetched_balance_quantity),
block_number: quantity_to_integer(block_quantity),
address_hash: hash_data
}}
end
defp get_balance_responses_to_balance_params(%{id: id, error: error}, id_to_params)
when is_map(id_to_params) do
%{block_quantity: block_quantity, hash_data: hash_data} = Map.fetch!(id_to_params, id)
annotated_error = Map.put(error, :data, %{"blockNumber" => block_quantity, "hash" => hash_data})
{:error, annotated_error}
end
defp get_block_by_hash_requests(id_to_params) do
Enum.map(id_to_params, fn {id, %{hash: hash}} ->
get_block_by_hash_request(%{id: id, hash: hash, transactions: :full})
end)
end
defp get_block_by_hash_request(%{id: id} = options) do
request(%{id: id, method: "eth_getBlockByHash", params: get_block_by_hash_params(options)})
end
defp get_block_by_hash_params(%{hash: hash} = options) do with {:ok, responses} <-
[hash, get_block_transactions(options)] id_to_params
end |> Blocks.requests(request)
|> json_rpc(json_rpc_named_arguments) do
defp get_block_by_number_requests(id_to_params) do {:ok, Blocks.from_responses(responses, id_to_params)}
Enum.map(id_to_params, fn {id, %{number: number}} ->
get_block_by_number_request(%{id: id, quantity: number, transactions: :full})
end)
end
defp get_block_by_number_request(%{id: id} = options) do
request(%{id: id, method: "eth_getBlockByNumber", params: get_block_by_number_params(options)})
end
defp get_block_by_tag_request(tag) do
# eth_getBlockByNumber accepts either a number OR a tag
get_block_by_number_request(%{id: 0, tag: tag, transactions: :hashes})
end
defp get_block_by_number_params(options) do
[get_block_by_number_subject(options), get_block_transactions(options)]
end
defp get_block_by_number_subject(options) do
case {Map.fetch(options, :quantity), Map.fetch(options, :tag)} do
{{:ok, integer}, :error} when is_integer(integer) ->
integer_to_quantity(integer)
{:error, {:ok, tag}} ->
tag
end
end
defp get_block_transactions(%{transactions: transactions}) do
case transactions do
:full -> true
:hashes -> false
end
end
defp handle_get_blocks({:ok, results}, id_to_params) when is_list(results) do
with {:ok, next, blocks} <- reduce_results(results, id_to_params) do
elixir_blocks = Blocks.to_elixir(blocks)
elixir_uncles = Blocks.elixir_to_uncles(elixir_blocks)
elixir_transactions = Blocks.elixir_to_transactions(elixir_blocks)
block_second_degree_relations_params = Uncles.elixir_to_params(elixir_uncles)
transactions_params = Transactions.elixir_to_params(elixir_transactions)
blocks_params = Blocks.elixir_to_params(elixir_blocks)
{:ok, next,
%{
blocks: blocks_params,
block_second_degree_relations: block_second_degree_relations_params,
transactions: transactions_params
}}
end
end
defp handle_get_blocks({:error, _} = error, _id_to_params), do: error
defp reduce_results(results, id_to_params) do
Enum.reduce(results, {:ok, :more, []}, &reduce_result(&1, &2, id_to_params))
end
defp reduce_result(%{result: nil}, {:ok, _, blocks}, _id_to_params), do: {:ok, :end_of_chain, blocks}
defp reduce_result(%{result: %{} = block}, {:ok, next, blocks}, _id_to_params), do: {:ok, next, [block | blocks]}
defp reduce_result(%{result: _}, {:error, _} = error, _id_to_params), do: error
defp reduce_result(%{error: reason, id: id}, acc, id_to_params) do
data = Map.fetch!(id_to_params, id)
annotated_reason = Map.put(reason, :data, data)
case acc do
{:ok, _, _} -> {:error, [annotated_reason]}
{:error, reasons} -> {:error, [annotated_reason | reasons]}
end end
end end
defp handle_get_block_by_tag({:ok, %{"number" => nil}}), do: {:error, :not_found}
defp handle_get_block_by_tag({:ok, %{"number" => quantity}}) when is_binary(quantity) do
{:ok, quantity_to_integer(quantity)}
end
# https://github.com/paritytech/parity-ethereum/pull/8281 fixed
# https://github.com/paritytech/parity-ethereum/issues/8028
defp handle_get_block_by_tag({:ok, nil}), do: {:error, :not_found}
defp handle_get_block_by_tag({:error, %{"code" => -32602}}), do: {:error, :invalid_tag}
defp handle_get_block_by_tag({:error, _} = error), do: error
end end

@ -68,6 +68,25 @@ defmodule EthereumJSONRPC.Block do
""" """
@type t :: %{String.t() => EthereumJSONRPC.data() | EthereumJSONRPC.hash() | EthereumJSONRPC.quantity() | nil} @type t :: %{String.t() => EthereumJSONRPC.data() | EthereumJSONRPC.hash() | EthereumJSONRPC.quantity() | nil}
def from_response(%{id: id, result: nil}, id_to_params) when is_map(id_to_params) do
params = Map.fetch!(id_to_params, id)
{:error, %{code: 404, message: "Not Found", data: params}}
end
def from_response(%{id: id, result: block}, id_to_params) when is_map(id_to_params) do
true = Map.has_key?(id_to_params, id)
{:ok, block}
end
def from_response(%{id: id, error: error}, id_to_params) when is_map(id_to_params) do
params = Map.fetch!(id_to_params, id)
annotated_error = Map.put(error, :data, params)
{:error, annotated_error}
end
@doc """ @doc """
Converts `t:elixir/0` format to params used in `Explorer.Chain`. Converts `t:elixir/0` format to params used in `Explorer.Chain`.

@ -0,0 +1,11 @@
defmodule EthereumJSONRPC.Block.ByHash do
@moduledoc """
Block format as returned by [`eth_getBlockByHash`](https://github.com/ethereum/wiki/wiki/JSON-RPC#eth_getblockbyhash)
"""
@include_transactions true
def request(%{id: id, hash: hash}) do
EthereumJSONRPC.request(%{id: id, method: "eth_getBlockByHash", params: [hash, @include_transactions]})
end
end

@ -0,0 +1,11 @@
defmodule EthereumJSONRPC.Block.ByNumber do
@moduledoc """
Block format as returned by [`eth_getBlockByNumber`](https://github.com/ethereum/wiki/wiki/JSON-RPC#eth_getblockbyhash)
"""
import EthereumJSONRPC, only: [integer_to_quantity: 1]
def request(%{id: id, number: number}) do
EthereumJSONRPC.request(%{id: id, method: "eth_getBlockByNumber", params: [integer_to_quantity(number), true]})
end
end

@ -0,0 +1,25 @@
defmodule EthereumJSONRPC.Block.ByTag do
@moduledoc """
Block format returned by [`eth_getBlockByNumber`](https://github.com/ethereum/wiki/wiki/JSON-RPC#eth_getblockbyhash)
when used with a semantic tag name instead of a number.
"""
import EthereumJSONRPC, only: [quantity_to_integer: 1]
def request(%{id: id, tag: tag}) when is_binary(tag) do
EthereumJSONRPC.request(%{id: id, method: "eth_getBlockByNumber", params: [tag, false]})
end
def number_from_result({:ok, %{"number" => nil}}), do: {:error, :not_found}
def number_from_result({:ok, %{"number" => quantity}}) when is_binary(quantity) do
{:ok, quantity_to_integer(quantity)}
end
# https://github.com/paritytech/parity-ethereum/pull/8281 fixed
# https://github.com/paritytech/parity-ethereum/issues/8028
def number_from_result({:ok, nil}), do: {:error, :not_found}
def number_from_result({:error, %{"code" => -32602}}), do: {:error, :invalid_tag}
def number_from_result({:error, _} = error), do: error
end

@ -4,11 +4,59 @@ defmodule EthereumJSONRPC.Blocks do
and [`eth_getBlockByNumber`](https://github.com/ethereum/wiki/wiki/JSON-RPC#eth_getblockbynumber) from batch requests. and [`eth_getBlockByNumber`](https://github.com/ethereum/wiki/wiki/JSON-RPC#eth_getblockbynumber) from batch requests.
""" """
alias EthereumJSONRPC.{Block, Transactions, Uncles} alias EthereumJSONRPC.{Block, Transactions, Transport, Uncles}
@type elixir :: [Block.elixir()] @type elixir :: [Block.elixir()]
@type params :: [Block.params()] @type params :: [Block.params()]
@type t :: [Block.t()] @type t :: %__MODULE__{
blocks_params: [map()],
block_second_degree_relations_params: [map()],
transactions_params: [map()],
errors: [Transport.error()]
}
defstruct blocks_params: [],
block_second_degree_relations_params: [],
transactions_params: [],
errors: []
def requests(id_to_params, request) when is_map(id_to_params) and is_function(request, 1) do
Enum.map(id_to_params, fn {id, params} ->
params
|> Map.put(:id, id)
|> request.()
end)
end
@spec from_responses(list(), map()) :: t()
def from_responses(responses, id_to_params) when is_list(responses) and is_map(id_to_params) do
%{errors: errors, blocks: blocks} =
responses
|> Enum.map(&Block.from_response(&1, id_to_params))
|> Enum.reduce(%{errors: [], blocks: []}, fn
{:ok, block}, %{blocks: blocks} = acc ->
%{acc | blocks: [block | blocks]}
{:error, error}, %{errors: errors} = acc ->
%{acc | errors: [error | errors]}
end)
elixir_blocks = to_elixir(blocks)
elixir_uncles = elixir_to_uncles(elixir_blocks)
elixir_transactions = elixir_to_transactions(elixir_blocks)
block_second_degree_relations_params = Uncles.elixir_to_params(elixir_uncles)
transactions_params = Transactions.elixir_to_params(elixir_transactions)
blocks_params = elixir_to_params(elixir_blocks)
%__MODULE__{
errors: errors,
blocks_params: blocks_params,
block_second_degree_relations_params: block_second_degree_relations_params,
transactions_params: transactions_params
}
end
@doc """ @doc """
Converts `t:elixir/0` elements to params used by `Explorer.Chain.Block.changeset/2`. Converts `t:elixir/0` elements to params used by `Explorer.Chain.Block.changeset/2`.
@ -282,7 +330,7 @@ defmodule EthereumJSONRPC.Blocks do
} }
] ]
""" """
@spec to_elixir(t) :: elixir @spec to_elixir([Block.t()]) :: elixir
def to_elixir(blocks) when is_list(blocks) do def to_elixir(blocks) when is_list(blocks) do
Enum.map(blocks, &Block.to_elixir/1) Enum.map(blocks, &Block.to_elixir/1)
end end

@ -0,0 +1,62 @@
defmodule EthereumJSONRPC.FetchedBalance do
@moduledoc """
A single balance fetched from `eth_getBalance`.
"""
import EthereumJSONRPC, only: [quantity_to_integer: 1]
@type params :: %{address_hash: EthereumJSONRPC.hash(), block_number: non_neg_integer(), value: non_neg_integer()}
@type error :: %{code: integer(), message: String.t(), data: %{block_quantity: String.t(), hash: String.t()}}
@doc """
Converts `response` to balance params or annotated error.
"""
@spec from_response(%{id: id, result: String.t()}, %{id => %{block_quantity: block_quantity, hash_data: hash_data}}) ::
{:ok, params()}
when id: non_neg_integer(), block_quantity: String.t(), hash_data: String.t()
def from_response(%{id: id, result: fetched_balance_quantity}, id_to_params) when is_map(id_to_params) do
%{block_quantity: block_quantity, hash_data: hash_data} = Map.fetch!(id_to_params, id)
{:ok,
%{
address_hash: hash_data,
block_number: quantity_to_integer(block_quantity),
value: quantity_to_integer(fetched_balance_quantity)
}}
end
@spec from_response(
%{
id: id,
error: %{code: code, message: message}
},
%{id => %{block_quantity: block_quantity, hash_data: hash_data}}
) :: {:error, %{code: code, message: message, data: %{block_quantity: block_quantity, hash: hash_data}}}
when id: non_neg_integer(),
code: integer(),
message: String.t(),
block_quantity: String.t(),
hash_data: String.t()
def from_response(%{id: id, error: %{code: code, message: message} = error}, id_to_params)
when is_integer(code) and is_binary(message) and is_map(id_to_params) do
%{block_quantity: block_quantity, hash_data: hash_data} = Map.fetch!(id_to_params, id)
annotated_error = Map.put(error, :data, %{block_quantity: block_quantity, hash_data: hash_data})
{:error, annotated_error}
end
@spec request(%{id: id, block_quantity: block_quantity, hash_data: hash_data}) :: %{
jsonrpc: String.t(),
id: id,
method: String.t(),
params: [hash_data | block_quantity]
}
when id: EthereumJSONRPC.request_id(),
block_quantity: EthereumJSONRPC.quantity(),
hash_data: EthereumJSONRPC.hash()
def request(%{id: id, block_quantity: block_quantity, hash_data: hash_data}) do
EthereumJSONRPC.request(%{id: id, method: "eth_getBalance", params: [hash_data, block_quantity]})
end
end

@ -0,0 +1,49 @@
defmodule EthereumJSONRPC.FetchedBalances do
@moduledoc """
Balance params and errors from a batch request from `eth_getBalance`.
"""
alias EthereumJSONRPC.FetchedBalance
defstruct params_list: [],
errors: []
@typedoc """
* `params_list` - all the balance params from requests that succeeded in the batch.
* `errors` - all the errors from requests that failed in the batch.
"""
@type t :: %__MODULE__{params_list: [FetchedBalance.params()], errors: [FetchedBalance.error()]}
@doc """
Converts `responses` to `t/0`.
"""
def from_responses(responses, id_to_params) do
responses
|> Enum.map(&FetchedBalance.from_response(&1, id_to_params))
|> Enum.reduce(
%__MODULE__{},
fn
{:ok, params}, %__MODULE__{params_list: params_list} = acc ->
%__MODULE__{acc | params_list: [params | params_list]}
{:error, reason}, %__MODULE__{errors: errors} = acc ->
%__MODULE__{acc | errors: [reason | errors]}
end
)
end
@doc """
`eth_getBalance` requests for `id_to_params`.
"""
@spec requests(%{id => %{block_quantity: block_quantity, hash_data: hash_data}}) :: [
%{jsonrpc: String.t(), id: id, method: String.t(), params: [hash_data | block_quantity]}
]
when id: EthereumJSONRPC.request_id(),
block_quantity: EthereumJSONRPC.quantity(),
hash_data: EthereumJSONRPC.hash()
def requests(id_to_params) when is_map(id_to_params) do
Enum.map(id_to_params, fn {id, %{block_quantity: block_quantity, hash_data: hash_data}} ->
FetchedBalance.request(%{id: id, block_quantity: block_quantity, hash_data: hash_data})
end)
end
end

@ -0,0 +1,16 @@
defmodule EthereumJSONRPC.FetchedBeneficiaries do
@moduledoc """
Balance params and errors from a batch request to fetch beneficiaries.
"""
alias EthereumJSONRPC.FetchedBeneficiary
defstruct params_set: MapSet.new(),
errors: []
@typedoc """
* `params_set` - all the balance request params from requests that succeeded in the batch.
* `errors` - all the errors from requests that failed in the batch.
"""
@type t :: %__MODULE__{params_set: MapSet.t(FetchedBeneficiary.params()), errors: [FetchedBeneficiary.error()]}
end

@ -0,0 +1,8 @@
defmodule EthereumJSONRPC.FetchedBeneficiary do
@moduledoc """
A single balance request params for the beneficiary of a block.
"""
@type params :: %{address_hash: EthereumJSONRPC.hash(), block_number: non_neg_integer()}
@type error :: %{code: integer(), message: String.t(), data: %{block_number: non_neg_integer()}}
end

@ -3,35 +3,26 @@ defmodule EthereumJSONRPC.Parity do
Ethereum JSONRPC methods that are only supported by [Parity](https://wiki.parity.io/). Ethereum JSONRPC methods that are only supported by [Parity](https://wiki.parity.io/).
""" """
import EthereumJSONRPC, only: [id_to_params: 1, json_rpc: 2, request: 1] import EthereumJSONRPC, only: [id_to_params: 1, integer_to_quantity: 1, json_rpc: 2, request: 1]
alias EthereumJSONRPC.Parity.Traces alias EthereumJSONRPC.Parity.{FetchedBeneficiaries, Traces}
alias EthereumJSONRPC.{Transaction, Transactions} alias EthereumJSONRPC.{Transaction, Transactions}
@behaviour EthereumJSONRPC.Variant @behaviour EthereumJSONRPC.Variant
@impl EthereumJSONRPC.Variant @impl EthereumJSONRPC.Variant
def fetch_beneficiaries(block_range, json_rpc_named_arguments) do def fetch_beneficiaries(_.._ = block_range, json_rpc_named_arguments) when is_list(json_rpc_named_arguments) do
Enum.reduce( id_to_params =
Enum.with_index(block_range), block_range
{:ok, MapSet.new()}, |> block_range_to_params_list()
fn |> id_to_params()
{block_number, index}, {:ok, beneficiaries} ->
quantity = EthereumJSONRPC.integer_to_quantity(block_number)
case trace_block(index, quantity, json_rpc_named_arguments) do
{:ok, traces} when is_list(traces) ->
new_beneficiaries = extract_beneficiaries(traces)
{:ok, MapSet.union(new_beneficiaries, beneficiaries)}
_ ->
{:error, "Error fetching block reward contract beneficiaries"}
end
_, {:error, _} = error -> with {:ok, responses} <-
error id_to_params
|> FetchedBeneficiaries.requests()
|> json_rpc(json_rpc_named_arguments) do
{:ok, FetchedBeneficiaries.from_responses(responses, id_to_params)}
end end
)
end end
@doc """ @doc """
@ -72,25 +63,8 @@ defmodule EthereumJSONRPC.Parity do
end end
end end
defp extract_beneficiaries(traces) when is_list(traces) do defp block_range_to_params_list(_.._ = block_range) do
Enum.reduce(traces, MapSet.new(), fn Enum.map(block_range, &%{block_quantity: integer_to_quantity(&1)})
%{"type" => "reward", "blockNumber" => block_number, "action" => %{"author" => author}}, beneficiaries ->
beneficiary = %{
block_number: block_number,
address_hash: author
}
MapSet.put(beneficiaries, beneficiary)
_, beneficiaries ->
beneficiaries
end)
end
defp trace_block(index, quantity, json_rpc_named_arguments) do
%{id: index, method: "trace_block", params: [quantity]}
|> request()
|> json_rpc(json_rpc_named_arguments)
end end
defp trace_replay_transaction_responses_to_internal_transactions_params(responses, id_to_params) defp trace_replay_transaction_responses_to_internal_transactions_params(responses, id_to_params)

@ -0,0 +1,94 @@
defmodule EthereumJSONRPC.Parity.FetchedBeneficiaries do
@moduledoc """
Beneficiaries and errors from batch requests to `trace_block`.
"""
import EthereumJSONRPC, only: [quantity_to_integer: 1]
@doc """
Converts `responses` to `t/0`.
"""
def from_responses(responses, id_to_params) when is_list(responses) and is_map(id_to_params) do
responses
|> Enum.map(&response_to_params_set(&1, id_to_params))
|> Enum.reduce(
%EthereumJSONRPC.FetchedBeneficiaries{},
fn
{:ok, params_set}, %EthereumJSONRPC.FetchedBeneficiaries{params_set: acc_params_set} = acc ->
%EthereumJSONRPC.FetchedBeneficiaries{acc | params_set: MapSet.union(acc_params_set, params_set)}
{:error, reason}, %EthereumJSONRPC.FetchedBeneficiaries{errors: errors} = acc ->
%EthereumJSONRPC.FetchedBeneficiaries{acc | errors: [reason | errors]}
end
)
end
@doc """
`trace_block` requests for `id_to_params`.
"""
def requests(id_to_params) when is_map(id_to_params) do
Enum.map(id_to_params, fn {id, %{block_quantity: block_quantity}} ->
request(%{id: id, block_quantity: block_quantity})
end)
end
@spec response_to_params_set(%{id: id, result: nil}, %{id => %{block_quantity: block_quantity}}) ::
{:error, %{code: 404, message: String.t(), data: %{block_quantity: block_quantity}}}
when id: non_neg_integer(), block_quantity: String.t()
defp response_to_params_set(%{id: id, result: nil}, id_to_params) when is_map(id_to_params) do
%{block_quantity: block_quantity} = Map.fetch!(id_to_params, id)
{:error, %{code: 404, message: "Not Found", data: %{block_quantity: block_quantity}}}
end
@spec response_to_params_set(%{id: id, result: list(map())}, %{id => %{block_quantity: block_quantity}}) ::
{:ok, MapSet.t(EthereumJSONRPC.FetchedBeneficiary.params())}
when id: non_neg_integer(), block_quantity: String.t()
defp response_to_params_set(%{id: id, result: traces}, id_to_params) when is_list(traces) and is_map(id_to_params) do
%{block_quantity: block_quantity} = Map.fetch!(id_to_params, id)
block_number = quantity_to_integer(block_quantity)
params_set = traces_to_params_set(traces, block_number)
{:ok, params_set}
end
@spec response_to_params_set(%{id: id, error: %{code: code, message: message}}, %{
id => %{block_quantity: block_quantity}
}) :: {:error, %{code: code, message: message, data: %{block_quantity: block_quantity}}}
when id: non_neg_integer(), code: integer(), message: String.t(), block_quantity: String.t()
defp response_to_params_set(%{id: id, error: error}, id_to_params) when is_map(id_to_params) do
%{block_quantity: block_quantity} = Map.fetch!(id_to_params, id)
annotated_error = Map.put(error, :data, %{block_quantity: block_quantity})
{:error, annotated_error}
end
defp request(%{id: id, block_quantity: block_quantity}) when is_integer(id) and is_binary(block_quantity) do
EthereumJSONRPC.request(%{id: id, method: "trace_block", params: [block_quantity]})
end
defp traces_to_params_set(traces, block_number) when is_list(traces) and is_integer(block_number) do
Enum.reduce(traces, MapSet.new(), fn trace, acc ->
MapSet.union(acc, trace_to_params_set(trace, block_number))
end)
end
defp trace_to_params_set(%{"action" => %{"callType" => _}, "blockNumber" => block_number}, block_number),
do: MapSet.new()
defp trace_to_params_set(%{"type" => type, "blockNumber" => block_number}, block_number)
when type in ~w(create suicide),
do: MapSet.new()
defp trace_to_params_set(
%{
"action" => %{"rewardType" => reward_type, "author" => address_hash_data},
"blockNumber" => block_number
},
block_number
)
when is_integer(block_number) and reward_type in ~w(block external uncle) do
MapSet.new([%{address_hash: address_hash_data, block_number: block_number}])
end
end

@ -4,7 +4,7 @@ defmodule EthereumJSONRPC.Variant do
Ethereum JSONRPC API. The variant callbacks abstract over this difference. Ethereum JSONRPC API. The variant callbacks abstract over this difference.
""" """
alias EthereumJSONRPC.Transaction alias EthereumJSONRPC.{FetchedBeneficiaries, Transaction}
@typedoc """ @typedoc """
A module that implements the `EthereumJSONRPC.Variant` behaviour callbacks. A module that implements the `EthereumJSONRPC.Variant` behaviour callbacks.
@ -22,12 +22,12 @@ defmodule EthereumJSONRPC.Variant do
## Returns ## Returns
* `{:ok, #MapSet<[%{...}]>}` - beneficiaries were successfully fetched * `{:ok, %EthereumJSONRPC.FetchedBeneficiaries{params_list: [%{address_hash: address_hash, block_number: block_number}], errors: %{code: code, message: message, data: %{block_number: block_number}}}` - some beneficiaries were successfully fetched and some may have had errors.
* `{:error, reason}` - there was one or more errors with `reason` in fetching the beneficiaries * `{:error, reason}` - there was an error at the transport level
* `:ignore` - the variant does not support fetching beneficiaries * `:ignore` - the variant does not support fetching beneficiaries
""" """
@callback fetch_beneficiaries(Range.t(), EthereumJSONRPC.json_rpc_named_arguments()) :: @callback fetch_beneficiaries(Range.t(), EthereumJSONRPC.json_rpc_named_arguments()) ::
{:ok, MapSet.t()} | {:error, reason :: term} | :ignore {:ok, FetchedBeneficiaries.t()} | {:error, reason :: term} | :ignore
@doc """ @doc """
Fetches the `t:Explorer.Chain.InternalTransaction.changeset/2` params from the variant of the Ethereum JSONRPC API. Fetches the `t:Explorer.Chain.InternalTransaction.changeset/2` params from the variant of the Ethereum JSONRPC API.

@ -5,6 +5,8 @@ defmodule EthereumJSONRPC.ParityTest do
import EthereumJSONRPC, only: [integer_to_quantity: 1] import EthereumJSONRPC, only: [integer_to_quantity: 1]
import Mox import Mox
alias EthereumJSONRPC.FetchedBeneficiaries
setup :verify_on_exit! setup :verify_on_exit!
doctest EthereumJSONRPC.Parity doctest EthereumJSONRPC.Parity
@ -245,9 +247,12 @@ defmodule EthereumJSONRPC.ParityTest do
hash2 = "0x523b6539ff08d72a6c8bb598af95bf50c1ea839c" hash2 = "0x523b6539ff08d72a6c8bb598af95bf50c1ea839c"
if json_rpc_named_arguments[:transport] == EthereumJSONRPC.Mox do if json_rpc_named_arguments[:transport] == EthereumJSONRPC.Mox do
expect(EthereumJSONRPC.Mox, :json_rpc, fn %{params: [^block_quantity]}, _options -> expect(EthereumJSONRPC.Mox, :json_rpc, fn [%{id: id, params: [^block_quantity]}], _options ->
{:ok, {:ok,
[ [
%{
id: id,
result: [
%{ %{
"action" => %{ "action" => %{
"author" => hash1, "author" => hash1,
@ -278,20 +283,18 @@ defmodule EthereumJSONRPC.ParityTest do
"transactionPosition" => nil, "transactionPosition" => nil,
"type" => "reward" "type" => "reward"
} }
]
}
]} ]}
end) end)
end end
expected_beneficiaries = assert {:ok, %FetchedBeneficiaries{params_set: params_set}} =
MapSet.new([
%{block_number: block_number, address_hash: hash2},
%{block_number: block_number, address_hash: hash1}
])
{:ok, fetched_beneficiaries} =
EthereumJSONRPC.Parity.fetch_beneficiaries(5_080_887..5_080_887, json_rpc_named_arguments) EthereumJSONRPC.Parity.fetch_beneficiaries(5_080_887..5_080_887, json_rpc_named_arguments)
assert fetched_beneficiaries == expected_beneficiaries assert Enum.count(params_set) == 2
assert %{block_number: block_number, address_hash: hash2} in params_set
assert %{block_number: block_number, address_hash: hash1} in params_set
end end
test "with 'external' 'rewardType'", %{ test "with 'external' 'rewardType'", %{
@ -303,9 +306,12 @@ defmodule EthereumJSONRPC.ParityTest do
hash2 = "0x523b6539ff08d72a6c8bb598af95bf50c1ea839c" hash2 = "0x523b6539ff08d72a6c8bb598af95bf50c1ea839c"
if json_rpc_named_arguments[:transport] == EthereumJSONRPC.Mox do if json_rpc_named_arguments[:transport] == EthereumJSONRPC.Mox do
expect(EthereumJSONRPC.Mox, :json_rpc, fn %{params: [^block_quantity]}, _options -> expect(EthereumJSONRPC.Mox, :json_rpc, fn [%{id: id, params: [^block_quantity]}], _options ->
{:ok, {:ok,
[ [
%{
id: id,
result: [
%{ %{
"action" => %{ "action" => %{
"author" => hash1, "author" => hash1,
@ -336,48 +342,33 @@ defmodule EthereumJSONRPC.ParityTest do
"transactionPosition" => nil, "transactionPosition" => nil,
"type" => "reward" "type" => "reward"
} }
]
}
]} ]}
end) end)
end end
expected_beneficiaries = assert {:ok, %FetchedBeneficiaries{params_set: params_set, errors: []}} =
MapSet.new([
%{block_number: block_number, address_hash: hash2},
%{block_number: block_number, address_hash: hash1}
])
{:ok, fetched_beneficiaries} =
EthereumJSONRPC.Parity.fetch_beneficiaries(5_609_295..5_609_295, json_rpc_named_arguments) EthereumJSONRPC.Parity.fetch_beneficiaries(5_609_295..5_609_295, json_rpc_named_arguments)
assert fetched_beneficiaries == expected_beneficiaries assert Enum.count(params_set) == 2
assert %{block_number: block_number, address_hash: hash1} in params_set
assert %{block_number: block_number, address_hash: hash2} in params_set
end end
test "with no rewards, returns {:ok, []}", %{ test "with no rewards, returns {:ok, []}", %{
json_rpc_named_arguments: json_rpc_named_arguments json_rpc_named_arguments: json_rpc_named_arguments
} do } do
if json_rpc_named_arguments[:transport] == EthereumJSONRPC.Mox do if json_rpc_named_arguments[:transport] == EthereumJSONRPC.Mox do
expect(EthereumJSONRPC.Mox, :json_rpc, fn _json, _options -> expect(EthereumJSONRPC.Mox, :json_rpc, fn requests, _options when is_list(requests) ->
{:ok, []} responses = Enum.map(requests, fn %{id: id} -> %{id: id, result: []} end)
{:ok, responses}
end) end)
{:ok, fetched_beneficiaries} = assert {:ok, %FetchedBeneficiaries{params_set: params_set}} =
EthereumJSONRPC.Parity.fetch_beneficiaries(5_080_887..5_080_887, json_rpc_named_arguments) EthereumJSONRPC.Parity.fetch_beneficiaries(5_080_887..5_080_887, json_rpc_named_arguments)
assert fetched_beneficiaries == MapSet.new() assert Enum.empty?(params_set)
end
end
test "with nil rewards, returns {:error, reason}", %{
json_rpc_named_arguments: json_rpc_named_arguments
} do
if json_rpc_named_arguments[:transport] == EthereumJSONRPC.Mox do
expect(EthereumJSONRPC.Mox, :json_rpc, fn _json, _options ->
{:ok, nil}
end)
result = EthereumJSONRPC.Parity.fetch_beneficiaries(5_080_887..5_080_887, json_rpc_named_arguments)
assert result == {:error, "Error fetching block reward contract beneficiaries"}
end end
end end
@ -390,9 +381,12 @@ defmodule EthereumJSONRPC.ParityTest do
hash2 = "0x523b6539ff08d72a6c8bb598af95bf50c1ea839c" hash2 = "0x523b6539ff08d72a6c8bb598af95bf50c1ea839c"
if json_rpc_named_arguments[:transport] == EthereumJSONRPC.Mox do if json_rpc_named_arguments[:transport] == EthereumJSONRPC.Mox do
expect(EthereumJSONRPC.Mox, :json_rpc, fn %{params: [^block_quantity]}, _options -> expect(EthereumJSONRPC.Mox, :json_rpc, fn [%{id: id, params: [^block_quantity]}], _options ->
{:ok, {:ok,
[ [
%{
id: id,
result: [
%{ %{
"action" => %{ "action" => %{
"callType" => "call", "callType" => "call",
@ -441,20 +435,18 @@ defmodule EthereumJSONRPC.ParityTest do
"transactionPosition" => nil, "transactionPosition" => nil,
"type" => "reward" "type" => "reward"
} }
]
}
]} ]}
end) end)
end end
expected_beneficiaries = assert {:ok, %FetchedBeneficiaries{params_set: params_set}} =
MapSet.new([
%{block_number: block_number, address_hash: hash2},
%{block_number: block_number, address_hash: hash1}
])
{:ok, fetched_beneficiaries} =
EthereumJSONRPC.Parity.fetch_beneficiaries(5_077_429..5_077_429, json_rpc_named_arguments) EthereumJSONRPC.Parity.fetch_beneficiaries(5_077_429..5_077_429, json_rpc_named_arguments)
assert fetched_beneficiaries == expected_beneficiaries assert Enum.count(params_set) == 2
assert %{block_number: block_number, address_hash: hash2} in params_set
assert %{block_number: block_number, address_hash: hash1} in params_set
end end
test "with multiple blocks with repeat beneficiaries", %{ test "with multiple blocks with repeat beneficiaries", %{
@ -469,10 +461,13 @@ defmodule EthereumJSONRPC.ParityTest do
hash3 = "0x523b6539ff08d72a6c8bb598af95bf50c1ea839c" hash3 = "0x523b6539ff08d72a6c8bb598af95bf50c1ea839c"
if json_rpc_named_arguments[:transport] == EthereumJSONRPC.Mox do if json_rpc_named_arguments[:transport] == EthereumJSONRPC.Mox do
expect(EthereumJSONRPC.Mox, :json_rpc, 2, fn expect(EthereumJSONRPC.Mox, :json_rpc, fn requests, _options when is_list(requests) ->
%{params: [^block_quantity1]} = _json, _options -> responses =
{:ok, Enum.map(requests, fn
[ %{id: id, params: [^block_quantity1]} ->
%{
id: id,
result: [
%{ %{
"action" => %{ "action" => %{
"author" => hash1, "author" => hash1,
@ -501,11 +496,13 @@ defmodule EthereumJSONRPC.ParityTest do
"transactionPosition" => nil, "transactionPosition" => nil,
"type" => "reward" "type" => "reward"
} }
]} ]
}
%{params: [^block_quantity2]} = _json, _options -> %{id: id, params: [^block_quantity2]} ->
{:ok, %{
[ id: id,
result: [
%{ %{
"action" => %{ "action" => %{
"author" => hash2, "author" => hash2,
@ -534,22 +531,22 @@ defmodule EthereumJSONRPC.ParityTest do
"transactionPosition" => nil, "transactionPosition" => nil,
"type" => "reward" "type" => "reward"
} }
]} ]
}
end) end)
end
expected_beneficiaries = {:ok, responses}
MapSet.new([ end)
%{block_number: block_number1, address_hash: hash3}, end
%{block_number: block_number2, address_hash: hash3},
%{block_number: block_number2, address_hash: hash2},
%{block_number: block_number1, address_hash: hash1}
])
{:ok, fetched_beneficiaries} = assert {:ok, %FetchedBeneficiaries{params_set: params_set}} =
EthereumJSONRPC.Parity.fetch_beneficiaries(5_080_886..5_080_887, json_rpc_named_arguments) EthereumJSONRPC.Parity.fetch_beneficiaries(5_080_886..5_080_887, json_rpc_named_arguments)
assert fetched_beneficiaries == expected_beneficiaries assert Enum.count(params_set) == 4
assert %{block_number: block_number1, address_hash: hash3} in params_set
assert %{block_number: block_number2, address_hash: hash3} in params_set
assert %{block_number: block_number2, address_hash: hash2} in params_set
assert %{block_number: block_number1, address_hash: hash1} in params_set
end end
test "with error, returns {:error, reason}", %{ test "with error, returns {:error, reason}", %{
@ -560,9 +557,8 @@ defmodule EthereumJSONRPC.ParityTest do
{:error, "oops"} {:error, "oops"}
end) end)
result = EthereumJSONRPC.Parity.fetch_beneficiaries(5_080_887..5_080_887, json_rpc_named_arguments) assert {:error, "oops"} =
EthereumJSONRPC.Parity.fetch_beneficiaries(5_080_887..5_080_887, json_rpc_named_arguments)
assert result == {:error, "Error fetching block reward contract beneficiaries"}
end end
end end
end end

@ -4,7 +4,7 @@ defmodule EthereumJSONRPCTest do
import EthereumJSONRPC.Case import EthereumJSONRPC.Case
import Mox import Mox
alias EthereumJSONRPC.Subscription alias EthereumJSONRPC.{Blocks, FetchedBalances, FetchedBeneficiaries, Subscription}
alias EthereumJSONRPC.WebSocket.WebSocketClient alias EthereumJSONRPC.WebSocket.WebSocketClient
setup :verify_on_exit! setup :verify_on_exit!
@ -37,16 +37,18 @@ defmodule EthereumJSONRPCTest do
json_rpc_named_arguments json_rpc_named_arguments
) == ) ==
{:ok, {:ok,
[ %FetchedBalances{
params_list: [
%{ %{
address_hash: hash, address_hash: hash,
block_number: 1, block_number: 1,
value: expected_fetched_balance value: expected_fetched_balance
} }
]} ]
}}
end end
test "with all invalid hash_data returns {:error, reasons}", %{json_rpc_named_arguments: json_rpc_named_arguments} do test "with all invalid hash_data returns errors", %{json_rpc_named_arguments: json_rpc_named_arguments} do
variant = Keyword.fetch!(json_rpc_named_arguments, :variant) variant = Keyword.fetch!(json_rpc_named_arguments, :variant)
expected_message = expected_message =
@ -76,18 +78,21 @@ defmodule EthereumJSONRPCTest do
end) end)
end end
assert {:error, assert {:ok,
[ %FetchedBalances{
errors: [
%{ %{
code: -32602, code: -32602,
data: %{"blockNumber" => "0x1", "hash" => "0x0"}, data: %{hash_data: "0x0", block_quantity: "0x1"},
message: ^expected_message message: ^expected_message
} }
]} = ],
params_list: []
}} =
EthereumJSONRPC.fetch_balances([%{block_quantity: "0x1", hash_data: "0x0"}], json_rpc_named_arguments) EthereumJSONRPC.fetch_balances([%{block_quantity: "0x1", hash_data: "0x0"}], json_rpc_named_arguments)
end end
test "with a mix of valid and invalid hash_data returns {:error, reasons}", %{ test "with a mix of valid and invalid hash_data returns both", %{
json_rpc_named_arguments: json_rpc_named_arguments json_rpc_named_arguments: json_rpc_named_arguments
} do } do
if json_rpc_named_arguments[:transport] == EthereumJSONRPC.Mox do if json_rpc_named_arguments[:transport] == EthereumJSONRPC.Mox do
@ -128,7 +133,7 @@ defmodule EthereumJSONRPCTest do
end) end)
end end
assert {:error, reasons} = assert {:ok, %FetchedBalances{params_list: params_list, errors: errors}} =
EthereumJSONRPC.fetch_balances( EthereumJSONRPC.fetch_balances(
[ [
# start with :ok # start with :ok
@ -160,8 +165,11 @@ defmodule EthereumJSONRPCTest do
json_rpc_named_arguments json_rpc_named_arguments
) )
assert is_list(reasons) assert is_list(params_list)
assert length(reasons) > 1 assert length(params_list) > 1
assert is_list(errors)
assert length(errors) > 1
end end
end end
@ -173,7 +181,8 @@ defmodule EthereumJSONRPCTest do
{:ok, []} {:ok, []}
end) end)
assert EthereumJSONRPC.fetch_beneficiaries(1..1, json_rpc_named_arguments) == {:ok, MapSet.new()} assert EthereumJSONRPC.fetch_beneficiaries(1..1, json_rpc_named_arguments) ==
{:ok, %FetchedBeneficiaries{params_set: MapSet.new(), errors: []}}
end end
end end
end end
@ -196,12 +205,13 @@ defmodule EthereumJSONRPCTest do
end end
if json_rpc_named_arguments[:transport] == EthereumJSONRPC.Mox do if json_rpc_named_arguments[:transport] == EthereumJSONRPC.Mox do
expect(EthereumJSONRPC.Mox, :json_rpc, fn _json, _options -> expect(EthereumJSONRPC.Mox, :json_rpc, fn [%{id: id}], _options ->
block_number = "0x0" block_number = "0x0"
{:ok, {:ok,
[ [
%{ %{
id: id,
result: %{ result: %{
"difficulty" => "0x0", "difficulty" => "0x0",
"gasLimit" => "0x0", "gasLimit" => "0x0",
@ -244,7 +254,7 @@ defmodule EthereumJSONRPCTest do
end) end)
end end
assert {:ok, %{blocks: [_ | _], transactions: [_ | _]}} = assert {:ok, %Blocks{blocks_params: [_ | _], transactions_params: [_ | _]}} =
EthereumJSONRPC.fetch_blocks_by_hash([block_hash], json_rpc_named_arguments) EthereumJSONRPC.fetch_blocks_by_hash([block_hash], json_rpc_named_arguments)
end end
@ -265,8 +275,18 @@ defmodule EthereumJSONRPCTest do
end) end)
end end
assert {:error, [%{data: %{hash: "0x0"}}]} = hash = "0x0"
EthereumJSONRPC.fetch_blocks_by_hash(["0x0"], json_rpc_named_arguments)
assert {:ok,
%Blocks{
errors: [
%{
data: %{
hash: ^hash
}
}
]
}} = EthereumJSONRPC.fetch_blocks_by_hash([hash], json_rpc_named_arguments)
end end
test "full batch errors are returned", %{json_rpc_named_arguments: json_rpc_named_arguments} do test "full batch errors are returned", %{json_rpc_named_arguments: json_rpc_named_arguments} do
@ -309,15 +329,29 @@ defmodule EthereumJSONRPCTest do
end) end)
end end
assert {:error, assert {:ok,
[%{data: %{number: 1_000_000_000_000_000_000_001}}, %{data: %{number: 1_000_000_000_000_000_000_000}}]} = %EthereumJSONRPC.Blocks{
block_second_degree_relations_params: [],
blocks_params: [],
errors: [
%{
data: %{number: 1_000_000_000_000_000_000_001}
},
%{
data: %{number: 1_000_000_000_000_000_000_000}
}
],
transactions_params: []
}} =
EthereumJSONRPC.fetch_blocks_by_range( EthereumJSONRPC.fetch_blocks_by_range(
1_000_000_000_000_000_000_000..1_000_000_000_000_000_000_001, 1_000_000_000_000_000_000_000..1_000_000_000_000_000_000_001,
json_rpc_named_arguments json_rpc_named_arguments
) )
end end
test "returns only errors if a mix of results and errors", %{json_rpc_named_arguments: json_rpc_named_arguments} do test "returns only errors and results if a mix of results and errors", %{
json_rpc_named_arguments: json_rpc_named_arguments
} do
# Can't be faked reliably on real chain # Can't be faked reliably on real chain
moxed_json_rpc_named_arguments = Keyword.put(json_rpc_named_arguments, :transport, EthereumJSONRPC.Mox) moxed_json_rpc_named_arguments = Keyword.put(json_rpc_named_arguments, :transport, EthereumJSONRPC.Mox)
@ -336,30 +370,71 @@ defmodule EthereumJSONRPCTest do
id: 1, id: 1,
result: %{ result: %{
"difficulty" => "0x0", "difficulty" => "0x0",
"extraData" => "0x",
"gasLimit" => "0x0", "gasLimit" => "0x0",
"gasUsed" => "0x0", "gasUsed" => "0x0",
"hash" => "0x0", "hash" => "0x0",
"logsBloom" => "0x",
"miner" => "0x0", "miner" => "0x0",
"number" => "0x0", "number" => "0x0",
"parentHash" => "0x0", "parentHash" => "0x0",
"receiptsRoot" => "0x0",
"sha3Uncles" => "0x0",
"size" => "0x0", "size" => "0x0",
"stateRoot" => "0x0",
"timestamp" => "0x0", "timestamp" => "0x0",
"totalDifficulty" => "0x0", "totalDifficulty" => "0x0",
"transactions" => [] "transactions" => [],
"transactionsRoot" => [],
"uncles" => []
}, },
jsonrpc: "2.0" jsonrpc: "2.0"
} }
]} ]}
end) end)
assert {:error, [%{data: %{number: 1_000_000_000_000_000_000_000}}]} = assert {:ok,
%EthereumJSONRPC.Blocks{
block_second_degree_relations_params: [],
blocks_params: [
%{
difficulty: 0,
extra_data: "0x",
gas_limit: 0,
gas_used: 0,
hash: "0x0",
logs_bloom: "0x",
miner_hash: "0x0",
mix_hash: "0x0",
nonce: 0,
number: 0,
parent_hash: "0x0",
receipts_root: "0x0",
sha3_uncles: "0x0",
size: 0,
state_root: "0x0",
timestamp: _,
total_difficulty: 0,
transactions_root: [],
uncles: []
}
],
errors: [
%{
code: -32602,
data: %{number: 1_000_000_000_000_000_000_000},
message: "Invalid params: Invalid block number: number too large to fit in target type."
}
],
transactions_params: []
}} =
EthereumJSONRPC.fetch_blocks_by_range( EthereumJSONRPC.fetch_blocks_by_range(
1_000_000_000_000_000_000_000..1_000_000_000_000_000_000_001, 1_000_000_000_000_000_000_000..1_000_000_000_000_000_000_001,
moxed_json_rpc_named_arguments moxed_json_rpc_named_arguments
) )
end end
test "nil result indicated end-of-chain", %{json_rpc_named_arguments: json_rpc_named_arguments} do test "nil result indicated error code 404", %{json_rpc_named_arguments: json_rpc_named_arguments} do
# Can't be faked reliably on real chain # Can't be faked reliably on real chain
moxed_json_rpc_named_arguments = Keyword.put(json_rpc_named_arguments, :transport, EthereumJSONRPC.Mox) moxed_json_rpc_named_arguments = Keyword.put(json_rpc_named_arguments, :transport, EthereumJSONRPC.Mox)
@ -398,8 +473,13 @@ defmodule EthereumJSONRPCTest do
]} ]}
end) end)
assert {:ok, :end_of_chain, %{blocks: [_], transactions: []}} = assert {:ok,
EthereumJSONRPC.fetch_blocks_by_range(0..1, moxed_json_rpc_named_arguments) %EthereumJSONRPC.Blocks{
block_second_degree_relations_params: [],
blocks_params: [%{}],
errors: [%{code: 404, data: %{number: 1}, message: "Not Found"}],
transactions_params: []
}} = EthereumJSONRPC.fetch_blocks_by_range(0..1, moxed_json_rpc_named_arguments)
end end
end end

@ -166,9 +166,11 @@ defmodule Indexer.Block.Catchup.Fetcher do
sequence sequence
) do ) do
case fetch_and_import_range(block_fetcher, range) do case fetch_and_import_range(block_fetcher, range) do
{:ok, {inserted, next}} -> {:ok, %{inserted: inserted, errors: errors}} ->
cap_seq(sequence, next, range) errors = cap_seq(sequence, errors, range)
{:ok, inserted} retry(sequence, errors)
{:ok, inserted: inserted}
{:error, {step, reason}} = error -> {:error, {step, reason}} = error ->
Logger.error(fn -> Logger.error(fn ->
@ -200,19 +202,27 @@ defmodule Indexer.Block.Catchup.Fetcher do
end end
end end
defp cap_seq(seq, next, range) do defp cap_seq(seq, errors, range) do
case next do {not_founds, other_errors} =
:more -> Enum.split_with(errors, fn
%{code: 404, data: %{number: _}} -> true
_ -> false
end)
case not_founds do
[] ->
Logger.debug(fn -> Logger.debug(fn ->
first_block_number..last_block_number = range first_block_number..last_block_number = range
"got blocks #{first_block_number} - #{last_block_number}" "got blocks #{first_block_number} - #{last_block_number}"
end) end)
:end_of_chain -> other_errors
_ ->
Sequence.cap(seq) Sequence.cap(seq)
end end
:ok other_errors
end end
defp push_back(sequence, range) do defp push_back(sequence, range) do
@ -222,6 +232,41 @@ defmodule Indexer.Block.Catchup.Fetcher do
end end
end end
defp retry(sequence, errors) when is_list(errors) do
errors
|> errors_to_ranges()
|> Enum.map(&push_back(sequence, &1))
end
defp errors_to_ranges(errors) when is_list(errors) do
errors
|> Enum.flat_map(&error_to_numbers/1)
|> numbers_to_ranges()
end
defp error_to_numbers(%{data: %{number: number}}) when is_integer(number), do: [number]
defp numbers_to_ranges([]), do: []
defp numbers_to_ranges(numbers) when is_list(numbers) do
numbers
|> Enum.sort()
|> Enum.chunk_while(
nil,
fn
number, nil ->
{:cont, number..number}
number, first..last when number == last + 1 ->
{:cont, first..number}
number, range ->
{:cont, range, number..number}
end,
fn range -> {:cont, range} end
)
end
defp put_memory_monitor(sequence_options, %__MODULE__{memory_monitor: nil}) when is_list(sequence_options), defp put_memory_monitor(sequence_options, %__MODULE__{memory_monitor: nil}) when is_list(sequence_options),
do: sequence_options do: sequence_options

@ -5,6 +5,7 @@ defmodule Indexer.Block.Fetcher do
require Logger require Logger
alias EthereumJSONRPC.{Blocks, FetchedBeneficiaries}
alias Explorer.Chain.{Address, Block, Import} alias Explorer.Chain.{Address, Block, Import}
alias Indexer.{AddressExtraction, CoinBalance, MintTransfer, Token, TokenTransfers} alias Indexer.{AddressExtraction, CoinBalance, MintTransfer, Token, TokenTransfers}
alias Indexer.Address.{CoinBalances, TokenBalances} alias Indexer.Address.{CoinBalances, TokenBalances}
@ -76,7 +77,7 @@ defmodule Indexer.Block.Fetcher do
end end
@spec fetch_and_import_range(t, Range.t()) :: @spec fetch_and_import_range(t, Range.t()) ::
{:ok, {inserted :: %{}, next :: :more | :end_of_chain}} {:ok, %{inserted: %{}, errors: [EthereumJSONRPC.Transport.error()]}}
| {:error, | {:error,
{step :: atom(), reason :: term()} {step :: atom(), reason :: term()}
| [%Ecto.Changeset{}] | [%Ecto.Changeset{}]
@ -90,23 +91,26 @@ defmodule Indexer.Block.Fetcher do
_.._ = range _.._ = range
) )
when callback_module != nil do when callback_module != nil do
with {:blocks, {:ok, next, result}} <- with {:blocks,
{:blocks, EthereumJSONRPC.fetch_blocks_by_range(range, json_rpc_named_arguments)}, {:ok,
%{ %Blocks{
blocks: blocks, blocks_params: blocks_params,
transactions: transactions_without_receipts, transactions_params: transactions_params_without_receipts,
block_second_degree_relations: block_second_degree_relations block_second_degree_relations_params: block_second_degree_relations_params,
} = result, errors: blocks_errors
blocks = Transform.transform_blocks(blocks), }}} <- {:blocks, EthereumJSONRPC.fetch_blocks_by_range(range, json_rpc_named_arguments)},
{:receipts, {:ok, receipt_params}} <- {:receipts, Receipts.fetch(state, transactions_without_receipts)}, blocks = Transform.transform_blocks(blocks_params),
{:receipts, {:ok, receipt_params}} <- {:receipts, Receipts.fetch(state, transactions_params_without_receipts)},
%{logs: logs, receipts: receipts} = receipt_params, %{logs: logs, receipts: receipts} = receipt_params,
transactions_with_receipts = Receipts.put(transactions_without_receipts, receipts), transactions_with_receipts = Receipts.put(transactions_params_without_receipts, receipts),
%{token_transfers: token_transfers, tokens: tokens} = TokenTransfers.parse(logs), %{token_transfers: token_transfers, tokens: tokens} = TokenTransfers.parse(logs),
%{mint_transfers: mint_transfers} = MintTransfer.parse(logs), %{mint_transfers: mint_transfers} = MintTransfer.parse(logs),
{:beneficiaries, {:ok, beneficiaries}} <- fetch_beneficiaries(range, json_rpc_named_arguments), {:beneficiaries,
{:ok, %FetchedBeneficiaries{params_set: beneficiary_params_set, errors: beneficiaries_errors}}} <-
fetch_beneficiaries(range, json_rpc_named_arguments),
addresses = addresses =
AddressExtraction.extract_addresses(%{ AddressExtraction.extract_addresses(%{
block_reward_contract_beneficiaries: MapSet.to_list(beneficiaries), block_reward_contract_beneficiaries: MapSet.to_list(beneficiary_params_set),
blocks: blocks, blocks: blocks,
logs: logs, logs: logs,
mint_transfers: mint_transfers, mint_transfers: mint_transfers,
@ -120,7 +124,7 @@ defmodule Indexer.Block.Fetcher do
transactions_params: transactions_with_receipts transactions_params: transactions_with_receipts
} }
|> CoinBalances.params_set() |> CoinBalances.params_set()
|> MapSet.union(beneficiaries), |> MapSet.union(beneficiary_params_set),
address_token_balances = TokenBalances.params_set(%{token_transfers_params: token_transfers}), address_token_balances = TokenBalances.params_set(%{token_transfers_params: token_transfers}),
{:ok, inserted} <- {:ok, inserted} <-
__MODULE__.import( __MODULE__.import(
@ -130,14 +134,14 @@ defmodule Indexer.Block.Fetcher do
address_coin_balances: %{params: coin_balances_params_set}, address_coin_balances: %{params: coin_balances_params_set},
address_token_balances: %{params: address_token_balances}, address_token_balances: %{params: address_token_balances},
blocks: %{params: blocks}, blocks: %{params: blocks},
block_second_degree_relations: %{params: block_second_degree_relations}, block_second_degree_relations: %{params: block_second_degree_relations_params},
logs: %{params: logs}, logs: %{params: logs},
token_transfers: %{params: token_transfers}, token_transfers: %{params: token_transfers},
tokens: %{on_conflict: :nothing, params: tokens}, tokens: %{on_conflict: :nothing, params: tokens},
transactions: %{params: transactions_with_receipts} transactions: %{params: transactions_with_receipts}
} }
) do ) do
{:ok, {inserted, next}} {:ok, %{inserted: inserted, errors: blocks_errors ++ beneficiaries_errors}}
else else
{step, {:error, reason}} -> {:error, {step, reason}} {step, {:error, reason}} -> {:error, {step, reason}}
{:error, :timeout} = error -> error {:error, :timeout} = error -> error
@ -200,9 +204,8 @@ defmodule Indexer.Block.Fetcher do
defp fetch_beneficiaries(range, json_rpc_named_arguments) do defp fetch_beneficiaries(range, json_rpc_named_arguments) do
result = result =
case EthereumJSONRPC.fetch_beneficiaries(range, json_rpc_named_arguments) do with :ignore <- EthereumJSONRPC.fetch_beneficiaries(range, json_rpc_named_arguments) do
:ignore -> {:ok, MapSet.new()} {:ok, %FetchedBeneficiaries{params_set: MapSet.new()}}
result -> result
end end
{:beneficiaries, result} {:beneficiaries, result}

@ -11,7 +11,7 @@ defmodule Indexer.Block.Realtime.Fetcher do
import Indexer.Block.Fetcher, only: [async_import_tokens: 1, async_import_uncles: 1, fetch_and_import_range: 2] import Indexer.Block.Fetcher, only: [async_import_tokens: 1, async_import_uncles: 1, fetch_and_import_range: 2]
alias Ecto.Changeset alias Ecto.Changeset
alias EthereumJSONRPC.Subscription alias EthereumJSONRPC.{FetchedBalances, Subscription}
alias Explorer.Chain alias Explorer.Chain
alias Indexer.{AddressExtraction, Block, TokenBalances} alias Indexer.{AddressExtraction, Block, TokenBalances}
alias Indexer.Block.Realtime.TaskSupervisor alias Indexer.Block.Realtime.TaskSupervisor
@ -127,11 +127,22 @@ defmodule Indexer.Block.Realtime.Fetcher do
def fetch_and_import_block(block_number_to_fetch, block_fetcher, retry \\ 3) do def fetch_and_import_block(block_number_to_fetch, block_fetcher, retry \\ 3) do
case fetch_and_import_range(block_fetcher, block_number_to_fetch..block_number_to_fetch) do case fetch_and_import_range(block_fetcher, block_number_to_fetch..block_number_to_fetch) do
{:ok, {_inserted, _next}} -> {:ok, %{inserted: _, errors: []}} ->
Logger.debug(fn -> Logger.debug(fn ->
["realtime indexer fetched and imported block ", to_string(block_number_to_fetch)] ["realtime indexer fetched and imported block ", to_string(block_number_to_fetch)]
end) end)
{:ok, %{inserted: _, errors: [_ | _] = errors}} ->
Logger.error(fn ->
[
"realtime indexer failed to fetch block",
to_string(block_number_to_fetch),
": ",
inspect(errors),
". Block will be retried by catchup indexer."
]
end)
{:error, {step, reason}} -> {:error, {step, reason}} ->
Logger.error(fn -> Logger.error(fn ->
[ [
@ -247,20 +258,27 @@ defmodule Indexer.Block.Realtime.Fetcher do
%Block.Fetcher{json_rpc_named_arguments: json_rpc_named_arguments}, %Block.Fetcher{json_rpc_named_arguments: json_rpc_named_arguments},
%{addresses_params: addresses_params} = options %{addresses_params: addresses_params} = options
) do ) do
with {:ok, fetched_balances_params} <- case options
options
|> fetch_balances_params_list() |> fetch_balances_params_list()
|> EthereumJSONRPC.fetch_balances(json_rpc_named_arguments) do |> EthereumJSONRPC.fetch_balances(json_rpc_named_arguments) do
{:ok, %FetchedBalances{params_list: params_list, errors: []}} ->
merged_addresses_params = merged_addresses_params =
%{address_coin_balances: fetched_balances_params} %{address_coin_balances: params_list}
|> AddressExtraction.extract_addresses() |> AddressExtraction.extract_addresses()
|> Kernel.++(addresses_params) |> Kernel.++(addresses_params)
|> AddressExtraction.merge_addresses() |> AddressExtraction.merge_addresses()
value_fetched_at = DateTime.utc_now() value_fetched_at = DateTime.utc_now()
importable_balances_params = Enum.map(fetched_balances_params, &Map.put(&1, :value_fetched_at, value_fetched_at))
importable_balances_params = Enum.map(params_list, &Map.put(&1, :value_fetched_at, value_fetched_at))
{:ok, %{addresses_params: merged_addresses_params, balances_params: importable_balances_params}} {:ok, %{addresses_params: merged_addresses_params, balances_params: importable_balances_params}}
{:error, _} = error ->
error
{:ok, %FetchedBalances{errors: errors}} ->
{:error, errors}
end end
end end

@ -6,6 +6,7 @@ defmodule Indexer.Block.Uncle.Fetcher do
require Logger require Logger
alias EthereumJSONRPC.Blocks
alias Explorer.Chain alias Explorer.Chain
alias Explorer.Chain.Hash alias Explorer.Chain.Hash
alias Indexer.{AddressExtraction, Block, BufferedTask} alias Indexer.{AddressExtraction, Block, BufferedTask}
@ -72,14 +73,31 @@ defmodule Indexer.Block.Uncle.Fetcher do
Logger.debug(fn -> "fetching #{length(unique_hashes)} uncle blocks" end) Logger.debug(fn -> "fetching #{length(unique_hashes)} uncle blocks" end)
case EthereumJSONRPC.fetch_blocks_by_hash(unique_hashes, json_rpc_named_arguments) do case EthereumJSONRPC.fetch_blocks_by_hash(unique_hashes, json_rpc_named_arguments) do
{:ok, {:ok, blocks} ->
%{ run_blocks(blocks, block_fetcher, unique_hashes)
blocks: blocks_params,
transactions: transactions_params, {:error, reason} ->
block_second_degree_relations: block_second_degree_relations_params Logger.error(fn ->
}} -> ["failed to fetch ", unique_hashes |> length |> to_string(), " uncle blocks: ", inspect(reason)]
addresses_params = end)
AddressExtraction.extract_addresses(%{blocks: blocks_params, transactions: transactions_params})
{:retry, unique_hashes}
end
end
defp run_blocks(%Blocks{blocks_params: []}, _, original_entries), do: {:retry, original_entries}
defp run_blocks(
%Blocks{
blocks_params: blocks_params,
transactions_params: transactions_params,
block_second_degree_relations_params: block_second_degree_relations_params,
errors: errors
},
block_fetcher,
original_entries
) do
addresses_params = AddressExtraction.extract_addresses(%{blocks: blocks_params, transactions: transactions_params})
case Block.Fetcher.import(block_fetcher, %{ case Block.Fetcher.import(block_fetcher, %{
addresses: %{params: addresses_params}, addresses: %{params: addresses_params},
@ -88,13 +106,13 @@ defmodule Indexer.Block.Uncle.Fetcher do
transactions: %{params: transactions_params, on_conflict: :nothing} transactions: %{params: transactions_params, on_conflict: :nothing}
}) do }) do
{:ok, _} -> {:ok, _} ->
:ok retry(errors, original_entries)
{:error, step, failed_value, _changes_so_far} -> {:error, step, failed_value, _changes_so_far} ->
Logger.error(fn -> Logger.error(fn ->
[ [
"failed to import ", "failed to import ",
unique_hashes |> length() |> to_string(), original_entries |> length() |> to_string(),
"uncle blocks in step ", "uncle blocks in step ",
inspect(step), inspect(step),
": ", ": ",
@ -102,15 +120,7 @@ defmodule Indexer.Block.Uncle.Fetcher do
] ]
end) end)
{:retry, unique_hashes} {:retry, original_entries}
end
{:error, reason} ->
Logger.error(fn ->
["failed to fetch ", unique_hashes |> length |> to_string(), " uncle blocks: ", inspect(reason)]
end)
{:retry, unique_hashes}
end end
end end
@ -170,4 +180,42 @@ defmodule Indexer.Block.Uncle.Fetcher do
%{uncle_hash: uncle_hash, index: index, hash: hash} %{uncle_hash: uncle_hash, index: index, hash: hash}
end) end)
end end
defp retry([], _), do: :ok
defp retry(errors, original_entries) when is_list(errors) do
retried_entries = errors_to_entries(errors)
Logger.error(fn ->
[
"failed to fetch ",
retried_entries |> length() |> to_string(),
"/",
original_entries |> length() |> to_string(),
" uncles: ",
errors_to_iodata(errors)
]
end)
end
defp errors_to_entries(errors) when is_list(errors) do
Enum.map(errors, &error_to_entry/1)
end
defp error_to_entry(%{data: %{hash: hash}}) when is_binary(hash), do: hash
defp errors_to_iodata(errors) when is_list(errors) do
errors_to_iodata(errors, [])
end
defp errors_to_iodata([], iodata), do: iodata
defp errors_to_iodata([error | errors], iodata) do
errors_to_iodata(errors, [iodata | error_to_iodata(error)])
end
defp error_to_iodata(%{code: code, message: message, data: %{hash: hash}})
when is_integer(code) and is_binary(message) and is_binary(hash) do
[hash, ": (", to_string(code), ") ", message, ?\n]
end
end end

@ -6,8 +6,9 @@ defmodule Indexer.CoinBalance.Fetcher do
require Logger require Logger
import EthereumJSONRPC, only: [integer_to_quantity: 1] import EthereumJSONRPC, only: [integer_to_quantity: 1, quantity_to_integer: 1]
alias EthereumJSONRPC.FetchedBalances
alias Explorer.Chain alias Explorer.Chain
alias Explorer.Chain.{Block, Hash} alias Explorer.Chain.{Block, Hash}
alias Indexer.BufferedTask alias Indexer.BufferedTask
@ -75,23 +76,14 @@ defmodule Indexer.CoinBalance.Fetcher do
|> Enum.map(&entry_to_params/1) |> Enum.map(&entry_to_params/1)
|> EthereumJSONRPC.fetch_balances(json_rpc_named_arguments) |> EthereumJSONRPC.fetch_balances(json_rpc_named_arguments)
|> case do |> case do
{:ok, balances_params} -> {:ok, fetched_balances} ->
value_fetched_at = DateTime.utc_now() run_fetched_balances(fetched_balances, unique_entries)
importable_balances_params = Enum.map(balances_params, &Map.put(&1, :value_fetched_at, value_fetched_at))
addresses_params = balances_params_to_address_params(importable_balances_params)
{:ok, _} =
Chain.import(%{
addresses: %{params: addresses_params, with: :balance_changeset},
address_coin_balances: %{params: importable_balances_params}
})
:ok
{:error, reason} -> {:error, reason} ->
Logger.debug(fn -> "failed to fetch #{length(unique_entries)} balances, #{inspect(reason)}" end) Logger.error(fn ->
["failed to fetch ", unique_entries |> length() |> to_string(), " balances, ", inspect(reason)]
end)
{:retry, unique_entries} {:retry, unique_entries}
end end
end end
@ -116,4 +108,71 @@ defmodule Indexer.CoinBalance.Fetcher do
%{hash: address_hash, fetched_coin_balance_block_number: block_number, fetched_coin_balance: value} %{hash: address_hash, fetched_coin_balance_block_number: block_number, fetched_coin_balance: value}
end) end)
end end
defp run_fetched_balances(%FetchedBalances{params_list: []}, original_entries), do: {:retry, original_entries}
defp run_fetched_balances(%FetchedBalances{params_list: params_list, errors: errors}, original_entries) do
value_fetched_at = DateTime.utc_now()
importable_balances_params = Enum.map(params_list, &Map.put(&1, :value_fetched_at, value_fetched_at))
addresses_params = balances_params_to_address_params(importable_balances_params)
{:ok, _} =
Chain.import(%{
addresses: %{params: addresses_params, with: :balance_changeset},
address_coin_balances: %{params: importable_balances_params}
})
retry(errors, original_entries)
end
defp retry([], _), do: :ok
defp retry(errors, original_entries) when is_list(errors) do
retried_entries = fetched_balances_errors_to_entries(errors)
Logger.error(fn ->
[
"failed to fetch ",
retried_entries |> length() |> to_string(),
"/",
original_entries |> length() |> to_string(),
" balances: ",
fetched_balance_errors_to_iodata(errors)
]
end)
{:retry, retried_entries}
end
defp fetched_balances_errors_to_entries(errors) when is_list(errors) do
Enum.map(errors, &fetched_balance_error_to_entry/1)
end
defp fetched_balance_error_to_entry(%{data: %{block_quantity: block_quantity, hash_data: hash_data}})
when is_binary(block_quantity) and is_binary(hash_data) do
{:ok, %Hash{bytes: address_hash_bytes}} = Hash.Address.cast(hash_data)
block_number = quantity_to_integer(block_quantity)
{address_hash_bytes, block_number}
end
defp fetched_balance_errors_to_iodata(errors) when is_list(errors) do
fetched_balance_errors_to_iodata(errors, [])
end
defp fetched_balance_errors_to_iodata([], iodata), do: iodata
defp fetched_balance_errors_to_iodata([error | errors], iodata) do
fetched_balance_errors_to_iodata(errors, [iodata | fetched_balance_error_to_iodata(error)])
end
defp fetched_balance_error_to_iodata(%{
code: code,
message: message,
data: %{block_quantity: block_quantity, hash_data: hash_data}
})
when is_integer(code) and is_binary(message) and is_binary(block_quantity) and is_binary(hash_data) do
[hash_data, "@", quantity_to_integer(block_quantity), ": (", to_string(code), ") ", message, ?\n]
end
end end

@ -22,6 +22,8 @@ defmodule Indexer.Block.Catchup.BoundIntervalSupervisorTest do
# See https://github.com/poanetwork/blockscout/issues/597 # See https://github.com/poanetwork/blockscout/issues/597
@tag :no_geth @tag :no_geth
test "starts fetching blocks from latest and goes down", %{json_rpc_named_arguments: json_rpc_named_arguments} do test "starts fetching blocks from latest and goes down", %{json_rpc_named_arguments: json_rpc_named_arguments} do
Logger.configure(truncate: :infinity)
if json_rpc_named_arguments[:transport] == EthereumJSONRPC.Mox do if json_rpc_named_arguments[:transport] == EthereumJSONRPC.Mox do
case Keyword.fetch!(json_rpc_named_arguments, :variant) do case Keyword.fetch!(json_rpc_named_arguments, :variant) do
EthereumJSONRPC.Parity -> EthereumJSONRPC.Parity ->
@ -63,8 +65,8 @@ defmodule Indexer.Block.Catchup.BoundIntervalSupervisorTest do
"uncles" => [] "uncles" => []
}} }}
%{method: "trace_block"}, _options -> [%{method: "trace_block"} | _] = requests, _options ->
{:ok, []} {:ok, Enum.map(requests, fn %{id: id} -> %{id: id, result: []} end)}
[%{method: "eth_getBlockByNumber", params: [_, true]} | _] = requests, _options -> [%{method: "eth_getBlockByNumber", params: [_, true]} | _] = requests, _options ->
{:ok, {:ok,
@ -476,8 +478,8 @@ defmodule Indexer.Block.Catchup.BoundIntervalSupervisorTest do
|> (fn mock -> |> (fn mock ->
case Keyword.fetch!(json_rpc_named_arguments, :variant) do case Keyword.fetch!(json_rpc_named_arguments, :variant) do
EthereumJSONRPC.Parity -> EthereumJSONRPC.Parity ->
expect(mock, :json_rpc, fn %{method: "trace_block"}, _options -> expect(mock, :json_rpc, fn [%{method: "trace_block"} | _] = requests, _options ->
{:ok, []} {:ok, Enum.map(requests, fn %{id: id} -> %{id: id, result: []} end)}
end) end)
_ -> _ ->

@ -109,8 +109,8 @@ defmodule Indexer.Block.FetcherTest do
} }
]} ]}
end) end)
|> expect(:json_rpc, fn %{id: _id, method: "trace_block", params: [^block_quantity]}, _options -> |> expect(:json_rpc, fn [%{id: id, method: "trace_block", params: [^block_quantity]}], _options ->
{:ok, []} {:ok, [%{id: id, result: []}]}
end) end)
|> expect(:json_rpc, fn [ |> expect(:json_rpc, fn [
%{ %{
@ -214,10 +214,13 @@ defmodule Indexer.Block.FetcherTest do
fn -> Fetcher.fetch_and_import_range(block_fetcher, block_number..block_number) end, fn -> Fetcher.fetch_and_import_range(block_fetcher, block_number..block_number) end,
fn result -> fn result ->
assert {:ok, assert {:ok,
{%{ %{
inserted: %{
addresses: [%Address{hash: ^address_hash}], addresses: [%Address{hash: ^address_hash}],
blocks: [%Chain.Block{hash: ^block_hash}] blocks: [%Chain.Block{hash: ^block_hash}]
}, :more}} = result },
errors: []
}} = result
wait_for_tasks(InternalTransaction.Fetcher) wait_for_tasks(InternalTransaction.Fetcher)
wait_for_tasks(CoinBalance.Fetcher) wait_for_tasks(CoinBalance.Fetcher)
@ -359,9 +362,8 @@ defmodule Indexer.Block.FetcherTest do
} }
]} ]}
end) end)
|> expect(:json_rpc, fn json, _options -> |> expect(:json_rpc, fn [%{id: id, method: "trace_block", params: [^block_quantity]}], _options ->
assert %{id: _id, method: "trace_block", params: [^block_quantity]} = json {:ok, [%{id: id, result: []}]}
{:ok, []}
end) end)
# async requests need to be grouped in one expect because the order is non-deterministic while multiple expect # async requests need to be grouped in one expect because the order is non-deterministic while multiple expect
# calls on the same name/arity are used in order # calls on the same name/arity are used in order
@ -507,15 +509,16 @@ defmodule Indexer.Block.FetcherTest do
EthereumJSONRPC.Parity -> EthereumJSONRPC.Parity ->
assert {:ok, assert {:ok,
{%{ %{
inserted: %{
addresses: [ addresses: [
%Address{ %Address{
hash: hash:
%Explorer.Chain.Hash{ %Explorer.Chain.Hash{
byte_count: 20, byte_count: 20,
bytes: bytes:
<<139, 243, 141, 71, 100, 146, 144, 100, 242, 212, 211, 165, 101, 32, 167, 106, 179, 223, <<139, 243, 141, 71, 100, 146, 144, 100, 242, 212, 211, 165, 101, 32, 167, 106, 179,
65, 91>> 223, 65, 91>>
} = first_address_hash } = first_address_hash
}, },
%Address{ %Address{
@ -557,7 +560,9 @@ defmodule Indexer.Block.FetcherTest do
57, 101, 36, 140, 57, 254, 153, 47, 255, 212, 51, 229>> 57, 101, 36, 140, 57, 254, 153, 47, 255, 212, 51, 229>>
} }
] ]
}, :more}} = Fetcher.fetch_and_import_range(block_fetcher, block_number..block_number) },
errors: []
}} = Fetcher.fetch_and_import_range(block_fetcher, block_number..block_number)
wait_for_tasks(InternalTransaction.Fetcher) wait_for_tasks(InternalTransaction.Fetcher)
wait_for_tasks(CoinBalance.Fetcher) wait_for_tasks(CoinBalance.Fetcher)

@ -198,8 +198,9 @@ defmodule Indexer.Block.Realtime.FetcherTest do
} }
]} ]}
end) end)
|> expect(:json_rpc, 2, fn %{method: "trace_block"}, _options -> |> expect(:json_rpc, fn [%{method: "trace_block"}, %{method: "trace_block"}] = requests, _options ->
{:ok, []} responses = Enum.map(requests, fn %{id: id} -> %{id: id, result: []} end)
{:ok, responses}
end) end)
|> expect(:json_rpc, fn [ |> expect(:json_rpc, fn [
%{ %{
@ -369,7 +370,8 @@ defmodule Indexer.Block.Realtime.FetcherTest do
end end
assert {:ok, assert {:ok,
{%{ %{
inserted: %{
addresses: [ addresses: [
%Address{hash: first_address_hash, fetched_coin_balance_block_number: 3_946_079}, %Address{hash: first_address_hash, fetched_coin_balance_block_number: 3_946_079},
%Address{hash: second_address_hash, fetched_coin_balance_block_number: 3_946_079}, %Address{hash: second_address_hash, fetched_coin_balance_block_number: 3_946_079},
@ -409,7 +411,9 @@ defmodule Indexer.Block.Realtime.FetcherTest do
%{index: 5, transaction_hash: transaction_hash} %{index: 5, transaction_hash: transaction_hash}
], ],
transactions: [transaction_hash] transactions: [transaction_hash]
}, :more}} = Indexer.Block.Fetcher.fetch_and_import_range(block_fetcher, 3_946_079..3_946_080) },
errors: []
}} = Indexer.Block.Fetcher.fetch_and_import_range(block_fetcher, 3_946_079..3_946_080)
end end
end end
end end

@ -51,12 +51,13 @@ defmodule Indexer.Block.Uncle.FetcherTest do
uncle_uncle_hash_data = to_string(block_hash()) uncle_uncle_hash_data = to_string(block_hash())
EthereumJSONRPC.Mox EthereumJSONRPC.Mox
|> expect(:json_rpc, fn [%{method: "eth_getBlockByHash", params: [^uncle_hash_data, true]}], _ -> |> expect(:json_rpc, fn [%{id: id, method: "eth_getBlockByHash", params: [^uncle_hash_data, true]}], _ ->
number_quantity = "0x0" number_quantity = "0x0"
{:ok, {:ok,
[ [
%{ %{
id: id,
result: %{ result: %{
"author" => "0xe2ac1c6843a33f81ae4935e5ef1277a392990381", "author" => "0xe2ac1c6843a33f81ae4935e5ef1277a392990381",
"difficulty" => "0xfffffffffffffffffffffffffffffffe", "difficulty" => "0xfffffffffffffffffffffffffffffffe",

@ -289,6 +289,84 @@ defmodule Indexer.CoinBalance.FetcherTest do
end end
end end
describe "run/2 partial batch" do
setup do
%{
json_rpc_named_arguments: [
transport: EthereumJSONRPC.Mox,
transport_options: [],
# Which one does not matter, so pick one
variant: EthereumJSONRPC.Parity
]
}
end
test "retries all if no successes", %{json_rpc_named_arguments: json_rpc_named_arguments} do
%Hash{bytes: address_hash_bytes} = address_hash()
entries = [{address_hash_bytes, block_number()}]
expect(EthereumJSONRPC.Mox, :json_rpc, fn [%{id: id, method: "eth_getBalance", params: [_, _]}], _ ->
{:ok, [%{id: id, error: %{code: 1, message: "Bad"}}]}
end)
assert {:retry, ^entries} = CoinBalance.Fetcher.run(entries, json_rpc_named_arguments)
end
test "retries none if all imported and no fetch errors", %{json_rpc_named_arguments: json_rpc_named_arguments} do
%Hash{bytes: address_hash_bytes} = address_hash()
entries = [{address_hash_bytes, block_number()}]
expect(EthereumJSONRPC.Mox, :json_rpc, fn [%{id: id, method: "eth_getBalance", params: [_, _]}], _ ->
{:ok, [%{id: id, result: "0x1"}]}
end)
assert :ok = CoinBalance.Fetcher.run(entries, json_rpc_named_arguments)
end
test "retries retries fetch errors if all imported", %{json_rpc_named_arguments: json_rpc_named_arguments} do
%Hash{bytes: address_hash_bytes} = address_hash()
bad_block_number = block_number()
good_block_number = block_number()
expect(EthereumJSONRPC.Mox, :json_rpc, fn [
%{
id: first_id,
method: "eth_getBalance",
params: [_, first_block_quantity]
},
%{
id: second_id,
method: "eth_getBalance",
params: [_, _]
}
],
_ ->
responses =
case quantity_to_integer(first_block_quantity) do
^good_block_number ->
[
%{id: first_id, result: "0x1"},
%{id: second_id, error: %{code: 2, message: "Bad"}}
]
^bad_block_number ->
[
%{id: first_id, error: %{code: 1, message: "Bad"}},
%{id: second_id, result: "0x2"}
]
end
{:ok, responses}
end)
assert {:retry, [{^address_hash_bytes, ^bad_block_number}]} =
CoinBalance.Fetcher.run(
[{address_hash_bytes, good_block_number}, {address_hash_bytes, bad_block_number}],
json_rpc_named_arguments
)
end
end
defp wait(producer) do defp wait(producer) do
producer.() producer.()
rescue rescue

Loading…
Cancel
Save