diff --git a/apps/indexer/lib/indexer/block/fetcher.ex b/apps/indexer/lib/indexer/block/fetcher.ex index 936443374c..79f9e6fca2 100644 --- a/apps/indexer/lib/indexer/block/fetcher.ex +++ b/apps/indexer/lib/indexer/block/fetcher.ex @@ -5,7 +5,7 @@ defmodule Indexer.Block.Fetcher do require Logger - alias Explorer.Chain.{Block, Import} + alias Explorer.Chain.{Address, Block, Import} alias Indexer.{CoinBalance, AddressExtraction, Token, TokenTransfers} alias Indexer.Address.{CoinBalances, TokenBalances} alias Indexer.Block.Fetcher.Receipts @@ -164,7 +164,7 @@ defmodule Indexer.Block.Fetcher do address_hash_to_fetched_balance_block_number: address_hash_to_block_number }) do addresses - |> Enum.map(fn address_hash -> + |> Enum.map(fn %Address{hash: address_hash} -> block_number = Map.fetch!(address_hash_to_block_number, to_string(address_hash)) %{address_hash: address_hash, block_number: block_number} end) diff --git a/apps/indexer/lib/indexer/coin_balance/fetcher.ex b/apps/indexer/lib/indexer/coin_balance/fetcher.ex index 708e0dde54..28d93a2de6 100644 --- a/apps/indexer/lib/indexer/coin_balance/fetcher.ex +++ b/apps/indexer/lib/indexer/coin_balance/fetcher.ex @@ -29,9 +29,9 @@ defmodule Indexer.CoinBalance.Fetcher do %{required(:address_hash) => Hash.Address.t(), required(:block_number) => Block.block_number()} ]) :: :ok def async_fetch_balances(balance_fields) when is_list(balance_fields) do - params_list = Enum.map(balance_fields, &balance_fields_to_params/1) + entries = Enum.map(balance_fields, &entry/1) - BufferedTask.buffer(__MODULE__, params_list) + BufferedTask.buffer(__MODULE__, entries) end @doc false @@ -57,7 +57,7 @@ defmodule Indexer.CoinBalance.Fetcher do {:ok, final} = Chain.stream_unfetched_balances(initial, fn address_fields, acc -> address_fields - |> balance_fields_to_params() + |> entry() |> reducer.(acc) end) @@ -65,14 +65,17 @@ defmodule Indexer.CoinBalance.Fetcher do end @impl BufferedTask - def run(params_list, _retries, json_rpc_named_arguments) do + def run(entries, _retries, json_rpc_named_arguments) do # the same address may be used more than once in the same block, but we only want one `Balance` for a given # `{address, block}`, so take unique params only - unique_params_list = Enum.uniq(params_list) + unique_entries = Enum.uniq(entries) - Logger.debug(fn -> "fetching #{length(unique_params_list)} balances" end) + Logger.debug(fn -> "fetching #{length(unique_entries)} balances" end) - case EthereumJSONRPC.fetch_balances(unique_params_list, json_rpc_named_arguments) do + unique_entries + |> Enum.map(&entry_to_params/1) + |> EthereumJSONRPC.fetch_balances(json_rpc_named_arguments) + |> case do {:ok, balances_params} -> value_fetched_at = DateTime.utc_now() @@ -89,16 +92,21 @@ defmodule Indexer.CoinBalance.Fetcher do :ok {:error, reason} -> - Logger.debug(fn -> "failed to fetch #{length(unique_params_list)} balances, #{inspect(reason)}" end) - {:retry, unique_params_list} + Logger.debug(fn -> "failed to fetch #{length(unique_entries)} balances, #{inspect(reason)}" end) + {:retry, unique_entries} end end - defp balance_fields_to_params(%{address_hash: address_hash, block_number: block_number}) + defp entry_to_params(%{address_hash_bytes: address_hash_bytes, block_number: block_number}) when is_integer(block_number) do + {:ok, address_hash} = Hash.Address.cast(address_hash_bytes) %{block_quantity: integer_to_quantity(block_number), hash_data: to_string(address_hash)} end + defp entry(%{address_hash: %Hash{bytes: address_hash_bytes}, block_number: block_number}) do + %{address_hash_bytes: address_hash_bytes, block_number: block_number} + end + # We want to record all historical balances for an address, but have the address itself have balance from the # `Balance` with the greatest block_number for that address. def balances_params_to_address_params(balances_params) do diff --git a/apps/indexer/test/indexer/coin_balance/fetcher_test.exs b/apps/indexer/test/indexer/coin_balance/fetcher_test.exs index 63914c6543..d07835f7fd 100644 --- a/apps/indexer/test/indexer/coin_balance/fetcher_test.exs +++ b/apps/indexer/test/indexer/coin_balance/fetcher_test.exs @@ -251,9 +251,10 @@ defmodule Indexer.CoinBalance.FetcherTest do end) end - params_list = Enum.map(block_quantities, &%{block_quantity: &1, hash_data: hash_data}) + {:ok, %Hash{bytes: address_hash_bytes}} = Hash.Address.cast(hash_data) + entries = Enum.map(block_quantities, &%{address_hash_bytes: address_hash_bytes, block_number: quantity_to_integer(&1)}) - case CoinBalance.Fetcher.run(params_list, 0, json_rpc_named_arguments) do + case CoinBalance.Fetcher.run(entries, 0, json_rpc_named_arguments) do :ok -> balances = Repo.all(from(balance in Address.CoinBalance, where: balance.address_hash == ^hash_data)) @@ -283,12 +284,14 @@ defmodule Indexer.CoinBalance.FetcherTest do other -> # not all nodes behind the `https://mainnet.infura.io` pool are fully-synced. Node that aren't fully-synced # won't have historical address balances. - assert {:retry, ^params_list} = other + assert {:retry, ^entries} = other end end - test "duplicate params retry unique params", %{json_rpc_named_arguments: json_rpc_named_arguments} do - hash_data = "0x000000000000000000000000000000000" + test "duplicate entries retry unique entries", %{json_rpc_named_arguments: json_rpc_named_arguments} do + hash_data = "0x0000000000000000000000000000000000000000" + + {:ok, %Hash{bytes: bytes}} = Hash.Address.cast(hash_data) if json_rpc_named_arguments[:transport] == EthereumJSONRPC.Mox do EthereumJSONRPC.Mox @@ -298,15 +301,15 @@ defmodule Indexer.CoinBalance.FetcherTest do end assert CoinBalance.Fetcher.run( - [%{block_quantity: "0x1", hash_data: hash_data}, %{block_quantity: "0x1", hash_data: hash_data}], + [%{address_hash_bytes: bytes, block_number: 1}, %{address_hash_bytes: bytes, block_number: 1}], 0, json_rpc_named_arguments ) == {:retry, [ %{ - block_quantity: "0x1", - hash_data: "0x000000000000000000000000000000000" + address_hash_bytes: <<0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0>>, + block_number: 1 } ]} end