Chunk realtime balances requests

pull/7873/head
Qwerty5Uiop 1 year ago
parent 62d531052f
commit 30e5e0904c
  1. 1
      CHANGELOG.md
  2. 6
      apps/ethereum_jsonrpc/lib/ethereum_jsonrpc.ex
  3. 4
      apps/ethereum_jsonrpc/lib/ethereum_jsonrpc/http.ex
  4. 2
      apps/indexer/lib/indexer/block/realtime/fetcher.ex
  5. 2
      apps/indexer/lib/indexer/fetcher/coin_balance.ex
  6. 142
      apps/indexer/test/indexer/block/realtime/fetcher_test.exs

@ -13,6 +13,7 @@
- [#7811](https://github.com/blockscout/blockscout/pull/7811) - Filter addresses before insertion - [#7811](https://github.com/blockscout/blockscout/pull/7811) - Filter addresses before insertion
- [#7895](https://github.com/blockscout/blockscout/pull/7895) - API v2: Add sorting to tokens page - [#7895](https://github.com/blockscout/blockscout/pull/7895) - API v2: Add sorting to tokens page
- [#7859](https://github.com/blockscout/blockscout/pull/7859) - Add TokenTotalSupplyUpdater - [#7859](https://github.com/blockscout/blockscout/pull/7859) - Add TokenTotalSupplyUpdater
- [#7873](https://github.com/blockscout/blockscout/pull/7873) - Chunk realtime balances requests
### Fixes ### Fixes

@ -185,7 +185,7 @@ defmodule EthereumJSONRPC do
[%{required(:block_quantity) => quantity, required(:hash_data) => data()}], [%{required(:block_quantity) => quantity, required(:hash_data) => data()}],
json_rpc_named_arguments json_rpc_named_arguments
) :: {:ok, FetchedBalances.t()} | {:error, reason :: term} ) :: {:ok, FetchedBalances.t()} | {:error, reason :: term}
def fetch_balances(params_list, json_rpc_named_arguments) def fetch_balances(params_list, json_rpc_named_arguments, chunk_size \\ nil)
when is_list(params_list) and is_list(json_rpc_named_arguments) do when is_list(params_list) and is_list(json_rpc_named_arguments) do
filtered_params = filtered_params =
if Application.get_env(:ethereum_jsonrpc, :disable_archive_balances?) do if Application.get_env(:ethereum_jsonrpc, :disable_archive_balances?) do
@ -203,6 +203,7 @@ defmodule EthereumJSONRPC do
with {:ok, responses} <- with {:ok, responses} <-
id_to_params id_to_params
|> FetchedBalances.requests() |> FetchedBalances.requests()
|> chunk_requests(chunk_size)
|> json_rpc(json_rpc_named_arguments) do |> json_rpc(json_rpc_named_arguments) do
{:ok, FetchedBalances.from_responses(responses, id_to_params)} {:ok, FetchedBalances.from_responses(responses, id_to_params)}
end end
@ -550,6 +551,9 @@ defmodule EthereumJSONRPC do
end end
end end
defp chunk_requests(requests, nil), do: requests
defp chunk_requests(requests, chunk_size), do: Enum.chunk_every(requests, chunk_size)
def first_block_to_fetch(config) do def first_block_to_fetch(config) do
string_value = Application.get_env(:indexer, config) string_value = Application.get_env(:indexer, config)

@ -38,6 +38,10 @@ defmodule EthereumJSONRPC.HTTP do
end end
end end
def json_rpc([batch | _] = chunked_batch_request, options) when is_list(batch) do
chunked_json_rpc(chunked_batch_request, options, [])
end
def json_rpc(batch_request, options) when is_list(batch_request) do def json_rpc(batch_request, options) when is_list(batch_request) do
chunked_json_rpc([batch_request], options, []) chunked_json_rpc([batch_request], options, [])
end end

@ -428,7 +428,7 @@ defmodule Indexer.Block.Realtime.Fetcher do
) do ) do
case options case options
|> fetch_balances_params_list() |> fetch_balances_params_list()
|> EthereumJSONRPC.fetch_balances(json_rpc_named_arguments) do |> EthereumJSONRPC.fetch_balances(json_rpc_named_arguments, CoinBalance.batch_size()) do
{:ok, %FetchedBalances{params_list: params_list, errors: []}} -> {:ok, %FetchedBalances{params_list: params_list, errors: []}} ->
merged_addresses_params = merged_addresses_params =
%{address_coin_balances: params_list} %{address_coin_balances: params_list}

@ -23,6 +23,8 @@ defmodule Indexer.Fetcher.CoinBalance do
@default_max_batch_size 500 @default_max_batch_size 500
@default_max_concurrency 4 @default_max_concurrency 4
def batch_size, do: defaults()[:max_batch_size]
@doc """ @doc """
Asynchronously fetches balances for each address `hash` at the `block_number`. Asynchronously fetches balances for each address `hash` at the `block_number`.
""" """

