feat: Send archive balances requests to trace url (#10820)

* feat: Send archive balances requests to trace url

* Fix fetch coin balances tests
pull/10885/head
Qwerty5Uiop 2 months ago committed by GitHub
parent bbd20a85c9
commit c3005aa876
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
  1. 81
      apps/ethereum_jsonrpc/lib/ethereum_jsonrpc.ex
  2. 11
      apps/ethereum_jsonrpc/lib/ethereum_jsonrpc/fetched_balances.ex
  3. 13
      apps/ethereum_jsonrpc/lib/ethereum_jsonrpc/utility/common_helper.ex
  4. 27
      apps/ethereum_jsonrpc/test/ethereum_jsonrpc_test.exs
  5. 2
      apps/explorer/lib/explorer/chain/supply/rsk.ex
  6. 4
      apps/indexer/lib/indexer/fetcher/coin_balance/helper.ex
  7. 4
      apps/indexer/lib/indexer/fetcher/contract_code.ex
  8. 2
      apps/indexer/lib/indexer/fetcher/on_demand/coin_balance.ex
  9. 6
      apps/indexer/test/indexer/block/fetcher_test.exs
  10. 18
      apps/indexer/test/indexer/fetcher/coin_balance/catchup_test.exs
  11. 4
      apps/indexer/test/support/indexer/fetcher/coin_balance_catchup_supervisor_case.ex

@ -38,6 +38,7 @@ defmodule EthereumJSONRPC do
RequestCoordinator,
Subscription,
Transport,
Utility.CommonHelper,
Utility.EndpointAvailabilityObserver,
Utility.RangesHelper,
Variant
@ -186,38 +187,53 @@ defmodule EthereumJSONRPC do
[%{required(:block_quantity) => quantity, required(:hash_data) => data()}],
json_rpc_named_arguments
) :: {:ok, FetchedBalances.t()} | {:error, reason :: term}
def fetch_balances(params_list, json_rpc_named_arguments, chunk_size \\ nil)
def fetch_balances(params_list, json_rpc_named_arguments, latest_block_number \\ 0, chunk_size \\ nil)
when is_list(params_list) and is_list(json_rpc_named_arguments) do
filtered_params =
with true <- Application.get_env(:ethereum_jsonrpc, :disable_archive_balances?),
{:ok, max_block_number} <- fetch_block_number_by_tag("latest", json_rpc_named_arguments) do
window = Application.get_env(:ethereum_jsonrpc, :archive_balances_window)
latest_block_number_params =
case latest_block_number do
0 -> fetch_block_number_by_tag("latest", json_rpc_named_arguments)
number -> {:ok, number}
end
params_in_range =
params_list
|> Enum.filter(fn
%{block_quantity: block_quantity} ->
block_quantity |> quantity_to_integer() |> RangesHelper.traceable_block_number?()
end)
trace_url_used? = !is_nil(json_rpc_named_arguments[:transport_options][:method_to_url][:eth_getBalance])
archive_disabled? = Application.get_env(:ethereum_jsonrpc, :disable_archive_balances?)
{latest_balances_params, archive_balance_params} =
with true <- not trace_url_used? or archive_disabled?,
{:ok, max_block_number} <- latest_block_number_params do
window = Application.get_env(:ethereum_jsonrpc, :archive_balances_window)
Enum.split_with(params_in_range, fn
%{block_quantity: "latest"} -> true
%{block_quantity: block_quantity} -> quantity_to_integer(block_quantity) > max_block_number - window
_ -> false
end)
else
_ -> params_list
_ -> {params_in_range, []}
end
filtered_params_in_range =
filtered_params
|> Enum.filter(fn
%{block_quantity: block_quantity} ->
block_quantity |> quantity_to_integer() |> RangesHelper.traceable_block_number?()
end)
latest_id_to_params = id_to_params(latest_balances_params)
archive_id_to_params = id_to_params(archive_balance_params)
id_to_params = id_to_params(filtered_params_in_range)
with {:ok, responses} <-
id_to_params
|> FetchedBalances.requests()
|> chunk_requests(chunk_size)
|> json_rpc(json_rpc_named_arguments) do
{:ok, FetchedBalances.from_responses(responses, id_to_params)}
with {:ok, latest_responses} <- do_balances_request(latest_id_to_params, chunk_size, json_rpc_named_arguments),
{:ok, archive_responses} <-
maybe_request_archive_balances(
archive_id_to_params,
trace_url_used?,
archive_disabled?,
chunk_size,
json_rpc_named_arguments
) do
latest_fetched_balances = FetchedBalances.from_responses(latest_responses, latest_id_to_params)
archive_fetched_balances = FetchedBalances.from_responses(archive_responses, archive_id_to_params)
{:ok, FetchedBalances.merge(latest_fetched_balances, archive_fetched_balances)}
end
end
@ -471,6 +487,31 @@ defmodule EthereumJSONRPC do
end
end
defp do_balances_request(id_to_params, _chunk_size, _args) when id_to_params == %{}, do: {:ok, []}
defp do_balances_request(id_to_params, chunk_size, json_rpc_named_arguments) do
id_to_params
|> FetchedBalances.requests()
|> chunk_requests(chunk_size)
|> json_rpc(json_rpc_named_arguments)
end
defp archive_json_rpc_named_arguments(json_rpc_named_arguments) do
CommonHelper.put_in_keyword_nested(
json_rpc_named_arguments,
[:transport_options, :method_to_url, :eth_getBalance],
System.get_env("ETHEREUM_JSONRPC_TRACE_URL")
)
end
defp maybe_request_archive_balances(id_to_params, trace_url_used?, disabled?, chunk_size, json_rpc_named_arguments) do
if not trace_url_used? and not disabled? do
do_balances_request(id_to_params, chunk_size, archive_json_rpc_named_arguments(json_rpc_named_arguments))
else
{:ok, []}
end
end
defp maybe_replace_url(url, _replace_url, EthereumJSONRPC.HTTP), do: url
defp maybe_replace_url(url, replace_url, _), do: EndpointAvailabilityObserver.maybe_replace_url(url, replace_url, :ws)

