Partial retries for CoinBalance.Fetcher

pull/1135/head
Luke Imhoff 6 years ago
parent 6a9e4a6895
commit 3efa2d9996
  1. 75
      apps/ethereum_jsonrpc/lib/ethereum_jsonrpc.ex
  2. 62
      apps/ethereum_jsonrpc/lib/ethereum_jsonrpc/fetched_balance.ex
  3. 49
      apps/ethereum_jsonrpc/lib/ethereum_jsonrpc/fetched_balances.ex
  4. 50
      apps/ethereum_jsonrpc/test/ethereum_jsonrpc_test.exs
  5. 37
      apps/indexer/lib/indexer/block/realtime/fetcher.ex
  6. 91
      apps/indexer/lib/indexer/coin_balance/fetcher.ex
  7. 78
      apps/indexer/test/indexer/coin_balance/fetcher_test.exs

@ -25,10 +25,9 @@ defmodule EthereumJSONRPC do
documentation for `EthereumJSONRPC.RequestCoordinator`. documentation for `EthereumJSONRPC.RequestCoordinator`.
""" """
alias Explorer.Chain.Block
alias EthereumJSONRPC.{ alias EthereumJSONRPC.{
Blocks, Blocks,
FetchedBalances,
Receipts, Receipts,
RequestCoordinator, RequestCoordinator,
Subscription, Subscription,
@ -190,25 +189,16 @@ defmodule EthereumJSONRPC do
@spec fetch_balances( @spec fetch_balances(
[%{required(:block_quantity) => quantity, required(:hash_data) => data()}], [%{required(:block_quantity) => quantity, required(:hash_data) => data()}],
json_rpc_named_arguments json_rpc_named_arguments
) :: ) :: {:ok, FetchedBalances.t()} | {:error, reason :: term}
{:ok,
[
%{
required(:address_hash) => quantity,
required(:block_number) => Block.block_number(),
required(:value) => non_neg_integer()
}
]}
| {:error, reason :: term}
def fetch_balances(params_list, json_rpc_named_arguments) def fetch_balances(params_list, json_rpc_named_arguments)
when is_list(params_list) and is_list(json_rpc_named_arguments) do when is_list(params_list) and is_list(json_rpc_named_arguments) do
id_to_params = id_to_params(params_list) id_to_params = id_to_params(params_list)
with {:ok, responses} <- with {:ok, responses} <-
id_to_params id_to_params
|> get_balance_requests() |> FetchedBalances.requests()
|> json_rpc(json_rpc_named_arguments) do |> json_rpc(json_rpc_named_arguments) do
get_balance_responses_to_balances_params(responses, id_to_params) {:ok, FetchedBalances.from_responses(responses, id_to_params)}
end end
end end
@ -312,6 +302,7 @@ defmodule EthereumJSONRPC do
@doc """ @doc """
Assigns an id to each set of params in `params_list` for batch request-response correlation Assigns an id to each set of params in `params_list` for batch request-response correlation
""" """
@spec id_to_params([params]) :: %{id => params} when id: non_neg_integer(), params: map()
def id_to_params(params_list) do def id_to_params(params_list) do
params_list params_list
|> Stream.with_index() |> Stream.with_index()
@ -420,62 +411,6 @@ defmodule EthereumJSONRPC do
|> Timex.from_unix() |> Timex.from_unix()
end end
defp get_balance_requests(id_to_params) when is_map(id_to_params) do
Enum.map(id_to_params, fn {id, %{block_quantity: block_quantity, hash_data: hash_data}} ->
get_balance_request(%{id: id, block_quantity: block_quantity, hash_data: hash_data})
end)
end
defp get_balance_request(%{id: id, block_quantity: block_quantity, hash_data: hash_data}) do
request(%{id: id, method: "eth_getBalance", params: [hash_data, block_quantity]})
end
defp get_balance_responses_to_balances_params(responses, id_to_params)
when is_list(responses) and is_map(id_to_params) do
{status, reversed} =
responses
|> Enum.map(&get_balance_responses_to_balance_params(&1, id_to_params))
|> Enum.reduce(
{:ok, []},
fn
{:ok, address_params}, {:ok, address_params_list} ->
{:ok, [address_params | address_params_list]}
{:ok, _}, {:error, _} = acc_error ->
acc_error
{:error, reason}, {:ok, _} ->
{:error, [reason]}
{:error, reason}, {:error, acc_reason} ->
{:error, [reason | acc_reason]}
end
)
{status, Enum.reverse(reversed)}
end
defp get_balance_responses_to_balance_params(%{id: id, result: fetched_balance_quantity}, id_to_params)
when is_map(id_to_params) do
%{block_quantity: block_quantity, hash_data: hash_data} = Map.fetch!(id_to_params, id)
{:ok,
%{
value: quantity_to_integer(fetched_balance_quantity),
block_number: quantity_to_integer(block_quantity),
address_hash: hash_data
}}
end
defp get_balance_responses_to_balance_params(%{id: id, error: error}, id_to_params)
when is_map(id_to_params) do
%{block_quantity: block_quantity, hash_data: hash_data} = Map.fetch!(id_to_params, id)
annotated_error = Map.put(error, :data, %{"blockNumber" => block_quantity, "hash" => hash_data})
{:error, annotated_error}
end
defp get_block_by_hash_requests(id_to_params) do defp get_block_by_hash_requests(id_to_params) do
Enum.map(id_to_params, fn {id, %{hash: hash}} -> Enum.map(id_to_params, fn {id, %{hash: hash}} ->
get_block_by_hash_request(%{id: id, hash: hash, transactions: :full}) get_block_by_hash_request(%{id: id, hash: hash, transactions: :full})

@ -0,0 +1,62 @@
defmodule EthereumJSONRPC.FetchedBalance do
@moduledoc """
A single balance fetched from `eth_getBalance`.
"""
import EthereumJSONRPC, only: [quantity_to_integer: 1]
@type params :: %{address_hash: EthereumJSONRPC.hash(), block_number: non_neg_integer(), value: non_neg_integer()}
@type error :: %{code: integer(), message: String.t(), data: %{block_quantity: String.t(), hash: String.t()}}
@doc """
Converts `response` to balance params or annotated error.
"""
@spec from_response(%{id: id, result: String.t()}, %{id => %{block_quantity: block_quantity, hash_data: hash_data}}) ::
{:ok, params()}
when id: non_neg_integer(), block_quantity: String.t(), hash_data: String.t()
def from_response(%{id: id, result: fetched_balance_quantity}, id_to_params) when is_map(id_to_params) do
%{block_quantity: block_quantity, hash_data: hash_data} = Map.fetch!(id_to_params, id)
{:ok,
%{
address_hash: hash_data,
block_number: quantity_to_integer(block_quantity),
value: quantity_to_integer(fetched_balance_quantity)
}}
end
@spec from_response(
%{
id: id,
error: %{code: code, message: message}
},
%{id => %{block_quantity: block_quantity, hash_data: hash_data}}
) :: {:error, %{code: code, message: message, data: %{block_quantity: block_quantity, hash: hash_data}}}
when id: non_neg_integer(),
code: integer(),
message: String.t(),
block_quantity: String.t(),
hash_data: String.t()
def from_response(%{id: id, error: %{code: code, message: message} = error}, id_to_params)
when is_integer(code) and is_binary(message) and is_map(id_to_params) do
%{block_quantity: block_quantity, hash_data: hash_data} = Map.fetch!(id_to_params, id)
annotated_error = Map.put(error, :data, %{block_quantity: block_quantity, hash_data: hash_data})
{:error, annotated_error}
end
@spec request(%{id: id, block_quantity: block_quantity, hash_data: hash_data}) :: %{
jsonrpc: String.t(),
id: id,
method: String.t(),
params: [hash_data | block_quantity]
}
when id: EthereumJSONRPC.request_id(),
block_quantity: EthereumJSONRPC.quantity(),
hash_data: EthereumJSONRPC.hash()
def request(%{id: id, block_quantity: block_quantity, hash_data: hash_data}) do
EthereumJSONRPC.request(%{id: id, method: "eth_getBalance", params: [hash_data, block_quantity]})
end
end

@ -0,0 +1,49 @@
defmodule EthereumJSONRPC.FetchedBalances do
@moduledoc """
Balance params and errors from a batch request from `eth_getBalance`.
"""
alias EthereumJSONRPC.FetchedBalance
defstruct errors: [],
params_list: []
@typedoc """
* `params_list` - all the balance params from requests that succeeded in the batch.
* `errors` - all the error from requests that failed in the batch.
"""
@type t :: %__MODULE__{params_list: [FetchedBalance.params()], errors: [FetchedBalance.error()]}
@doc """
Converts `responses` to `t/0`.
"""
def from_responses(responses, id_to_params) do
responses
|> Enum.map(&FetchedBalance.from_response(&1, id_to_params))
|> Enum.reduce(
%__MODULE__{},
fn
{:ok, params}, %__MODULE__{params_list: params_list} = acc ->
%__MODULE__{acc | params_list: [params | params_list]}
{:error, reason}, %__MODULE__{errors: errors} = acc ->
%__MODULE__{acc | errors: [reason | errors]}
end
)
end
@doc """
`eth_getBalance` requests for `id_to_params`.
"""
@spec requests(%{id => %{block_quantity: block_quantity, hash_data: hash_data}}) :: [
%{jsonrpc: String.t(), id: id, method: String.t(), params: [hash_data | block_quantity]}
]
when id: EthereumJSONRPC.request_id(),
block_quantity: EthereumJSONRPC.quantity(),
hash_data: EthereumJSONRPC.hash()
def requests(id_to_params) when is_map(id_to_params) do
Enum.map(id_to_params, fn {id, %{block_quantity: block_quantity, hash_data: hash_data}} ->
FetchedBalance.request(%{id: id, block_quantity: block_quantity, hash_data: hash_data})
end)
end
end

@ -4,7 +4,7 @@ defmodule EthereumJSONRPCTest do
import EthereumJSONRPC.Case import EthereumJSONRPC.Case
import Mox import Mox
alias EthereumJSONRPC.Subscription alias EthereumJSONRPC.{FetchedBalances, Subscription}
alias EthereumJSONRPC.WebSocket.WebSocketClient alias EthereumJSONRPC.WebSocket.WebSocketClient
setup :verify_on_exit! setup :verify_on_exit!
@ -37,16 +37,18 @@ defmodule EthereumJSONRPCTest do
json_rpc_named_arguments json_rpc_named_arguments
) == ) ==
{:ok, {:ok,
[ %FetchedBalances{
%{ params_list: [
address_hash: hash, %{
block_number: 1, address_hash: hash,
value: expected_fetched_balance block_number: 1,
} value: expected_fetched_balance
]} }
]
}}
end end
test "with all invalid hash_data returns {:error, reasons}", %{json_rpc_named_arguments: json_rpc_named_arguments} do test "with all invalid hash_data returns errors", %{json_rpc_named_arguments: json_rpc_named_arguments} do
variant = Keyword.fetch!(json_rpc_named_arguments, :variant) variant = Keyword.fetch!(json_rpc_named_arguments, :variant)
expected_message = expected_message =
@ -76,18 +78,21 @@ defmodule EthereumJSONRPCTest do
end) end)
end end
assert {:error, assert {:ok,
[ %FetchedBalances{
%{ errors: [
code: -32602, %{
data: %{"blockNumber" => "0x1", "hash" => "0x0"}, code: -32602,
message: ^expected_message data: %{hash_data: "0x0", block_quantity: "0x1"},
} message: ^expected_message
]} = }
],
params_list: []
}} =
EthereumJSONRPC.fetch_balances([%{block_quantity: "0x1", hash_data: "0x0"}], json_rpc_named_arguments) EthereumJSONRPC.fetch_balances([%{block_quantity: "0x1", hash_data: "0x0"}], json_rpc_named_arguments)
end end
test "with a mix of valid and invalid hash_data returns {:error, reasons}", %{ test "with a mix of valid and invalid hash_data returns both", %{
json_rpc_named_arguments: json_rpc_named_arguments json_rpc_named_arguments: json_rpc_named_arguments
} do } do
if json_rpc_named_arguments[:transport] == EthereumJSONRPC.Mox do if json_rpc_named_arguments[:transport] == EthereumJSONRPC.Mox do
@ -128,7 +133,7 @@ defmodule EthereumJSONRPCTest do
end) end)
end end
assert {:error, reasons} = assert {:ok, %FetchedBalances{params_list: params_list, errors: errors}} =
EthereumJSONRPC.fetch_balances( EthereumJSONRPC.fetch_balances(
[ [
# start with :ok # start with :ok
@ -160,8 +165,11 @@ defmodule EthereumJSONRPCTest do
json_rpc_named_arguments json_rpc_named_arguments
) )
assert is_list(reasons) assert is_list(params_list)
assert length(reasons) > 1 assert length(params_list) > 1
assert is_list(errors)
assert length(errors) > 1
end end
end end

@ -11,7 +11,7 @@ defmodule Indexer.Block.Realtime.Fetcher do
import Indexer.Block.Fetcher, only: [async_import_tokens: 1, async_import_uncles: 1, fetch_and_import_range: 2] import Indexer.Block.Fetcher, only: [async_import_tokens: 1, async_import_uncles: 1, fetch_and_import_range: 2]
alias Ecto.Changeset alias Ecto.Changeset
alias EthereumJSONRPC.Subscription alias EthereumJSONRPC.{FetchedBalances, Subscription}
alias Explorer.Chain alias Explorer.Chain
alias Indexer.{AddressExtraction, Block, TokenBalances} alias Indexer.{AddressExtraction, Block, TokenBalances}
alias Indexer.Block.Realtime.TaskSupervisor alias Indexer.Block.Realtime.TaskSupervisor
@ -247,20 +247,27 @@ defmodule Indexer.Block.Realtime.Fetcher do
%Block.Fetcher{json_rpc_named_arguments: json_rpc_named_arguments}, %Block.Fetcher{json_rpc_named_arguments: json_rpc_named_arguments},
%{addresses_params: addresses_params} = options %{addresses_params: addresses_params} = options
) do ) do
with {:ok, fetched_balances_params} <- case options
options |> fetch_balances_params_list()
|> fetch_balances_params_list() |> EthereumJSONRPC.fetch_balances(json_rpc_named_arguments) do
|> EthereumJSONRPC.fetch_balances(json_rpc_named_arguments) do {:ok, %FetchedBalances{params_list: params_list, errors: []}} ->
merged_addresses_params = merged_addresses_params =
%{address_coin_balances: fetched_balances_params} %{address_coin_balances: params_list}
|> AddressExtraction.extract_addresses() |> AddressExtraction.extract_addresses()
|> Kernel.++(addresses_params) |> Kernel.++(addresses_params)
|> AddressExtraction.merge_addresses() |> AddressExtraction.merge_addresses()
value_fetched_at = DateTime.utc_now() value_fetched_at = DateTime.utc_now()
importable_balances_params = Enum.map(fetched_balances_params, &Map.put(&1, :value_fetched_at, value_fetched_at))
importable_balances_params = Enum.map(params_list, &Map.put(&1, :value_fetched_at, value_fetched_at))
{:ok, %{addresses_params: merged_addresses_params, balances_params: importable_balances_params}}
{:ok, %{addresses_params: merged_addresses_params, balances_params: importable_balances_params}}
{:error, _} = error ->
error
{:ok, %FetchedBalances{errors: errors}} ->
{:error, errors}
end end
end end

