From e2692945fc29a97e8f7b4679797a929a86fd0561 Mon Sep 17 00:00:00 2001 From: Qwerty5Uiop <105209995+Qwerty5Uiop@users.noreply.github.com> Date: Wed, 29 May 2024 16:39:58 +0400 Subject: [PATCH] feat: Add window between balance fetch retries for missing balanceOf tokens (#10142) --- .../import/runner/address/token_balances.ex | 2 +- .../utility/missing_balance_of_token.ex | 40 ++++++++++++---- ...plemented_to_missing_balance_of_tokens.exs | 9 ++++ .../runner/address/token_balances_test.exs | 3 +- .../lib/indexer/fetcher/token_balance.ex | 47 +++++++++++++++---- .../indexer/fetcher/token_balance_test.exs | 35 +++++++++++++- config/runtime.exs | 3 ++ docker-compose/envs/common-blockscout.env | 1 + 8 files changed, 121 insertions(+), 19 deletions(-) create mode 100644 apps/explorer/priv/repo/migrations/20240527152734_add_currently_implemented_to_missing_balance_of_tokens.exs diff --git a/apps/explorer/lib/explorer/chain/import/runner/address/token_balances.ex b/apps/explorer/lib/explorer/chain/import/runner/address/token_balances.ex index e4e20d08fa..af07a27626 100644 --- a/apps/explorer/lib/explorer/chain/import/runner/address/token_balances.ex +++ b/apps/explorer/lib/explorer/chain/import/runner/address/token_balances.ex @@ -75,7 +75,7 @@ defmodule Explorer.Chain.Import.Runner.Address.TokenBalances do is_nil(Map.get(balance_params, :value_fetched_at)) or is_nil(Map.get(balance_params, :value)) end) - {:ok, filled_balances ++ MissingBalanceOfToken.filter_token_balances_params(placeholders)} + {:ok, filled_balances ++ MissingBalanceOfToken.filter_token_balances_params(placeholders, false)} end @spec insert(Repo.t(), [map()], %{ diff --git a/apps/explorer/lib/explorer/utility/missing_balance_of_token.ex b/apps/explorer/lib/explorer/utility/missing_balance_of_token.ex index 671de226ba..af1333cc90 100644 --- a/apps/explorer/lib/explorer/utility/missing_balance_of_token.ex +++ b/apps/explorer/lib/explorer/utility/missing_balance_of_token.ex @@ -12,6 +12,7 @@ defmodule Explorer.Utility.MissingBalanceOfToken do @primary_key false typed_schema "missing_balance_of_tokens" do field(:block_number, :integer) + field(:currently_implemented, :boolean) belongs_to( :token, @@ -28,7 +29,7 @@ defmodule Explorer.Utility.MissingBalanceOfToken do @doc false def changeset(missing_balance_of_token \\ %__MODULE__{}, params) do - cast(missing_balance_of_token, params, [:token_contract_address_hash, :block_number]) + cast(missing_balance_of_token, params, [:token_contract_address_hash, :block_number, :currently_implemented]) end @doc """ @@ -41,23 +42,38 @@ defmodule Explorer.Utility.MissingBalanceOfToken do |> Repo.all() end + @doc """ + Set currently_implemented: true for all provided token contract address hashes + """ + @spec mark_as_implemented([Hash.Address.t()]) :: {non_neg_integer(), nil | [term()]} + def mark_as_implemented(token_contract_address_hashes) do + __MODULE__ + |> where([mbot], mbot.token_contract_address_hash in ^token_contract_address_hashes) + |> Repo.update_all(set: [currently_implemented: true]) + end + @doc """ Filters provided token balances params by presence of record with the same `token_contract_address_hash` and above or equal `block_number` in `missing_balance_of_tokens`. """ - @spec filter_token_balances_params([map()]) :: [map()] - def filter_token_balances_params(params) do + @spec filter_token_balances_params([map()], boolean(), [__MODULE__.t()] | nil) :: [map()] + def filter_token_balances_params(params, use_window?, missing_balance_of_tokens \\ nil) do + existing_missing_balance_of_tokens = missing_balance_of_tokens || fetch_from_params(params) + missing_balance_of_tokens_map = - params - |> Enum.map(& &1.token_contract_address_hash) - |> get_by_hashes() - |> Enum.map(&{to_string(&1.token_contract_address_hash), &1.block_number}) + existing_missing_balance_of_tokens + |> Enum.map( + &{to_string(&1.token_contract_address_hash), + %{block_number: &1.block_number, currently_implemented: &1.currently_implemented}} + ) |> Map.new() Enum.filter(params, fn %{token_contract_address_hash: token_contract_address_hash, block_number: block_number} -> case missing_balance_of_tokens_map[to_string(token_contract_address_hash)] do nil -> true - missing_balance_of_block_number -> block_number > missing_balance_of_block_number + %{block_number: bn, currently_implemented: true} -> block_number > bn + %{block_number: bn} when not use_window? -> block_number > bn + %{block_number: bn} -> block_number > bn + missing_balance_of_window() end end) end @@ -87,6 +103,14 @@ defmodule Explorer.Utility.MissingBalanceOfToken do Repo.insert_all(__MODULE__, params, on_conflict: on_conflict(), conflict_target: :token_contract_address_hash) end + defp fetch_from_params(params) do + params + |> Enum.map(& &1.token_contract_address_hash) + |> get_by_hashes() + end + + defp missing_balance_of_window, do: Application.get_env(:explorer, __MODULE__)[:window_size] + defp on_conflict do from( mbot in __MODULE__, diff --git a/apps/explorer/priv/repo/migrations/20240527152734_add_currently_implemented_to_missing_balance_of_tokens.exs b/apps/explorer/priv/repo/migrations/20240527152734_add_currently_implemented_to_missing_balance_of_tokens.exs new file mode 100644 index 0000000000..84b8e02cf4 --- /dev/null +++ b/apps/explorer/priv/repo/migrations/20240527152734_add_currently_implemented_to_missing_balance_of_tokens.exs @@ -0,0 +1,9 @@ +defmodule Explorer.Repo.Migrations.AddCurrentlyImplementedToMissingBalanceOfTokens do + use Ecto.Migration + + def change do + alter table(:missing_balance_of_tokens) do + add(:currently_implemented, :boolean) + end + end +end diff --git a/apps/explorer/test/explorer/chain/import/runner/address/token_balances_test.exs b/apps/explorer/test/explorer/chain/import/runner/address/token_balances_test.exs index f391aaeb80..4d7f24eec8 100644 --- a/apps/explorer/test/explorer/chain/import/runner/address/token_balances_test.exs +++ b/apps/explorer/test/explorer/chain/import/runner/address/token_balances_test.exs @@ -234,7 +234,8 @@ defmodule Explorer.Chain.Import.Runner.Address.TokenBalancesTest do insert(:missing_balance_of_token, token_contract_address_hash: token_contract_address_hash, - block_number: block_number + block_number: block_number, + currently_implemented: true ) address_hash = address.hash diff --git a/apps/indexer/lib/indexer/fetcher/token_balance.ex b/apps/indexer/lib/indexer/fetcher/token_balance.ex index bfabded15a..be3bfafb2e 100644 --- a/apps/indexer/lib/indexer/fetcher/token_balance.ex +++ b/apps/indexer/lib/indexer/fetcher/token_balance.ex @@ -96,12 +96,19 @@ defmodule Indexer.Fetcher.TokenBalance do @impl BufferedTask @decorate trace(name: "fetch", resource: "Indexer.Fetcher.TokenBalance.run/2", tracer: Tracer, service: :indexer) def run(entries, _json_rpc_named_arguments) do + params = Enum.map(entries, &format_params/1) + + missing_balance_of_tokens = + params + |> Enum.map(& &1.token_contract_address_hash) + |> Enum.uniq() + |> MissingBalanceOfToken.get_by_hashes() + result = - entries - |> Enum.map(&format_params/1) - |> MissingBalanceOfToken.filter_token_balances_params() + params + |> MissingBalanceOfToken.filter_token_balances_params(true, missing_balance_of_tokens) |> increase_retries_count() - |> fetch_from_blockchain() + |> fetch_from_blockchain(missing_balance_of_tokens) |> import_token_balances() if result == :ok do @@ -111,7 +118,7 @@ defmodule Indexer.Fetcher.TokenBalance do end end - def fetch_from_blockchain(params_list) do + def fetch_from_blockchain(params_list, missing_balance_of_tokens) do retryable_params_list = params_list |> Enum.filter(&(&1.retries_count <= @max_retries)) @@ -130,6 +137,8 @@ defmodule Indexer.Fetcher.TokenBalance do failed_token_balances: failed_token_balances } + handle_success_balances(fetched_token_balances, missing_balance_of_tokens) + if Enum.empty?(failed_token_balances) do {:halt, all_token_balances} else @@ -149,6 +158,23 @@ defmodule Indexer.Fetcher.TokenBalance do fetched_token_balances end + defp handle_success_balances(fetched_token_balances, missing_balance_of_tokens) do + successful_token_hashes = + fetched_token_balances + |> Enum.map(&to_string(&1.token_contract_address_hash)) + |> MapSet.new() + + missing_balance_of_token_hashes = + missing_balance_of_tokens + |> Enum.map(&to_string(&1.token_contract_address_hash)) + |> MapSet.new() + + successful_token_hashes + |> MapSet.intersection(missing_balance_of_token_hashes) + |> MapSet.to_list() + |> MissingBalanceOfToken.mark_as_implemented() + end + defp handle_failed_balances(failed_token_balances) do {missing_balance_of_balances, other_failed_balances} = Enum.split_with(failed_token_balances, fn @@ -158,9 +184,14 @@ defmodule Indexer.Fetcher.TokenBalance do MissingBalanceOfToken.insert_from_params(missing_balance_of_balances) - Enum.each(missing_balance_of_balances, fn balance -> - TokenBalance.delete_placeholders_below(balance.token_contract_address_hash, balance.block_number) - CurrentTokenBalance.delete_placeholders_below(balance.token_contract_address_hash, balance.block_number) + missing_balance_of_balances + |> Enum.group_by(& &1.token_contract_address_hash, & &1.block_number) + |> Enum.map(fn {token_contract_address_hash, block_numbers} -> + {token_contract_address_hash, Enum.max(block_numbers)} + end) + |> Enum.each(fn {token_contract_address_hash, block_number} -> + TokenBalance.delete_placeholders_below(token_contract_address_hash, block_number) + CurrentTokenBalance.delete_placeholders_below(token_contract_address_hash, block_number) end) other_failed_balances diff --git a/apps/indexer/test/indexer/fetcher/token_balance_test.exs b/apps/indexer/test/indexer/fetcher/token_balance_test.exs index 5d33543b75..e39cc6e98a 100644 --- a/apps/indexer/test/indexer/fetcher/token_balance_test.exs +++ b/apps/indexer/test/indexer/fetcher/token_balance_test.exs @@ -187,7 +187,7 @@ defmodule Indexer.Fetcher.TokenBalanceTest do test "filters out params with tokens that doesn't implement balanceOf function" do address = insert(:address) - missing_balance_of_token = insert(:missing_balance_of_token) + missing_balance_of_token = insert(:missing_balance_of_token, currently_implemented: true) assert TokenBalance.run( [ @@ -200,6 +200,39 @@ defmodule Indexer.Fetcher.TokenBalanceTest do assert Repo.all(Address.TokenBalance) == [] end + test "set currently_implemented: true for missing balanceOf token if balance was successfully fetched" do + address = insert(:address) + missing_balance_of_token = insert(:missing_balance_of_token) + window_size = Application.get_env(:explorer, MissingBalanceOfToken)[:window_size] + + expect( + EthereumJSONRPC.Mox, + :json_rpc, + fn [%{id: id, method: "eth_call", params: [%{data: _, to: _}, _]}], _options -> + {:ok, + [ + %{ + id: id, + jsonrpc: "2.0", + result: "0x00000000000000000000000000000000000000000000d3c21bcecceda1000000" + } + ]} + end + ) + + refute missing_balance_of_token.currently_implemented + + assert TokenBalance.run( + [ + {address.hash.bytes, missing_balance_of_token.token_contract_address_hash.bytes, + missing_balance_of_token.block_number + window_size + 1, "ERC-20", nil, 0} + ], + nil + ) == :ok + + assert %{currently_implemented: true} = Repo.one(MissingBalanceOfToken) + end + test "in case of error deletes token balance placeholders below the given number and inserts new missing balanceOf tokens" do address = insert(:address) %{contract_address_hash: token_contract_address_hash} = insert(:token) diff --git a/config/runtime.exs b/config/runtime.exs index a9fb04c4df..c9490d7bcc 100644 --- a/config/runtime.exs +++ b/config/runtime.exs @@ -575,6 +575,9 @@ config :explorer, Explorer.Chain.BridgedToken, amb_bridge_mediators: System.get_env("BRIDGED_TOKENS_AMB_BRIDGE_MEDIATORS"), foreign_json_rpc: System.get_env("BRIDGED_TOKENS_FOREIGN_JSON_RPC", "") +config :explorer, Explorer.Utility.MissingBalanceOfToken, + window_size: ConfigHelper.parse_integer_env_var("MISSING_BALANCE_OF_TOKENS_WINDOW_SIZE", 100) + ############### ### Indexer ### ############### diff --git a/docker-compose/envs/common-blockscout.env b/docker-compose/envs/common-blockscout.env index 0f3c2a5791..66728d07b5 100644 --- a/docker-compose/envs/common-blockscout.env +++ b/docker-compose/envs/common-blockscout.env @@ -263,6 +263,7 @@ INDEXER_DISABLE_INTERNAL_TRANSACTIONS_FETCHER=false # TOKEN_ID_MIGRATION_FIRST_BLOCK= # TOKEN_ID_MIGRATION_CONCURRENCY= # TOKEN_ID_MIGRATION_BATCH_SIZE= +# MISSING_BALANCE_OF_TOKENS_WINDOW_SIZE= # INDEXER_INTERNAL_TRANSACTIONS_TRACER_TYPE= # WEBAPP_URL= # API_URL=