From 3499155cdba64241f6a8c72b16f4cd06d81afe0a Mon Sep 17 00:00:00 2001 From: Luke Imhoff Date: Mon, 26 Nov 2018 11:15:54 -0600 Subject: [PATCH] Partial retries for fetch_blocks_by_range Means that both Block.Fetchers, realtime and catchup, now handle fetch errors. The Block.Catchup.Fetcher will retry those blocks that failed to fetch while importing the rest that did fetch instead of refetching the whole batch. --- apps/ethereum_jsonrpc/lib/ethereum_jsonrpc.ex | 98 +++------------ .../lib/ethereum_jsonrpc/block.ex | 16 ++- .../lib/ethereum_jsonrpc/block/by_number.ex | 11 ++ .../lib/ethereum_jsonrpc/blocks.ex | 8 ++ .../lib/ethereum_jsonrpc/blocks/by_hash.ex | 14 --- .../test/ethereum_jsonrpc_test.exs | 76 ++++++++++-- .../lib/indexer/block/catchup/fetcher.ex | 61 +++++++-- apps/indexer/lib/indexer/block/fetcher.ex | 33 ++--- .../lib/indexer/block/realtime/fetcher.ex | 13 +- .../bound_interval_supervisor_test.exs | 2 + .../test/indexer/block/fetcher_test.exs | 116 +++++++++--------- .../indexer/block/realtime/fetcher_test.exs | 85 ++++++------- 12 files changed, 305 insertions(+), 228 deletions(-) create mode 100644 apps/ethereum_jsonrpc/lib/ethereum_jsonrpc/block/by_number.ex delete mode 100644 apps/ethereum_jsonrpc/lib/ethereum_jsonrpc/blocks/by_hash.ex diff --git a/apps/ethereum_jsonrpc/lib/ethereum_jsonrpc.ex b/apps/ethereum_jsonrpc/lib/ethereum_jsonrpc.ex index b764c4d406..6b7493ec9a 100644 --- a/apps/ethereum_jsonrpc/lib/ethereum_jsonrpc.ex +++ b/apps/ethereum_jsonrpc/lib/ethereum_jsonrpc.ex @@ -26,14 +26,13 @@ defmodule EthereumJSONRPC do """ alias EthereumJSONRPC.{ + Block, Blocks, FetchedBalances, Receipts, RequestCoordinator, Subscription, - Transactions, Transport, - Uncles, Variant } @@ -87,11 +86,6 @@ defmodule EthereumJSONRPC do {:transport, Transport.t()} | {:transport_options, Transport.options()} | {:variant, Variant.t()} ] - @typedoc """ - If there are more blocks. - """ - @type next :: :end_of_chain | :more - @typedoc """ 8 byte [KECCAK-256](https://en.wikipedia.org/wiki/SHA-3) hash of the proof-of-work. """ @@ -214,41 +208,21 @@ defmodule EthereumJSONRPC do Transaction data is included for each block. """ + @spec fetch_blocks_by_hash([hash()], json_rpc_named_arguments) :: {:ok, Blocks.t()} | {:error, reason :: term} def fetch_blocks_by_hash(block_hashes, json_rpc_named_arguments) do - id_to_params = - block_hashes - |> Enum.map(fn block_hash -> %{hash: block_hash} end) - |> id_to_params() - - with {:ok, responses} <- - id_to_params - |> Blocks.ByHash.requests() - |> json_rpc(json_rpc_named_arguments) do - {:ok, Blocks.from_responses(responses, id_to_params)} - end + block_hashes + |> Enum.map(fn block_hash -> %{hash: block_hash} end) + |> fetch_blocks_by_params(&Block.ByHash.request/1, json_rpc_named_arguments) end @doc """ Fetches blocks by block number range. """ - @spec fetch_blocks_by_range(Range.t(), json_rpc_named_arguments) :: - {:ok, next, - %{ - blocks: Blocks.params(), - block_second_degree_relations: Uncles.params(), - transactions: Transactions.params() - }} - | {:error, [reason :: term, ...]} + @spec fetch_blocks_by_range(Range.t(), json_rpc_named_arguments) :: {:ok, Blocks.t()} | {:error, reason :: term} def fetch_blocks_by_range(_first.._last = range, json_rpc_named_arguments) do - id_to_params = - range - |> Enum.map(fn number -> %{number: number} end) - |> id_to_params() - - id_to_params - |> get_block_by_number_requests() - |> json_rpc(json_rpc_named_arguments) - |> handle_get_blocks(id_to_params) + range + |> Enum.map(fn number -> %{number: number} end) + |> fetch_blocks_by_params(&Block.ByNumber.request/1, json_rpc_named_arguments) end @doc """ @@ -409,10 +383,16 @@ defmodule EthereumJSONRPC do |> Timex.from_unix() end - defp get_block_by_number_requests(id_to_params) do - Enum.map(id_to_params, fn {id, %{number: number}} -> - get_block_by_number_request(%{id: id, quantity: number, transactions: :full}) - end) + defp fetch_blocks_by_params(params, request, json_rpc_named_arguments) + when is_list(params) and is_function(request, 1) do + id_to_params = id_to_params(params) + + with {:ok, responses} <- + id_to_params + |> Blocks.requests(request) + |> json_rpc(json_rpc_named_arguments) do + {:ok, Blocks.from_responses(responses, id_to_params)} + end end defp get_block_by_number_request(%{id: id} = options) do @@ -445,46 +425,6 @@ defmodule EthereumJSONRPC do end end - defp handle_get_blocks({:ok, results}, id_to_params) when is_list(results) do - with {:ok, next, blocks} <- reduce_results(results, id_to_params) do - elixir_blocks = Blocks.to_elixir(blocks) - - elixir_uncles = Blocks.elixir_to_uncles(elixir_blocks) - elixir_transactions = Blocks.elixir_to_transactions(elixir_blocks) - - block_second_degree_relations_params = Uncles.elixir_to_params(elixir_uncles) - transactions_params = Transactions.elixir_to_params(elixir_transactions) - blocks_params = Blocks.elixir_to_params(elixir_blocks) - - {:ok, next, - %{ - blocks: blocks_params, - block_second_degree_relations: block_second_degree_relations_params, - transactions: transactions_params - }} - end - end - - defp handle_get_blocks({:error, _} = error, _id_to_params), do: error - - defp reduce_results(results, id_to_params) do - Enum.reduce(results, {:ok, :more, []}, &reduce_result(&1, &2, id_to_params)) - end - - defp reduce_result(%{result: nil}, {:ok, _, blocks}, _id_to_params), do: {:ok, :end_of_chain, blocks} - defp reduce_result(%{result: %{} = block}, {:ok, next, blocks}, _id_to_params), do: {:ok, next, [block | blocks]} - defp reduce_result(%{result: _}, {:error, _} = error, _id_to_params), do: error - - defp reduce_result(%{error: reason, id: id}, acc, id_to_params) do - data = Map.fetch!(id_to_params, id) - annotated_reason = Map.put(reason, :data, data) - - case acc do - {:ok, _, _} -> {:error, [annotated_reason]} - {:error, reasons} -> {:error, [annotated_reason | reasons]} - end - end - defp handle_get_block_by_tag({:ok, %{"number" => nil}}), do: {:error, :not_found} defp handle_get_block_by_tag({:ok, %{"number" => quantity}}) when is_binary(quantity) do diff --git a/apps/ethereum_jsonrpc/lib/ethereum_jsonrpc/block.ex b/apps/ethereum_jsonrpc/lib/ethereum_jsonrpc/block.ex index 58c42b6272..06ebe479df 100644 --- a/apps/ethereum_jsonrpc/lib/ethereum_jsonrpc/block.ex +++ b/apps/ethereum_jsonrpc/lib/ethereum_jsonrpc/block.ex @@ -68,17 +68,21 @@ defmodule EthereumJSONRPC.Block do """ @type t :: %{String.t() => EthereumJSONRPC.data() | EthereumJSONRPC.hash() | EthereumJSONRPC.quantity() | nil} - def from_response(%{id: id, result: %{"hash" => hash} = block}, id_to_params) when is_map(id_to_params) do - # `^` verifies returned hash matches sent hash - %{hash: ^hash} = Map.fetch!(id_to_params, id) + def from_response(%{id: id, result: nil}, id_to_params) when is_map(id_to_params) do + params = Map.fetch!(id_to_params, id) + + {:error, %{code: 404, message: "Not Found", data: params}} + end + + def from_response(%{id: id, result: block}, id_to_params) when is_map(id_to_params) do + true = Map.has_key?(id_to_params, id) {:ok, block} end def from_response(%{id: id, error: error}, id_to_params) when is_map(id_to_params) do - %{hash: hash} = Map.fetch!(id_to_params, id) - - annotated_error = Map.put(error, :data, %{hash: hash}) + params = Map.fetch!(id_to_params, id) + annotated_error = Map.put(error, :data, params) {:error, annotated_error} end diff --git a/apps/ethereum_jsonrpc/lib/ethereum_jsonrpc/block/by_number.ex b/apps/ethereum_jsonrpc/lib/ethereum_jsonrpc/block/by_number.ex new file mode 100644 index 0000000000..80ec9b1cd1 --- /dev/null +++ b/apps/ethereum_jsonrpc/lib/ethereum_jsonrpc/block/by_number.ex @@ -0,0 +1,11 @@ +defmodule EthereumJSONRPC.Block.ByNumber do + @moduledoc """ + Block format as returned by [`eth_getBlockByNumber`](https://github.com/ethereum/wiki/wiki/JSON-RPC#eth_getblockbyhash) + """ + + import EthereumJSONRPC, only: [integer_to_quantity: 1] + + def request(%{id: id, number: number}) do + EthereumJSONRPC.request(%{id: id, method: "eth_getBlockByNumber", params: [integer_to_quantity(number), true]}) + end +end diff --git a/apps/ethereum_jsonrpc/lib/ethereum_jsonrpc/blocks.ex b/apps/ethereum_jsonrpc/lib/ethereum_jsonrpc/blocks.ex index 9c56c4a538..22bb74faf6 100644 --- a/apps/ethereum_jsonrpc/lib/ethereum_jsonrpc/blocks.ex +++ b/apps/ethereum_jsonrpc/lib/ethereum_jsonrpc/blocks.ex @@ -20,6 +20,14 @@ defmodule EthereumJSONRPC.Blocks do transactions_params: [], errors: [] + def requests(id_to_params, request) when is_map(id_to_params) and is_function(request, 1) do + Enum.map(id_to_params, fn {id, params} -> + params + |> Map.put(:id, id) + |> request.() + end) + end + @spec from_responses(list(), map()) :: t() def from_responses(responses, id_to_params) when is_list(responses) and is_map(id_to_params) do %{errors: errors, blocks: blocks} = diff --git a/apps/ethereum_jsonrpc/lib/ethereum_jsonrpc/blocks/by_hash.ex b/apps/ethereum_jsonrpc/lib/ethereum_jsonrpc/blocks/by_hash.ex deleted file mode 100644 index 97b455840e..0000000000 --- a/apps/ethereum_jsonrpc/lib/ethereum_jsonrpc/blocks/by_hash.ex +++ /dev/null @@ -1,14 +0,0 @@ -defmodule EthereumJSONRPC.Blocks.ByHash do - @moduledoc """ - Blocks format as returned by [`eth_getBlockByHash`](https://github.com/ethereum/wiki/wiki/JSON-RPC#eth_getblockbyhash) - from batch requests. - """ - - alias EthereumJSONRPC.Block - - def requests(id_to_params) when is_map(id_to_params) do - Enum.map(id_to_params, fn {id, %{hash: hash}} -> - Block.ByHash.request(%{id: id, hash: hash}) - end) - end -end diff --git a/apps/ethereum_jsonrpc/test/ethereum_jsonrpc_test.exs b/apps/ethereum_jsonrpc/test/ethereum_jsonrpc_test.exs index f5747cf38b..1f99bd8697 100644 --- a/apps/ethereum_jsonrpc/test/ethereum_jsonrpc_test.exs +++ b/apps/ethereum_jsonrpc/test/ethereum_jsonrpc_test.exs @@ -329,15 +329,29 @@ defmodule EthereumJSONRPCTest do end) end - assert {:error, - [%{data: %{number: 1_000_000_000_000_000_000_001}}, %{data: %{number: 1_000_000_000_000_000_000_000}}]} = + assert {:ok, + %EthereumJSONRPC.Blocks{ + block_second_degree_relations_params: [], + blocks_params: [], + errors: [ + %{ + data: %{number: 1_000_000_000_000_000_000_001} + }, + %{ + data: %{number: 1_000_000_000_000_000_000_000} + } + ], + transactions_params: [] + }} = EthereumJSONRPC.fetch_blocks_by_range( 1_000_000_000_000_000_000_000..1_000_000_000_000_000_000_001, json_rpc_named_arguments ) end - test "returns only errors if a mix of results and errors", %{json_rpc_named_arguments: json_rpc_named_arguments} do + test "returns only errors and results if a mix of results and errors", %{ + json_rpc_named_arguments: json_rpc_named_arguments + } do # Can't be faked reliably on real chain moxed_json_rpc_named_arguments = Keyword.put(json_rpc_named_arguments, :transport, EthereumJSONRPC.Mox) @@ -356,30 +370,71 @@ defmodule EthereumJSONRPCTest do id: 1, result: %{ "difficulty" => "0x0", + "extraData" => "0x", "gasLimit" => "0x0", "gasUsed" => "0x0", "hash" => "0x0", + "logsBloom" => "0x", "miner" => "0x0", "number" => "0x0", "parentHash" => "0x0", + "receiptsRoot" => "0x0", + "sha3Uncles" => "0x0", "size" => "0x0", + "stateRoot" => "0x0", "timestamp" => "0x0", "totalDifficulty" => "0x0", - "transactions" => [] + "transactions" => [], + "transactionsRoot" => [], + "uncles" => [] }, jsonrpc: "2.0" } ]} end) - assert {:error, [%{data: %{number: 1_000_000_000_000_000_000_000}}]} = + assert {:ok, + %EthereumJSONRPC.Blocks{ + block_second_degree_relations_params: [], + blocks_params: [ + %{ + difficulty: 0, + extra_data: "0x", + gas_limit: 0, + gas_used: 0, + hash: "0x0", + logs_bloom: "0x", + miner_hash: "0x0", + mix_hash: "0x0", + nonce: 0, + number: 0, + parent_hash: "0x0", + receipts_root: "0x0", + sha3_uncles: "0x0", + size: 0, + state_root: "0x0", + timestamp: _, + total_difficulty: 0, + transactions_root: [], + uncles: [] + } + ], + errors: [ + %{ + code: -32602, + data: %{number: 1_000_000_000_000_000_000_000}, + message: "Invalid params: Invalid block number: number too large to fit in target type." + } + ], + transactions_params: [] + }} = EthereumJSONRPC.fetch_blocks_by_range( 1_000_000_000_000_000_000_000..1_000_000_000_000_000_000_001, moxed_json_rpc_named_arguments ) end - test "nil result indicated end-of-chain", %{json_rpc_named_arguments: json_rpc_named_arguments} do + test "nil result indicated error code 404", %{json_rpc_named_arguments: json_rpc_named_arguments} do # Can't be faked reliably on real chain moxed_json_rpc_named_arguments = Keyword.put(json_rpc_named_arguments, :transport, EthereumJSONRPC.Mox) @@ -418,8 +473,13 @@ defmodule EthereumJSONRPCTest do ]} end) - assert {:ok, :end_of_chain, %{blocks: [_], transactions: []}} = - EthereumJSONRPC.fetch_blocks_by_range(0..1, moxed_json_rpc_named_arguments) + assert {:ok, + %EthereumJSONRPC.Blocks{ + block_second_degree_relations_params: [], + blocks_params: [%{}], + errors: [%{code: 404, data: %{number: 1}, message: "Not Found"}], + transactions_params: [] + }} = EthereumJSONRPC.fetch_blocks_by_range(0..1, moxed_json_rpc_named_arguments) end end diff --git a/apps/indexer/lib/indexer/block/catchup/fetcher.ex b/apps/indexer/lib/indexer/block/catchup/fetcher.ex index c35d610ef8..79611270aa 100644 --- a/apps/indexer/lib/indexer/block/catchup/fetcher.ex +++ b/apps/indexer/lib/indexer/block/catchup/fetcher.ex @@ -166,9 +166,11 @@ defmodule Indexer.Block.Catchup.Fetcher do sequence ) do case fetch_and_import_range(block_fetcher, range) do - {:ok, {inserted, next}} -> - cap_seq(sequence, next, range) - {:ok, inserted} + {:ok, %{inserted: inserted, errors: errors}} -> + errors = cap_seq(sequence, errors, range) + retry(sequence, errors) + + {:ok, inserted: inserted} {:error, {step, reason}} = error -> Logger.error(fn -> @@ -200,19 +202,27 @@ defmodule Indexer.Block.Catchup.Fetcher do end end - defp cap_seq(seq, next, range) do - case next do - :more -> + defp cap_seq(seq, errors, range) do + {not_founds, other_errors} = + Enum.split_with(errors, fn + %{code: 404, data: %{number: _}} -> true + _ -> false + end) + + case not_founds do + [] -> Logger.debug(fn -> first_block_number..last_block_number = range "got blocks #{first_block_number} - #{last_block_number}" end) - :end_of_chain -> + other_errors + + _ -> Sequence.cap(seq) end - :ok + other_errors end defp push_back(sequence, range) do @@ -222,6 +232,41 @@ defmodule Indexer.Block.Catchup.Fetcher do end end + defp retry(sequence, errors) when is_list(errors) do + errors + |> errors_to_ranges() + |> Enum.map(&push_back(sequence, &1)) + end + + defp errors_to_ranges(errors) when is_list(errors) do + errors + |> Enum.flat_map(&error_to_numbers/1) + |> numbers_to_ranges() + end + + defp error_to_numbers(%{data: %{number: number}}) when is_integer(number), do: [number] + + defp numbers_to_ranges([]), do: [] + + defp numbers_to_ranges(numbers) when is_list(numbers) do + numbers + |> Enum.sort() + |> Enum.chunk_while( + nil, + fn + number, nil -> + {:cont, number..number} + + number, first..last when number == last + 1 -> + {:cont, first..number} + + number, range -> + {:cont, range, number..number} + end, + fn range -> {:cont, range} end + ) + end + defp put_memory_monitor(sequence_options, %__MODULE__{memory_monitor: nil}) when is_list(sequence_options), do: sequence_options diff --git a/apps/indexer/lib/indexer/block/fetcher.ex b/apps/indexer/lib/indexer/block/fetcher.ex index 8450d809b6..5e7585b59c 100644 --- a/apps/indexer/lib/indexer/block/fetcher.ex +++ b/apps/indexer/lib/indexer/block/fetcher.ex @@ -5,7 +5,7 @@ defmodule Indexer.Block.Fetcher do require Logger - alias EthereumJSONRPC.FetchedBeneficiaries + alias EthereumJSONRPC.{Blocks, FetchedBeneficiaries} alias Explorer.Chain.{Address, Block, Import} alias Indexer.{AddressExtraction, CoinBalance, MintTransfer, Token, TokenTransfers} alias Indexer.Address.{CoinBalances, TokenBalances} @@ -77,7 +77,7 @@ defmodule Indexer.Block.Fetcher do end @spec fetch_and_import_range(t, Range.t()) :: - {:ok, {inserted :: %{}, next :: :more | :end_of_chain}} + {:ok, %{inserted: %{}, errors: [EthereumJSONRPC.Transport.error()]}} | {:error, {step :: atom(), reason :: term()} | [%Ecto.Changeset{}] @@ -91,20 +91,22 @@ defmodule Indexer.Block.Fetcher do _.._ = range ) when callback_module != nil do - with {:blocks, {:ok, next, result}} <- - {:blocks, EthereumJSONRPC.fetch_blocks_by_range(range, json_rpc_named_arguments)}, - %{ - blocks: blocks, - transactions: transactions_without_receipts, - block_second_degree_relations: block_second_degree_relations - } = result, - blocks = Transform.transform_blocks(blocks), - {:receipts, {:ok, receipt_params}} <- {:receipts, Receipts.fetch(state, transactions_without_receipts)}, + with {:blocks, + {:ok, + %Blocks{ + blocks_params: blocks_params, + transactions_params: transactions_params_without_receipts, + block_second_degree_relations_params: block_second_degree_relations_params, + errors: blocks_errors + }}} <- {:blocks, EthereumJSONRPC.fetch_blocks_by_range(range, json_rpc_named_arguments)}, + blocks = Transform.transform_blocks(blocks_params), + {:receipts, {:ok, receipt_params}} <- {:receipts, Receipts.fetch(state, transactions_params_without_receipts)}, %{logs: logs, receipts: receipts} = receipt_params, - transactions_with_receipts = Receipts.put(transactions_without_receipts, receipts), + transactions_with_receipts = Receipts.put(transactions_params_without_receipts, receipts), %{token_transfers: token_transfers, tokens: tokens} = TokenTransfers.parse(logs), %{mint_transfers: mint_transfers} = MintTransfer.parse(logs), - {:beneficiaries, {:ok, %FetchedBeneficiaries{params_set: beneficiary_params_set}}} <- + {:beneficiaries, + {:ok, %FetchedBeneficiaries{params_set: beneficiary_params_set, errors: beneficiaries_errors}}} <- fetch_beneficiaries(range, json_rpc_named_arguments), addresses = AddressExtraction.extract_addresses(%{ @@ -132,16 +134,15 @@ defmodule Indexer.Block.Fetcher do address_coin_balances: %{params: coin_balances_params_set}, address_token_balances: %{params: address_token_balances}, blocks: %{params: blocks}, - block_second_degree_relations: %{params: block_second_degree_relations}, + block_second_degree_relations: %{params: block_second_degree_relations_params}, logs: %{params: logs}, token_transfers: %{params: token_transfers}, tokens: %{on_conflict: :nothing, params: tokens}, transactions: %{params: transactions_with_receipts} } ) do - {:ok, {inserted, next}} + {:ok, %{inserted: inserted, errors: blocks_errors ++ beneficiaries_errors}} else - {:beneficiaries = step, {:ok, %FetchedBeneficiaries{errors: [_ | _] = errors}}} -> {:error, {step, errors}} {step, {:error, reason}} -> {:error, {step, reason}} {:error, :timeout} = error -> error {:error, changesets} = error when is_list(changesets) -> error diff --git a/apps/indexer/lib/indexer/block/realtime/fetcher.ex b/apps/indexer/lib/indexer/block/realtime/fetcher.ex index 1b3151006b..e1667679af 100644 --- a/apps/indexer/lib/indexer/block/realtime/fetcher.ex +++ b/apps/indexer/lib/indexer/block/realtime/fetcher.ex @@ -127,11 +127,22 @@ defmodule Indexer.Block.Realtime.Fetcher do def fetch_and_import_block(block_number_to_fetch, block_fetcher, retry \\ 3) do case fetch_and_import_range(block_fetcher, block_number_to_fetch..block_number_to_fetch) do - {:ok, {_inserted, _next}} -> + {:ok, %{inserted: _, errors: []}} -> Logger.debug(fn -> ["realtime indexer fetched and imported block ", to_string(block_number_to_fetch)] end) + {:ok, %{inserted: _, errors: [_ | _] = errors}} -> + Logger.error(fn -> + [ + "realtime indexer failed to fetch block", + to_string(block_number_to_fetch), + ": ", + inspect(errors), + ". Block will be retried by catchup indexer." + ] + end) + {:error, {step, reason}} -> Logger.error(fn -> [ diff --git a/apps/indexer/test/indexer/block/catchup/bound_interval_supervisor_test.exs b/apps/indexer/test/indexer/block/catchup/bound_interval_supervisor_test.exs index b94b5f1c7d..631f74c87d 100644 --- a/apps/indexer/test/indexer/block/catchup/bound_interval_supervisor_test.exs +++ b/apps/indexer/test/indexer/block/catchup/bound_interval_supervisor_test.exs @@ -22,6 +22,8 @@ defmodule Indexer.Block.Catchup.BoundIntervalSupervisorTest do # See https://github.com/poanetwork/blockscout/issues/597 @tag :no_geth test "starts fetching blocks from latest and goes down", %{json_rpc_named_arguments: json_rpc_named_arguments} do + Logger.configure(truncate: :infinity) + if json_rpc_named_arguments[:transport] == EthereumJSONRPC.Mox do case Keyword.fetch!(json_rpc_named_arguments, :variant) do EthereumJSONRPC.Parity -> diff --git a/apps/indexer/test/indexer/block/fetcher_test.exs b/apps/indexer/test/indexer/block/fetcher_test.exs index 8c8a959b35..0328abf028 100644 --- a/apps/indexer/test/indexer/block/fetcher_test.exs +++ b/apps/indexer/test/indexer/block/fetcher_test.exs @@ -214,10 +214,13 @@ defmodule Indexer.Block.FetcherTest do fn -> Fetcher.fetch_and_import_range(block_fetcher, block_number..block_number) end, fn result -> assert {:ok, - {%{ - addresses: [%Address{hash: ^address_hash}], - blocks: [%Chain.Block{hash: ^block_hash}] - }, :more}} = result + %{ + inserted: %{ + addresses: [%Address{hash: ^address_hash}], + blocks: [%Chain.Block{hash: ^block_hash}] + }, + errors: [] + }} = result wait_for_tasks(InternalTransaction.Fetcher) wait_for_tasks(CoinBalance.Fetcher) @@ -506,57 +509,60 @@ defmodule Indexer.Block.FetcherTest do EthereumJSONRPC.Parity -> assert {:ok, - {%{ - addresses: [ - %Address{ - hash: - %Explorer.Chain.Hash{ - byte_count: 20, - bytes: - <<139, 243, 141, 71, 100, 146, 144, 100, 242, 212, 211, 165, 101, 32, 167, 106, 179, 223, - 65, 91>> - } = first_address_hash - }, - %Address{ - hash: - %Explorer.Chain.Hash{ - byte_count: 20, - bytes: - <<232, 221, 197, 199, 162, 210, 240, 215, 169, 121, 132, 89, 192, 16, 79, 223, 94, 152, - 122, 202>> - } = second_address_hash - } - ], - blocks: [ - %Chain.Block{ - hash: %Explorer.Chain.Hash{ - byte_count: 32, - bytes: - <<246, 180, 184, 200, 141, 243, 235, 210, 82, 236, 71, 99, 40, 51, 77, 192, 38, 207, 102, - 96, 106, 132, 251, 118, 155, 61, 60, 188, 204, 132, 113, 189>> - } - } - ], - logs: [ - %Log{ - index: 0, - transaction_hash: %Explorer.Chain.Hash{ - byte_count: 32, - bytes: - <<83, 189, 136, 72, 114, 222, 62, 72, 134, 146, 136, 27, 174, 236, 38, 46, 123, 149, 35, - 77, 57, 101, 36, 140, 57, 254, 153, 47, 255, 212, 51, 229>> - } - } - ], - transactions: [ - %Explorer.Chain.Hash{ - byte_count: 32, - bytes: - <<83, 189, 136, 72, 114, 222, 62, 72, 134, 146, 136, 27, 174, 236, 38, 46, 123, 149, 35, 77, - 57, 101, 36, 140, 57, 254, 153, 47, 255, 212, 51, 229>> - } - ] - }, :more}} = Fetcher.fetch_and_import_range(block_fetcher, block_number..block_number) + %{ + inserted: %{ + addresses: [ + %Address{ + hash: + %Explorer.Chain.Hash{ + byte_count: 20, + bytes: + <<139, 243, 141, 71, 100, 146, 144, 100, 242, 212, 211, 165, 101, 32, 167, 106, 179, + 223, 65, 91>> + } = first_address_hash + }, + %Address{ + hash: + %Explorer.Chain.Hash{ + byte_count: 20, + bytes: + <<232, 221, 197, 199, 162, 210, 240, 215, 169, 121, 132, 89, 192, 16, 79, 223, 94, 152, + 122, 202>> + } = second_address_hash + } + ], + blocks: [ + %Chain.Block{ + hash: %Explorer.Chain.Hash{ + byte_count: 32, + bytes: + <<246, 180, 184, 200, 141, 243, 235, 210, 82, 236, 71, 99, 40, 51, 77, 192, 38, 207, 102, + 96, 106, 132, 251, 118, 155, 61, 60, 188, 204, 132, 113, 189>> + } + } + ], + logs: [ + %Log{ + index: 0, + transaction_hash: %Explorer.Chain.Hash{ + byte_count: 32, + bytes: + <<83, 189, 136, 72, 114, 222, 62, 72, 134, 146, 136, 27, 174, 236, 38, 46, 123, 149, 35, + 77, 57, 101, 36, 140, 57, 254, 153, 47, 255, 212, 51, 229>> + } + } + ], + transactions: [ + %Explorer.Chain.Hash{ + byte_count: 32, + bytes: + <<83, 189, 136, 72, 114, 222, 62, 72, 134, 146, 136, 27, 174, 236, 38, 46, 123, 149, 35, 77, + 57, 101, 36, 140, 57, 254, 153, 47, 255, 212, 51, 229>> + } + ] + }, + errors: [] + }} = Fetcher.fetch_and_import_range(block_fetcher, block_number..block_number) wait_for_tasks(InternalTransaction.Fetcher) wait_for_tasks(CoinBalance.Fetcher) diff --git a/apps/indexer/test/indexer/block/realtime/fetcher_test.exs b/apps/indexer/test/indexer/block/realtime/fetcher_test.exs index 4e2a1ed692..f16a1d9124 100644 --- a/apps/indexer/test/indexer/block/realtime/fetcher_test.exs +++ b/apps/indexer/test/indexer/block/realtime/fetcher_test.exs @@ -370,47 +370,50 @@ defmodule Indexer.Block.Realtime.FetcherTest do end assert {:ok, - {%{ - addresses: [ - %Address{hash: first_address_hash, fetched_coin_balance_block_number: 3_946_079}, - %Address{hash: second_address_hash, fetched_coin_balance_block_number: 3_946_079}, - %Address{hash: third_address_hash, fetched_coin_balance_block_number: 3_946_079}, - %Address{hash: fourth_address_hash, fetched_coin_balance_block_number: 3_946_080}, - %Address{hash: fifth_address_hash, fetched_coin_balance_block_number: 3_946_079} - ], - address_coin_balances: [ - %{ - address_hash: first_address_hash, - block_number: 3_946_079 - }, - %{ - address_hash: second_address_hash, - block_number: 3_946_079 - }, - %{ - address_hash: third_address_hash, - block_number: 3_946_079 - }, - %{ - address_hash: fourth_address_hash, - block_number: 3_946_080 - }, - %{ - address_hash: fifth_address_hash, - block_number: 3_946_079 - } - ], - blocks: [%Chain.Block{number: 3_946_079}, %Chain.Block{number: 3_946_080}], - internal_transactions: [ - %{index: 0, transaction_hash: transaction_hash}, - %{index: 1, transaction_hash: transaction_hash}, - %{index: 2, transaction_hash: transaction_hash}, - %{index: 3, transaction_hash: transaction_hash}, - %{index: 4, transaction_hash: transaction_hash}, - %{index: 5, transaction_hash: transaction_hash} - ], - transactions: [transaction_hash] - }, :more}} = Indexer.Block.Fetcher.fetch_and_import_range(block_fetcher, 3_946_079..3_946_080) + %{ + inserted: %{ + addresses: [ + %Address{hash: first_address_hash, fetched_coin_balance_block_number: 3_946_079}, + %Address{hash: second_address_hash, fetched_coin_balance_block_number: 3_946_079}, + %Address{hash: third_address_hash, fetched_coin_balance_block_number: 3_946_079}, + %Address{hash: fourth_address_hash, fetched_coin_balance_block_number: 3_946_080}, + %Address{hash: fifth_address_hash, fetched_coin_balance_block_number: 3_946_079} + ], + address_coin_balances: [ + %{ + address_hash: first_address_hash, + block_number: 3_946_079 + }, + %{ + address_hash: second_address_hash, + block_number: 3_946_079 + }, + %{ + address_hash: third_address_hash, + block_number: 3_946_079 + }, + %{ + address_hash: fourth_address_hash, + block_number: 3_946_080 + }, + %{ + address_hash: fifth_address_hash, + block_number: 3_946_079 + } + ], + blocks: [%Chain.Block{number: 3_946_079}, %Chain.Block{number: 3_946_080}], + internal_transactions: [ + %{index: 0, transaction_hash: transaction_hash}, + %{index: 1, transaction_hash: transaction_hash}, + %{index: 2, transaction_hash: transaction_hash}, + %{index: 3, transaction_hash: transaction_hash}, + %{index: 4, transaction_hash: transaction_hash}, + %{index: 5, transaction_hash: transaction_hash} + ], + transactions: [transaction_hash] + }, + errors: [] + }} = Indexer.Block.Fetcher.fetch_and_import_range(block_fetcher, 3_946_079..3_946_080) end end end