@ -6,8 +6,9 @@ defmodule Indexer.CoinBalance.Fetcher do
require Logger require Logger
import EthereumJSONRPC, only: [integer_to_quantity: 1] import EthereumJSONRPC, only: [integer_to_quantity: 1, quantity_to_integer: 1]
alias EthereumJSONRPC.FetchedBalances
alias Explorer.Chain alias Explorer.Chain
alias Explorer.Chain.{Block, Hash} alias Explorer.Chain.{Block, Hash}
alias Indexer.BufferedTask alias Indexer.BufferedTask
@ -75,23 +76,14 @@ defmodule Indexer.CoinBalance.Fetcher do
|> Enum.map(&entry_to_params/1) |> Enum.map(&entry_to_params/1)
|> EthereumJSONRPC.fetch_balances(json_rpc_named_arguments) |> EthereumJSONRPC.fetch_balances(json_rpc_named_arguments)
|> case do |> case do
{:ok, balances_params} -> {:ok, fetched_balances} ->
value_fetched_at = DateTime.utc_now() run_fetched_balances(fetched_balances, unique_entries)
importable_balances_params = Enum.map(balances_params, &Map.put(&1, :value_fetched_at, value_fetched_at))
addresses_params = balances_params_to_address_params(importable_balances_params)
{:ok, _} =
Chain.import(%{
addresses: %{params: addresses_params, with: :balance_changeset},
address_coin_balances: %{params: importable_balances_params}
})
:ok
{:error, reason} -> {:error, reason} ->
Logger.debug(fn -> "failed to fetch #{length(unique_entries)} balances, #{inspect(reason)}" end) Logger.error(fn ->
["failed to fetch ", unique_entries |> length() |> to_string(), " balances, ", inspect(reason)]
end)
{:retry, unique_entries} {:retry, unique_entries}
end end
end end
@ -116,4 +108,71 @@ defmodule Indexer.CoinBalance.Fetcher do
%{hash: address_hash, fetched_coin_balance_block_number: block_number, fetched_coin_balance: value} %{hash: address_hash, fetched_coin_balance_block_number: block_number, fetched_coin_balance: value}
end) end)
end end
defp run_fetched_balances(%FetchedBalances{params_list: []}, original_entries), do: {:retry, original_entries}
defp run_fetched_balances(%FetchedBalances{params_list: params_list, errors: errors}, original_entries) do
value_fetched_at = DateTime.utc_now()
importable_balances_params = Enum.map(params_list, &Map.put(&1, :value_fetched_at, value_fetched_at))
addresses_params = balances_params_to_address_params(importable_balances_params)
{:ok, _} =
Chain.import(%{
addresses: %{params: addresses_params, with: :balance_changeset},
address_coin_balances: %{params: importable_balances_params}
})
retry(errors, original_entries)
end
defp retry([], _), do: :ok
defp retry(errors, original_entries) when is_list(errors) do
retried_entries = fetched_balances_errors_to_entries(errors)
Logger.error(fn ->
[
"failed to fetch ",
retried_entries |> length() |> to_string(),
"/",
original_entries |> length() |> to_string(),
" balances: ",
fetched_balance_errors_to_iodata(errors)
]
end)
{:retry, retried_entries}
end
defp fetched_balances_errors_to_entries(errors) when is_list(errors) do
Enum.map(errors, &fetched_balance_error_to_entry/1)
end
defp fetched_balance_error_to_entry(%{data: %{block_quantity: block_quantity, hash_data: hash_data}})
when is_binary(block_quantity) and is_binary(hash_data) do
{:ok, %Hash{bytes: address_hash_bytes}} = Hash.Address.cast(hash_data)
block_number = quantity_to_integer(block_quantity)
{address_hash_bytes, block_number}
end
defp fetched_balance_errors_to_iodata(errors) when is_list(errors) do
fetched_balance_errors_to_iodata(errors, [])
end
defp fetched_balance_errors_to_iodata([], iodata), do: iodata
defp fetched_balance_errors_to_iodata([error | errors], iodata) do
fetched_balance_errors_to_iodata(errors, [iodata | fetched_balance_error_to_iodata(error)])
end
defp fetched_balance_error_to_iodata(%{
code: code,
message: message,
data: %{block_quantity: block_quantity, hash_data: hash_data}
})
when is_integer(code) and is_binary(message) and is_binary(block_quantity) and is_binary(hash_data) do
[hash_data, "@", quantity_to_integer(block_quantity), ": (", to_string(code), ") ", message]
end
end end