@ -33,6 +33,17 @@ defmodule EthereumJSONRPC.FetchedBalances do
)
end
@doc """
Merges two `t/0` to one `t/0`.
"""
@spec merge(t(), t()) :: t()
def merge(%{params_list: params_list_1, errors: errors_1}, %{params_list: params_list_2, errors: errors_2}) do
%__MODULE__{
params_list: params_list_1 ++ params_list_2,
errors: errors_1 ++ errors_2
}
end
@doc """
`eth_getBalance` requests for `id_to_params`.
"""

@ -17,6 +17,19 @@ defmodule EthereumJSONRPC.Utility.CommonHelper do
end
end
@doc """
Puts value under nested key in keyword.
Similar to `Kernel.put_in/3` but inserts values in the middle if they're missing
"""
@spec put_in_keyword_nested(Keyword.t(), [atom()], any()) :: Keyword.t()
def put_in_keyword_nested(keyword, [last_path], value) do
Keyword.put(keyword || [], last_path, value)
end
def put_in_keyword_nested(keyword, [nearest_path | rest_path], value) do
Keyword.put(keyword || [], nearest_path, put_in_keyword_nested(keyword[nearest_path], rest_path, value))
end
defp convert_to_ms(number, "s"), do: :timer.seconds(number)
defp convert_to_ms(number, "m"), do: :timer.minutes(number)
defp convert_to_ms(number, "h"), do: :timer.hours(number)

