From c3005aa876e1fad707de6d70afea764363b2dc38 Mon Sep 17 00:00:00 2001 From: Qwerty5Uiop <105209995+Qwerty5Uiop@users.noreply.github.com> Date: Fri, 4 Oct 2024 14:57:41 +0400 Subject: [PATCH] feat: Send archive balances requests to trace url (#10820) * feat: Send archive balances requests to trace url * Fix fetch coin balances tests --- apps/ethereum_jsonrpc/lib/ethereum_jsonrpc.ex | 85 ++++++++++++++----- .../lib/ethereum_jsonrpc/fetched_balances.ex | 11 +++ .../ethereum_jsonrpc/utility/common_helper.ex | 13 +++ .../test/ethereum_jsonrpc_test.exs | 27 +++++- .../explorer/lib/explorer/chain/supply/rsk.ex | 2 +- .../indexer/fetcher/coin_balance/helper.ex | 4 +- .../lib/indexer/fetcher/contract_code.ex | 4 +- .../indexer/fetcher/on_demand/coin_balance.ex | 2 +- .../test/indexer/block/fetcher_test.exs | 6 +- .../fetcher/coin_balance/catchup_test.exs | 18 ++++ .../coin_balance_catchup_supervisor_case.ex | 10 ++- 11 files changed, 146 insertions(+), 36 deletions(-) diff --git a/apps/ethereum_jsonrpc/lib/ethereum_jsonrpc.ex b/apps/ethereum_jsonrpc/lib/ethereum_jsonrpc.ex index 8b4dd4f51b..3d5cc09968 100644 --- a/apps/ethereum_jsonrpc/lib/ethereum_jsonrpc.ex +++ b/apps/ethereum_jsonrpc/lib/ethereum_jsonrpc.ex @@ -38,6 +38,7 @@ defmodule EthereumJSONRPC do RequestCoordinator, Subscription, Transport, + Utility.CommonHelper, Utility.EndpointAvailabilityObserver, Utility.RangesHelper, Variant @@ -186,38 +187,53 @@ defmodule EthereumJSONRPC do [%{required(:block_quantity) => quantity, required(:hash_data) => data()}], json_rpc_named_arguments ) :: {:ok, FetchedBalances.t()} | {:error, reason :: term} - def fetch_balances(params_list, json_rpc_named_arguments, chunk_size \\ nil) + def fetch_balances(params_list, json_rpc_named_arguments, latest_block_number \\ 0, chunk_size \\ nil) when is_list(params_list) and is_list(json_rpc_named_arguments) do - filtered_params = - with true <- Application.get_env(:ethereum_jsonrpc, :disable_archive_balances?), - {:ok, max_block_number} <- fetch_block_number_by_tag("latest", json_rpc_named_arguments) do + latest_block_number_params = + case latest_block_number do + 0 -> fetch_block_number_by_tag("latest", json_rpc_named_arguments) + number -> {:ok, number} + end + + params_in_range = + params_list + |> Enum.filter(fn + %{block_quantity: block_quantity} -> + block_quantity |> quantity_to_integer() |> RangesHelper.traceable_block_number?() + end) + + trace_url_used? = !is_nil(json_rpc_named_arguments[:transport_options][:method_to_url][:eth_getBalance]) + archive_disabled? = Application.get_env(:ethereum_jsonrpc, :disable_archive_balances?) + + {latest_balances_params, archive_balance_params} = + with true <- not trace_url_used? or archive_disabled?, + {:ok, max_block_number} <- latest_block_number_params do window = Application.get_env(:ethereum_jsonrpc, :archive_balances_window) - params_list - |> Enum.filter(fn + Enum.split_with(params_in_range, fn %{block_quantity: "latest"} -> true %{block_quantity: block_quantity} -> quantity_to_integer(block_quantity) > max_block_number - window _ -> false end) else - _ -> params_list + _ -> {params_in_range, []} end - filtered_params_in_range = - filtered_params - |> Enum.filter(fn - %{block_quantity: block_quantity} -> - block_quantity |> quantity_to_integer() |> RangesHelper.traceable_block_number?() - end) - - id_to_params = id_to_params(filtered_params_in_range) - - with {:ok, responses} <- - id_to_params - |> FetchedBalances.requests() - |> chunk_requests(chunk_size) - |> json_rpc(json_rpc_named_arguments) do - {:ok, FetchedBalances.from_responses(responses, id_to_params)} + latest_id_to_params = id_to_params(latest_balances_params) + archive_id_to_params = id_to_params(archive_balance_params) + + with {:ok, latest_responses} <- do_balances_request(latest_id_to_params, chunk_size, json_rpc_named_arguments), + {:ok, archive_responses} <- + maybe_request_archive_balances( + archive_id_to_params, + trace_url_used?, + archive_disabled?, + chunk_size, + json_rpc_named_arguments + ) do + latest_fetched_balances = FetchedBalances.from_responses(latest_responses, latest_id_to_params) + archive_fetched_balances = FetchedBalances.from_responses(archive_responses, archive_id_to_params) + {:ok, FetchedBalances.merge(latest_fetched_balances, archive_fetched_balances)} end end @@ -471,6 +487,31 @@ defmodule EthereumJSONRPC do end end + defp do_balances_request(id_to_params, _chunk_size, _args) when id_to_params == %{}, do: {:ok, []} + + defp do_balances_request(id_to_params, chunk_size, json_rpc_named_arguments) do + id_to_params + |> FetchedBalances.requests() + |> chunk_requests(chunk_size) + |> json_rpc(json_rpc_named_arguments) + end + + defp archive_json_rpc_named_arguments(json_rpc_named_arguments) do + CommonHelper.put_in_keyword_nested( + json_rpc_named_arguments, + [:transport_options, :method_to_url, :eth_getBalance], + System.get_env("ETHEREUM_JSONRPC_TRACE_URL") + ) + end + + defp maybe_request_archive_balances(id_to_params, trace_url_used?, disabled?, chunk_size, json_rpc_named_arguments) do + if not trace_url_used? and not disabled? do + do_balances_request(id_to_params, chunk_size, archive_json_rpc_named_arguments(json_rpc_named_arguments)) + else + {:ok, []} + end + end + defp maybe_replace_url(url, _replace_url, EthereumJSONRPC.HTTP), do: url defp maybe_replace_url(url, replace_url, _), do: EndpointAvailabilityObserver.maybe_replace_url(url, replace_url, :ws) diff --git a/apps/ethereum_jsonrpc/lib/ethereum_jsonrpc/fetched_balances.ex b/apps/ethereum_jsonrpc/lib/ethereum_jsonrpc/fetched_balances.ex index da769b5e5a..d87bdfb00e 100644 --- a/apps/ethereum_jsonrpc/lib/ethereum_jsonrpc/fetched_balances.ex +++ b/apps/ethereum_jsonrpc/lib/ethereum_jsonrpc/fetched_balances.ex @@ -33,6 +33,17 @@ defmodule EthereumJSONRPC.FetchedBalances do ) end + @doc """ + Merges two `t/0` to one `t/0`. + """ + @spec merge(t(), t()) :: t() + def merge(%{params_list: params_list_1, errors: errors_1}, %{params_list: params_list_2, errors: errors_2}) do + %__MODULE__{ + params_list: params_list_1 ++ params_list_2, + errors: errors_1 ++ errors_2 + } + end + @doc """ `eth_getBalance` requests for `id_to_params`. """ diff --git a/apps/ethereum_jsonrpc/lib/ethereum_jsonrpc/utility/common_helper.ex b/apps/ethereum_jsonrpc/lib/ethereum_jsonrpc/utility/common_helper.ex index 154edb78bd..a6d0d55f2d 100644 --- a/apps/ethereum_jsonrpc/lib/ethereum_jsonrpc/utility/common_helper.ex +++ b/apps/ethereum_jsonrpc/lib/ethereum_jsonrpc/utility/common_helper.ex @@ -17,6 +17,19 @@ defmodule EthereumJSONRPC.Utility.CommonHelper do end end + @doc """ + Puts value under nested key in keyword. + Similar to `Kernel.put_in/3` but inserts values in the middle if they're missing + """ + @spec put_in_keyword_nested(Keyword.t(), [atom()], any()) :: Keyword.t() + def put_in_keyword_nested(keyword, [last_path], value) do + Keyword.put(keyword || [], last_path, value) + end + + def put_in_keyword_nested(keyword, [nearest_path | rest_path], value) do + Keyword.put(keyword || [], nearest_path, put_in_keyword_nested(keyword[nearest_path], rest_path, value)) + end + defp convert_to_ms(number, "s"), do: :timer.seconds(number) defp convert_to_ms(number, "m"), do: :timer.minutes(number) defp convert_to_ms(number, "h"), do: :timer.hours(number) diff --git a/apps/ethereum_jsonrpc/test/ethereum_jsonrpc_test.exs b/apps/ethereum_jsonrpc/test/ethereum_jsonrpc_test.exs index 52bbf7bd46..ecc2dbd148 100644 --- a/apps/ethereum_jsonrpc/test/ethereum_jsonrpc_test.exs +++ b/apps/ethereum_jsonrpc/test/ethereum_jsonrpc_test.exs @@ -35,7 +35,8 @@ defmodule EthereumJSONRPCTest do [ %{block_quantity: "0x1", hash_data: hash} ], - json_rpc_named_arguments + json_rpc_named_arguments, + 1 ) == {:ok, %FetchedBalances{ @@ -49,6 +50,25 @@ defmodule EthereumJSONRPCTest do }} end + test "fetch latest block number from node if it wasn't provided", %{ + json_rpc_named_arguments: json_rpc_named_arguments + } do + if json_rpc_named_arguments[:transport] == EthereumJSONRPC.Mox do + expect(EthereumJSONRPC.Mox, :json_rpc, 2, fn + [%{id: id, method: "eth_getBlockByNumber"}], _options -> block_response(id, false, "0x1") + _json, _options -> {:ok, [%{id: 0, result: "0x1"}]} + end) + end + + assert {:ok, %FetchedBalances{}} = + EthereumJSONRPC.fetch_balances( + [ + %{block_quantity: "0x1", hash_data: "0x8bf38d4764929064f2d4d3a56520a76ab3df415b"} + ], + json_rpc_named_arguments + ) + end + test "with all invalid hash_data returns errors", %{json_rpc_named_arguments: json_rpc_named_arguments} do variant = Keyword.fetch!(json_rpc_named_arguments, :variant) @@ -90,7 +110,7 @@ defmodule EthereumJSONRPCTest do ], params_list: [] }} = - EthereumJSONRPC.fetch_balances([%{block_quantity: "0x1", hash_data: "0x0"}], json_rpc_named_arguments) + EthereumJSONRPC.fetch_balances([%{block_quantity: "0x1", hash_data: "0x0"}], json_rpc_named_arguments, 1) end test "with a mix of valid and invalid hash_data returns both", %{ @@ -163,7 +183,8 @@ defmodule EthereumJSONRPCTest do hash_data: "0x5" } ], - json_rpc_named_arguments + json_rpc_named_arguments, + 53 ) assert is_list(params_list) diff --git a/apps/explorer/lib/explorer/chain/supply/rsk.ex b/apps/explorer/lib/explorer/chain/supply/rsk.ex index 2eb9caa933..14635a1c7e 100644 --- a/apps/explorer/lib/explorer/chain/supply/rsk.ex +++ b/apps/explorer/lib/explorer/chain/supply/rsk.ex @@ -113,7 +113,7 @@ defmodule Explorer.Chain.Supply.RSK do json_rpc_named_arguments = Application.get_env(:explorer, :json_rpc_named_arguments) - case EthereumJSONRPC.fetch_balances(params, json_rpc_named_arguments) do + case EthereumJSONRPC.fetch_balances(params, json_rpc_named_arguments, max_number) do {:ok, %FetchedBalances{ errors: [], diff --git a/apps/indexer/lib/indexer/fetcher/coin_balance/helper.ex b/apps/indexer/lib/indexer/fetcher/coin_balance/helper.ex index e4da98ed9a..759bd6eea4 100644 --- a/apps/indexer/lib/indexer/fetcher/coin_balance/helper.ex +++ b/apps/indexer/lib/indexer/fetcher/coin_balance/helper.ex @@ -9,7 +9,7 @@ defmodule Indexer.Fetcher.CoinBalance.Helper do alias EthereumJSONRPC.{Blocks, FetchedBalances, Utility.RangesHelper} alias Explorer.Chain - alias Explorer.Chain.Cache.Accounts + alias Explorer.Chain.Cache.{Accounts, BlockNumber} alias Explorer.Chain.Hash alias Indexer.BufferedTask @@ -55,7 +55,7 @@ defmodule Indexer.Fetcher.CoinBalance.Helper do unique_filtered_entries |> Enum.map(&entry_to_params/1) - |> EthereumJSONRPC.fetch_balances(json_rpc_named_arguments) + |> EthereumJSONRPC.fetch_balances(json_rpc_named_arguments, BlockNumber.get_max()) |> case do {:ok, fetched_balances} -> run_fetched_balances(fetched_balances, fetcher_type) diff --git a/apps/indexer/lib/indexer/fetcher/contract_code.ex b/apps/indexer/lib/indexer/fetcher/contract_code.ex index bd2809da52..49778f375e 100644 --- a/apps/indexer/lib/indexer/fetcher/contract_code.ex +++ b/apps/indexer/lib/indexer/fetcher/contract_code.ex @@ -12,7 +12,7 @@ defmodule Indexer.Fetcher.ContractCode do alias Explorer.Chain alias Explorer.Chain.{Block, Hash} - alias Explorer.Chain.Cache.Accounts + alias Explorer.Chain.Cache.{Accounts, BlockNumber} alias Indexer.{BufferedTask, Tracer} alias Indexer.Fetcher.CoinBalance.Helper, as: CoinBalanceHelper alias Indexer.Transform.Addresses @@ -120,7 +120,7 @@ defmodule Indexer.Fetcher.ContractCode do defp import_with_balances(addresses_params, entries, json_rpc_named_arguments) do entries |> coin_balances_request_params() - |> EthereumJSONRPC.fetch_balances(json_rpc_named_arguments) + |> EthereumJSONRPC.fetch_balances(json_rpc_named_arguments, BlockNumber.get_max()) |> case do {:ok, fetched_balances} -> balance_addresses_params = CoinBalanceHelper.balances_params_to_address_params(fetched_balances.params_list) diff --git a/apps/indexer/lib/indexer/fetcher/on_demand/coin_balance.ex b/apps/indexer/lib/indexer/fetcher/on_demand/coin_balance.ex index 626520b2b7..2170ee41ea 100644 --- a/apps/indexer/lib/indexer/fetcher/on_demand/coin_balance.ex +++ b/apps/indexer/lib/indexer/fetcher/on_demand/coin_balance.ex @@ -205,7 +205,7 @@ defmodule Indexer.Fetcher.OnDemand.CoinBalance do defp fetch_balances(block_number, address, json_rpc_named_arguments) do params = %{block_quantity: integer_to_quantity(block_number), hash_data: to_string(address.hash)} - EthereumJSONRPC.fetch_balances([params], json_rpc_named_arguments) + EthereumJSONRPC.fetch_balances([params], json_rpc_named_arguments, latest_block_number()) end defp do_import(%FetchedBalances{} = fetched_balances) do diff --git a/apps/indexer/test/indexer/block/fetcher_test.exs b/apps/indexer/test/indexer/block/fetcher_test.exs index 21416981d0..dc337b5e28 100644 --- a/apps/indexer/test/indexer/block/fetcher_test.exs +++ b/apps/indexer/test/indexer/block/fetcher_test.exs @@ -49,7 +49,11 @@ defmodule Indexer.Block.FetcherTest do describe "import_range/2" do setup %{json_rpc_named_arguments: json_rpc_named_arguments} do - CoinBalanceCatchup.Supervisor.Case.start_supervised!(json_rpc_named_arguments: json_rpc_named_arguments) + CoinBalanceCatchup.Supervisor.Case.start_supervised!( + json_rpc_named_arguments: json_rpc_named_arguments, + poll: false + ) + ContractCode.Supervisor.Case.start_supervised!(json_rpc_named_arguments: json_rpc_named_arguments) InternalTransaction.Supervisor.Case.start_supervised!(json_rpc_named_arguments: json_rpc_named_arguments) Token.Supervisor.Case.start_supervised!(json_rpc_named_arguments: json_rpc_named_arguments) diff --git a/apps/indexer/test/indexer/fetcher/coin_balance/catchup_test.exs b/apps/indexer/test/indexer/fetcher/coin_balance/catchup_test.exs index 358598a17d..b330dccef5 100644 --- a/apps/indexer/test/indexer/fetcher/coin_balance/catchup_test.exs +++ b/apps/indexer/test/indexer/fetcher/coin_balance/catchup_test.exs @@ -8,6 +8,7 @@ defmodule Indexer.Fetcher.CoinBalance.CatchupTest do import Mox alias Explorer.Chain.{Address, Hash, Wei} + alias Explorer.Chain.Cache.BlockNumber alias Indexer.Fetcher.CoinBalance.Catchup, as: CoinBalanceCatchup @moduletag :capture_log @@ -21,6 +22,13 @@ defmodule Indexer.Fetcher.CoinBalance.CatchupTest do setup do start_supervised!({Task.Supervisor, name: Indexer.TaskSupervisor}) + initial_config = Application.get_env(:explorer, Explorer.Chain.Cache.BlockNumber) + Application.put_env(:explorer, Explorer.Chain.Cache.BlockNumber, enabled: true) + + on_exit(fn -> + Application.put_env(:explorer, Explorer.Chain.Cache.BlockNumber, initial_config) + end) + :ok end @@ -225,6 +233,8 @@ defmodule Indexer.Fetcher.CoinBalance.CatchupTest do end) end + BlockNumber.set_max(block_number) + CoinBalanceCatchup.Supervisor.Case.start_supervised!(json_rpc_named_arguments: json_rpc_named_arguments) assert :ok = CoinBalanceCatchup.async_fetch_balances([%{address_hash: hash, block_number: block_number}]) @@ -318,6 +328,8 @@ defmodule Indexer.Fetcher.CoinBalance.CatchupTest do {:ok, [res2]} end) + BlockNumber.set_max(2) + case CoinBalanceCatchup.run(entries, json_rpc_named_arguments) do :ok -> balances = Repo.all(from(balance in Address.CoinBalance, where: balance.address_hash == ^hash_data)) @@ -373,6 +385,8 @@ defmodule Indexer.Fetcher.CoinBalance.CatchupTest do {:ok, [%{id: id, error: %{code: 1, message: "Bad"}}]} end) + BlockNumber.set_max(block_number()) + assert {:retry, ^entries} = CoinBalanceCatchup.run(entries, json_rpc_named_arguments) end @@ -401,6 +415,8 @@ defmodule Indexer.Fetcher.CoinBalance.CatchupTest do {:ok, [res]} end) + BlockNumber.set_max(block_number) + assert :ok = CoinBalanceCatchup.run(entries, json_rpc_named_arguments) end @@ -456,6 +472,8 @@ defmodule Indexer.Fetcher.CoinBalance.CatchupTest do {:ok, [res_good]} end) + BlockNumber.set_max(good_block_number) + assert {:retry, [{^address_hash_bytes, ^bad_block_number}]} = CoinBalanceCatchup.run( [{address_hash_bytes, good_block_number}, {address_hash_bytes, bad_block_number}], diff --git a/apps/indexer/test/support/indexer/fetcher/coin_balance_catchup_supervisor_case.ex b/apps/indexer/test/support/indexer/fetcher/coin_balance_catchup_supervisor_case.ex index 17831099c3..69ec782561 100644 --- a/apps/indexer/test/support/indexer/fetcher/coin_balance_catchup_supervisor_case.ex +++ b/apps/indexer/test/support/indexer/fetcher/coin_balance_catchup_supervisor_case.ex @@ -4,10 +4,12 @@ defmodule Indexer.Fetcher.CoinBalance.Catchup.Supervisor.Case do def start_supervised!(fetcher_arguments \\ []) when is_list(fetcher_arguments) do merged_fetcher_arguments = Keyword.merge( - fetcher_arguments, - flush_interval: 50, - max_batch_size: 1, - max_concurrency: 1 + [ + flush_interval: 50, + max_batch_size: 1, + max_concurrency: 1 + ], + fetcher_arguments ) [merged_fetcher_arguments]