feat: Add window between balance fetch retries for missing balanceOf tokens (#10142)

pull/10168/head
Qwerty5Uiop 6 months ago committed by GitHub
parent ba664fa27e
commit e2692945fc
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
  1. 2
      apps/explorer/lib/explorer/chain/import/runner/address/token_balances.ex
  2. 40
      apps/explorer/lib/explorer/utility/missing_balance_of_token.ex
  3. 9
      apps/explorer/priv/repo/migrations/20240527152734_add_currently_implemented_to_missing_balance_of_tokens.exs
  4. 3
      apps/explorer/test/explorer/chain/import/runner/address/token_balances_test.exs
  5. 47
      apps/indexer/lib/indexer/fetcher/token_balance.ex
  6. 35
      apps/indexer/test/indexer/fetcher/token_balance_test.exs
  7. 3
      config/runtime.exs
  8. 1
      docker-compose/envs/common-blockscout.env

@ -75,7 +75,7 @@ defmodule Explorer.Chain.Import.Runner.Address.TokenBalances do
is_nil(Map.get(balance_params, :value_fetched_at)) or is_nil(Map.get(balance_params, :value))
end)
{:ok, filled_balances ++ MissingBalanceOfToken.filter_token_balances_params(placeholders)}
{:ok, filled_balances ++ MissingBalanceOfToken.filter_token_balances_params(placeholders, false)}
end
@spec insert(Repo.t(), [map()], %{

@ -12,6 +12,7 @@ defmodule Explorer.Utility.MissingBalanceOfToken do
@primary_key false
typed_schema "missing_balance_of_tokens" do
field(:block_number, :integer)
field(:currently_implemented, :boolean)
belongs_to(
:token,
@ -28,7 +29,7 @@ defmodule Explorer.Utility.MissingBalanceOfToken do
@doc false
def changeset(missing_balance_of_token \\ %__MODULE__{}, params) do
cast(missing_balance_of_token, params, [:token_contract_address_hash, :block_number])
cast(missing_balance_of_token, params, [:token_contract_address_hash, :block_number, :currently_implemented])
end
@doc """
@ -41,23 +42,38 @@ defmodule Explorer.Utility.MissingBalanceOfToken do
|> Repo.all()
end
@doc """
Set currently_implemented: true for all provided token contract address hashes
"""
@spec mark_as_implemented([Hash.Address.t()]) :: {non_neg_integer(), nil | [term()]}
def mark_as_implemented(token_contract_address_hashes) do
__MODULE__
|> where([mbot], mbot.token_contract_address_hash in ^token_contract_address_hashes)
|> Repo.update_all(set: [currently_implemented: true])
end
@doc """
Filters provided token balances params by presence of record with the same `token_contract_address_hash`
and above or equal `block_number` in `missing_balance_of_tokens`.
"""
@spec filter_token_balances_params([map()]) :: [map()]
def filter_token_balances_params(params) do
@spec filter_token_balances_params([map()], boolean(), [__MODULE__.t()] | nil) :: [map()]
def filter_token_balances_params(params, use_window?, missing_balance_of_tokens \\ nil) do
existing_missing_balance_of_tokens = missing_balance_of_tokens || fetch_from_params(params)
missing_balance_of_tokens_map =
params
|> Enum.map(& &1.token_contract_address_hash)
|> get_by_hashes()
|> Enum.map(&{to_string(&1.token_contract_address_hash), &1.block_number})
existing_missing_balance_of_tokens
|> Enum.map(
&{to_string(&1.token_contract_address_hash),
%{block_number: &1.block_number, currently_implemented: &1.currently_implemented}}
)
|> Map.new()
Enum.filter(params, fn %{token_contract_address_hash: token_contract_address_hash, block_number: block_number} ->
case missing_balance_of_tokens_map[to_string(token_contract_address_hash)] do
nil -> true
missing_balance_of_block_number -> block_number > missing_balance_of_block_number
%{block_number: bn, currently_implemented: true} -> block_number > bn
%{block_number: bn} when not use_window? -> block_number > bn
%{block_number: bn} -> block_number > bn + missing_balance_of_window()
end
end)
end
@ -87,6 +103,14 @@ defmodule Explorer.Utility.MissingBalanceOfToken do
Repo.insert_all(__MODULE__, params, on_conflict: on_conflict(), conflict_target: :token_contract_address_hash)
end
defp fetch_from_params(params) do
params
|> Enum.map(& &1.token_contract_address_hash)
|> get_by_hashes()
end
defp missing_balance_of_window, do: Application.get_env(:explorer, __MODULE__)[:window_size]
defp on_conflict do
from(
mbot in __MODULE__,

@ -0,0 +1,9 @@
defmodule Explorer.Repo.Migrations.AddCurrentlyImplementedToMissingBalanceOfTokens do
use Ecto.Migration
def change do
alter table(:missing_balance_of_tokens) do
add(:currently_implemented, :boolean)
end
end
end

@ -234,7 +234,8 @@ defmodule Explorer.Chain.Import.Runner.Address.TokenBalancesTest do
insert(:missing_balance_of_token,
token_contract_address_hash: token_contract_address_hash,
block_number: block_number
block_number: block_number,
currently_implemented: true
)
address_hash = address.hash

@ -96,12 +96,19 @@ defmodule Indexer.Fetcher.TokenBalance do
@impl BufferedTask
@decorate trace(name: "fetch", resource: "Indexer.Fetcher.TokenBalance.run/2", tracer: Tracer, service: :indexer)
def run(entries, _json_rpc_named_arguments) do
params = Enum.map(entries, &format_params/1)
missing_balance_of_tokens =
params
|> Enum.map(& &1.token_contract_address_hash)
|> Enum.uniq()
|> MissingBalanceOfToken.get_by_hashes()
result =
entries
|> Enum.map(&format_params/1)
|> MissingBalanceOfToken.filter_token_balances_params()
params
|> MissingBalanceOfToken.filter_token_balances_params(true, missing_balance_of_tokens)
|> increase_retries_count()
|> fetch_from_blockchain()
|> fetch_from_blockchain(missing_balance_of_tokens)
|> import_token_balances()
if result == :ok do
@ -111,7 +118,7 @@ defmodule Indexer.Fetcher.TokenBalance do
end
end
def fetch_from_blockchain(params_list) do
def fetch_from_blockchain(params_list, missing_balance_of_tokens) do
retryable_params_list =
params_list
|> Enum.filter(&(&1.retries_count <= @max_retries))
@ -130,6 +137,8 @@ defmodule Indexer.Fetcher.TokenBalance do
failed_token_balances: failed_token_balances
}
handle_success_balances(fetched_token_balances, missing_balance_of_tokens)
if Enum.empty?(failed_token_balances) do
{:halt, all_token_balances}
else
@ -149,6 +158,23 @@ defmodule Indexer.Fetcher.TokenBalance do
fetched_token_balances
end
defp handle_success_balances(fetched_token_balances, missing_balance_of_tokens) do
successful_token_hashes =
fetched_token_balances
|> Enum.map(&to_string(&1.token_contract_address_hash))
|> MapSet.new()
missing_balance_of_token_hashes =
missing_balance_of_tokens
|> Enum.map(&to_string(&1.token_contract_address_hash))
|> MapSet.new()
successful_token_hashes
|> MapSet.intersection(missing_balance_of_token_hashes)
|> MapSet.to_list()
|> MissingBalanceOfToken.mark_as_implemented()
end
defp handle_failed_balances(failed_token_balances) do
{missing_balance_of_balances, other_failed_balances} =
Enum.split_with(failed_token_balances, fn
@ -158,9 +184,14 @@ defmodule Indexer.Fetcher.TokenBalance do
MissingBalanceOfToken.insert_from_params(missing_balance_of_balances)
Enum.each(missing_balance_of_balances, fn balance ->
TokenBalance.delete_placeholders_below(balance.token_contract_address_hash, balance.block_number)
CurrentTokenBalance.delete_placeholders_below(balance.token_contract_address_hash, balance.block_number)
missing_balance_of_balances
|> Enum.group_by(& &1.token_contract_address_hash, & &1.block_number)
|> Enum.map(fn {token_contract_address_hash, block_numbers} ->
{token_contract_address_hash, Enum.max(block_numbers)}
end)
|> Enum.each(fn {token_contract_address_hash, block_number} ->
TokenBalance.delete_placeholders_below(token_contract_address_hash, block_number)
CurrentTokenBalance.delete_placeholders_below(token_contract_address_hash, block_number)
end)
other_failed_balances

@ -187,7 +187,7 @@ defmodule Indexer.Fetcher.TokenBalanceTest do
test "filters out params with tokens that doesn't implement balanceOf function" do
address = insert(:address)
missing_balance_of_token = insert(:missing_balance_of_token)
missing_balance_of_token = insert(:missing_balance_of_token, currently_implemented: true)
assert TokenBalance.run(
[
@ -200,6 +200,39 @@ defmodule Indexer.Fetcher.TokenBalanceTest do
assert Repo.all(Address.TokenBalance) == []
end
test "set currently_implemented: true for missing balanceOf token if balance was successfully fetched" do
address = insert(:address)
missing_balance_of_token = insert(:missing_balance_of_token)
window_size = Application.get_env(:explorer, MissingBalanceOfToken)[:window_size]
expect(
EthereumJSONRPC.Mox,
:json_rpc,
fn [%{id: id, method: "eth_call", params: [%{data: _, to: _}, _]}], _options ->
{:ok,
[
%{
id: id,
jsonrpc: "2.0",
result: "0x00000000000000000000000000000000000000000000d3c21bcecceda1000000"
}
]}
end
)
refute missing_balance_of_token.currently_implemented
assert TokenBalance.run(
[
{address.hash.bytes, missing_balance_of_token.token_contract_address_hash.bytes,
missing_balance_of_token.block_number + window_size + 1, "ERC-20", nil, 0}
],
nil
) == :ok
assert %{currently_implemented: true} = Repo.one(MissingBalanceOfToken)
end
test "in case of error deletes token balance placeholders below the given number and inserts new missing balanceOf tokens" do
address = insert(:address)
%{contract_address_hash: token_contract_address_hash} = insert(:token)

@ -575,6 +575,9 @@ config :explorer, Explorer.Chain.BridgedToken,
amb_bridge_mediators: System.get_env("BRIDGED_TOKENS_AMB_BRIDGE_MEDIATORS"),
foreign_json_rpc: System.get_env("BRIDGED_TOKENS_FOREIGN_JSON_RPC", "")
config :explorer, Explorer.Utility.MissingBalanceOfToken,
window_size: ConfigHelper.parse_integer_env_var("MISSING_BALANCE_OF_TOKENS_WINDOW_SIZE", 100)
###############
### Indexer ###
###############

@ -263,6 +263,7 @@ INDEXER_DISABLE_INTERNAL_TRANSACTIONS_FETCHER=false
# TOKEN_ID_MIGRATION_FIRST_BLOCK=
# TOKEN_ID_MIGRATION_CONCURRENCY=
# TOKEN_ID_MIGRATION_BATCH_SIZE=
# MISSING_BALANCE_OF_TOKENS_WINDOW_SIZE=
# INDEXER_INTERNAL_TRANSACTIONS_TRACER_TYPE=
# WEBAPP_URL=
# API_URL=

Loading…
Cancel
Save