From 3efa2d9996227acc9cfdb9422321c2b2812f45fb Mon Sep 17 00:00:00 2001 From: Luke Imhoff Date: Wed, 14 Nov 2018 14:07:14 -0600 Subject: [PATCH 1/5] Partial retries for CoinBalance.Fetcher --- apps/ethereum_jsonrpc/lib/ethereum_jsonrpc.ex | 75 +-------------- .../lib/ethereum_jsonrpc/fetched_balance.ex | 62 +++++++++++++ .../lib/ethereum_jsonrpc/fetched_balances.ex | 49 ++++++++++ .../test/ethereum_jsonrpc_test.exs | 50 +++++----- .../lib/indexer/block/realtime/fetcher.ex | 37 +++++--- .../lib/indexer/coin_balance/fetcher.ex | 91 +++++++++++++++---- .../indexer/coin_balance/fetcher_test.exs | 78 ++++++++++++++++ 7 files changed, 320 insertions(+), 122 deletions(-) create mode 100644 apps/ethereum_jsonrpc/lib/ethereum_jsonrpc/fetched_balance.ex create mode 100644 apps/ethereum_jsonrpc/lib/ethereum_jsonrpc/fetched_balances.ex diff --git a/apps/ethereum_jsonrpc/lib/ethereum_jsonrpc.ex b/apps/ethereum_jsonrpc/lib/ethereum_jsonrpc.ex index bd6cbefc0d..c1e6f88354 100644 --- a/apps/ethereum_jsonrpc/lib/ethereum_jsonrpc.ex +++ b/apps/ethereum_jsonrpc/lib/ethereum_jsonrpc.ex @@ -25,10 +25,9 @@ defmodule EthereumJSONRPC do documentation for `EthereumJSONRPC.RequestCoordinator`. """ - alias Explorer.Chain.Block - alias EthereumJSONRPC.{ Blocks, + FetchedBalances, Receipts, RequestCoordinator, Subscription, @@ -190,25 +189,16 @@ defmodule EthereumJSONRPC do @spec fetch_balances( [%{required(:block_quantity) => quantity, required(:hash_data) => data()}], json_rpc_named_arguments - ) :: - {:ok, - [ - %{ - required(:address_hash) => quantity, - required(:block_number) => Block.block_number(), - required(:value) => non_neg_integer() - } - ]} - | {:error, reason :: term} + ) :: {:ok, FetchedBalances.t()} | {:error, reason :: term} def fetch_balances(params_list, json_rpc_named_arguments) when is_list(params_list) and is_list(json_rpc_named_arguments) do id_to_params = id_to_params(params_list) with {:ok, responses} <- id_to_params - |> get_balance_requests() + |> FetchedBalances.requests() |> json_rpc(json_rpc_named_arguments) do - get_balance_responses_to_balances_params(responses, id_to_params) + {:ok, FetchedBalances.from_responses(responses, id_to_params)} end end @@ -312,6 +302,7 @@ defmodule EthereumJSONRPC do @doc """ Assigns an id to each set of params in `params_list` for batch request-response correlation """ + @spec id_to_params([params]) :: %{id => params} when id: non_neg_integer(), params: map() def id_to_params(params_list) do params_list |> Stream.with_index() @@ -420,62 +411,6 @@ defmodule EthereumJSONRPC do |> Timex.from_unix() end - defp get_balance_requests(id_to_params) when is_map(id_to_params) do - Enum.map(id_to_params, fn {id, %{block_quantity: block_quantity, hash_data: hash_data}} -> - get_balance_request(%{id: id, block_quantity: block_quantity, hash_data: hash_data}) - end) - end - - defp get_balance_request(%{id: id, block_quantity: block_quantity, hash_data: hash_data}) do - request(%{id: id, method: "eth_getBalance", params: [hash_data, block_quantity]}) - end - - defp get_balance_responses_to_balances_params(responses, id_to_params) - when is_list(responses) and is_map(id_to_params) do - {status, reversed} = - responses - |> Enum.map(&get_balance_responses_to_balance_params(&1, id_to_params)) - |> Enum.reduce( - {:ok, []}, - fn - {:ok, address_params}, {:ok, address_params_list} -> - {:ok, [address_params | address_params_list]} - - {:ok, _}, {:error, _} = acc_error -> - acc_error - - {:error, reason}, {:ok, _} -> - {:error, [reason]} - - {:error, reason}, {:error, acc_reason} -> - {:error, [reason | acc_reason]} - end - ) - - {status, Enum.reverse(reversed)} - end - - defp get_balance_responses_to_balance_params(%{id: id, result: fetched_balance_quantity}, id_to_params) - when is_map(id_to_params) do - %{block_quantity: block_quantity, hash_data: hash_data} = Map.fetch!(id_to_params, id) - - {:ok, - %{ - value: quantity_to_integer(fetched_balance_quantity), - block_number: quantity_to_integer(block_quantity), - address_hash: hash_data - }} - end - - defp get_balance_responses_to_balance_params(%{id: id, error: error}, id_to_params) - when is_map(id_to_params) do - %{block_quantity: block_quantity, hash_data: hash_data} = Map.fetch!(id_to_params, id) - - annotated_error = Map.put(error, :data, %{"blockNumber" => block_quantity, "hash" => hash_data}) - - {:error, annotated_error} - end - defp get_block_by_hash_requests(id_to_params) do Enum.map(id_to_params, fn {id, %{hash: hash}} -> get_block_by_hash_request(%{id: id, hash: hash, transactions: :full}) diff --git a/apps/ethereum_jsonrpc/lib/ethereum_jsonrpc/fetched_balance.ex b/apps/ethereum_jsonrpc/lib/ethereum_jsonrpc/fetched_balance.ex new file mode 100644 index 0000000000..682614feec --- /dev/null +++ b/apps/ethereum_jsonrpc/lib/ethereum_jsonrpc/fetched_balance.ex @@ -0,0 +1,62 @@ +defmodule EthereumJSONRPC.FetchedBalance do + @moduledoc """ + A single balance fetched from `eth_getBalance`. + """ + + import EthereumJSONRPC, only: [quantity_to_integer: 1] + + @type params :: %{address_hash: EthereumJSONRPC.hash(), block_number: non_neg_integer(), value: non_neg_integer()} + @type error :: %{code: integer(), message: String.t(), data: %{block_quantity: String.t(), hash: String.t()}} + + @doc """ + Converts `response` to balance params or annotated error. + """ + + @spec from_response(%{id: id, result: String.t()}, %{id => %{block_quantity: block_quantity, hash_data: hash_data}}) :: + {:ok, params()} + when id: non_neg_integer(), block_quantity: String.t(), hash_data: String.t() + def from_response(%{id: id, result: fetched_balance_quantity}, id_to_params) when is_map(id_to_params) do + %{block_quantity: block_quantity, hash_data: hash_data} = Map.fetch!(id_to_params, id) + + {:ok, + %{ + address_hash: hash_data, + block_number: quantity_to_integer(block_quantity), + value: quantity_to_integer(fetched_balance_quantity) + }} + end + + @spec from_response( + %{ + id: id, + error: %{code: code, message: message} + }, + %{id => %{block_quantity: block_quantity, hash_data: hash_data}} + ) :: {:error, %{code: code, message: message, data: %{block_quantity: block_quantity, hash: hash_data}}} + when id: non_neg_integer(), + code: integer(), + message: String.t(), + block_quantity: String.t(), + hash_data: String.t() + def from_response(%{id: id, error: %{code: code, message: message} = error}, id_to_params) + when is_integer(code) and is_binary(message) and is_map(id_to_params) do + %{block_quantity: block_quantity, hash_data: hash_data} = Map.fetch!(id_to_params, id) + + annotated_error = Map.put(error, :data, %{block_quantity: block_quantity, hash_data: hash_data}) + + {:error, annotated_error} + end + + @spec request(%{id: id, block_quantity: block_quantity, hash_data: hash_data}) :: %{ + jsonrpc: String.t(), + id: id, + method: String.t(), + params: [hash_data | block_quantity] + } + when id: EthereumJSONRPC.request_id(), + block_quantity: EthereumJSONRPC.quantity(), + hash_data: EthereumJSONRPC.hash() + def request(%{id: id, block_quantity: block_quantity, hash_data: hash_data}) do + EthereumJSONRPC.request(%{id: id, method: "eth_getBalance", params: [hash_data, block_quantity]}) + end +end diff --git a/apps/ethereum_jsonrpc/lib/ethereum_jsonrpc/fetched_balances.ex b/apps/ethereum_jsonrpc/lib/ethereum_jsonrpc/fetched_balances.ex new file mode 100644 index 0000000000..4be25fd441 --- /dev/null +++ b/apps/ethereum_jsonrpc/lib/ethereum_jsonrpc/fetched_balances.ex @@ -0,0 +1,49 @@ +defmodule EthereumJSONRPC.FetchedBalances do + @moduledoc """ + Balance params and errors from a batch request from `eth_getBalance`. + """ + + alias EthereumJSONRPC.FetchedBalance + + defstruct errors: [], + params_list: [] + + @typedoc """ + * `params_list` - all the balance params from requests that succeeded in the batch. + * `errors` - all the error from requests that failed in the batch. + """ + @type t :: %__MODULE__{params_list: [FetchedBalance.params()], errors: [FetchedBalance.error()]} + + @doc """ + Converts `responses` to `t/0`. + """ + def from_responses(responses, id_to_params) do + responses + |> Enum.map(&FetchedBalance.from_response(&1, id_to_params)) + |> Enum.reduce( + %__MODULE__{}, + fn + {:ok, params}, %__MODULE__{params_list: params_list} = acc -> + %__MODULE__{acc | params_list: [params | params_list]} + + {:error, reason}, %__MODULE__{errors: errors} = acc -> + %__MODULE__{acc | errors: [reason | errors]} + end + ) + end + + @doc """ + `eth_getBalance` requests for `id_to_params`. + """ + @spec requests(%{id => %{block_quantity: block_quantity, hash_data: hash_data}}) :: [ + %{jsonrpc: String.t(), id: id, method: String.t(), params: [hash_data | block_quantity]} + ] + when id: EthereumJSONRPC.request_id(), + block_quantity: EthereumJSONRPC.quantity(), + hash_data: EthereumJSONRPC.hash() + def requests(id_to_params) when is_map(id_to_params) do + Enum.map(id_to_params, fn {id, %{block_quantity: block_quantity, hash_data: hash_data}} -> + FetchedBalance.request(%{id: id, block_quantity: block_quantity, hash_data: hash_data}) + end) + end +end diff --git a/apps/ethereum_jsonrpc/test/ethereum_jsonrpc_test.exs b/apps/ethereum_jsonrpc/test/ethereum_jsonrpc_test.exs index 1c19a1fa97..3868d79497 100644 --- a/apps/ethereum_jsonrpc/test/ethereum_jsonrpc_test.exs +++ b/apps/ethereum_jsonrpc/test/ethereum_jsonrpc_test.exs @@ -4,7 +4,7 @@ defmodule EthereumJSONRPCTest do import EthereumJSONRPC.Case import Mox - alias EthereumJSONRPC.Subscription + alias EthereumJSONRPC.{FetchedBalances, Subscription} alias EthereumJSONRPC.WebSocket.WebSocketClient setup :verify_on_exit! @@ -37,16 +37,18 @@ defmodule EthereumJSONRPCTest do json_rpc_named_arguments ) == {:ok, - [ - %{ - address_hash: hash, - block_number: 1, - value: expected_fetched_balance - } - ]} + %FetchedBalances{ + params_list: [ + %{ + address_hash: hash, + block_number: 1, + value: expected_fetched_balance + } + ] + }} end - test "with all invalid hash_data returns {:error, reasons}", %{json_rpc_named_arguments: json_rpc_named_arguments} do + test "with all invalid hash_data returns errors", %{json_rpc_named_arguments: json_rpc_named_arguments} do variant = Keyword.fetch!(json_rpc_named_arguments, :variant) expected_message = @@ -76,18 +78,21 @@ defmodule EthereumJSONRPCTest do end) end - assert {:error, - [ - %{ - code: -32602, - data: %{"blockNumber" => "0x1", "hash" => "0x0"}, - message: ^expected_message - } - ]} = + assert {:ok, + %FetchedBalances{ + errors: [ + %{ + code: -32602, + data: %{hash_data: "0x0", block_quantity: "0x1"}, + message: ^expected_message + } + ], + params_list: [] + }} = EthereumJSONRPC.fetch_balances([%{block_quantity: "0x1", hash_data: "0x0"}], json_rpc_named_arguments) end - test "with a mix of valid and invalid hash_data returns {:error, reasons}", %{ + test "with a mix of valid and invalid hash_data returns both", %{ json_rpc_named_arguments: json_rpc_named_arguments } do if json_rpc_named_arguments[:transport] == EthereumJSONRPC.Mox do @@ -128,7 +133,7 @@ defmodule EthereumJSONRPCTest do end) end - assert {:error, reasons} = + assert {:ok, %FetchedBalances{params_list: params_list, errors: errors}} = EthereumJSONRPC.fetch_balances( [ # start with :ok @@ -160,8 +165,11 @@ defmodule EthereumJSONRPCTest do json_rpc_named_arguments ) - assert is_list(reasons) - assert length(reasons) > 1 + assert is_list(params_list) + assert length(params_list) > 1 + + assert is_list(errors) + assert length(errors) > 1 end end diff --git a/apps/indexer/lib/indexer/block/realtime/fetcher.ex b/apps/indexer/lib/indexer/block/realtime/fetcher.ex index aa4e7c9d7f..1b3151006b 100644 --- a/apps/indexer/lib/indexer/block/realtime/fetcher.ex +++ b/apps/indexer/lib/indexer/block/realtime/fetcher.ex @@ -11,7 +11,7 @@ defmodule Indexer.Block.Realtime.Fetcher do import Indexer.Block.Fetcher, only: [async_import_tokens: 1, async_import_uncles: 1, fetch_and_import_range: 2] alias Ecto.Changeset - alias EthereumJSONRPC.Subscription + alias EthereumJSONRPC.{FetchedBalances, Subscription} alias Explorer.Chain alias Indexer.{AddressExtraction, Block, TokenBalances} alias Indexer.Block.Realtime.TaskSupervisor @@ -247,20 +247,27 @@ defmodule Indexer.Block.Realtime.Fetcher do %Block.Fetcher{json_rpc_named_arguments: json_rpc_named_arguments}, %{addresses_params: addresses_params} = options ) do - with {:ok, fetched_balances_params} <- - options - |> fetch_balances_params_list() - |> EthereumJSONRPC.fetch_balances(json_rpc_named_arguments) do - merged_addresses_params = - %{address_coin_balances: fetched_balances_params} - |> AddressExtraction.extract_addresses() - |> Kernel.++(addresses_params) - |> AddressExtraction.merge_addresses() - - value_fetched_at = DateTime.utc_now() - importable_balances_params = Enum.map(fetched_balances_params, &Map.put(&1, :value_fetched_at, value_fetched_at)) - - {:ok, %{addresses_params: merged_addresses_params, balances_params: importable_balances_params}} + case options + |> fetch_balances_params_list() + |> EthereumJSONRPC.fetch_balances(json_rpc_named_arguments) do + {:ok, %FetchedBalances{params_list: params_list, errors: []}} -> + merged_addresses_params = + %{address_coin_balances: params_list} + |> AddressExtraction.extract_addresses() + |> Kernel.++(addresses_params) + |> AddressExtraction.merge_addresses() + + value_fetched_at = DateTime.utc_now() + + importable_balances_params = Enum.map(params_list, &Map.put(&1, :value_fetched_at, value_fetched_at)) + + {:ok, %{addresses_params: merged_addresses_params, balances_params: importable_balances_params}} + + {:error, _} = error -> + error + + {:ok, %FetchedBalances{errors: errors}} -> + {:error, errors} end end diff --git a/apps/indexer/lib/indexer/coin_balance/fetcher.ex b/apps/indexer/lib/indexer/coin_balance/fetcher.ex index 860edbd7d7..8b867def35 100644 --- a/apps/indexer/lib/indexer/coin_balance/fetcher.ex +++ b/apps/indexer/lib/indexer/coin_balance/fetcher.ex @@ -6,8 +6,9 @@ defmodule Indexer.CoinBalance.Fetcher do require Logger - import EthereumJSONRPC, only: [integer_to_quantity: 1] + import EthereumJSONRPC, only: [integer_to_quantity: 1, quantity_to_integer: 1] + alias EthereumJSONRPC.FetchedBalances alias Explorer.Chain alias Explorer.Chain.{Block, Hash} alias Indexer.BufferedTask @@ -75,23 +76,14 @@ defmodule Indexer.CoinBalance.Fetcher do |> Enum.map(&entry_to_params/1) |> EthereumJSONRPC.fetch_balances(json_rpc_named_arguments) |> case do - {:ok, balances_params} -> - value_fetched_at = DateTime.utc_now() - - importable_balances_params = Enum.map(balances_params, &Map.put(&1, :value_fetched_at, value_fetched_at)) - - addresses_params = balances_params_to_address_params(importable_balances_params) - - {:ok, _} = - Chain.import(%{ - addresses: %{params: addresses_params, with: :balance_changeset}, - address_coin_balances: %{params: importable_balances_params} - }) - - :ok + {:ok, fetched_balances} -> + run_fetched_balances(fetched_balances, unique_entries) {:error, reason} -> - Logger.debug(fn -> "failed to fetch #{length(unique_entries)} balances, #{inspect(reason)}" end) + Logger.error(fn -> + ["failed to fetch ", unique_entries |> length() |> to_string(), " balances, ", inspect(reason)] + end) + {:retry, unique_entries} end end @@ -116,4 +108,71 @@ defmodule Indexer.CoinBalance.Fetcher do %{hash: address_hash, fetched_coin_balance_block_number: block_number, fetched_coin_balance: value} end) end + + defp run_fetched_balances(%FetchedBalances{params_list: []}, original_entries), do: {:retry, original_entries} + + defp run_fetched_balances(%FetchedBalances{params_list: params_list, errors: errors}, original_entries) do + value_fetched_at = DateTime.utc_now() + + importable_balances_params = Enum.map(params_list, &Map.put(&1, :value_fetched_at, value_fetched_at)) + + addresses_params = balances_params_to_address_params(importable_balances_params) + + {:ok, _} = + Chain.import(%{ + addresses: %{params: addresses_params, with: :balance_changeset}, + address_coin_balances: %{params: importable_balances_params} + }) + + retry(errors, original_entries) + end + + defp retry([], _), do: :ok + + defp retry(errors, original_entries) when is_list(errors) do + retried_entries = fetched_balances_errors_to_entries(errors) + + Logger.error(fn -> + [ + "failed to fetch ", + retried_entries |> length() |> to_string(), + "/", + original_entries |> length() |> to_string(), + " balances: ", + fetched_balance_errors_to_iodata(errors) + ] + end) + + {:retry, retried_entries} + end + + defp fetched_balances_errors_to_entries(errors) when is_list(errors) do + Enum.map(errors, &fetched_balance_error_to_entry/1) + end + + defp fetched_balance_error_to_entry(%{data: %{block_quantity: block_quantity, hash_data: hash_data}}) + when is_binary(block_quantity) and is_binary(hash_data) do + {:ok, %Hash{bytes: address_hash_bytes}} = Hash.Address.cast(hash_data) + block_number = quantity_to_integer(block_quantity) + {address_hash_bytes, block_number} + end + + defp fetched_balance_errors_to_iodata(errors) when is_list(errors) do + fetched_balance_errors_to_iodata(errors, []) + end + + defp fetched_balance_errors_to_iodata([], iodata), do: iodata + + defp fetched_balance_errors_to_iodata([error | errors], iodata) do + fetched_balance_errors_to_iodata(errors, [iodata | fetched_balance_error_to_iodata(error)]) + end + + defp fetched_balance_error_to_iodata(%{ + code: code, + message: message, + data: %{block_quantity: block_quantity, hash_data: hash_data} + }) + when is_integer(code) and is_binary(message) and is_binary(block_quantity) and is_binary(hash_data) do + [hash_data, "@", quantity_to_integer(block_quantity), ": (", to_string(code), ") ", message] + end end diff --git a/apps/indexer/test/indexer/coin_balance/fetcher_test.exs b/apps/indexer/test/indexer/coin_balance/fetcher_test.exs index 045c1e4930..155e77abb2 100644 --- a/apps/indexer/test/indexer/coin_balance/fetcher_test.exs +++ b/apps/indexer/test/indexer/coin_balance/fetcher_test.exs @@ -289,6 +289,84 @@ defmodule Indexer.CoinBalance.FetcherTest do end end + describe "run/2 partial batch" do + setup do + %{ + json_rpc_named_arguments: [ + transport: EthereumJSONRPC.Mox, + transport_options: [], + # Which one does not matter, so pick one + variant: EthereumJSONRPC.Parity + ] + } + end + + test "retries all if no successes", %{json_rpc_named_arguments: json_rpc_named_arguments} do + %Hash{bytes: address_hash_bytes} = address_hash() + entries = [{address_hash_bytes, block_number()}] + + expect(EthereumJSONRPC.Mox, :json_rpc, fn [%{id: id, method: "eth_getBalance", params: [_, _]}], _ -> + {:ok, [%{id: id, error: %{code: 1, message: "Bad"}}]} + end) + + assert {:retry, ^entries} = CoinBalance.Fetcher.run(entries, json_rpc_named_arguments) + end + + test "retries none if all imported and no fetch errors", %{json_rpc_named_arguments: json_rpc_named_arguments} do + %Hash{bytes: address_hash_bytes} = address_hash() + entries = [{address_hash_bytes, block_number()}] + + expect(EthereumJSONRPC.Mox, :json_rpc, fn [%{id: id, method: "eth_getBalance", params: [_, _]}], _ -> + {:ok, [%{id: id, result: "0x1"}]} + end) + + assert :ok = CoinBalance.Fetcher.run(entries, json_rpc_named_arguments) + end + + test "retries retries fetch errors if all imported", %{json_rpc_named_arguments: json_rpc_named_arguments} do + %Hash{bytes: address_hash_bytes} = address_hash() + bad_block_number = block_number() + good_block_number = block_number() + + expect(EthereumJSONRPC.Mox, :json_rpc, fn [ + %{ + id: first_id, + method: "eth_getBalance", + params: [_, first_block_quantity] + }, + %{ + id: second_id, + method: "eth_getBalance", + params: [_, _] + } + ], + _ -> + responses = + case quantity_to_integer(first_block_quantity) do + ^good_block_number -> + [ + %{id: first_id, result: "0x1"}, + %{id: second_id, error: %{code: 2, message: "Bad"}} + ] + + ^bad_block_number -> + [ + %{id: first_id, error: %{code: 1, message: "Bad"}}, + %{id: second_id, result: "0x2"} + ] + end + + {:ok, responses} + end) + + assert {:retry, [{^address_hash_bytes, ^bad_block_number}]} = + CoinBalance.Fetcher.run( + [{address_hash_bytes, good_block_number}, {address_hash_bytes, bad_block_number}], + json_rpc_named_arguments + ) + end + end + defp wait(producer) do producer.() rescue From ad1119eb4c383015ee820fe982ef6bfded6fba37 Mon Sep 17 00:00:00 2001 From: Luke Imhoff Date: Thu, 15 Nov 2018 13:12:40 -0600 Subject: [PATCH 2/5] Fetched Beneficiaries partial batch retries --- .../lib/ethereum_jsonrpc/fetched_balances.ex | 6 +- .../ethereum_jsonrpc/fetched_beneficiaries.ex | 16 + .../ethereum_jsonrpc/fetched_beneficiary.ex | 8 + .../lib/ethereum_jsonrpc/parity.ex | 56 +-- .../parity/fetched_beneficiaries.ex | 94 ++++ .../lib/ethereum_jsonrpc/variant.ex | 8 +- .../test/ethereum_jsonrpc/parity_test.exs | 458 +++++++++--------- .../test/ethereum_jsonrpc_test.exs | 5 +- apps/indexer/lib/indexer/block/fetcher.ex | 14 +- .../bound_interval_supervisor_test.exs | 8 +- .../test/indexer/block/fetcher_test.exs | 9 +- .../indexer/block/realtime/fetcher_test.exs | 5 +- 12 files changed, 389 insertions(+), 298 deletions(-) create mode 100644 apps/ethereum_jsonrpc/lib/ethereum_jsonrpc/fetched_beneficiaries.ex create mode 100644 apps/ethereum_jsonrpc/lib/ethereum_jsonrpc/fetched_beneficiary.ex create mode 100644 apps/ethereum_jsonrpc/lib/ethereum_jsonrpc/parity/fetched_beneficiaries.ex diff --git a/apps/ethereum_jsonrpc/lib/ethereum_jsonrpc/fetched_balances.ex b/apps/ethereum_jsonrpc/lib/ethereum_jsonrpc/fetched_balances.ex index 4be25fd441..dc629c720a 100644 --- a/apps/ethereum_jsonrpc/lib/ethereum_jsonrpc/fetched_balances.ex +++ b/apps/ethereum_jsonrpc/lib/ethereum_jsonrpc/fetched_balances.ex @@ -5,12 +5,12 @@ defmodule EthereumJSONRPC.FetchedBalances do alias EthereumJSONRPC.FetchedBalance - defstruct errors: [], - params_list: [] + defstruct params_list: [], + errors: [] @typedoc """ * `params_list` - all the balance params from requests that succeeded in the batch. - * `errors` - all the error from requests that failed in the batch. + * `errors` - all the errors from requests that failed in the batch. """ @type t :: %__MODULE__{params_list: [FetchedBalance.params()], errors: [FetchedBalance.error()]} diff --git a/apps/ethereum_jsonrpc/lib/ethereum_jsonrpc/fetched_beneficiaries.ex b/apps/ethereum_jsonrpc/lib/ethereum_jsonrpc/fetched_beneficiaries.ex new file mode 100644 index 0000000000..8dcf2cd8b6 --- /dev/null +++ b/apps/ethereum_jsonrpc/lib/ethereum_jsonrpc/fetched_beneficiaries.ex @@ -0,0 +1,16 @@ +defmodule EthereumJSONRPC.FetchedBeneficiaries do + @moduledoc """ + Balance params and errors from a batch request to fetch beneficiaries. + """ + + alias EthereumJSONRPC.FetchedBeneficiary + + defstruct params_set: MapSet.new(), + errors: [] + + @typedoc """ + * `params_set` - all the balance request params from requests that succeeded in the batch. + * `errors` - all the errors from requests that failed in the batch. + """ + @type t :: %__MODULE__{params_set: MapSet.t(FetchedBeneficiary.params()), errors: [FetchedBeneficiary.error()]} +end diff --git a/apps/ethereum_jsonrpc/lib/ethereum_jsonrpc/fetched_beneficiary.ex b/apps/ethereum_jsonrpc/lib/ethereum_jsonrpc/fetched_beneficiary.ex new file mode 100644 index 0000000000..3abe09a58f --- /dev/null +++ b/apps/ethereum_jsonrpc/lib/ethereum_jsonrpc/fetched_beneficiary.ex @@ -0,0 +1,8 @@ +defmodule EthereumJSONRPC.FetchedBeneficiary do + @moduledoc """ + A single balance request params for the beneficiary of a block. + """ + + @type params :: %{address_hash: EthereumJSONRPC.hash(), block_number: non_neg_integer()} + @type error :: %{code: integer(), message: String.t(), data: %{block_number: non_neg_integer()}} +end diff --git a/apps/ethereum_jsonrpc/lib/ethereum_jsonrpc/parity.ex b/apps/ethereum_jsonrpc/lib/ethereum_jsonrpc/parity.ex index 8771cd3b50..6407b2b0ee 100644 --- a/apps/ethereum_jsonrpc/lib/ethereum_jsonrpc/parity.ex +++ b/apps/ethereum_jsonrpc/lib/ethereum_jsonrpc/parity.ex @@ -3,35 +3,26 @@ defmodule EthereumJSONRPC.Parity do Ethereum JSONRPC methods that are only supported by [Parity](https://wiki.parity.io/). """ - import EthereumJSONRPC, only: [id_to_params: 1, json_rpc: 2, request: 1] + import EthereumJSONRPC, only: [id_to_params: 1, integer_to_quantity: 1, json_rpc: 2, request: 1] - alias EthereumJSONRPC.Parity.Traces + alias EthereumJSONRPC.Parity.{FetchedBeneficiaries, Traces} alias EthereumJSONRPC.{Transaction, Transactions} @behaviour EthereumJSONRPC.Variant @impl EthereumJSONRPC.Variant - def fetch_beneficiaries(block_range, json_rpc_named_arguments) do - Enum.reduce( - Enum.with_index(block_range), - {:ok, MapSet.new()}, - fn - {block_number, index}, {:ok, beneficiaries} -> - quantity = EthereumJSONRPC.integer_to_quantity(block_number) - - case trace_block(index, quantity, json_rpc_named_arguments) do - {:ok, traces} when is_list(traces) -> - new_beneficiaries = extract_beneficiaries(traces) - {:ok, MapSet.union(new_beneficiaries, beneficiaries)} + def fetch_beneficiaries(_.._ = block_range, json_rpc_named_arguments) when is_list(json_rpc_named_arguments) do + id_to_params = + block_range + |> block_range_to_params_list() + |> id_to_params() - _ -> - {:error, "Error fetching block reward contract beneficiaries"} - end - - _, {:error, _} = error -> - error - end - ) + with {:ok, responses} <- + id_to_params + |> FetchedBeneficiaries.requests() + |> json_rpc(json_rpc_named_arguments) do + {:ok, FetchedBeneficiaries.from_responses(responses, id_to_params)} + end end @doc """ @@ -72,25 +63,8 @@ defmodule EthereumJSONRPC.Parity do end end - defp extract_beneficiaries(traces) when is_list(traces) do - Enum.reduce(traces, MapSet.new(), fn - %{"type" => "reward", "blockNumber" => block_number, "action" => %{"author" => author}}, beneficiaries -> - beneficiary = %{ - block_number: block_number, - address_hash: author - } - - MapSet.put(beneficiaries, beneficiary) - - _, beneficiaries -> - beneficiaries - end) - end - - defp trace_block(index, quantity, json_rpc_named_arguments) do - %{id: index, method: "trace_block", params: [quantity]} - |> request() - |> json_rpc(json_rpc_named_arguments) + defp block_range_to_params_list(_.._ = block_range) do + Enum.map(block_range, &%{block_quantity: integer_to_quantity(&1)}) end defp trace_replay_transaction_responses_to_internal_transactions_params(responses, id_to_params) diff --git a/apps/ethereum_jsonrpc/lib/ethereum_jsonrpc/parity/fetched_beneficiaries.ex b/apps/ethereum_jsonrpc/lib/ethereum_jsonrpc/parity/fetched_beneficiaries.ex new file mode 100644 index 0000000000..e207d29fd4 --- /dev/null +++ b/apps/ethereum_jsonrpc/lib/ethereum_jsonrpc/parity/fetched_beneficiaries.ex @@ -0,0 +1,94 @@ +defmodule EthereumJSONRPC.Parity.FetchedBeneficiaries do + @moduledoc """ + Beneficiaries and errors from batch requests to `trace_block`. + """ + + import EthereumJSONRPC, only: [quantity_to_integer: 1] + + @doc """ + Converts `responses` to `t/0`. + """ + def from_responses(responses, id_to_params) when is_list(responses) and is_map(id_to_params) do + responses + |> Enum.map(&response_to_params_set(&1, id_to_params)) + |> Enum.reduce( + %EthereumJSONRPC.FetchedBeneficiaries{}, + fn + {:ok, params_set}, %EthereumJSONRPC.FetchedBeneficiaries{params_set: acc_params_set} = acc -> + %EthereumJSONRPC.FetchedBeneficiaries{acc | params_set: MapSet.union(acc_params_set, params_set)} + + {:error, reason}, %EthereumJSONRPC.FetchedBeneficiaries{errors: errors} = acc -> + %EthereumJSONRPC.FetchedBeneficiaries{acc | errors: [reason | errors]} + end + ) + end + + @doc """ + `trace_block` requests for `id_to_params`. + """ + def requests(id_to_params) when is_map(id_to_params) do + Enum.map(id_to_params, fn {id, %{block_quantity: block_quantity}} -> + request(%{id: id, block_quantity: block_quantity}) + end) + end + + @spec response_to_params_set(%{id: id, result: nil}, %{id => %{block_quantity: block_quantity}}) :: + {:error, %{code: 404, message: String.t(), data: %{block_quantity: block_quantity}}} + when id: non_neg_integer(), block_quantity: String.t() + defp response_to_params_set(%{id: id, result: nil}, id_to_params) when is_map(id_to_params) do + %{block_quantity: block_quantity} = Map.fetch!(id_to_params, id) + + {:error, %{code: 404, message: "Not Found", data: %{block_quantity: block_quantity}}} + end + + @spec response_to_params_set(%{id: id, result: list(map())}, %{id => %{block_quantity: block_quantity}}) :: + {:ok, MapSet.t(EthereumJSONRPC.FetchedBeneficiary.params())} + when id: non_neg_integer(), block_quantity: String.t() + defp response_to_params_set(%{id: id, result: traces}, id_to_params) when is_list(traces) and is_map(id_to_params) do + %{block_quantity: block_quantity} = Map.fetch!(id_to_params, id) + block_number = quantity_to_integer(block_quantity) + params_set = traces_to_params_set(traces, block_number) + + {:ok, params_set} + end + + @spec response_to_params_set(%{id: id, error: %{code: code, message: message}}, %{ + id => %{block_quantity: block_quantity} + }) :: {:error, %{code: code, message: message, data: %{block_quantity: block_quantity}}} + when id: non_neg_integer(), code: integer(), message: String.t(), block_quantity: String.t() + defp response_to_params_set(%{id: id, error: error}, id_to_params) when is_map(id_to_params) do + %{block_quantity: block_quantity} = Map.fetch!(id_to_params, id) + + annotated_error = Map.put(error, :data, %{block_quantity: block_quantity}) + + {:error, annotated_error} + end + + defp request(%{id: id, block_quantity: block_quantity}) when is_integer(id) and is_binary(block_quantity) do + EthereumJSONRPC.request(%{id: id, method: "trace_block", params: [block_quantity]}) + end + + defp traces_to_params_set(traces, block_number) when is_list(traces) and is_integer(block_number) do + Enum.reduce(traces, MapSet.new(), fn trace, acc -> + MapSet.union(acc, trace_to_params_set(trace, block_number)) + end) + end + + defp trace_to_params_set(%{"action" => %{"callType" => _}, "blockNumber" => block_number}, block_number), + do: MapSet.new() + + defp trace_to_params_set(%{"type" => type, "blockNumber" => block_number}, block_number) + when type in ~w(create suicide), + do: MapSet.new() + + defp trace_to_params_set( + %{ + "action" => %{"rewardType" => reward_type, "author" => address_hash_data}, + "blockNumber" => block_number + }, + block_number + ) + when is_integer(block_number) and reward_type in ~w(block external uncle) do + MapSet.new([%{address_hash: address_hash_data, block_number: block_number}]) + end +end diff --git a/apps/ethereum_jsonrpc/lib/ethereum_jsonrpc/variant.ex b/apps/ethereum_jsonrpc/lib/ethereum_jsonrpc/variant.ex index d6b4608a16..1355118a1f 100644 --- a/apps/ethereum_jsonrpc/lib/ethereum_jsonrpc/variant.ex +++ b/apps/ethereum_jsonrpc/lib/ethereum_jsonrpc/variant.ex @@ -4,7 +4,7 @@ defmodule EthereumJSONRPC.Variant do Ethereum JSONRPC API. The variant callbacks abstract over this difference. """ - alias EthereumJSONRPC.Transaction + alias EthereumJSONRPC.{FetchedBeneficiaries, Transaction} @typedoc """ A module that implements the `EthereumJSONRPC.Variant` behaviour callbacks. @@ -22,12 +22,12 @@ defmodule EthereumJSONRPC.Variant do ## Returns - * `{:ok, #MapSet<[%{...}]>}` - beneficiaries were successfully fetched - * `{:error, reason}` - there was one or more errors with `reason` in fetching the beneficiaries + * `{:ok, %EthereumJSONRPC.FetchedBeneficiaries{params_list: [%{address_hash: address_hash, block_number: block_number}], errors: %{code: code, message: message, data: %{block_number: block_number}}}` - some beneficiaries were successfully fetched and some may have had errors. + * `{:error, reason}` - there was an error at the transport level * `:ignore` - the variant does not support fetching beneficiaries """ @callback fetch_beneficiaries(Range.t(), EthereumJSONRPC.json_rpc_named_arguments()) :: - {:ok, MapSet.t()} | {:error, reason :: term} | :ignore + {:ok, FetchedBeneficiaries.t()} | {:error, reason :: term} | :ignore @doc """ Fetches the `t:Explorer.Chain.InternalTransaction.changeset/2` params from the variant of the Ethereum JSONRPC API. diff --git a/apps/ethereum_jsonrpc/test/ethereum_jsonrpc/parity_test.exs b/apps/ethereum_jsonrpc/test/ethereum_jsonrpc/parity_test.exs index 689369a63f..756d8e406e 100644 --- a/apps/ethereum_jsonrpc/test/ethereum_jsonrpc/parity_test.exs +++ b/apps/ethereum_jsonrpc/test/ethereum_jsonrpc/parity_test.exs @@ -5,6 +5,8 @@ defmodule EthereumJSONRPC.ParityTest do import EthereumJSONRPC, only: [integer_to_quantity: 1] import Mox + alias EthereumJSONRPC.FetchedBeneficiaries + setup :verify_on_exit! doctest EthereumJSONRPC.Parity @@ -245,53 +247,54 @@ defmodule EthereumJSONRPC.ParityTest do hash2 = "0x523b6539ff08d72a6c8bb598af95bf50c1ea839c" if json_rpc_named_arguments[:transport] == EthereumJSONRPC.Mox do - expect(EthereumJSONRPC.Mox, :json_rpc, fn %{params: [^block_quantity]}, _options -> + expect(EthereumJSONRPC.Mox, :json_rpc, fn [%{id: id, params: [^block_quantity]}], _options -> {:ok, [ %{ - "action" => %{ - "author" => hash1, - "rewardType" => "block", - "value" => "0xde0b6b3a7640000" - }, - "blockHash" => "0x52a8d2185282506ce681364d2aa0c085ba45fdeb5d6c0ddec1131617a71ee2ca", - "blockNumber" => block_number, - "result" => nil, - "subtraces" => 0, - "traceAddress" => [], - "transactionHash" => nil, - "transactionPosition" => nil, - "type" => "reward" - }, - %{ - "action" => %{ - "author" => hash2, - "rewardType" => "block", - "value" => "0xde0b6b3a7640000" - }, - "blockHash" => "0x52a8d2185282506ce681364d2aa0c085ba45fdeb5d6c0ddec1131617a71ee2ca", - "blockNumber" => block_number, - "result" => nil, - "subtraces" => 0, - "traceAddress" => [], - "transactionHash" => nil, - "transactionPosition" => nil, - "type" => "reward" + id: id, + result: [ + %{ + "action" => %{ + "author" => hash1, + "rewardType" => "block", + "value" => "0xde0b6b3a7640000" + }, + "blockHash" => "0x52a8d2185282506ce681364d2aa0c085ba45fdeb5d6c0ddec1131617a71ee2ca", + "blockNumber" => block_number, + "result" => nil, + "subtraces" => 0, + "traceAddress" => [], + "transactionHash" => nil, + "transactionPosition" => nil, + "type" => "reward" + }, + %{ + "action" => %{ + "author" => hash2, + "rewardType" => "block", + "value" => "0xde0b6b3a7640000" + }, + "blockHash" => "0x52a8d2185282506ce681364d2aa0c085ba45fdeb5d6c0ddec1131617a71ee2ca", + "blockNumber" => block_number, + "result" => nil, + "subtraces" => 0, + "traceAddress" => [], + "transactionHash" => nil, + "transactionPosition" => nil, + "type" => "reward" + } + ] } ]} end) end - expected_beneficiaries = - MapSet.new([ - %{block_number: block_number, address_hash: hash2}, - %{block_number: block_number, address_hash: hash1} - ]) - - {:ok, fetched_beneficiaries} = - EthereumJSONRPC.Parity.fetch_beneficiaries(5_080_887..5_080_887, json_rpc_named_arguments) + assert {:ok, %FetchedBeneficiaries{params_set: params_set}} = + EthereumJSONRPC.Parity.fetch_beneficiaries(5_080_887..5_080_887, json_rpc_named_arguments) - assert fetched_beneficiaries == expected_beneficiaries + assert Enum.count(params_set) == 2 + assert %{block_number: block_number, address_hash: hash2} in params_set + assert %{block_number: block_number, address_hash: hash1} in params_set end test "with 'external' 'rewardType'", %{ @@ -303,81 +306,69 @@ defmodule EthereumJSONRPC.ParityTest do hash2 = "0x523b6539ff08d72a6c8bb598af95bf50c1ea839c" if json_rpc_named_arguments[:transport] == EthereumJSONRPC.Mox do - expect(EthereumJSONRPC.Mox, :json_rpc, fn %{params: [^block_quantity]}, _options -> + expect(EthereumJSONRPC.Mox, :json_rpc, fn [%{id: id, params: [^block_quantity]}], _options -> {:ok, [ %{ - "action" => %{ - "author" => hash1, - "rewardType" => "external", - "value" => "0xde0b6b3a7640000" - }, - "blockHash" => "0xf19a4ea2bb4f2d8839f4c3ec11e0e86c29d57799d7073713958fe1990e197cf5", - "blockNumber" => 5_609_295, - "result" => nil, - "subtraces" => 0, - "traceAddress" => [], - "transactionHash" => nil, - "transactionPosition" => nil, - "type" => "reward" - }, - %{ - "action" => %{ - "author" => hash2, - "rewardType" => "external", - "value" => "0xde0b6b3a7640000" - }, - "blockHash" => "0xf19a4ea2bb4f2d8839f4c3ec11e0e86c29d57799d7073713958fe1990e197cf5", - "blockNumber" => 5_609_295, - "result" => nil, - "subtraces" => 0, - "traceAddress" => [], - "transactionHash" => nil, - "transactionPosition" => nil, - "type" => "reward" + id: id, + result: [ + %{ + "action" => %{ + "author" => hash1, + "rewardType" => "external", + "value" => "0xde0b6b3a7640000" + }, + "blockHash" => "0xf19a4ea2bb4f2d8839f4c3ec11e0e86c29d57799d7073713958fe1990e197cf5", + "blockNumber" => 5_609_295, + "result" => nil, + "subtraces" => 0, + "traceAddress" => [], + "transactionHash" => nil, + "transactionPosition" => nil, + "type" => "reward" + }, + %{ + "action" => %{ + "author" => hash2, + "rewardType" => "external", + "value" => "0xde0b6b3a7640000" + }, + "blockHash" => "0xf19a4ea2bb4f2d8839f4c3ec11e0e86c29d57799d7073713958fe1990e197cf5", + "blockNumber" => 5_609_295, + "result" => nil, + "subtraces" => 0, + "traceAddress" => [], + "transactionHash" => nil, + "transactionPosition" => nil, + "type" => "reward" + } + ] } ]} end) end - expected_beneficiaries = - MapSet.new([ - %{block_number: block_number, address_hash: hash2}, - %{block_number: block_number, address_hash: hash1} - ]) + assert {:ok, %FetchedBeneficiaries{params_set: params_set, errors: []}} = + EthereumJSONRPC.Parity.fetch_beneficiaries(5_609_295..5_609_295, json_rpc_named_arguments) - {:ok, fetched_beneficiaries} = - EthereumJSONRPC.Parity.fetch_beneficiaries(5_609_295..5_609_295, json_rpc_named_arguments) - - assert fetched_beneficiaries == expected_beneficiaries + assert Enum.count(params_set) == 2 + assert %{block_number: block_number, address_hash: hash1} in params_set + assert %{block_number: block_number, address_hash: hash2} in params_set end test "with no rewards, returns {:ok, []}", %{ json_rpc_named_arguments: json_rpc_named_arguments } do if json_rpc_named_arguments[:transport] == EthereumJSONRPC.Mox do - expect(EthereumJSONRPC.Mox, :json_rpc, fn _json, _options -> - {:ok, []} - end) - - {:ok, fetched_beneficiaries} = - EthereumJSONRPC.Parity.fetch_beneficiaries(5_080_887..5_080_887, json_rpc_named_arguments) - - assert fetched_beneficiaries == MapSet.new() - end - end - - test "with nil rewards, returns {:error, reason}", %{ - json_rpc_named_arguments: json_rpc_named_arguments - } do - if json_rpc_named_arguments[:transport] == EthereumJSONRPC.Mox do - expect(EthereumJSONRPC.Mox, :json_rpc, fn _json, _options -> - {:ok, nil} + expect(EthereumJSONRPC.Mox, :json_rpc, fn requests, _options when is_list(requests) -> + responses = Enum.map(requests, fn %{id: id} -> %{id: id, result: []} end) + {:ok, responses} end) - result = EthereumJSONRPC.Parity.fetch_beneficiaries(5_080_887..5_080_887, json_rpc_named_arguments) + assert {:ok, %FetchedBeneficiaries{params_set: params_set}} = + EthereumJSONRPC.Parity.fetch_beneficiaries(5_080_887..5_080_887, json_rpc_named_arguments) - assert result == {:error, "Error fetching block reward contract beneficiaries"} + assert Enum.empty?(params_set) end end @@ -390,71 +381,72 @@ defmodule EthereumJSONRPC.ParityTest do hash2 = "0x523b6539ff08d72a6c8bb598af95bf50c1ea839c" if json_rpc_named_arguments[:transport] == EthereumJSONRPC.Mox do - expect(EthereumJSONRPC.Mox, :json_rpc, fn %{params: [^block_quantity]}, _options -> + expect(EthereumJSONRPC.Mox, :json_rpc, fn [%{id: id, params: [^block_quantity]}], _options -> {:ok, [ %{ - "action" => %{ - "callType" => "call", - "from" => "0x95426f2bc716022fcf1def006dbc4bb81f5b5164", - "gas" => "0x0", - "input" => "0x", - "to" => "0xe797a1da01eb0f951e0e400f9343de9d17a06bac", - "value" => "0x4a817c800" - }, - "blockHash" => "0x6659a4926d833a7eab74379fa647ec74c9f5e65f8029552a35264126560f300a", - "blockNumber" => block_number, - "result" => %{"gasUsed" => "0x0", "output" => "0x"}, - "subtraces" => 0, - "traceAddress" => [], - "transactionHash" => "0x5acf90f846b8216bdbc309cf4eb24adc69d730bf29304dc0e740cf6df850666e", - "transactionPosition" => 0, - "type" => "call" - }, - %{ - "action" => %{ - "author" => hash1, - "rewardType" => "block", - "value" => "0xde0b6b3a7640000" - }, - "blockHash" => "0x6659a4926d833a7eab74379fa647ec74c9f5e65f8029552a35264126560f300a", - "blockNumber" => block_number, - "result" => nil, - "subtraces" => 0, - "traceAddress" => [], - "transactionHash" => nil, - "transactionPosition" => nil, - "type" => "reward" - }, - %{ - "action" => %{ - "author" => hash2, - "rewardType" => "block", - "value" => "0xde0b6b3a7640000" - }, - "blockHash" => "0x6659a4926d833a7eab74379fa647ec74c9f5e65f8029552a35264126560f300a", - "blockNumber" => block_number, - "result" => nil, - "subtraces" => 0, - "traceAddress" => [], - "transactionHash" => nil, - "transactionPosition" => nil, - "type" => "reward" + id: id, + result: [ + %{ + "action" => %{ + "callType" => "call", + "from" => "0x95426f2bc716022fcf1def006dbc4bb81f5b5164", + "gas" => "0x0", + "input" => "0x", + "to" => "0xe797a1da01eb0f951e0e400f9343de9d17a06bac", + "value" => "0x4a817c800" + }, + "blockHash" => "0x6659a4926d833a7eab74379fa647ec74c9f5e65f8029552a35264126560f300a", + "blockNumber" => block_number, + "result" => %{"gasUsed" => "0x0", "output" => "0x"}, + "subtraces" => 0, + "traceAddress" => [], + "transactionHash" => "0x5acf90f846b8216bdbc309cf4eb24adc69d730bf29304dc0e740cf6df850666e", + "transactionPosition" => 0, + "type" => "call" + }, + %{ + "action" => %{ + "author" => hash1, + "rewardType" => "block", + "value" => "0xde0b6b3a7640000" + }, + "blockHash" => "0x6659a4926d833a7eab74379fa647ec74c9f5e65f8029552a35264126560f300a", + "blockNumber" => block_number, + "result" => nil, + "subtraces" => 0, + "traceAddress" => [], + "transactionHash" => nil, + "transactionPosition" => nil, + "type" => "reward" + }, + %{ + "action" => %{ + "author" => hash2, + "rewardType" => "block", + "value" => "0xde0b6b3a7640000" + }, + "blockHash" => "0x6659a4926d833a7eab74379fa647ec74c9f5e65f8029552a35264126560f300a", + "blockNumber" => block_number, + "result" => nil, + "subtraces" => 0, + "traceAddress" => [], + "transactionHash" => nil, + "transactionPosition" => nil, + "type" => "reward" + } + ] } ]} end) end - expected_beneficiaries = - MapSet.new([ - %{block_number: block_number, address_hash: hash2}, - %{block_number: block_number, address_hash: hash1} - ]) - - {:ok, fetched_beneficiaries} = - EthereumJSONRPC.Parity.fetch_beneficiaries(5_077_429..5_077_429, json_rpc_named_arguments) + assert {:ok, %FetchedBeneficiaries{params_set: params_set}} = + EthereumJSONRPC.Parity.fetch_beneficiaries(5_077_429..5_077_429, json_rpc_named_arguments) - assert fetched_beneficiaries == expected_beneficiaries + assert Enum.count(params_set) == 2 + assert %{block_number: block_number, address_hash: hash2} in params_set + assert %{block_number: block_number, address_hash: hash1} in params_set end test "with multiple blocks with repeat beneficiaries", %{ @@ -469,87 +461,92 @@ defmodule EthereumJSONRPC.ParityTest do hash3 = "0x523b6539ff08d72a6c8bb598af95bf50c1ea839c" if json_rpc_named_arguments[:transport] == EthereumJSONRPC.Mox do - expect(EthereumJSONRPC.Mox, :json_rpc, 2, fn - %{params: [^block_quantity1]} = _json, _options -> - {:ok, - [ - %{ - "action" => %{ - "author" => hash1, - "rewardType" => "block", - "value" => "0xde0b6b3a7640000" - }, - "blockNumber" => block_number1, - "result" => nil, - "subtraces" => 0, - "traceAddress" => [], - "transactionHash" => nil, - "transactionPosition" => nil, - "type" => "reward" - }, - %{ - "action" => %{ - "author" => hash3, - "rewardType" => "block", - "value" => "0xde0b6b3a7640000" - }, - "blockNumber" => block_number1, - "result" => nil, - "subtraces" => 0, - "traceAddress" => [], - "transactionHash" => nil, - "transactionPosition" => nil, - "type" => "reward" - } - ]} - - %{params: [^block_quantity2]} = _json, _options -> - {:ok, - [ - %{ - "action" => %{ - "author" => hash2, - "rewardType" => "block", - "value" => "0xde0b6b3a7640000" - }, - "blockNumber" => block_number2, - "result" => nil, - "subtraces" => 0, - "traceAddress" => [], - "transactionHash" => nil, - "transactionPosition" => nil, - "type" => "reward" - }, - %{ - "action" => %{ - "author" => hash3, - "rewardType" => "block", - "value" => "0xde0b6b3a7640000" - }, - "blockNumber" => block_number2, - "result" => nil, - "subtraces" => 0, - "traceAddress" => [], - "transactionHash" => nil, - "transactionPosition" => nil, - "type" => "reward" - } - ]} + expect(EthereumJSONRPC.Mox, :json_rpc, fn requests, _options when is_list(requests) -> + responses = + Enum.map(requests, fn + %{id: id, params: [^block_quantity1]} -> + %{ + id: id, + result: [ + %{ + "action" => %{ + "author" => hash1, + "rewardType" => "block", + "value" => "0xde0b6b3a7640000" + }, + "blockNumber" => block_number1, + "result" => nil, + "subtraces" => 0, + "traceAddress" => [], + "transactionHash" => nil, + "transactionPosition" => nil, + "type" => "reward" + }, + %{ + "action" => %{ + "author" => hash3, + "rewardType" => "block", + "value" => "0xde0b6b3a7640000" + }, + "blockNumber" => block_number1, + "result" => nil, + "subtraces" => 0, + "traceAddress" => [], + "transactionHash" => nil, + "transactionPosition" => nil, + "type" => "reward" + } + ] + } + + %{id: id, params: [^block_quantity2]} -> + %{ + id: id, + result: [ + %{ + "action" => %{ + "author" => hash2, + "rewardType" => "block", + "value" => "0xde0b6b3a7640000" + }, + "blockNumber" => block_number2, + "result" => nil, + "subtraces" => 0, + "traceAddress" => [], + "transactionHash" => nil, + "transactionPosition" => nil, + "type" => "reward" + }, + %{ + "action" => %{ + "author" => hash3, + "rewardType" => "block", + "value" => "0xde0b6b3a7640000" + }, + "blockNumber" => block_number2, + "result" => nil, + "subtraces" => 0, + "traceAddress" => [], + "transactionHash" => nil, + "transactionPosition" => nil, + "type" => "reward" + } + ] + } + end) + + {:ok, responses} end) end - expected_beneficiaries = - MapSet.new([ - %{block_number: block_number1, address_hash: hash3}, - %{block_number: block_number2, address_hash: hash3}, - %{block_number: block_number2, address_hash: hash2}, - %{block_number: block_number1, address_hash: hash1} - ]) - - {:ok, fetched_beneficiaries} = - EthereumJSONRPC.Parity.fetch_beneficiaries(5_080_886..5_080_887, json_rpc_named_arguments) + assert {:ok, %FetchedBeneficiaries{params_set: params_set}} = + EthereumJSONRPC.Parity.fetch_beneficiaries(5_080_886..5_080_887, json_rpc_named_arguments) - assert fetched_beneficiaries == expected_beneficiaries + assert Enum.count(params_set) == 4 + assert %{block_number: block_number1, address_hash: hash3} in params_set + assert %{block_number: block_number2, address_hash: hash3} in params_set + assert %{block_number: block_number2, address_hash: hash2} in params_set + assert %{block_number: block_number1, address_hash: hash1} in params_set end test "with error, returns {:error, reason}", %{ @@ -560,9 +557,8 @@ defmodule EthereumJSONRPC.ParityTest do {:error, "oops"} end) - result = EthereumJSONRPC.Parity.fetch_beneficiaries(5_080_887..5_080_887, json_rpc_named_arguments) - - assert result == {:error, "Error fetching block reward contract beneficiaries"} + assert {:error, "oops"} = + EthereumJSONRPC.Parity.fetch_beneficiaries(5_080_887..5_080_887, json_rpc_named_arguments) end end end diff --git a/apps/ethereum_jsonrpc/test/ethereum_jsonrpc_test.exs b/apps/ethereum_jsonrpc/test/ethereum_jsonrpc_test.exs index 3868d79497..336d0016db 100644 --- a/apps/ethereum_jsonrpc/test/ethereum_jsonrpc_test.exs +++ b/apps/ethereum_jsonrpc/test/ethereum_jsonrpc_test.exs @@ -4,7 +4,7 @@ defmodule EthereumJSONRPCTest do import EthereumJSONRPC.Case import Mox - alias EthereumJSONRPC.{FetchedBalances, Subscription} + alias EthereumJSONRPC.{FetchedBalances, FetchedBeneficiaries, Subscription} alias EthereumJSONRPC.WebSocket.WebSocketClient setup :verify_on_exit! @@ -181,7 +181,8 @@ defmodule EthereumJSONRPCTest do {:ok, []} end) - assert EthereumJSONRPC.fetch_beneficiaries(1..1, json_rpc_named_arguments) == {:ok, MapSet.new()} + assert EthereumJSONRPC.fetch_beneficiaries(1..1, json_rpc_named_arguments) == + {:ok, %FetchedBeneficiaries{params_set: MapSet.new(), errors: []}} end end end diff --git a/apps/indexer/lib/indexer/block/fetcher.ex b/apps/indexer/lib/indexer/block/fetcher.ex index bb35eb8036..8450d809b6 100644 --- a/apps/indexer/lib/indexer/block/fetcher.ex +++ b/apps/indexer/lib/indexer/block/fetcher.ex @@ -5,6 +5,7 @@ defmodule Indexer.Block.Fetcher do require Logger + alias EthereumJSONRPC.FetchedBeneficiaries alias Explorer.Chain.{Address, Block, Import} alias Indexer.{AddressExtraction, CoinBalance, MintTransfer, Token, TokenTransfers} alias Indexer.Address.{CoinBalances, TokenBalances} @@ -103,10 +104,11 @@ defmodule Indexer.Block.Fetcher do transactions_with_receipts = Receipts.put(transactions_without_receipts, receipts), %{token_transfers: token_transfers, tokens: tokens} = TokenTransfers.parse(logs), %{mint_transfers: mint_transfers} = MintTransfer.parse(logs), - {:beneficiaries, {:ok, beneficiaries}} <- fetch_beneficiaries(range, json_rpc_named_arguments), + {:beneficiaries, {:ok, %FetchedBeneficiaries{params_set: beneficiary_params_set}}} <- + fetch_beneficiaries(range, json_rpc_named_arguments), addresses = AddressExtraction.extract_addresses(%{ - block_reward_contract_beneficiaries: MapSet.to_list(beneficiaries), + block_reward_contract_beneficiaries: MapSet.to_list(beneficiary_params_set), blocks: blocks, logs: logs, mint_transfers: mint_transfers, @@ -120,7 +122,7 @@ defmodule Indexer.Block.Fetcher do transactions_params: transactions_with_receipts } |> CoinBalances.params_set() - |> MapSet.union(beneficiaries), + |> MapSet.union(beneficiary_params_set), address_token_balances = TokenBalances.params_set(%{token_transfers_params: token_transfers}), {:ok, inserted} <- __MODULE__.import( @@ -139,6 +141,7 @@ defmodule Indexer.Block.Fetcher do ) do {:ok, {inserted, next}} else + {:beneficiaries = step, {:ok, %FetchedBeneficiaries{errors: [_ | _] = errors}}} -> {:error, {step, errors}} {step, {:error, reason}} -> {:error, {step, reason}} {:error, :timeout} = error -> error {:error, changesets} = error when is_list(changesets) -> error @@ -200,9 +203,8 @@ defmodule Indexer.Block.Fetcher do defp fetch_beneficiaries(range, json_rpc_named_arguments) do result = - case EthereumJSONRPC.fetch_beneficiaries(range, json_rpc_named_arguments) do - :ignore -> {:ok, MapSet.new()} - result -> result + with :ignore <- EthereumJSONRPC.fetch_beneficiaries(range, json_rpc_named_arguments) do + {:ok, %FetchedBeneficiaries{params_set: MapSet.new()}} end {:beneficiaries, result} diff --git a/apps/indexer/test/indexer/block/catchup/bound_interval_supervisor_test.exs b/apps/indexer/test/indexer/block/catchup/bound_interval_supervisor_test.exs index 07d97fc87b..b94b5f1c7d 100644 --- a/apps/indexer/test/indexer/block/catchup/bound_interval_supervisor_test.exs +++ b/apps/indexer/test/indexer/block/catchup/bound_interval_supervisor_test.exs @@ -63,8 +63,8 @@ defmodule Indexer.Block.Catchup.BoundIntervalSupervisorTest do "uncles" => [] }} - %{method: "trace_block"}, _options -> - {:ok, []} + [%{method: "trace_block"} | _] = requests, _options -> + {:ok, Enum.map(requests, fn %{id: id} -> %{id: id, result: []} end)} [%{method: "eth_getBlockByNumber", params: [_, true]} | _] = requests, _options -> {:ok, @@ -476,8 +476,8 @@ defmodule Indexer.Block.Catchup.BoundIntervalSupervisorTest do |> (fn mock -> case Keyword.fetch!(json_rpc_named_arguments, :variant) do EthereumJSONRPC.Parity -> - expect(mock, :json_rpc, fn %{method: "trace_block"}, _options -> - {:ok, []} + expect(mock, :json_rpc, fn [%{method: "trace_block"} | _] = requests, _options -> + {:ok, Enum.map(requests, fn %{id: id} -> %{id: id, result: []} end)} end) _ -> diff --git a/apps/indexer/test/indexer/block/fetcher_test.exs b/apps/indexer/test/indexer/block/fetcher_test.exs index 60526035f8..8c8a959b35 100644 --- a/apps/indexer/test/indexer/block/fetcher_test.exs +++ b/apps/indexer/test/indexer/block/fetcher_test.exs @@ -109,8 +109,8 @@ defmodule Indexer.Block.FetcherTest do } ]} end) - |> expect(:json_rpc, fn %{id: _id, method: "trace_block", params: [^block_quantity]}, _options -> - {:ok, []} + |> expect(:json_rpc, fn [%{id: id, method: "trace_block", params: [^block_quantity]}], _options -> + {:ok, [%{id: id, result: []}]} end) |> expect(:json_rpc, fn [ %{ @@ -359,9 +359,8 @@ defmodule Indexer.Block.FetcherTest do } ]} end) - |> expect(:json_rpc, fn json, _options -> - assert %{id: _id, method: "trace_block", params: [^block_quantity]} = json - {:ok, []} + |> expect(:json_rpc, fn [%{id: id, method: "trace_block", params: [^block_quantity]}], _options -> + {:ok, [%{id: id, result: []}]} end) # async requests need to be grouped in one expect because the order is non-deterministic while multiple expect # calls on the same name/arity are used in order diff --git a/apps/indexer/test/indexer/block/realtime/fetcher_test.exs b/apps/indexer/test/indexer/block/realtime/fetcher_test.exs index 48ff3b9cc7..4e2a1ed692 100644 --- a/apps/indexer/test/indexer/block/realtime/fetcher_test.exs +++ b/apps/indexer/test/indexer/block/realtime/fetcher_test.exs @@ -198,8 +198,9 @@ defmodule Indexer.Block.Realtime.FetcherTest do } ]} end) - |> expect(:json_rpc, 2, fn %{method: "trace_block"}, _options -> - {:ok, []} + |> expect(:json_rpc, fn [%{method: "trace_block"}, %{method: "trace_block"}] = requests, _options -> + responses = Enum.map(requests, fn %{id: id} -> %{id: id, result: []} end) + {:ok, responses} end) |> expect(:json_rpc, fn [ %{ From d43a8c61333781185c4f5b349980626bd3e17cbe Mon Sep 17 00:00:00 2001 From: Luke Imhoff Date: Fri, 16 Nov 2018 08:43:26 -0600 Subject: [PATCH 3/5] Partial retry of batches of blocks by hash --- .credo.exs | 2 +- apps/ethereum_jsonrpc/lib/ethereum_jsonrpc.ex | 26 +--- .../lib/ethereum_jsonrpc/block.ex | 15 +++ .../lib/ethereum_jsonrpc/block/by_hash.ex | 11 ++ .../lib/ethereum_jsonrpc/blocks.ex | 46 ++++++- .../lib/ethereum_jsonrpc/blocks/by_hash.ex | 14 +++ .../test/ethereum_jsonrpc_test.exs | 21 +++- .../lib/indexer/block/uncle/fetcher.ex | 112 +++++++++++++----- .../lib/indexer/coin_balance/fetcher.ex | 2 +- .../test/indexer/block/uncle/fetcher_test.exs | 3 +- 10 files changed, 188 insertions(+), 64 deletions(-) create mode 100644 apps/ethereum_jsonrpc/lib/ethereum_jsonrpc/block/by_hash.ex create mode 100644 apps/ethereum_jsonrpc/lib/ethereum_jsonrpc/blocks/by_hash.ex diff --git a/.credo.exs b/.credo.exs index e4efa6521c..ceb9421811 100644 --- a/.credo.exs +++ b/.credo.exs @@ -75,7 +75,7 @@ # Priority values are: `low, normal, high, higher` # {Credo.Check.Design.AliasUsage, - excluded_namespaces: ~w(Import Socket Task), + excluded_namespaces: ~w(Block Blocks Import Socket Task), excluded_lastnames: ~w(Address DateTime Exporter Fetcher Full Instrumenter Monitor Name Number Repo Time Unit), priority: :low}, diff --git a/apps/ethereum_jsonrpc/lib/ethereum_jsonrpc.ex b/apps/ethereum_jsonrpc/lib/ethereum_jsonrpc.ex index c1e6f88354..b764c4d406 100644 --- a/apps/ethereum_jsonrpc/lib/ethereum_jsonrpc.ex +++ b/apps/ethereum_jsonrpc/lib/ethereum_jsonrpc.ex @@ -220,13 +220,11 @@ defmodule EthereumJSONRPC do |> Enum.map(fn block_hash -> %{hash: block_hash} end) |> id_to_params() - id_to_params - |> get_block_by_hash_requests() - |> json_rpc(json_rpc_named_arguments) - |> handle_get_blocks(id_to_params) - |> case do - {:ok, _next, results} -> {:ok, results} - {:error, reason} -> {:error, reason} + with {:ok, responses} <- + id_to_params + |> Blocks.ByHash.requests() + |> json_rpc(json_rpc_named_arguments) do + {:ok, Blocks.from_responses(responses, id_to_params)} end end @@ -411,20 +409,6 @@ defmodule EthereumJSONRPC do |> Timex.from_unix() end - defp get_block_by_hash_requests(id_to_params) do - Enum.map(id_to_params, fn {id, %{hash: hash}} -> - get_block_by_hash_request(%{id: id, hash: hash, transactions: :full}) - end) - end - - defp get_block_by_hash_request(%{id: id} = options) do - request(%{id: id, method: "eth_getBlockByHash", params: get_block_by_hash_params(options)}) - end - - defp get_block_by_hash_params(%{hash: hash} = options) do - [hash, get_block_transactions(options)] - end - defp get_block_by_number_requests(id_to_params) do Enum.map(id_to_params, fn {id, %{number: number}} -> get_block_by_number_request(%{id: id, quantity: number, transactions: :full}) diff --git a/apps/ethereum_jsonrpc/lib/ethereum_jsonrpc/block.ex b/apps/ethereum_jsonrpc/lib/ethereum_jsonrpc/block.ex index b202d8a64d..58c42b6272 100644 --- a/apps/ethereum_jsonrpc/lib/ethereum_jsonrpc/block.ex +++ b/apps/ethereum_jsonrpc/lib/ethereum_jsonrpc/block.ex @@ -68,6 +68,21 @@ defmodule EthereumJSONRPC.Block do """ @type t :: %{String.t() => EthereumJSONRPC.data() | EthereumJSONRPC.hash() | EthereumJSONRPC.quantity() | nil} + def from_response(%{id: id, result: %{"hash" => hash} = block}, id_to_params) when is_map(id_to_params) do + # `^` verifies returned hash matches sent hash + %{hash: ^hash} = Map.fetch!(id_to_params, id) + + {:ok, block} + end + + def from_response(%{id: id, error: error}, id_to_params) when is_map(id_to_params) do + %{hash: hash} = Map.fetch!(id_to_params, id) + + annotated_error = Map.put(error, :data, %{hash: hash}) + + {:error, annotated_error} + end + @doc """ Converts `t:elixir/0` format to params used in `Explorer.Chain`. diff --git a/apps/ethereum_jsonrpc/lib/ethereum_jsonrpc/block/by_hash.ex b/apps/ethereum_jsonrpc/lib/ethereum_jsonrpc/block/by_hash.ex new file mode 100644 index 0000000000..07d1e48b4d --- /dev/null +++ b/apps/ethereum_jsonrpc/lib/ethereum_jsonrpc/block/by_hash.ex @@ -0,0 +1,11 @@ +defmodule EthereumJSONRPC.Block.ByHash do + @moduledoc """ + Block format as returned by [`eth_getBlockByHash`](https://github.com/ethereum/wiki/wiki/JSON-RPC#eth_getblockbyhash) + """ + + @include_transactions true + + def request(%{id: id, hash: hash}) do + EthereumJSONRPC.request(%{id: id, method: "eth_getBlockByHash", params: [hash, @include_transactions]}) + end +end diff --git a/apps/ethereum_jsonrpc/lib/ethereum_jsonrpc/blocks.ex b/apps/ethereum_jsonrpc/lib/ethereum_jsonrpc/blocks.ex index 1eef103513..9c56c4a538 100644 --- a/apps/ethereum_jsonrpc/lib/ethereum_jsonrpc/blocks.ex +++ b/apps/ethereum_jsonrpc/lib/ethereum_jsonrpc/blocks.ex @@ -4,11 +4,51 @@ defmodule EthereumJSONRPC.Blocks do and [`eth_getBlockByNumber`](https://github.com/ethereum/wiki/wiki/JSON-RPC#eth_getblockbynumber) from batch requests. """ - alias EthereumJSONRPC.{Block, Transactions, Uncles} + alias EthereumJSONRPC.{Block, Transactions, Transport, Uncles} @type elixir :: [Block.elixir()] @type params :: [Block.params()] - @type t :: [Block.t()] + @type t :: %__MODULE__{ + blocks_params: [map()], + block_second_degree_relations_params: [map()], + transactions_params: [map()], + errors: [Transport.error()] + } + + defstruct blocks_params: [], + block_second_degree_relations_params: [], + transactions_params: [], + errors: [] + + @spec from_responses(list(), map()) :: t() + def from_responses(responses, id_to_params) when is_list(responses) and is_map(id_to_params) do + %{errors: errors, blocks: blocks} = + responses + |> Enum.map(&Block.from_response(&1, id_to_params)) + |> Enum.reduce(%{errors: [], blocks: []}, fn + {:ok, block}, %{blocks: blocks} = acc -> + %{acc | blocks: [block | blocks]} + + {:error, error}, %{errors: errors} = acc -> + %{acc | errors: [error | errors]} + end) + + elixir_blocks = to_elixir(blocks) + + elixir_uncles = elixir_to_uncles(elixir_blocks) + elixir_transactions = elixir_to_transactions(elixir_blocks) + + block_second_degree_relations_params = Uncles.elixir_to_params(elixir_uncles) + transactions_params = Transactions.elixir_to_params(elixir_transactions) + blocks_params = elixir_to_params(elixir_blocks) + + %__MODULE__{ + errors: errors, + blocks_params: blocks_params, + block_second_degree_relations_params: block_second_degree_relations_params, + transactions_params: transactions_params + } + end @doc """ Converts `t:elixir/0` elements to params used by `Explorer.Chain.Block.changeset/2`. @@ -282,7 +322,7 @@ defmodule EthereumJSONRPC.Blocks do } ] """ - @spec to_elixir(t) :: elixir + @spec to_elixir([Block.t()]) :: elixir def to_elixir(blocks) when is_list(blocks) do Enum.map(blocks, &Block.to_elixir/1) end diff --git a/apps/ethereum_jsonrpc/lib/ethereum_jsonrpc/blocks/by_hash.ex b/apps/ethereum_jsonrpc/lib/ethereum_jsonrpc/blocks/by_hash.ex new file mode 100644 index 0000000000..97b455840e --- /dev/null +++ b/apps/ethereum_jsonrpc/lib/ethereum_jsonrpc/blocks/by_hash.ex @@ -0,0 +1,14 @@ +defmodule EthereumJSONRPC.Blocks.ByHash do + @moduledoc """ + Blocks format as returned by [`eth_getBlockByHash`](https://github.com/ethereum/wiki/wiki/JSON-RPC#eth_getblockbyhash) + from batch requests. + """ + + alias EthereumJSONRPC.Block + + def requests(id_to_params) when is_map(id_to_params) do + Enum.map(id_to_params, fn {id, %{hash: hash}} -> + Block.ByHash.request(%{id: id, hash: hash}) + end) + end +end diff --git a/apps/ethereum_jsonrpc/test/ethereum_jsonrpc_test.exs b/apps/ethereum_jsonrpc/test/ethereum_jsonrpc_test.exs index 336d0016db..f5747cf38b 100644 --- a/apps/ethereum_jsonrpc/test/ethereum_jsonrpc_test.exs +++ b/apps/ethereum_jsonrpc/test/ethereum_jsonrpc_test.exs @@ -4,7 +4,7 @@ defmodule EthereumJSONRPCTest do import EthereumJSONRPC.Case import Mox - alias EthereumJSONRPC.{FetchedBalances, FetchedBeneficiaries, Subscription} + alias EthereumJSONRPC.{Blocks, FetchedBalances, FetchedBeneficiaries, Subscription} alias EthereumJSONRPC.WebSocket.WebSocketClient setup :verify_on_exit! @@ -205,12 +205,13 @@ defmodule EthereumJSONRPCTest do end if json_rpc_named_arguments[:transport] == EthereumJSONRPC.Mox do - expect(EthereumJSONRPC.Mox, :json_rpc, fn _json, _options -> + expect(EthereumJSONRPC.Mox, :json_rpc, fn [%{id: id}], _options -> block_number = "0x0" {:ok, [ %{ + id: id, result: %{ "difficulty" => "0x0", "gasLimit" => "0x0", @@ -253,7 +254,7 @@ defmodule EthereumJSONRPCTest do end) end - assert {:ok, %{blocks: [_ | _], transactions: [_ | _]}} = + assert {:ok, %Blocks{blocks_params: [_ | _], transactions_params: [_ | _]}} = EthereumJSONRPC.fetch_blocks_by_hash([block_hash], json_rpc_named_arguments) end @@ -274,8 +275,18 @@ defmodule EthereumJSONRPCTest do end) end - assert {:error, [%{data: %{hash: "0x0"}}]} = - EthereumJSONRPC.fetch_blocks_by_hash(["0x0"], json_rpc_named_arguments) + hash = "0x0" + + assert {:ok, + %Blocks{ + errors: [ + %{ + data: %{ + hash: ^hash + } + } + ] + }} = EthereumJSONRPC.fetch_blocks_by_hash([hash], json_rpc_named_arguments) end test "full batch errors are returned", %{json_rpc_named_arguments: json_rpc_named_arguments} do diff --git a/apps/indexer/lib/indexer/block/uncle/fetcher.ex b/apps/indexer/lib/indexer/block/uncle/fetcher.ex index b57f4bd061..24e5499a29 100644 --- a/apps/indexer/lib/indexer/block/uncle/fetcher.ex +++ b/apps/indexer/lib/indexer/block/uncle/fetcher.ex @@ -6,6 +6,7 @@ defmodule Indexer.Block.Uncle.Fetcher do require Logger + alias EthereumJSONRPC.Blocks alias Explorer.Chain alias Explorer.Chain.Hash alias Indexer.{AddressExtraction, Block, BufferedTask} @@ -72,38 +73,8 @@ defmodule Indexer.Block.Uncle.Fetcher do Logger.debug(fn -> "fetching #{length(unique_hashes)} uncle blocks" end) case EthereumJSONRPC.fetch_blocks_by_hash(unique_hashes, json_rpc_named_arguments) do - {:ok, - %{ - blocks: blocks_params, - transactions: transactions_params, - block_second_degree_relations: block_second_degree_relations_params - }} -> - addresses_params = - AddressExtraction.extract_addresses(%{blocks: blocks_params, transactions: transactions_params}) - - case Block.Fetcher.import(block_fetcher, %{ - addresses: %{params: addresses_params}, - blocks: %{params: blocks_params}, - block_second_degree_relations: %{params: block_second_degree_relations_params}, - transactions: %{params: transactions_params, on_conflict: :nothing} - }) do - {:ok, _} -> - :ok - - {:error, step, failed_value, _changes_so_far} -> - Logger.error(fn -> - [ - "failed to import ", - unique_hashes |> length() |> to_string(), - "uncle blocks in step ", - inspect(step), - ": ", - inspect(failed_value) - ] - end) - - {:retry, unique_hashes} - end + {:ok, blocks} -> + run_blocks(blocks, block_fetcher, unique_hashes) {:error, reason} -> Logger.error(fn -> @@ -114,6 +85,45 @@ defmodule Indexer.Block.Uncle.Fetcher do end end + defp run_blocks(%Blocks{blocks_params: []}, _, original_entries), do: {:retry, original_entries} + + defp run_blocks( + %Blocks{ + blocks_params: blocks_params, + transactions_params: transactions_params, + block_second_degree_relations_params: block_second_degree_relations_params, + errors: errors + }, + block_fetcher, + original_entries + ) do + addresses_params = AddressExtraction.extract_addresses(%{blocks: blocks_params, transactions: transactions_params}) + + case Block.Fetcher.import(block_fetcher, %{ + addresses: %{params: addresses_params}, + blocks: %{params: blocks_params}, + block_second_degree_relations: %{params: block_second_degree_relations_params}, + transactions: %{params: transactions_params, on_conflict: :nothing} + }) do + {:ok, _} -> + retry(errors, original_entries) + + {:error, step, failed_value, _changes_so_far} -> + Logger.error(fn -> + [ + "failed to import ", + original_entries |> length() |> to_string(), + "uncle blocks in step ", + inspect(step), + ": ", + inspect(failed_value) + ] + end) + + {:retry, original_entries} + end + end + @ignored_options ~w(address_hash_to_fetched_balance_block_number transaction_hash_to_block_number)a @impl Block.Fetcher @@ -170,4 +180,42 @@ defmodule Indexer.Block.Uncle.Fetcher do %{uncle_hash: uncle_hash, index: index, hash: hash} end) end + + defp retry([], _), do: :ok + + defp retry(errors, original_entries) when is_list(errors) do + retried_entries = errors_to_entries(errors) + + Logger.error(fn -> + [ + "failed to fetch ", + retried_entries |> length() |> to_string(), + "/", + original_entries |> length() |> to_string(), + " uncles: ", + errors_to_iodata(errors) + ] + end) + end + + defp errors_to_entries(errors) when is_list(errors) do + Enum.map(errors, &error_to_entry/1) + end + + defp error_to_entry(%{data: %{hash: hash}}) when is_binary(hash), do: hash + + defp errors_to_iodata(errors) when is_list(errors) do + errors_to_iodata(errors, []) + end + + defp errors_to_iodata([], iodata), do: iodata + + defp errors_to_iodata([error | errors], iodata) do + errors_to_iodata(errors, [iodata | error_to_iodata(error)]) + end + + defp error_to_iodata(%{code: code, message: message, data: %{hash: hash}}) + when is_integer(code) and is_binary(message) and is_binary(hash) do + [hash, ": (", to_string(code), ") ", message, ?\n] + end end diff --git a/apps/indexer/lib/indexer/coin_balance/fetcher.ex b/apps/indexer/lib/indexer/coin_balance/fetcher.ex index 8b867def35..27240c5aad 100644 --- a/apps/indexer/lib/indexer/coin_balance/fetcher.ex +++ b/apps/indexer/lib/indexer/coin_balance/fetcher.ex @@ -173,6 +173,6 @@ defmodule Indexer.CoinBalance.Fetcher do data: %{block_quantity: block_quantity, hash_data: hash_data} }) when is_integer(code) and is_binary(message) and is_binary(block_quantity) and is_binary(hash_data) do - [hash_data, "@", quantity_to_integer(block_quantity), ": (", to_string(code), ") ", message] + [hash_data, "@", quantity_to_integer(block_quantity), ": (", to_string(code), ") ", message, ?\n] end end diff --git a/apps/indexer/test/indexer/block/uncle/fetcher_test.exs b/apps/indexer/test/indexer/block/uncle/fetcher_test.exs index 5c70683aa7..0bb2b0a915 100644 --- a/apps/indexer/test/indexer/block/uncle/fetcher_test.exs +++ b/apps/indexer/test/indexer/block/uncle/fetcher_test.exs @@ -51,12 +51,13 @@ defmodule Indexer.Block.Uncle.FetcherTest do uncle_uncle_hash_data = to_string(block_hash()) EthereumJSONRPC.Mox - |> expect(:json_rpc, fn [%{method: "eth_getBlockByHash", params: [^uncle_hash_data, true]}], _ -> + |> expect(:json_rpc, fn [%{id: id, method: "eth_getBlockByHash", params: [^uncle_hash_data, true]}], _ -> number_quantity = "0x0" {:ok, [ %{ + id: id, result: %{ "author" => "0xe2ac1c6843a33f81ae4935e5ef1277a392990381", "difficulty" => "0xfffffffffffffffffffffffffffffffe", From 3499155cdba64241f6a8c72b16f4cd06d81afe0a Mon Sep 17 00:00:00 2001 From: Luke Imhoff Date: Mon, 26 Nov 2018 11:15:54 -0600 Subject: [PATCH 4/5] Partial retries for fetch_blocks_by_range Means that both Block.Fetchers, realtime and catchup, now handle fetch errors. The Block.Catchup.Fetcher will retry those blocks that failed to fetch while importing the rest that did fetch instead of refetching the whole batch. --- apps/ethereum_jsonrpc/lib/ethereum_jsonrpc.ex | 98 +++------------ .../lib/ethereum_jsonrpc/block.ex | 16 ++- .../lib/ethereum_jsonrpc/block/by_number.ex | 11 ++ .../lib/ethereum_jsonrpc/blocks.ex | 8 ++ .../lib/ethereum_jsonrpc/blocks/by_hash.ex | 14 --- .../test/ethereum_jsonrpc_test.exs | 76 ++++++++++-- .../lib/indexer/block/catchup/fetcher.ex | 61 +++++++-- apps/indexer/lib/indexer/block/fetcher.ex | 33 ++--- .../lib/indexer/block/realtime/fetcher.ex | 13 +- .../bound_interval_supervisor_test.exs | 2 + .../test/indexer/block/fetcher_test.exs | 116 +++++++++--------- .../indexer/block/realtime/fetcher_test.exs | 85 ++++++------- 12 files changed, 305 insertions(+), 228 deletions(-) create mode 100644 apps/ethereum_jsonrpc/lib/ethereum_jsonrpc/block/by_number.ex delete mode 100644 apps/ethereum_jsonrpc/lib/ethereum_jsonrpc/blocks/by_hash.ex diff --git a/apps/ethereum_jsonrpc/lib/ethereum_jsonrpc.ex b/apps/ethereum_jsonrpc/lib/ethereum_jsonrpc.ex index b764c4d406..6b7493ec9a 100644 --- a/apps/ethereum_jsonrpc/lib/ethereum_jsonrpc.ex +++ b/apps/ethereum_jsonrpc/lib/ethereum_jsonrpc.ex @@ -26,14 +26,13 @@ defmodule EthereumJSONRPC do """ alias EthereumJSONRPC.{ + Block, Blocks, FetchedBalances, Receipts, RequestCoordinator, Subscription, - Transactions, Transport, - Uncles, Variant } @@ -87,11 +86,6 @@ defmodule EthereumJSONRPC do {:transport, Transport.t()} | {:transport_options, Transport.options()} | {:variant, Variant.t()} ] - @typedoc """ - If there are more blocks. - """ - @type next :: :end_of_chain | :more - @typedoc """ 8 byte [KECCAK-256](https://en.wikipedia.org/wiki/SHA-3) hash of the proof-of-work. """ @@ -214,41 +208,21 @@ defmodule EthereumJSONRPC do Transaction data is included for each block. """ + @spec fetch_blocks_by_hash([hash()], json_rpc_named_arguments) :: {:ok, Blocks.t()} | {:error, reason :: term} def fetch_blocks_by_hash(block_hashes, json_rpc_named_arguments) do - id_to_params = - block_hashes - |> Enum.map(fn block_hash -> %{hash: block_hash} end) - |> id_to_params() - - with {:ok, responses} <- - id_to_params - |> Blocks.ByHash.requests() - |> json_rpc(json_rpc_named_arguments) do - {:ok, Blocks.from_responses(responses, id_to_params)} - end + block_hashes + |> Enum.map(fn block_hash -> %{hash: block_hash} end) + |> fetch_blocks_by_params(&Block.ByHash.request/1, json_rpc_named_arguments) end @doc """ Fetches blocks by block number range. """ - @spec fetch_blocks_by_range(Range.t(), json_rpc_named_arguments) :: - {:ok, next, - %{ - blocks: Blocks.params(), - block_second_degree_relations: Uncles.params(), - transactions: Transactions.params() - }} - | {:error, [reason :: term, ...]} + @spec fetch_blocks_by_range(Range.t(), json_rpc_named_arguments) :: {:ok, Blocks.t()} | {:error, reason :: term} def fetch_blocks_by_range(_first.._last = range, json_rpc_named_arguments) do - id_to_params = - range - |> Enum.map(fn number -> %{number: number} end) - |> id_to_params() - - id_to_params - |> get_block_by_number_requests() - |> json_rpc(json_rpc_named_arguments) - |> handle_get_blocks(id_to_params) + range + |> Enum.map(fn number -> %{number: number} end) + |> fetch_blocks_by_params(&Block.ByNumber.request/1, json_rpc_named_arguments) end @doc """ @@ -409,10 +383,16 @@ defmodule EthereumJSONRPC do |> Timex.from_unix() end - defp get_block_by_number_requests(id_to_params) do - Enum.map(id_to_params, fn {id, %{number: number}} -> - get_block_by_number_request(%{id: id, quantity: number, transactions: :full}) - end) + defp fetch_blocks_by_params(params, request, json_rpc_named_arguments) + when is_list(params) and is_function(request, 1) do + id_to_params = id_to_params(params) + + with {:ok, responses} <- + id_to_params + |> Blocks.requests(request) + |> json_rpc(json_rpc_named_arguments) do + {:ok, Blocks.from_responses(responses, id_to_params)} + end end defp get_block_by_number_request(%{id: id} = options) do @@ -445,46 +425,6 @@ defmodule EthereumJSONRPC do end end - defp handle_get_blocks({:ok, results}, id_to_params) when is_list(results) do - with {:ok, next, blocks} <- reduce_results(results, id_to_params) do - elixir_blocks = Blocks.to_elixir(blocks) - - elixir_uncles = Blocks.elixir_to_uncles(elixir_blocks) - elixir_transactions = Blocks.elixir_to_transactions(elixir_blocks) - - block_second_degree_relations_params = Uncles.elixir_to_params(elixir_uncles) - transactions_params = Transactions.elixir_to_params(elixir_transactions) - blocks_params = Blocks.elixir_to_params(elixir_blocks) - - {:ok, next, - %{ - blocks: blocks_params, - block_second_degree_relations: block_second_degree_relations_params, - transactions: transactions_params - }} - end - end - - defp handle_get_blocks({:error, _} = error, _id_to_params), do: error - - defp reduce_results(results, id_to_params) do - Enum.reduce(results, {:ok, :more, []}, &reduce_result(&1, &2, id_to_params)) - end - - defp reduce_result(%{result: nil}, {:ok, _, blocks}, _id_to_params), do: {:ok, :end_of_chain, blocks} - defp reduce_result(%{result: %{} = block}, {:ok, next, blocks}, _id_to_params), do: {:ok, next, [block | blocks]} - defp reduce_result(%{result: _}, {:error, _} = error, _id_to_params), do: error - - defp reduce_result(%{error: reason, id: id}, acc, id_to_params) do - data = Map.fetch!(id_to_params, id) - annotated_reason = Map.put(reason, :data, data) - - case acc do - {:ok, _, _} -> {:error, [annotated_reason]} - {:error, reasons} -> {:error, [annotated_reason | reasons]} - end - end - defp handle_get_block_by_tag({:ok, %{"number" => nil}}), do: {:error, :not_found} defp handle_get_block_by_tag({:ok, %{"number" => quantity}}) when is_binary(quantity) do diff --git a/apps/ethereum_jsonrpc/lib/ethereum_jsonrpc/block.ex b/apps/ethereum_jsonrpc/lib/ethereum_jsonrpc/block.ex index 58c42b6272..06ebe479df 100644 --- a/apps/ethereum_jsonrpc/lib/ethereum_jsonrpc/block.ex +++ b/apps/ethereum_jsonrpc/lib/ethereum_jsonrpc/block.ex @@ -68,17 +68,21 @@ defmodule EthereumJSONRPC.Block do """ @type t :: %{String.t() => EthereumJSONRPC.data() | EthereumJSONRPC.hash() | EthereumJSONRPC.quantity() | nil} - def from_response(%{id: id, result: %{"hash" => hash} = block}, id_to_params) when is_map(id_to_params) do - # `^` verifies returned hash matches sent hash - %{hash: ^hash} = Map.fetch!(id_to_params, id) + def from_response(%{id: id, result: nil}, id_to_params) when is_map(id_to_params) do + params = Map.fetch!(id_to_params, id) + + {:error, %{code: 404, message: "Not Found", data: params}} + end + + def from_response(%{id: id, result: block}, id_to_params) when is_map(id_to_params) do + true = Map.has_key?(id_to_params, id) {:ok, block} end def from_response(%{id: id, error: error}, id_to_params) when is_map(id_to_params) do - %{hash: hash} = Map.fetch!(id_to_params, id) - - annotated_error = Map.put(error, :data, %{hash: hash}) + params = Map.fetch!(id_to_params, id) + annotated_error = Map.put(error, :data, params) {:error, annotated_error} end diff --git a/apps/ethereum_jsonrpc/lib/ethereum_jsonrpc/block/by_number.ex b/apps/ethereum_jsonrpc/lib/ethereum_jsonrpc/block/by_number.ex new file mode 100644 index 0000000000..80ec9b1cd1 --- /dev/null +++ b/apps/ethereum_jsonrpc/lib/ethereum_jsonrpc/block/by_number.ex @@ -0,0 +1,11 @@ +defmodule EthereumJSONRPC.Block.ByNumber do + @moduledoc """ + Block format as returned by [`eth_getBlockByNumber`](https://github.com/ethereum/wiki/wiki/JSON-RPC#eth_getblockbyhash) + """ + + import EthereumJSONRPC, only: [integer_to_quantity: 1] + + def request(%{id: id, number: number}) do + EthereumJSONRPC.request(%{id: id, method: "eth_getBlockByNumber", params: [integer_to_quantity(number), true]}) + end +end diff --git a/apps/ethereum_jsonrpc/lib/ethereum_jsonrpc/blocks.ex b/apps/ethereum_jsonrpc/lib/ethereum_jsonrpc/blocks.ex index 9c56c4a538..22bb74faf6 100644 --- a/apps/ethereum_jsonrpc/lib/ethereum_jsonrpc/blocks.ex +++ b/apps/ethereum_jsonrpc/lib/ethereum_jsonrpc/blocks.ex @@ -20,6 +20,14 @@ defmodule EthereumJSONRPC.Blocks do transactions_params: [], errors: [] + def requests(id_to_params, request) when is_map(id_to_params) and is_function(request, 1) do + Enum.map(id_to_params, fn {id, params} -> + params + |> Map.put(:id, id) + |> request.() + end) + end + @spec from_responses(list(), map()) :: t() def from_responses(responses, id_to_params) when is_list(responses) and is_map(id_to_params) do %{errors: errors, blocks: blocks} = diff --git a/apps/ethereum_jsonrpc/lib/ethereum_jsonrpc/blocks/by_hash.ex b/apps/ethereum_jsonrpc/lib/ethereum_jsonrpc/blocks/by_hash.ex deleted file mode 100644 index 97b455840e..0000000000 --- a/apps/ethereum_jsonrpc/lib/ethereum_jsonrpc/blocks/by_hash.ex +++ /dev/null @@ -1,14 +0,0 @@ -defmodule EthereumJSONRPC.Blocks.ByHash do - @moduledoc """ - Blocks format as returned by [`eth_getBlockByHash`](https://github.com/ethereum/wiki/wiki/JSON-RPC#eth_getblockbyhash) - from batch requests. - """ - - alias EthereumJSONRPC.Block - - def requests(id_to_params) when is_map(id_to_params) do - Enum.map(id_to_params, fn {id, %{hash: hash}} -> - Block.ByHash.request(%{id: id, hash: hash}) - end) - end -end diff --git a/apps/ethereum_jsonrpc/test/ethereum_jsonrpc_test.exs b/apps/ethereum_jsonrpc/test/ethereum_jsonrpc_test.exs index f5747cf38b..1f99bd8697 100644 --- a/apps/ethereum_jsonrpc/test/ethereum_jsonrpc_test.exs +++ b/apps/ethereum_jsonrpc/test/ethereum_jsonrpc_test.exs @@ -329,15 +329,29 @@ defmodule EthereumJSONRPCTest do end) end - assert {:error, - [%{data: %{number: 1_000_000_000_000_000_000_001}}, %{data: %{number: 1_000_000_000_000_000_000_000}}]} = + assert {:ok, + %EthereumJSONRPC.Blocks{ + block_second_degree_relations_params: [], + blocks_params: [], + errors: [ + %{ + data: %{number: 1_000_000_000_000_000_000_001} + }, + %{ + data: %{number: 1_000_000_000_000_000_000_000} + } + ], + transactions_params: [] + }} = EthereumJSONRPC.fetch_blocks_by_range( 1_000_000_000_000_000_000_000..1_000_000_000_000_000_000_001, json_rpc_named_arguments ) end - test "returns only errors if a mix of results and errors", %{json_rpc_named_arguments: json_rpc_named_arguments} do + test "returns only errors and results if a mix of results and errors", %{ + json_rpc_named_arguments: json_rpc_named_arguments + } do # Can't be faked reliably on real chain moxed_json_rpc_named_arguments = Keyword.put(json_rpc_named_arguments, :transport, EthereumJSONRPC.Mox) @@ -356,30 +370,71 @@ defmodule EthereumJSONRPCTest do id: 1, result: %{ "difficulty" => "0x0", + "extraData" => "0x", "gasLimit" => "0x0", "gasUsed" => "0x0", "hash" => "0x0", + "logsBloom" => "0x", "miner" => "0x0", "number" => "0x0", "parentHash" => "0x0", + "receiptsRoot" => "0x0", + "sha3Uncles" => "0x0", "size" => "0x0", + "stateRoot" => "0x0", "timestamp" => "0x0", "totalDifficulty" => "0x0", - "transactions" => [] + "transactions" => [], + "transactionsRoot" => [], + "uncles" => [] }, jsonrpc: "2.0" } ]} end) - assert {:error, [%{data: %{number: 1_000_000_000_000_000_000_000}}]} = + assert {:ok, + %EthereumJSONRPC.Blocks{ + block_second_degree_relations_params: [], + blocks_params: [ + %{ + difficulty: 0, + extra_data: "0x", + gas_limit: 0, + gas_used: 0, + hash: "0x0", + logs_bloom: "0x", + miner_hash: "0x0", + mix_hash: "0x0", + nonce: 0, + number: 0, + parent_hash: "0x0", + receipts_root: "0x0", + sha3_uncles: "0x0", + size: 0, + state_root: "0x0", + timestamp: _, + total_difficulty: 0, + transactions_root: [], + uncles: [] + } + ], + errors: [ + %{ + code: -32602, + data: %{number: 1_000_000_000_000_000_000_000}, + message: "Invalid params: Invalid block number: number too large to fit in target type." + } + ], + transactions_params: [] + }} = EthereumJSONRPC.fetch_blocks_by_range( 1_000_000_000_000_000_000_000..1_000_000_000_000_000_000_001, moxed_json_rpc_named_arguments ) end - test "nil result indicated end-of-chain", %{json_rpc_named_arguments: json_rpc_named_arguments} do + test "nil result indicated error code 404", %{json_rpc_named_arguments: json_rpc_named_arguments} do # Can't be faked reliably on real chain moxed_json_rpc_named_arguments = Keyword.put(json_rpc_named_arguments, :transport, EthereumJSONRPC.Mox) @@ -418,8 +473,13 @@ defmodule EthereumJSONRPCTest do ]} end) - assert {:ok, :end_of_chain, %{blocks: [_], transactions: []}} = - EthereumJSONRPC.fetch_blocks_by_range(0..1, moxed_json_rpc_named_arguments) + assert {:ok, + %EthereumJSONRPC.Blocks{ + block_second_degree_relations_params: [], + blocks_params: [%{}], + errors: [%{code: 404, data: %{number: 1}, message: "Not Found"}], + transactions_params: [] + }} = EthereumJSONRPC.fetch_blocks_by_range(0..1, moxed_json_rpc_named_arguments) end end diff --git a/apps/indexer/lib/indexer/block/catchup/fetcher.ex b/apps/indexer/lib/indexer/block/catchup/fetcher.ex index c35d610ef8..79611270aa 100644 --- a/apps/indexer/lib/indexer/block/catchup/fetcher.ex +++ b/apps/indexer/lib/indexer/block/catchup/fetcher.ex @@ -166,9 +166,11 @@ defmodule Indexer.Block.Catchup.Fetcher do sequence ) do case fetch_and_import_range(block_fetcher, range) do - {:ok, {inserted, next}} -> - cap_seq(sequence, next, range) - {:ok, inserted} + {:ok, %{inserted: inserted, errors: errors}} -> + errors = cap_seq(sequence, errors, range) + retry(sequence, errors) + + {:ok, inserted: inserted} {:error, {step, reason}} = error -> Logger.error(fn -> @@ -200,19 +202,27 @@ defmodule Indexer.Block.Catchup.Fetcher do end end - defp cap_seq(seq, next, range) do - case next do - :more -> + defp cap_seq(seq, errors, range) do + {not_founds, other_errors} = + Enum.split_with(errors, fn + %{code: 404, data: %{number: _}} -> true + _ -> false + end) + + case not_founds do + [] -> Logger.debug(fn -> first_block_number..last_block_number = range "got blocks #{first_block_number} - #{last_block_number}" end) - :end_of_chain -> + other_errors + + _ -> Sequence.cap(seq) end - :ok + other_errors end defp push_back(sequence, range) do @@ -222,6 +232,41 @@ defmodule Indexer.Block.Catchup.Fetcher do end end + defp retry(sequence, errors) when is_list(errors) do + errors + |> errors_to_ranges() + |> Enum.map(&push_back(sequence, &1)) + end + + defp errors_to_ranges(errors) when is_list(errors) do + errors + |> Enum.flat_map(&error_to_numbers/1) + |> numbers_to_ranges() + end + + defp error_to_numbers(%{data: %{number: number}}) when is_integer(number), do: [number] + + defp numbers_to_ranges([]), do: [] + + defp numbers_to_ranges(numbers) when is_list(numbers) do + numbers + |> Enum.sort() + |> Enum.chunk_while( + nil, + fn + number, nil -> + {:cont, number..number} + + number, first..last when number == last + 1 -> + {:cont, first..number} + + number, range -> + {:cont, range, number..number} + end, + fn range -> {:cont, range} end + ) + end + defp put_memory_monitor(sequence_options, %__MODULE__{memory_monitor: nil}) when is_list(sequence_options), do: sequence_options diff --git a/apps/indexer/lib/indexer/block/fetcher.ex b/apps/indexer/lib/indexer/block/fetcher.ex index 8450d809b6..5e7585b59c 100644 --- a/apps/indexer/lib/indexer/block/fetcher.ex +++ b/apps/indexer/lib/indexer/block/fetcher.ex @@ -5,7 +5,7 @@ defmodule Indexer.Block.Fetcher do require Logger - alias EthereumJSONRPC.FetchedBeneficiaries + alias EthereumJSONRPC.{Blocks, FetchedBeneficiaries} alias Explorer.Chain.{Address, Block, Import} alias Indexer.{AddressExtraction, CoinBalance, MintTransfer, Token, TokenTransfers} alias Indexer.Address.{CoinBalances, TokenBalances} @@ -77,7 +77,7 @@ defmodule Indexer.Block.Fetcher do end @spec fetch_and_import_range(t, Range.t()) :: - {:ok, {inserted :: %{}, next :: :more | :end_of_chain}} + {:ok, %{inserted: %{}, errors: [EthereumJSONRPC.Transport.error()]}} | {:error, {step :: atom(), reason :: term()} | [%Ecto.Changeset{}] @@ -91,20 +91,22 @@ defmodule Indexer.Block.Fetcher do _.._ = range ) when callback_module != nil do - with {:blocks, {:ok, next, result}} <- - {:blocks, EthereumJSONRPC.fetch_blocks_by_range(range, json_rpc_named_arguments)}, - %{ - blocks: blocks, - transactions: transactions_without_receipts, - block_second_degree_relations: block_second_degree_relations - } = result, - blocks = Transform.transform_blocks(blocks), - {:receipts, {:ok, receipt_params}} <- {:receipts, Receipts.fetch(state, transactions_without_receipts)}, + with {:blocks, + {:ok, + %Blocks{ + blocks_params: blocks_params, + transactions_params: transactions_params_without_receipts, + block_second_degree_relations_params: block_second_degree_relations_params, + errors: blocks_errors + }}} <- {:blocks, EthereumJSONRPC.fetch_blocks_by_range(range, json_rpc_named_arguments)}, + blocks = Transform.transform_blocks(blocks_params), + {:receipts, {:ok, receipt_params}} <- {:receipts, Receipts.fetch(state, transactions_params_without_receipts)}, %{logs: logs, receipts: receipts} = receipt_params, - transactions_with_receipts = Receipts.put(transactions_without_receipts, receipts), + transactions_with_receipts = Receipts.put(transactions_params_without_receipts, receipts), %{token_transfers: token_transfers, tokens: tokens} = TokenTransfers.parse(logs), %{mint_transfers: mint_transfers} = MintTransfer.parse(logs), - {:beneficiaries, {:ok, %FetchedBeneficiaries{params_set: beneficiary_params_set}}} <- + {:beneficiaries, + {:ok, %FetchedBeneficiaries{params_set: beneficiary_params_set, errors: beneficiaries_errors}}} <- fetch_beneficiaries(range, json_rpc_named_arguments), addresses = AddressExtraction.extract_addresses(%{ @@ -132,16 +134,15 @@ defmodule Indexer.Block.Fetcher do address_coin_balances: %{params: coin_balances_params_set}, address_token_balances: %{params: address_token_balances}, blocks: %{params: blocks}, - block_second_degree_relations: %{params: block_second_degree_relations}, + block_second_degree_relations: %{params: block_second_degree_relations_params}, logs: %{params: logs}, token_transfers: %{params: token_transfers}, tokens: %{on_conflict: :nothing, params: tokens}, transactions: %{params: transactions_with_receipts} } ) do - {:ok, {inserted, next}} + {:ok, %{inserted: inserted, errors: blocks_errors ++ beneficiaries_errors}} else - {:beneficiaries = step, {:ok, %FetchedBeneficiaries{errors: [_ | _] = errors}}} -> {:error, {step, errors}} {step, {:error, reason}} -> {:error, {step, reason}} {:error, :timeout} = error -> error {:error, changesets} = error when is_list(changesets) -> error diff --git a/apps/indexer/lib/indexer/block/realtime/fetcher.ex b/apps/indexer/lib/indexer/block/realtime/fetcher.ex index 1b3151006b..e1667679af 100644 --- a/apps/indexer/lib/indexer/block/realtime/fetcher.ex +++ b/apps/indexer/lib/indexer/block/realtime/fetcher.ex @@ -127,11 +127,22 @@ defmodule Indexer.Block.Realtime.Fetcher do def fetch_and_import_block(block_number_to_fetch, block_fetcher, retry \\ 3) do case fetch_and_import_range(block_fetcher, block_number_to_fetch..block_number_to_fetch) do - {:ok, {_inserted, _next}} -> + {:ok, %{inserted: _, errors: []}} -> Logger.debug(fn -> ["realtime indexer fetched and imported block ", to_string(block_number_to_fetch)] end) + {:ok, %{inserted: _, errors: [_ | _] = errors}} -> + Logger.error(fn -> + [ + "realtime indexer failed to fetch block", + to_string(block_number_to_fetch), + ": ", + inspect(errors), + ". Block will be retried by catchup indexer." + ] + end) + {:error, {step, reason}} -> Logger.error(fn -> [ diff --git a/apps/indexer/test/indexer/block/catchup/bound_interval_supervisor_test.exs b/apps/indexer/test/indexer/block/catchup/bound_interval_supervisor_test.exs index b94b5f1c7d..631f74c87d 100644 --- a/apps/indexer/test/indexer/block/catchup/bound_interval_supervisor_test.exs +++ b/apps/indexer/test/indexer/block/catchup/bound_interval_supervisor_test.exs @@ -22,6 +22,8 @@ defmodule Indexer.Block.Catchup.BoundIntervalSupervisorTest do # See https://github.com/poanetwork/blockscout/issues/597 @tag :no_geth test "starts fetching blocks from latest and goes down", %{json_rpc_named_arguments: json_rpc_named_arguments} do + Logger.configure(truncate: :infinity) + if json_rpc_named_arguments[:transport] == EthereumJSONRPC.Mox do case Keyword.fetch!(json_rpc_named_arguments, :variant) do EthereumJSONRPC.Parity -> diff --git a/apps/indexer/test/indexer/block/fetcher_test.exs b/apps/indexer/test/indexer/block/fetcher_test.exs index 8c8a959b35..0328abf028 100644 --- a/apps/indexer/test/indexer/block/fetcher_test.exs +++ b/apps/indexer/test/indexer/block/fetcher_test.exs @@ -214,10 +214,13 @@ defmodule Indexer.Block.FetcherTest do fn -> Fetcher.fetch_and_import_range(block_fetcher, block_number..block_number) end, fn result -> assert {:ok, - {%{ - addresses: [%Address{hash: ^address_hash}], - blocks: [%Chain.Block{hash: ^block_hash}] - }, :more}} = result + %{ + inserted: %{ + addresses: [%Address{hash: ^address_hash}], + blocks: [%Chain.Block{hash: ^block_hash}] + }, + errors: [] + }} = result wait_for_tasks(InternalTransaction.Fetcher) wait_for_tasks(CoinBalance.Fetcher) @@ -506,57 +509,60 @@ defmodule Indexer.Block.FetcherTest do EthereumJSONRPC.Parity -> assert {:ok, - {%{ - addresses: [ - %Address{ - hash: - %Explorer.Chain.Hash{ - byte_count: 20, - bytes: - <<139, 243, 141, 71, 100, 146, 144, 100, 242, 212, 211, 165, 101, 32, 167, 106, 179, 223, - 65, 91>> - } = first_address_hash - }, - %Address{ - hash: - %Explorer.Chain.Hash{ - byte_count: 20, - bytes: - <<232, 221, 197, 199, 162, 210, 240, 215, 169, 121, 132, 89, 192, 16, 79, 223, 94, 152, - 122, 202>> - } = second_address_hash - } - ], - blocks: [ - %Chain.Block{ - hash: %Explorer.Chain.Hash{ - byte_count: 32, - bytes: - <<246, 180, 184, 200, 141, 243, 235, 210, 82, 236, 71, 99, 40, 51, 77, 192, 38, 207, 102, - 96, 106, 132, 251, 118, 155, 61, 60, 188, 204, 132, 113, 189>> - } - } - ], - logs: [ - %Log{ - index: 0, - transaction_hash: %Explorer.Chain.Hash{ - byte_count: 32, - bytes: - <<83, 189, 136, 72, 114, 222, 62, 72, 134, 146, 136, 27, 174, 236, 38, 46, 123, 149, 35, - 77, 57, 101, 36, 140, 57, 254, 153, 47, 255, 212, 51, 229>> - } - } - ], - transactions: [ - %Explorer.Chain.Hash{ - byte_count: 32, - bytes: - <<83, 189, 136, 72, 114, 222, 62, 72, 134, 146, 136, 27, 174, 236, 38, 46, 123, 149, 35, 77, - 57, 101, 36, 140, 57, 254, 153, 47, 255, 212, 51, 229>> - } - ] - }, :more}} = Fetcher.fetch_and_import_range(block_fetcher, block_number..block_number) + %{ + inserted: %{ + addresses: [ + %Address{ + hash: + %Explorer.Chain.Hash{ + byte_count: 20, + bytes: + <<139, 243, 141, 71, 100, 146, 144, 100, 242, 212, 211, 165, 101, 32, 167, 106, 179, + 223, 65, 91>> + } = first_address_hash + }, + %Address{ + hash: + %Explorer.Chain.Hash{ + byte_count: 20, + bytes: + <<232, 221, 197, 199, 162, 210, 240, 215, 169, 121, 132, 89, 192, 16, 79, 223, 94, 152, + 122, 202>> + } = second_address_hash + } + ], + blocks: [ + %Chain.Block{ + hash: %Explorer.Chain.Hash{ + byte_count: 32, + bytes: + <<246, 180, 184, 200, 141, 243, 235, 210, 82, 236, 71, 99, 40, 51, 77, 192, 38, 207, 102, + 96, 106, 132, 251, 118, 155, 61, 60, 188, 204, 132, 113, 189>> + } + } + ], + logs: [ + %Log{ + index: 0, + transaction_hash: %Explorer.Chain.Hash{ + byte_count: 32, + bytes: + <<83, 189, 136, 72, 114, 222, 62, 72, 134, 146, 136, 27, 174, 236, 38, 46, 123, 149, 35, + 77, 57, 101, 36, 140, 57, 254, 153, 47, 255, 212, 51, 229>> + } + } + ], + transactions: [ + %Explorer.Chain.Hash{ + byte_count: 32, + bytes: + <<83, 189, 136, 72, 114, 222, 62, 72, 134, 146, 136, 27, 174, 236, 38, 46, 123, 149, 35, 77, + 57, 101, 36, 140, 57, 254, 153, 47, 255, 212, 51, 229>> + } + ] + }, + errors: [] + }} = Fetcher.fetch_and_import_range(block_fetcher, block_number..block_number) wait_for_tasks(InternalTransaction.Fetcher) wait_for_tasks(CoinBalance.Fetcher) diff --git a/apps/indexer/test/indexer/block/realtime/fetcher_test.exs b/apps/indexer/test/indexer/block/realtime/fetcher_test.exs index 4e2a1ed692..f16a1d9124 100644 --- a/apps/indexer/test/indexer/block/realtime/fetcher_test.exs +++ b/apps/indexer/test/indexer/block/realtime/fetcher_test.exs @@ -370,47 +370,50 @@ defmodule Indexer.Block.Realtime.FetcherTest do end assert {:ok, - {%{ - addresses: [ - %Address{hash: first_address_hash, fetched_coin_balance_block_number: 3_946_079}, - %Address{hash: second_address_hash, fetched_coin_balance_block_number: 3_946_079}, - %Address{hash: third_address_hash, fetched_coin_balance_block_number: 3_946_079}, - %Address{hash: fourth_address_hash, fetched_coin_balance_block_number: 3_946_080}, - %Address{hash: fifth_address_hash, fetched_coin_balance_block_number: 3_946_079} - ], - address_coin_balances: [ - %{ - address_hash: first_address_hash, - block_number: 3_946_079 - }, - %{ - address_hash: second_address_hash, - block_number: 3_946_079 - }, - %{ - address_hash: third_address_hash, - block_number: 3_946_079 - }, - %{ - address_hash: fourth_address_hash, - block_number: 3_946_080 - }, - %{ - address_hash: fifth_address_hash, - block_number: 3_946_079 - } - ], - blocks: [%Chain.Block{number: 3_946_079}, %Chain.Block{number: 3_946_080}], - internal_transactions: [ - %{index: 0, transaction_hash: transaction_hash}, - %{index: 1, transaction_hash: transaction_hash}, - %{index: 2, transaction_hash: transaction_hash}, - %{index: 3, transaction_hash: transaction_hash}, - %{index: 4, transaction_hash: transaction_hash}, - %{index: 5, transaction_hash: transaction_hash} - ], - transactions: [transaction_hash] - }, :more}} = Indexer.Block.Fetcher.fetch_and_import_range(block_fetcher, 3_946_079..3_946_080) + %{ + inserted: %{ + addresses: [ + %Address{hash: first_address_hash, fetched_coin_balance_block_number: 3_946_079}, + %Address{hash: second_address_hash, fetched_coin_balance_block_number: 3_946_079}, + %Address{hash: third_address_hash, fetched_coin_balance_block_number: 3_946_079}, + %Address{hash: fourth_address_hash, fetched_coin_balance_block_number: 3_946_080}, + %Address{hash: fifth_address_hash, fetched_coin_balance_block_number: 3_946_079} + ], + address_coin_balances: [ + %{ + address_hash: first_address_hash, + block_number: 3_946_079 + }, + %{ + address_hash: second_address_hash, + block_number: 3_946_079 + }, + %{ + address_hash: third_address_hash, + block_number: 3_946_079 + }, + %{ + address_hash: fourth_address_hash, + block_number: 3_946_080 + }, + %{ + address_hash: fifth_address_hash, + block_number: 3_946_079 + } + ], + blocks: [%Chain.Block{number: 3_946_079}, %Chain.Block{number: 3_946_080}], + internal_transactions: [ + %{index: 0, transaction_hash: transaction_hash}, + %{index: 1, transaction_hash: transaction_hash}, + %{index: 2, transaction_hash: transaction_hash}, + %{index: 3, transaction_hash: transaction_hash}, + %{index: 4, transaction_hash: transaction_hash}, + %{index: 5, transaction_hash: transaction_hash} + ], + transactions: [transaction_hash] + }, + errors: [] + }} = Indexer.Block.Fetcher.fetch_and_import_range(block_fetcher, 3_946_079..3_946_080) end end end From a599aaff063024d8841215284ce5ac199d785206 Mon Sep 17 00:00:00 2001 From: Luke Imhoff Date: Mon, 26 Nov 2018 11:42:12 -0600 Subject: [PATCH 5/5] Extract EthereumJSONRPC.Block.ByTag --- apps/ethereum_jsonrpc/lib/ethereum_jsonrpc.ex | 49 ++----------------- .../lib/ethereum_jsonrpc/block/by_tag.ex | 25 ++++++++++ 2 files changed, 28 insertions(+), 46 deletions(-) create mode 100644 apps/ethereum_jsonrpc/lib/ethereum_jsonrpc/block/by_tag.ex diff --git a/apps/ethereum_jsonrpc/lib/ethereum_jsonrpc.ex b/apps/ethereum_jsonrpc/lib/ethereum_jsonrpc.ex index 6b7493ec9a..ca7af965da 100644 --- a/apps/ethereum_jsonrpc/lib/ethereum_jsonrpc.ex +++ b/apps/ethereum_jsonrpc/lib/ethereum_jsonrpc.ex @@ -238,10 +238,10 @@ defmodule EthereumJSONRPC do @spec fetch_block_number_by_tag(tag(), json_rpc_named_arguments) :: {:ok, non_neg_integer()} | {:error, reason :: :invalid_tag | :not_found | term()} def fetch_block_number_by_tag(tag, json_rpc_named_arguments) when tag in ~w(earliest latest pending) do - tag - |> get_block_by_tag_request() + %{id: 0, tag: tag} + |> Block.ByTag.request() |> json_rpc(json_rpc_named_arguments) - |> handle_get_block_by_tag() + |> Block.ByTag.number_from_result() end @doc """ @@ -394,47 +394,4 @@ defmodule EthereumJSONRPC do {:ok, Blocks.from_responses(responses, id_to_params)} end end - - defp get_block_by_number_request(%{id: id} = options) do - request(%{id: id, method: "eth_getBlockByNumber", params: get_block_by_number_params(options)}) - end - - defp get_block_by_tag_request(tag) do - # eth_getBlockByNumber accepts either a number OR a tag - get_block_by_number_request(%{id: 0, tag: tag, transactions: :hashes}) - end - - defp get_block_by_number_params(options) do - [get_block_by_number_subject(options), get_block_transactions(options)] - end - - defp get_block_by_number_subject(options) do - case {Map.fetch(options, :quantity), Map.fetch(options, :tag)} do - {{:ok, integer}, :error} when is_integer(integer) -> - integer_to_quantity(integer) - - {:error, {:ok, tag}} -> - tag - end - end - - defp get_block_transactions(%{transactions: transactions}) do - case transactions do - :full -> true - :hashes -> false - end - end - - defp handle_get_block_by_tag({:ok, %{"number" => nil}}), do: {:error, :not_found} - - defp handle_get_block_by_tag({:ok, %{"number" => quantity}}) when is_binary(quantity) do - {:ok, quantity_to_integer(quantity)} - end - - # https://github.com/paritytech/parity-ethereum/pull/8281 fixed - # https://github.com/paritytech/parity-ethereum/issues/8028 - defp handle_get_block_by_tag({:ok, nil}), do: {:error, :not_found} - - defp handle_get_block_by_tag({:error, %{"code" => -32602}}), do: {:error, :invalid_tag} - defp handle_get_block_by_tag({:error, _} = error), do: error end diff --git a/apps/ethereum_jsonrpc/lib/ethereum_jsonrpc/block/by_tag.ex b/apps/ethereum_jsonrpc/lib/ethereum_jsonrpc/block/by_tag.ex new file mode 100644 index 0000000000..1c9d57c493 --- /dev/null +++ b/apps/ethereum_jsonrpc/lib/ethereum_jsonrpc/block/by_tag.ex @@ -0,0 +1,25 @@ +defmodule EthereumJSONRPC.Block.ByTag do + @moduledoc """ + Block format returned by [`eth_getBlockByNumber`](https://github.com/ethereum/wiki/wiki/JSON-RPC#eth_getblockbyhash) + when used with a semantic tag name instead of a number. + """ + + import EthereumJSONRPC, only: [quantity_to_integer: 1] + + def request(%{id: id, tag: tag}) when is_binary(tag) do + EthereumJSONRPC.request(%{id: id, method: "eth_getBlockByNumber", params: [tag, false]}) + end + + def number_from_result({:ok, %{"number" => nil}}), do: {:error, :not_found} + + def number_from_result({:ok, %{"number" => quantity}}) when is_binary(quantity) do + {:ok, quantity_to_integer(quantity)} + end + + # https://github.com/paritytech/parity-ethereum/pull/8281 fixed + # https://github.com/paritytech/parity-ethereum/issues/8028 + def number_from_result({:ok, nil}), do: {:error, :not_found} + + def number_from_result({:error, %{"code" => -32602}}), do: {:error, :invalid_tag} + def number_from_result({:error, _} = error), do: error +end