@ -289,6 +289,84 @@ defmodule Indexer.CoinBalance.FetcherTest do
end end
end end
describe "run/2 partial batch" do
setup do
%{
json_rpc_named_arguments: [
transport: EthereumJSONRPC.Mox,
transport_options: [],
# Which one does not matter, so pick one
variant: EthereumJSONRPC.Parity
]
}
end
test "retries all if no successes", %{json_rpc_named_arguments: json_rpc_named_arguments} do
%Hash{bytes: address_hash_bytes} = address_hash()
entries = [{address_hash_bytes, block_number()}]
expect(EthereumJSONRPC.Mox, :json_rpc, fn [%{id: id, method: "eth_getBalance", params: [_, _]}], _ ->
{:ok, [%{id: id, error: %{code: 1, message: "Bad"}}]}
end)
assert {:retry, ^entries} = CoinBalance.Fetcher.run(entries, json_rpc_named_arguments)
end
test "retries none if all imported and no fetch errors", %{json_rpc_named_arguments: json_rpc_named_arguments} do
%Hash{bytes: address_hash_bytes} = address_hash()
entries = [{address_hash_bytes, block_number()}]
expect(EthereumJSONRPC.Mox, :json_rpc, fn [%{id: id, method: "eth_getBalance", params: [_, _]}], _ ->
{:ok, [%{id: id, result: "0x1"}]}
end)
assert :ok = CoinBalance.Fetcher.run(entries, json_rpc_named_arguments)
end
test "retries retries fetch errors if all imported", %{json_rpc_named_arguments: json_rpc_named_arguments} do
%Hash{bytes: address_hash_bytes} = address_hash()
bad_block_number = block_number()
good_block_number = block_number()
expect(EthereumJSONRPC.Mox, :json_rpc, fn [
%{
id: first_id,
method: "eth_getBalance",
params: [_, first_block_quantity]
},
%{
id: second_id,
method: "eth_getBalance",
params: [_, _]
}
],
_ ->
responses =
case quantity_to_integer(first_block_quantity) do
^good_block_number ->
[
%{id: first_id, result: "0x1"},
%{id: second_id, error: %{code: 2, message: "Bad"}}
]
^bad_block_number ->
[
%{id: first_id, error: %{code: 1, message: "Bad"}},
%{id: second_id, result: "0x2"}
]
end
{:ok, responses}
end)
assert {:retry, [{^address_hash_bytes, ^bad_block_number}]} =
CoinBalance.Fetcher.run(
[{address_hash_bytes, good_block_number}, {address_hash_bytes, bad_block_number}],
json_rpc_named_arguments
)
end
end
defp wait(producer) do defp wait(producer) do
producer.() producer.()
rescue rescue

Loading…
Cancel
Save