From 11d6a2e9a93dfb0d2d5a975f778be7d702e67ffb Mon Sep 17 00:00:00 2001 From: Qwerty5Uiop <105209995+Qwerty5Uiop@users.noreply.github.com> Date: Thu, 5 Sep 2024 13:16:01 +0400 Subject: [PATCH] chore: Token balances fetcher slow queue (#10694) --- .../explorer/chain/address/token_balance.ex | 12 +- .../import/runner/address/token_balances.ex | 2 + ...8140638_add_token_balance_retry_fields.exs | 10 ++ .../lib/indexer/fetcher/token_balance.ex | 109 +++++++------- apps/indexer/lib/indexer/token_balances.ex | 27 +--- .../indexer/fetcher/token_balance_test.exs | 138 ++++++++---------- config/runtime.exs | 4 +- docker-compose/envs/common-blockscout.env | 2 + 8 files changed, 138 insertions(+), 166 deletions(-) create mode 100644 apps/explorer/priv/repo/migrations/20240828140638_add_token_balance_retry_fields.exs diff --git a/apps/explorer/lib/explorer/chain/address/token_balance.ex b/apps/explorer/lib/explorer/chain/address/token_balance.ex index e0aef47563..85a7314eb8 100644 --- a/apps/explorer/lib/explorer/chain/address/token_balance.ex +++ b/apps/explorer/lib/explorer/chain/address/token_balance.ex @@ -25,6 +25,8 @@ defmodule Explorer.Chain.Address.TokenBalance do * `value` - The value that's represents the balance. * `token_id` - The token_id of the transferred token (applicable for ERC-1155, ERC-721 and ERC-404 tokens) * `token_type` - The type of the token + * `refetch_after` - when to refetch the token balance + * `retries_count` - number of times the token balance has been retried """ typed_schema "address_token_balances" do field(:value, :decimal) @@ -32,6 +34,8 @@ defmodule Explorer.Chain.Address.TokenBalance do field(:value_fetched_at, :utc_datetime_usec) field(:token_id, :decimal) field(:token_type, :string, null: false) + field(:refetch_after, :utc_datetime_usec) + field(:retries_count, :integer) belongs_to(:address, Address, foreign_key: :address_hash, references: :hash, type: Hash.Address, null: false) @@ -47,7 +51,7 @@ defmodule Explorer.Chain.Address.TokenBalance do timestamps() end - @optional_fields ~w(value value_fetched_at token_id)a + @optional_fields ~w(value value_fetched_at token_id refetch_after retries_count)a @required_fields ~w(address_hash block_number token_contract_address_hash token_type)a @allowed_fields @optional_fields ++ @required_fields @@ -77,7 +81,8 @@ defmodule Explorer.Chain.Address.TokenBalance do where: ((tb.address_hash != ^@burn_address_hash and tb.token_type == "ERC-721") or tb.token_type == "ERC-20" or tb.token_type == "ERC-1155" or tb.token_type == "ERC-404") and - (is_nil(tb.value_fetched_at) or is_nil(tb.value)) + (is_nil(tb.value_fetched_at) or is_nil(tb.value)) and + (is_nil(tb.refetch_after) or tb.refetch_after < ^Timex.now()) ) else from( @@ -87,7 +92,8 @@ defmodule Explorer.Chain.Address.TokenBalance do where: ((tb.address_hash != ^@burn_address_hash and t.type == "ERC-721") or t.type == "ERC-20" or t.type == "ERC-1155" or t.type == "ERC-404") and - (is_nil(tb.value_fetched_at) or is_nil(tb.value)) + (is_nil(tb.value_fetched_at) or is_nil(tb.value)) and + (is_nil(tb.refetch_after) or tb.refetch_after < ^Timex.now()) ) end end diff --git a/apps/explorer/lib/explorer/chain/import/runner/address/token_balances.ex b/apps/explorer/lib/explorer/chain/import/runner/address/token_balances.ex index af07a27626..932b6cc748 100644 --- a/apps/explorer/lib/explorer/chain/import/runner/address/token_balances.ex +++ b/apps/explorer/lib/explorer/chain/import/runner/address/token_balances.ex @@ -148,6 +148,8 @@ defmodule Explorer.Chain.Import.Runner.Address.TokenBalances do value: fragment("COALESCE(EXCLUDED.value, ?)", token_balance.value), value_fetched_at: fragment("EXCLUDED.value_fetched_at"), token_type: fragment("EXCLUDED.token_type"), + refetch_after: fragment("EXCLUDED.refetch_after"), + retries_count: fragment("EXCLUDED.retries_count"), inserted_at: fragment("LEAST(EXCLUDED.inserted_at, ?)", token_balance.inserted_at), updated_at: fragment("GREATEST(EXCLUDED.updated_at, ?)", token_balance.updated_at) ] diff --git a/apps/explorer/priv/repo/migrations/20240828140638_add_token_balance_retry_fields.exs b/apps/explorer/priv/repo/migrations/20240828140638_add_token_balance_retry_fields.exs new file mode 100644 index 0000000000..3d1fddf8bb --- /dev/null +++ b/apps/explorer/priv/repo/migrations/20240828140638_add_token_balance_retry_fields.exs @@ -0,0 +1,10 @@ +defmodule Explorer.Repo.Migrations.AddTokenBalanceRetryFields do + use Ecto.Migration + + def change do + alter table(:address_token_balances) do + add(:refetch_after, :utc_datetime_usec) + add(:retries_count, :smallint) + end + end +end diff --git a/apps/indexer/lib/indexer/fetcher/token_balance.ex b/apps/indexer/lib/indexer/fetcher/token_balance.ex index 1f808effc6..5bfd3deaff 100644 --- a/apps/indexer/lib/indexer/fetcher/token_balance.ex +++ b/apps/indexer/lib/indexer/fetcher/token_balance.ex @@ -9,7 +9,7 @@ defmodule Indexer.Fetcher.TokenBalance do It behaves as a `BufferedTask`, so we can configure the `max_batch_size` and the `max_concurrency` to control how many token balances will be fetched at the same time. - Also, this module set a `retries_count` for each token balance and increment this number to avoid fetching the ones + Also, this module set a `refetch_after` for each token balance in case of failure to avoid fetching the ones that always raise errors interacting with the Smart Contract. """ @@ -32,8 +32,6 @@ defmodule Indexer.Fetcher.TokenBalance do @timeout :timer.minutes(10) - @max_retries 3 - @spec async_fetch( [ %{ @@ -93,7 +91,7 @@ defmodule Indexer.Fetcher.TokenBalance do @doc """ Fetches the given entries (token_balances) from the Smart Contract and import them in our database. - It also increments the `retries_count` to avoid fetching token balances that always raise errors + It also set the `refetch_after` in case of failure to avoid fetching token balances that always raise errors when reading their balance in the Smart Contract. """ @impl BufferedTask @@ -110,7 +108,6 @@ defmodule Indexer.Fetcher.TokenBalance do result = params |> MissingBalanceOfToken.filter_token_balances_params(true, missing_balance_of_tokens) - |> increase_retries_count() |> fetch_from_blockchain(missing_balance_of_tokens) |> import_token_balances() @@ -122,45 +119,22 @@ defmodule Indexer.Fetcher.TokenBalance do end def fetch_from_blockchain(params_list, missing_balance_of_tokens) do - retryable_params_list = - params_list - |> Enum.filter(&(&1.retries_count <= @max_retries)) - |> Enum.uniq_by(&Map.take(&1, [:token_contract_address_hash, :token_id, :address_hash, :block_number])) - - Logger.metadata(count: Enum.count(retryable_params_list)) - - %{fetched_token_balances: fetched_token_balances, failed_token_balances: _failed_token_balances} = - 1..@max_retries - |> Enum.reduce_while(%{fetched_token_balances: [], failed_token_balances: retryable_params_list}, fn _x, acc -> - {:ok, %{fetched_token_balances: fetched_token_balances, failed_token_balances: failed_token_balances}} = - TokenBalances.fetch_token_balances_from_blockchain(acc.failed_token_balances) - - all_token_balances = %{ - fetched_token_balances: acc.fetched_token_balances ++ fetched_token_balances, - failed_token_balances: failed_token_balances - } - - handle_success_balances(fetched_token_balances, missing_balance_of_tokens) - - if Enum.empty?(failed_token_balances) do - {:halt, all_token_balances} - else - failed_token_balances = - failed_token_balances - |> handle_failed_balances() - |> increase_retries_count() - - token_balances_updated_retries_count = - all_token_balances - |> Map.put(:failed_token_balances, failed_token_balances) - - {:cont, token_balances_updated_retries_count} - end - end) + params_list = + Enum.uniq_by(params_list, &Map.take(&1, [:token_contract_address_hash, :token_id, :address_hash, :block_number])) + + Logger.metadata(count: Enum.count(params_list)) + + {:ok, %{fetched_token_balances: fetched_token_balances, failed_token_balances: failed_token_balances}} = + TokenBalances.fetch_token_balances_from_blockchain(params_list) + + handle_success_balances(fetched_token_balances, missing_balance_of_tokens) + failed_balances_to_keep = handle_failed_balances(failed_token_balances) - fetched_token_balances + fetched_token_balances ++ failed_balances_to_keep end + defp handle_success_balances([], _missing_balance_of_tokens), do: :ok + defp handle_success_balances(fetched_token_balances, missing_balance_of_tokens) do successful_token_hashes = fetched_token_balances @@ -178,7 +152,15 @@ defmodule Indexer.Fetcher.TokenBalance do |> MissingBalanceOfToken.mark_as_implemented() end + defp handle_failed_balances([]), do: [] + defp handle_failed_balances(failed_token_balances) do + failed_token_balances + |> handle_missing_balance_of_tokens() + |> handle_other_errors() + end + + defp handle_missing_balance_of_tokens(failed_token_balances) do {missing_balance_of_balances, other_failed_balances} = Enum.split_with(failed_token_balances, fn %{error: :unable_to_decode} -> true @@ -201,9 +183,27 @@ defmodule Indexer.Fetcher.TokenBalance do other_failed_balances end - defp increase_retries_count(params_list) do - params_list - |> Enum.map(&Map.put(&1, :retries_count, &1.retries_count + 1)) + defp handle_other_errors(failed_token_balances) do + Enum.map(failed_token_balances, fn token_balance_params -> + new_retries_count = token_balance_params.retries_count + 1 + + Map.merge(token_balance_params, %{ + retries_count: new_retries_count, + refetch_after: define_refetch_after(new_retries_count) + }) + end) + end + + defp define_refetch_after(retries_count) do + config = Application.get_env(:indexer, __MODULE__) + + coef = config[:exp_timeout_coeff] + max_refetch_interval = config[:max_refetch_interval] + max_retries_count = :math.log(max_refetch_interval / 1000 / coef) + + value = floor(coef * :math.exp(min(retries_count, max_retries_count))) + + Timex.shift(Timex.now(), seconds: value) end def import_token_balances(token_balances_params) do @@ -259,17 +259,14 @@ defmodule Indexer.Fetcher.TokenBalance do end end - defp entry( - %{ - token_contract_address_hash: token_contract_address_hash, - address_hash: address_hash, - block_number: block_number, - token_type: token_type, - token_id: token_id - } = token_balance - ) do - retries_count = Map.get(token_balance, :retries_count, 0) - + defp entry(%{ + token_contract_address_hash: token_contract_address_hash, + address_hash: address_hash, + block_number: block_number, + token_type: token_type, + token_id: token_id, + retries_count: retries_count + }) do token_id_int = case token_id do %Decimal{} -> Decimal.to_integer(token_id) @@ -277,7 +274,7 @@ defmodule Indexer.Fetcher.TokenBalance do _ -> token_id end - {address_hash.bytes, token_contract_address_hash.bytes, block_number, token_type, token_id_int, retries_count} + {address_hash.bytes, token_contract_address_hash.bytes, block_number, token_type, token_id_int, retries_count || 0} end defp format_params( diff --git a/apps/indexer/lib/indexer/token_balances.ex b/apps/indexer/lib/indexer/token_balances.ex index 5348bd2382..78180c9978 100644 --- a/apps/indexer/lib/indexer/token_balances.ex +++ b/apps/indexer/lib/indexer/token_balances.ex @@ -8,9 +8,7 @@ defmodule Indexer.TokenBalances do require Indexer.Tracer require Logger - alias Explorer.Chain alias Explorer.Token.BalanceReader - alias Indexer.Fetcher.TokenBalance alias Indexer.Tracer @nft_balance_function_abi [ @@ -85,7 +83,7 @@ defmodule Indexer.TokenBalances do requested_token_balances |> handle_killed_tasks(token_balances) |> unfetched_token_balances(fetched_token_balances) - |> schedule_token_balances + |> log_fetching_errors() failed_token_balances = requested_token_balances @@ -119,27 +117,6 @@ defmodule Indexer.TokenBalances do Map.merge(token_balance, %{value: nil, value_fetched_at: nil, error: error_message}) end - defp schedule_token_balances([]), do: nil - - defp schedule_token_balances(unfetched_token_balances) do - Logger.debug(fn -> "#{Enum.count(unfetched_token_balances)} token balances will be retried" end) - - log_fetching_errors(unfetched_token_balances) - - unfetched_token_balances - |> Enum.map(fn token_balance -> - {:ok, address_hash} = Chain.string_to_address_hash(token_balance.address_hash) - {:ok, token_hash} = Chain.string_to_address_hash(token_balance.token_contract_address_hash) - - Map.merge(token_balance, %{ - address_hash: address_hash, - token_contract_address_hash: token_hash, - block_number: token_balance.block_number - }) - end) - |> TokenBalance.async_fetch(false) - end - defp ignore_request_with_errors(%{value: nil, value_fetched_at: nil, error: _error}), do: false defp ignore_request_with_errors(_token_balance), do: true @@ -161,7 +138,7 @@ defmodule Indexer.TokenBalances do end) if Enum.any?(error_messages) do - Logger.debug( + Logger.error( [ "Errors while fetching TokenBalances through Contract interaction: \n", error_messages diff --git a/apps/indexer/test/indexer/fetcher/token_balance_test.exs b/apps/indexer/test/indexer/fetcher/token_balance_test.exs index e39cc6e98a..0c4fb0be44 100644 --- a/apps/indexer/test/indexer/fetcher/token_balance_test.exs +++ b/apps/indexer/test/indexer/fetcher/token_balance_test.exs @@ -28,6 +28,22 @@ defmodule Indexer.Fetcher.TokenBalanceTest do {address_hash_bytes, token_contract_address_hash_bytes, 1000, "ERC-20", nil, 0} ] end + + test "omits failed balances with refetch_after in future" do + %Address.TokenBalance{ + address_hash: %Hash{bytes: address_hash_bytes}, + token_contract_address_hash: %Hash{bytes: token_contract_address_hash_bytes}, + block_number: block_number + } = insert(:token_balance, value_fetched_at: nil) + + insert(:token_balance, value_fetched_at: DateTime.utc_now()) + + insert(:token_balance, refetch_after: Timex.shift(Timex.now(), hours: 1)) + + assert TokenBalance.init([], &[&1 | &2], nil) == [ + {address_hash_bytes, token_contract_address_hash_bytes, block_number, "ERC-20", nil, 0} + ] + end end describe "run/3" do @@ -70,86 +86,6 @@ defmodule Indexer.Fetcher.TokenBalanceTest do assert token_balance_updated.value_fetched_at != nil end - test "imports the given token balances from 2nd retry" do - %Address.TokenBalance{ - address_hash: %Hash{bytes: address_hash_bytes} = address_hash, - token_contract_address_hash: %Hash{bytes: token_contract_address_hash_bytes}, - block_number: block_number - } = insert(:token_balance, value_fetched_at: nil, value: nil) - - expect( - EthereumJSONRPC.Mox, - :json_rpc, - fn [%{id: id, method: "eth_call", params: [%{data: _, to: _}, _]}], _options -> - {:ok, - [ - %{ - id: id, - jsonrpc: "2.0", - error: %{code: -32015, message: "VM execution error.", data: ""} - } - ]} - end - ) - - expect( - EthereumJSONRPC.Mox, - :json_rpc, - fn [%{id: id, method: "eth_call", params: [%{data: _, to: _}, _]}], _options -> - {:ok, - [ - %{ - id: id, - jsonrpc: "2.0", - result: "0x00000000000000000000000000000000000000000000d3c21bcecceda1000000" - } - ]} - end - ) - - assert TokenBalance.run( - [{address_hash_bytes, token_contract_address_hash_bytes, block_number, "ERC-20", nil, 0}], - nil - ) == :ok - - token_balance_updated = Repo.get_by(Address.TokenBalance, address_hash: address_hash) - - assert token_balance_updated.value == Decimal.new(1_000_000_000_000_000_000_000_000) - assert token_balance_updated.value_fetched_at != nil - end - - test "does not try to fetch the token balance again if the retry is over" do - max_retries = 3 - - Application.put_env(:indexer, :token_balance_max_retries, max_retries) - - token_balance_a = insert(:token_balance, value_fetched_at: nil, value: nil) - token_balance_b = insert(:token_balance, value_fetched_at: nil, value: nil) - - token_balances = [ - { - token_balance_a.address_hash.bytes, - token_balance_a.token_contract_address_hash.bytes, - "ERC-20", - nil, - token_balance_a.block_number, - # this token balance must be ignored - max_retries - }, - { - token_balance_b.address_hash.bytes, - token_balance_b.token_contract_address_hash.bytes, - "ERC-20", - nil, - token_balance_b.block_number, - # this token balance still have to be retried - max_retries - 2 - } - ] - - assert TokenBalance.run(token_balances, nil) == :ok - end - test "fetches duplicate params only once" do %Address.TokenBalance{ address_hash: %Hash{bytes: address_hash_bytes} = address_hash, @@ -233,7 +169,7 @@ defmodule Indexer.Fetcher.TokenBalanceTest do assert %{currently_implemented: true} = Repo.one(MissingBalanceOfToken) end - test "in case of error deletes token balance placeholders below the given number and inserts new missing balanceOf tokens" do + test "in case of execution reverted error deletes token balance placeholders below the given number and inserts new missing balanceOf tokens" do address = insert(:address) %{contract_address_hash: token_contract_address_hash} = insert(:token) @@ -272,6 +208,46 @@ defmodule Indexer.Fetcher.TokenBalanceTest do assert Repo.all(Address.TokenBalance) == [] end + + test "in case of other error updates the refetch_after and retries_count of token balance" do + address = insert(:address) + %{contract_address_hash: token_contract_address_hash} = insert(:token) + + insert(:token_balance, + token_contract_address_hash: token_contract_address_hash, + address: address, + block_number: 1, + value_fetched_at: nil, + value: nil, + refetch_after: nil, + retries_count: nil + ) + + expect( + EthereumJSONRPC.Mox, + :json_rpc, + fn [%{id: id, method: "eth_call", params: [%{data: _, to: _}, _]}], _options -> + {:ok, + [ + %{ + id: id, + jsonrpc: "2.0", + error: %{code: "-32000", message: "other error"} + } + ]} + end + ) + + assert TokenBalance.run( + [ + {address.hash.bytes, token_contract_address_hash.bytes, 1, "ERC-20", nil, 0} + ], + nil + ) == :ok + + assert %{retries_count: 1, refetch_after: refetch_after} = Repo.one(Address.TokenBalance) + refute is_nil(refetch_after) + end end describe "import_token_balances/1" do diff --git a/config/runtime.exs b/config/runtime.exs index f3ffebf53b..70fdba23d9 100644 --- a/config/runtime.exs +++ b/config/runtime.exs @@ -708,7 +708,9 @@ config :indexer, Indexer.Fetcher.Token, concurrency: ConfigHelper.parse_integer_ config :indexer, Indexer.Fetcher.TokenBalance, batch_size: ConfigHelper.parse_integer_env_var("INDEXER_TOKEN_BALANCES_BATCH_SIZE", 100), - concurrency: ConfigHelper.parse_integer_env_var("INDEXER_TOKEN_BALANCES_CONCURRENCY", 10) + concurrency: ConfigHelper.parse_integer_env_var("INDEXER_TOKEN_BALANCES_CONCURRENCY", 10), + max_refetch_interval: ConfigHelper.parse_time_env_var("INDEXER_TOKEN_BALANCES_MAX_REFETCH_INTERVAL", "168h"), + exp_timeout_coeff: ConfigHelper.parse_integer_env_var("INDEXER_TOKEN_BALANCES_EXPONENTIAL_TIMEOUT_COEFF", 100) config :indexer, Indexer.Fetcher.OnDemand.TokenBalance, threshold: ConfigHelper.parse_time_env_var("TOKEN_BALANCE_ON_DEMAND_FETCHER_THRESHOLD", "1h"), diff --git a/docker-compose/envs/common-blockscout.env b/docker-compose/envs/common-blockscout.env index bf5a2daac6..5ec959df46 100644 --- a/docker-compose/envs/common-blockscout.env +++ b/docker-compose/envs/common-blockscout.env @@ -204,6 +204,8 @@ INDEXER_DISABLE_INTERNAL_TRANSACTIONS_FETCHER=false # INDEXER_TOKEN_CONCURRENCY= # INDEXER_TOKEN_BALANCES_BATCH_SIZE= # INDEXER_TOKEN_BALANCES_CONCURRENCY= +# INDEXER_TOKEN_BALANCES_MAX_REFETCH_INTERVAL= +# INDEXER_TOKEN_BALANCES_EXPONENTIAL_TIMEOUT_COEFF= # INDEXER_TX_ACTIONS_ENABLE= # INDEXER_TX_ACTIONS_MAX_TOKEN_CACHE_SIZE= # INDEXER_TX_ACTIONS_REINDEX_FIRST_BLOCK=