@ -35,7 +35,8 @@ defmodule EthereumJSONRPCTest do
[
%{block_quantity: "0x1", hash_data: hash}
],
json_rpc_named_arguments
json_rpc_named_arguments,
1
) ==
{:ok,
%FetchedBalances{
@ -49,6 +50,25 @@ defmodule EthereumJSONRPCTest do
}}
end
test "fetch latest block number from node if it wasn't provided", %{
json_rpc_named_arguments: json_rpc_named_arguments
} do
if json_rpc_named_arguments[:transport] == EthereumJSONRPC.Mox do
expect(EthereumJSONRPC.Mox, :json_rpc, 2, fn
[%{id: id, method: "eth_getBlockByNumber"}], _options -> block_response(id, false, "0x1")
_json, _options -> {:ok, [%{id: 0, result: "0x1"}]}
end)
end
assert {:ok, %FetchedBalances{}} =
EthereumJSONRPC.fetch_balances(
[
%{block_quantity: "0x1", hash_data: "0x8bf38d4764929064f2d4d3a56520a76ab3df415b"}
],
json_rpc_named_arguments
)
end
test "with all invalid hash_data returns errors", %{json_rpc_named_arguments: json_rpc_named_arguments} do
variant = Keyword.fetch!(json_rpc_named_arguments, :variant)
@ -90,7 +110,7 @@ defmodule EthereumJSONRPCTest do
],
params_list: []
}} =
EthereumJSONRPC.fetch_balances([%{block_quantity: "0x1", hash_data: "0x0"}], json_rpc_named_arguments)
EthereumJSONRPC.fetch_balances([%{block_quantity: "0x1", hash_data: "0x0"}], json_rpc_named_arguments, 1)
end
test "with a mix of valid and invalid hash_data returns both", %{
@ -163,7 +183,8 @@ defmodule EthereumJSONRPCTest do
hash_data: "0x5"
}
],
json_rpc_named_arguments
json_rpc_named_arguments,
53
)
assert is_list(params_list)