@ -472,30 +472,32 @@ defmodule Indexer.Block.Realtime.FetcherTest do
]} ]}
[ [
%{ [
id: 0, %{
jsonrpc: "2.0", id: 0,
method: "eth_getBalance", jsonrpc: "2.0",
params: ["0x40b18103537c0f15d5e137dd8ddd019b84949d16", "0x3C365F"] method: "eth_getBalance",
}, params: ["0x40b18103537c0f15d5e137dd8ddd019b84949d16", "0x3C365F"]
%{ },
id: 1, %{
jsonrpc: "2.0", id: 1,
method: "eth_getBalance", jsonrpc: "2.0",
params: ["0x5ee341ac44d344ade1ca3a771c59b98eb2a77df2", "0x3C365F"] method: "eth_getBalance",
}, params: ["0x5ee341ac44d344ade1ca3a771c59b98eb2a77df2", "0x3C365F"]
%{ },
id: 2, %{
jsonrpc: "2.0", id: 2,
method: "eth_getBalance", jsonrpc: "2.0",
params: ["0x66c9343c7e8ca673a1fedf9dbf2cd7936dbbf7e3", "0x3C3660"] method: "eth_getBalance",
}, params: ["0x66c9343c7e8ca673a1fedf9dbf2cd7936dbbf7e3", "0x3C3660"]
%{ },
id: 3, %{
jsonrpc: "2.0", id: 3,
method: "eth_getBalance", jsonrpc: "2.0",
params: ["0x698bf6943bab687b2756394624aa183f434f65da", "0x3C365F"] method: "eth_getBalance",
} params: ["0x698bf6943bab687b2756394624aa183f434f65da", "0x3C365F"]
}
]
], ],
_ -> _ ->
{:ok, {:ok,
@ -964,30 +966,32 @@ defmodule Indexer.Block.Realtime.FetcherTest do
]} ]}
[ [
%{ [
id: 0, %{
jsonrpc: "2.0", id: 0,
method: "eth_getBalance", jsonrpc: "2.0",
params: ["0x40b18103537c0f15d5e137dd8ddd019b84949d16", "0x3C365F"] method: "eth_getBalance",
}, params: ["0x40b18103537c0f15d5e137dd8ddd019b84949d16", "0x3C365F"]
%{ },
id: 1, %{
jsonrpc: "2.0", id: 1,
method: "eth_getBalance", jsonrpc: "2.0",
params: ["0x5ee341ac44d344ade1ca3a771c59b98eb2a77df2", "0x3C365F"] method: "eth_getBalance",
}, params: ["0x5ee341ac44d344ade1ca3a771c59b98eb2a77df2", "0x3C365F"]
%{ },
id: 2, %{
jsonrpc: "2.0", id: 2,
method: "eth_getBalance", jsonrpc: "2.0",
params: ["0x66c9343c7e8ca673a1fedf9dbf2cd7936dbbf7e3", "0x3C3660"] method: "eth_getBalance",
}, params: ["0x66c9343c7e8ca673a1fedf9dbf2cd7936dbbf7e3", "0x3C3660"]
%{ },
id: 3, %{
jsonrpc: "2.0", id: 3,
method: "eth_getBalance", jsonrpc: "2.0",
params: ["0x698bf6943bab687b2756394624aa183f434f65da", "0x3C365F"] method: "eth_getBalance",
} params: ["0x698bf6943bab687b2756394624aa183f434f65da", "0x3C365F"]
}
]
], ],
_ -> _ ->
{:ok, {:ok,
@ -1183,23 +1187,27 @@ defmodule Indexer.Block.Realtime.FetcherTest do
]} ]}
[ [
%{ [
id: 0, %{
jsonrpc: "2.0", id: 0,
method: "eth_getBalance", jsonrpc: "2.0",
params: ["0x5ee341ac44d344ade1ca3a771c59b98eb2a77df2", "0x3C365F"] method: "eth_getBalance",
} params: ["0x5ee341ac44d344ade1ca3a771c59b98eb2a77df2", "0x3C365F"]
}
]
], ],
_ -> _ ->
{:ok, [%{id: 0, jsonrpc: "2.0", result: "0x53474fa377a46000"}]} {:ok, [%{id: 0, jsonrpc: "2.0", result: "0x53474fa377a46000"}]}
[ [
%{ [
id: 0, %{
jsonrpc: "2.0", id: 0,
method: "eth_getBalance", jsonrpc: "2.0",
params: ["0x66c9343c7e8ca673a1fedf9dbf2cd7936dbbf7e3", "0x3C3660"] method: "eth_getBalance",
} params: ["0x66c9343c7e8ca673a1fedf9dbf2cd7936dbbf7e3", "0x3C3660"]
}
]
], ],
_ -> _ ->
{:ok, [%{id: 0, jsonrpc: "2.0", result: "0x53507afe51f28000"}]} {:ok, [%{id: 0, jsonrpc: "2.0", result: "0x53507afe51f28000"}]}
@ -1224,12 +1232,14 @@ defmodule Indexer.Block.Realtime.FetcherTest do
]} ]}
[ [
%{ [
id: 0, %{
jsonrpc: "2.0", id: 0,
method: "eth_getBalance", jsonrpc: "2.0",
params: ["0x5ee341ac44d344ade1ca3a771c59b98eb2a77df2", "0x3C365F"] method: "eth_getBalance",
} params: ["0x5ee341ac44d344ade1ca3a771c59b98eb2a77df2", "0x3C365F"]
}
]
], ],
_ -> _ ->
{:ok, [%{id: 0, jsonrpc: "2.0", result: "0x53474fa377a46000"}]} {:ok, [%{id: 0, jsonrpc: "2.0", result: "0x53474fa377a46000"}]}

Loading…
Cancel
Save