@ -113,7 +113,7 @@ defmodule Explorer.Chain.Supply.RSK do
json_rpc_named_arguments = Application.get_env(:explorer, :json_rpc_named_arguments)
case EthereumJSONRPC.fetch_balances(params, json_rpc_named_arguments) do
case EthereumJSONRPC.fetch_balances(params, json_rpc_named_arguments, max_number) do
{:ok,
%FetchedBalances{
errors: [],

@ -9,7 +9,7 @@ defmodule Indexer.Fetcher.CoinBalance.Helper do
alias EthereumJSONRPC.{Blocks, FetchedBalances, Utility.RangesHelper}
alias Explorer.Chain
alias Explorer.Chain.Cache.Accounts
alias Explorer.Chain.Cache.{Accounts, BlockNumber}
alias Explorer.Chain.Hash
alias Indexer.BufferedTask
@ -55,7 +55,7 @@ defmodule Indexer.Fetcher.CoinBalance.Helper do
unique_filtered_entries
|> Enum.map(&entry_to_params/1)
|> EthereumJSONRPC.fetch_balances(json_rpc_named_arguments)
|> EthereumJSONRPC.fetch_balances(json_rpc_named_arguments, BlockNumber.get_max())
|> case do
{:ok, fetched_balances} ->
run_fetched_balances(fetched_balances, fetcher_type)

@ -12,7 +12,7 @@ defmodule Indexer.Fetcher.ContractCode do
alias Explorer.Chain
alias Explorer.Chain.{Block, Hash}
alias Explorer.Chain.Cache.Accounts
alias Explorer.Chain.Cache.{Accounts, BlockNumber}
alias Indexer.{BufferedTask, Tracer}
alias Indexer.Fetcher.CoinBalance.Helper, as: CoinBalanceHelper
alias Indexer.Transform.Addresses
@ -120,7 +120,7 @@ defmodule Indexer.Fetcher.ContractCode do
defp import_with_balances(addresses_params, entries, json_rpc_named_arguments) do
entries
|> coin_balances_request_params()
|> EthereumJSONRPC.fetch_balances(json_rpc_named_arguments)
|> EthereumJSONRPC.fetch_balances(json_rpc_named_arguments, BlockNumber.get_max())
|> case do
{:ok, fetched_balances} ->
balance_addresses_params = CoinBalanceHelper.balances_params_to_address_params(fetched_balances.params_list)

@ -205,7 +205,7 @@ defmodule Indexer.Fetcher.OnDemand.CoinBalance do
defp fetch_balances(block_number, address, json_rpc_named_arguments) do
params = %{block_quantity: integer_to_quantity(block_number), hash_data: to_string(address.hash)}
EthereumJSONRPC.fetch_balances([params], json_rpc_named_arguments)
EthereumJSONRPC.fetch_balances([params], json_rpc_named_arguments, latest_block_number())
end
defp do_import(%FetchedBalances{} = fetched_balances) do

@ -49,7 +49,11 @@ defmodule Indexer.Block.FetcherTest do
describe "import_range/2" do
setup %{json_rpc_named_arguments: json_rpc_named_arguments} do
CoinBalanceCatchup.Supervisor.Case.start_supervised!(json_rpc_named_arguments: json_rpc_named_arguments)
CoinBalanceCatchup.Supervisor.Case.start_supervised!(
json_rpc_named_arguments: json_rpc_named_arguments,
poll: false
)
ContractCode.Supervisor.Case.start_supervised!(json_rpc_named_arguments: json_rpc_named_arguments)
InternalTransaction.Supervisor.Case.start_supervised!(json_rpc_named_arguments: json_rpc_named_arguments)
Token.Supervisor.Case.start_supervised!(json_rpc_named_arguments: json_rpc_named_arguments)

@ -8,6 +8,7 @@ defmodule Indexer.Fetcher.CoinBalance.CatchupTest do
import Mox
alias Explorer.Chain.{Address, Hash, Wei}
alias Explorer.Chain.Cache.BlockNumber
alias Indexer.Fetcher.CoinBalance.Catchup, as: CoinBalanceCatchup
@moduletag :capture_log
@ -21,6 +22,13 @@ defmodule Indexer.Fetcher.CoinBalance.CatchupTest do
setup do
start_supervised!({Task.Supervisor, name: Indexer.TaskSupervisor})
initial_config = Application.get_env(:explorer, Explorer.Chain.Cache.BlockNumber)
Application.put_env(:explorer, Explorer.Chain.Cache.BlockNumber, enabled: true)
on_exit(fn ->
Application.put_env(:explorer, Explorer.Chain.Cache.BlockNumber, initial_config)
end)
:ok
end
@ -225,6 +233,8 @@ defmodule Indexer.Fetcher.CoinBalance.CatchupTest do
end)
end
BlockNumber.set_max(block_number)
CoinBalanceCatchup.Supervisor.Case.start_supervised!(json_rpc_named_arguments: json_rpc_named_arguments)
assert :ok = CoinBalanceCatchup.async_fetch_balances([%{address_hash: hash, block_number: block_number}])
@ -318,6 +328,8 @@ defmodule Indexer.Fetcher.CoinBalance.CatchupTest do
{:ok, [res2]}
end)
BlockNumber.set_max(2)
case CoinBalanceCatchup.run(entries, json_rpc_named_arguments) do
:ok ->
balances = Repo.all(from(balance in Address.CoinBalance, where: balance.address_hash == ^hash_data))
@ -373,6 +385,8 @@ defmodule Indexer.Fetcher.CoinBalance.CatchupTest do
{:ok, [%{id: id, error: %{code: 1, message: "Bad"}}]}
end)
BlockNumber.set_max(block_number())
assert {:retry, ^entries} = CoinBalanceCatchup.run(entries, json_rpc_named_arguments)
end
@ -401,6 +415,8 @@ defmodule Indexer.Fetcher.CoinBalance.CatchupTest do
{:ok, [res]}
end)
BlockNumber.set_max(block_number)
assert :ok = CoinBalanceCatchup.run(entries, json_rpc_named_arguments)
end
@ -456,6 +472,8 @@ defmodule Indexer.Fetcher.CoinBalance.CatchupTest do
{:ok, [res_good]}
end)
BlockNumber.set_max(good_block_number)
assert {:retry, [{^address_hash_bytes, ^bad_block_number}]} =
CoinBalanceCatchup.run(
[{address_hash_bytes, good_block_number}, {address_hash_bytes, bad_block_number}],

@ -4,10 +4,12 @@ defmodule Indexer.Fetcher.CoinBalance.Catchup.Supervisor.Case do
def start_supervised!(fetcher_arguments \\ []) when is_list(fetcher_arguments) do
merged_fetcher_arguments =
Keyword.merge(
fetcher_arguments,
[
flush_interval: 50,
max_batch_size: 1,
max_concurrency: 1
],
fetcher_arguments
)
[merged_fetcher_arguments]

Loading…
Cancel
Save