chore: Token balances fetcher slow queue (#10694)

pull/10722/head
Qwerty5Uiop 3 months ago committed by GitHub
parent 40961e39de
commit 11d6a2e9a9
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
  1. 12
      apps/explorer/lib/explorer/chain/address/token_balance.ex
  2. 2
      apps/explorer/lib/explorer/chain/import/runner/address/token_balances.ex
  3. 10
      apps/explorer/priv/repo/migrations/20240828140638_add_token_balance_retry_fields.exs
  4. 109
      apps/indexer/lib/indexer/fetcher/token_balance.ex
  5. 27
      apps/indexer/lib/indexer/token_balances.ex
  6. 138
      apps/indexer/test/indexer/fetcher/token_balance_test.exs
  7. 4
      config/runtime.exs
  8. 2
      docker-compose/envs/common-blockscout.env

@ -25,6 +25,8 @@ defmodule Explorer.Chain.Address.TokenBalance do
* `value` - The value that's represents the balance.
* `token_id` - The token_id of the transferred token (applicable for ERC-1155, ERC-721 and ERC-404 tokens)
* `token_type` - The type of the token
* `refetch_after` - when to refetch the token balance
* `retries_count` - number of times the token balance has been retried
"""
typed_schema "address_token_balances" do
field(:value, :decimal)
@ -32,6 +34,8 @@ defmodule Explorer.Chain.Address.TokenBalance do
field(:value_fetched_at, :utc_datetime_usec)
field(:token_id, :decimal)
field(:token_type, :string, null: false)
field(:refetch_after, :utc_datetime_usec)
field(:retries_count, :integer)
belongs_to(:address, Address, foreign_key: :address_hash, references: :hash, type: Hash.Address, null: false)
@ -47,7 +51,7 @@ defmodule Explorer.Chain.Address.TokenBalance do
timestamps()
end
@optional_fields ~w(value value_fetched_at token_id)a
@optional_fields ~w(value value_fetched_at token_id refetch_after retries_count)a
@required_fields ~w(address_hash block_number token_contract_address_hash token_type)a
@allowed_fields @optional_fields ++ @required_fields
@ -77,7 +81,8 @@ defmodule Explorer.Chain.Address.TokenBalance do
where:
((tb.address_hash != ^@burn_address_hash and tb.token_type == "ERC-721") or tb.token_type == "ERC-20" or
tb.token_type == "ERC-1155" or tb.token_type == "ERC-404") and
(is_nil(tb.value_fetched_at) or is_nil(tb.value))
(is_nil(tb.value_fetched_at) or is_nil(tb.value)) and
(is_nil(tb.refetch_after) or tb.refetch_after < ^Timex.now())
)
else
from(
@ -87,7 +92,8 @@ defmodule Explorer.Chain.Address.TokenBalance do
where:
((tb.address_hash != ^@burn_address_hash and t.type == "ERC-721") or t.type == "ERC-20" or
t.type == "ERC-1155" or t.type == "ERC-404") and
(is_nil(tb.value_fetched_at) or is_nil(tb.value))
(is_nil(tb.value_fetched_at) or is_nil(tb.value)) and
(is_nil(tb.refetch_after) or tb.refetch_after < ^Timex.now())
)
end
end

@ -148,6 +148,8 @@ defmodule Explorer.Chain.Import.Runner.Address.TokenBalances do
value: fragment("COALESCE(EXCLUDED.value, ?)", token_balance.value),
value_fetched_at: fragment("EXCLUDED.value_fetched_at"),
token_type: fragment("EXCLUDED.token_type"),
refetch_after: fragment("EXCLUDED.refetch_after"),
retries_count: fragment("EXCLUDED.retries_count"),
inserted_at: fragment("LEAST(EXCLUDED.inserted_at, ?)", token_balance.inserted_at),
updated_at: fragment("GREATEST(EXCLUDED.updated_at, ?)", token_balance.updated_at)
]

@ -0,0 +1,10 @@
defmodule Explorer.Repo.Migrations.AddTokenBalanceRetryFields do
use Ecto.Migration
def change do
alter table(:address_token_balances) do
add(:refetch_after, :utc_datetime_usec)
add(:retries_count, :smallint)
end
end
end

@ -9,7 +9,7 @@ defmodule Indexer.Fetcher.TokenBalance do
It behaves as a `BufferedTask`, so we can configure the `max_batch_size` and the `max_concurrency` to control how many
token balances will be fetched at the same time.
Also, this module set a `retries_count` for each token balance and increment this number to avoid fetching the ones
Also, this module set a `refetch_after` for each token balance in case of failure to avoid fetching the ones
that always raise errors interacting with the Smart Contract.
"""
@ -32,8 +32,6 @@ defmodule Indexer.Fetcher.TokenBalance do
@timeout :timer.minutes(10)
@max_retries 3
@spec async_fetch(
[
%{
@ -93,7 +91,7 @@ defmodule Indexer.Fetcher.TokenBalance do
@doc """
Fetches the given entries (token_balances) from the Smart Contract and import them in our database.
It also increments the `retries_count` to avoid fetching token balances that always raise errors
It also set the `refetch_after` in case of failure to avoid fetching token balances that always raise errors
when reading their balance in the Smart Contract.
"""
@impl BufferedTask
@ -110,7 +108,6 @@ defmodule Indexer.Fetcher.TokenBalance do
result =
params
|> MissingBalanceOfToken.filter_token_balances_params(true, missing_balance_of_tokens)
|> increase_retries_count()
|> fetch_from_blockchain(missing_balance_of_tokens)
|> import_token_balances()
@ -122,45 +119,22 @@ defmodule Indexer.Fetcher.TokenBalance do
end
def fetch_from_blockchain(params_list, missing_balance_of_tokens) do
retryable_params_list =
params_list
|> Enum.filter(&(&1.retries_count <= @max_retries))
|> Enum.uniq_by(&Map.take(&1, [:token_contract_address_hash, :token_id, :address_hash, :block_number]))
Logger.metadata(count: Enum.count(retryable_params_list))
%{fetched_token_balances: fetched_token_balances, failed_token_balances: _failed_token_balances} =
1..@max_retries
|> Enum.reduce_while(%{fetched_token_balances: [], failed_token_balances: retryable_params_list}, fn _x, acc ->
{:ok, %{fetched_token_balances: fetched_token_balances, failed_token_balances: failed_token_balances}} =
TokenBalances.fetch_token_balances_from_blockchain(acc.failed_token_balances)
all_token_balances = %{
fetched_token_balances: acc.fetched_token_balances ++ fetched_token_balances,
failed_token_balances: failed_token_balances
}
handle_success_balances(fetched_token_balances, missing_balance_of_tokens)
if Enum.empty?(failed_token_balances) do
{:halt, all_token_balances}
else
failed_token_balances =
failed_token_balances
|> handle_failed_balances()
|> increase_retries_count()
token_balances_updated_retries_count =
all_token_balances
|> Map.put(:failed_token_balances, failed_token_balances)
{:cont, token_balances_updated_retries_count}
end
end)
params_list =
Enum.uniq_by(params_list, &Map.take(&1, [:token_contract_address_hash, :token_id, :address_hash, :block_number]))
Logger.metadata(count: Enum.count(params_list))
{:ok, %{fetched_token_balances: fetched_token_balances, failed_token_balances: failed_token_balances}} =
TokenBalances.fetch_token_balances_from_blockchain(params_list)
handle_success_balances(fetched_token_balances, missing_balance_of_tokens)
failed_balances_to_keep = handle_failed_balances(failed_token_balances)
fetched_token_balances
fetched_token_balances ++ failed_balances_to_keep
end
defp handle_success_balances([], _missing_balance_of_tokens), do: :ok
defp handle_success_balances(fetched_token_balances, missing_balance_of_tokens) do
successful_token_hashes =
fetched_token_balances
@ -178,7 +152,15 @@ defmodule Indexer.Fetcher.TokenBalance do
|> MissingBalanceOfToken.mark_as_implemented()
end
defp handle_failed_balances([]), do: []
defp handle_failed_balances(failed_token_balances) do
failed_token_balances
|> handle_missing_balance_of_tokens()
|> handle_other_errors()
end
defp handle_missing_balance_of_tokens(failed_token_balances) do
{missing_balance_of_balances, other_failed_balances} =
Enum.split_with(failed_token_balances, fn
%{error: :unable_to_decode} -> true
@ -201,9 +183,27 @@ defmodule Indexer.Fetcher.TokenBalance do
other_failed_balances
end
defp increase_retries_count(params_list) do
params_list
|> Enum.map(&Map.put(&1, :retries_count, &1.retries_count + 1))
defp handle_other_errors(failed_token_balances) do
Enum.map(failed_token_balances, fn token_balance_params ->
new_retries_count = token_balance_params.retries_count + 1
Map.merge(token_balance_params, %{
retries_count: new_retries_count,
refetch_after: define_refetch_after(new_retries_count)
})
end)
end
defp define_refetch_after(retries_count) do
config = Application.get_env(:indexer, __MODULE__)
coef = config[:exp_timeout_coeff]
max_refetch_interval = config[:max_refetch_interval]
max_retries_count = :math.log(max_refetch_interval / 1000 / coef)
value = floor(coef * :math.exp(min(retries_count, max_retries_count)))
Timex.shift(Timex.now(), seconds: value)
end
def import_token_balances(token_balances_params) do
@ -259,17 +259,14 @@ defmodule Indexer.Fetcher.TokenBalance do
end
end
defp entry(
%{
token_contract_address_hash: token_contract_address_hash,
address_hash: address_hash,
block_number: block_number,
token_type: token_type,
token_id: token_id
} = token_balance
) do
retries_count = Map.get(token_balance, :retries_count, 0)
defp entry(%{
token_contract_address_hash: token_contract_address_hash,
address_hash: address_hash,
block_number: block_number,
token_type: token_type,
token_id: token_id,
retries_count: retries_count
}) do
token_id_int =
case token_id do
%Decimal{} -> Decimal.to_integer(token_id)
@ -277,7 +274,7 @@ defmodule Indexer.Fetcher.TokenBalance do
_ -> token_id
end
{address_hash.bytes, token_contract_address_hash.bytes, block_number, token_type, token_id_int, retries_count}
{address_hash.bytes, token_contract_address_hash.bytes, block_number, token_type, token_id_int, retries_count || 0}
end
defp format_params(

@ -8,9 +8,7 @@ defmodule Indexer.TokenBalances do
require Indexer.Tracer
require Logger
alias Explorer.Chain
alias Explorer.Token.BalanceReader
alias Indexer.Fetcher.TokenBalance
alias Indexer.Tracer
@nft_balance_function_abi [
@ -85,7 +83,7 @@ defmodule Indexer.TokenBalances do
requested_token_balances
|> handle_killed_tasks(token_balances)
|> unfetched_token_balances(fetched_token_balances)
|> schedule_token_balances
|> log_fetching_errors()
failed_token_balances =
requested_token_balances
@ -119,27 +117,6 @@ defmodule Indexer.TokenBalances do
Map.merge(token_balance, %{value: nil, value_fetched_at: nil, error: error_message})
end
defp schedule_token_balances([]), do: nil
defp schedule_token_balances(unfetched_token_balances) do
Logger.debug(fn -> "#{Enum.count(unfetched_token_balances)} token balances will be retried" end)
log_fetching_errors(unfetched_token_balances)
unfetched_token_balances
|> Enum.map(fn token_balance ->
{:ok, address_hash} = Chain.string_to_address_hash(token_balance.address_hash)
{:ok, token_hash} = Chain.string_to_address_hash(token_balance.token_contract_address_hash)
Map.merge(token_balance, %{
address_hash: address_hash,
token_contract_address_hash: token_hash,
block_number: token_balance.block_number
})
end)
|> TokenBalance.async_fetch(false)
end
defp ignore_request_with_errors(%{value: nil, value_fetched_at: nil, error: _error}), do: false
defp ignore_request_with_errors(_token_balance), do: true
@ -161,7 +138,7 @@ defmodule Indexer.TokenBalances do
end)
if Enum.any?(error_messages) do
Logger.debug(
Logger.error(
[
"Errors while fetching TokenBalances through Contract interaction: \n",
error_messages

@ -28,6 +28,22 @@ defmodule Indexer.Fetcher.TokenBalanceTest do
{address_hash_bytes, token_contract_address_hash_bytes, 1000, "ERC-20", nil, 0}
]
end
test "omits failed balances with refetch_after in future" do
%Address.TokenBalance{
address_hash: %Hash{bytes: address_hash_bytes},
token_contract_address_hash: %Hash{bytes: token_contract_address_hash_bytes},
block_number: block_number
} = insert(:token_balance, value_fetched_at: nil)
insert(:token_balance, value_fetched_at: DateTime.utc_now())
insert(:token_balance, refetch_after: Timex.shift(Timex.now(), hours: 1))
assert TokenBalance.init([], &[&1 | &2], nil) == [
{address_hash_bytes, token_contract_address_hash_bytes, block_number, "ERC-20", nil, 0}
]
end
end
describe "run/3" do
@ -70,86 +86,6 @@ defmodule Indexer.Fetcher.TokenBalanceTest do
assert token_balance_updated.value_fetched_at != nil
end
test "imports the given token balances from 2nd retry" do
%Address.TokenBalance{
address_hash: %Hash{bytes: address_hash_bytes} = address_hash,
token_contract_address_hash: %Hash{bytes: token_contract_address_hash_bytes},
block_number: block_number
} = insert(:token_balance, value_fetched_at: nil, value: nil)
expect(
EthereumJSONRPC.Mox,
:json_rpc,
fn [%{id: id, method: "eth_call", params: [%{data: _, to: _}, _]}], _options ->
{:ok,
[
%{
id: id,
jsonrpc: "2.0",
error: %{code: -32015, message: "VM execution error.", data: ""}
}
]}
end
)
expect(
EthereumJSONRPC.Mox,
:json_rpc,
fn [%{id: id, method: "eth_call", params: [%{data: _, to: _}, _]}], _options ->
{:ok,
[
%{
id: id,
jsonrpc: "2.0",
result: "0x00000000000000000000000000000000000000000000d3c21bcecceda1000000"
}
]}
end
)
assert TokenBalance.run(
[{address_hash_bytes, token_contract_address_hash_bytes, block_number, "ERC-20", nil, 0}],
nil
) == :ok
token_balance_updated = Repo.get_by(Address.TokenBalance, address_hash: address_hash)
assert token_balance_updated.value == Decimal.new(1_000_000_000_000_000_000_000_000)
assert token_balance_updated.value_fetched_at != nil
end
test "does not try to fetch the token balance again if the retry is over" do
max_retries = 3
Application.put_env(:indexer, :token_balance_max_retries, max_retries)
token_balance_a = insert(:token_balance, value_fetched_at: nil, value: nil)
token_balance_b = insert(:token_balance, value_fetched_at: nil, value: nil)
token_balances = [
{
token_balance_a.address_hash.bytes,
token_balance_a.token_contract_address_hash.bytes,
"ERC-20",
nil,
token_balance_a.block_number,
# this token balance must be ignored
max_retries
},
{
token_balance_b.address_hash.bytes,
token_balance_b.token_contract_address_hash.bytes,
"ERC-20",
nil,
token_balance_b.block_number,
# this token balance still have to be retried
max_retries - 2
}
]
assert TokenBalance.run(token_balances, nil) == :ok
end
test "fetches duplicate params only once" do
%Address.TokenBalance{
address_hash: %Hash{bytes: address_hash_bytes} = address_hash,
@ -233,7 +169,7 @@ defmodule Indexer.Fetcher.TokenBalanceTest do
assert %{currently_implemented: true} = Repo.one(MissingBalanceOfToken)
end
test "in case of error deletes token balance placeholders below the given number and inserts new missing balanceOf tokens" do
test "in case of execution reverted error deletes token balance placeholders below the given number and inserts new missing balanceOf tokens" do
address = insert(:address)
%{contract_address_hash: token_contract_address_hash} = insert(:token)
@ -272,6 +208,46 @@ defmodule Indexer.Fetcher.TokenBalanceTest do
assert Repo.all(Address.TokenBalance) == []
end
test "in case of other error updates the refetch_after and retries_count of token balance" do
address = insert(:address)
%{contract_address_hash: token_contract_address_hash} = insert(:token)
insert(:token_balance,
token_contract_address_hash: token_contract_address_hash,
address: address,
block_number: 1,
value_fetched_at: nil,
value: nil,
refetch_after: nil,
retries_count: nil
)
expect(
EthereumJSONRPC.Mox,
:json_rpc,
fn [%{id: id, method: "eth_call", params: [%{data: _, to: _}, _]}], _options ->
{:ok,
[
%{
id: id,
jsonrpc: "2.0",
error: %{code: "-32000", message: "other error"}
}
]}
end
)
assert TokenBalance.run(
[
{address.hash.bytes, token_contract_address_hash.bytes, 1, "ERC-20", nil, 0}
],
nil
) == :ok
assert %{retries_count: 1, refetch_after: refetch_after} = Repo.one(Address.TokenBalance)
refute is_nil(refetch_after)
end
end
describe "import_token_balances/1" do

@ -708,7 +708,9 @@ config :indexer, Indexer.Fetcher.Token, concurrency: ConfigHelper.parse_integer_
config :indexer, Indexer.Fetcher.TokenBalance,
batch_size: ConfigHelper.parse_integer_env_var("INDEXER_TOKEN_BALANCES_BATCH_SIZE", 100),
concurrency: ConfigHelper.parse_integer_env_var("INDEXER_TOKEN_BALANCES_CONCURRENCY", 10)
concurrency: ConfigHelper.parse_integer_env_var("INDEXER_TOKEN_BALANCES_CONCURRENCY", 10),
max_refetch_interval: ConfigHelper.parse_time_env_var("INDEXER_TOKEN_BALANCES_MAX_REFETCH_INTERVAL", "168h"),
exp_timeout_coeff: ConfigHelper.parse_integer_env_var("INDEXER_TOKEN_BALANCES_EXPONENTIAL_TIMEOUT_COEFF", 100)
config :indexer, Indexer.Fetcher.OnDemand.TokenBalance,
threshold: ConfigHelper.parse_time_env_var("TOKEN_BALANCE_ON_DEMAND_FETCHER_THRESHOLD", "1h"),

@ -204,6 +204,8 @@ INDEXER_DISABLE_INTERNAL_TRANSACTIONS_FETCHER=false
# INDEXER_TOKEN_CONCURRENCY=
# INDEXER_TOKEN_BALANCES_BATCH_SIZE=
# INDEXER_TOKEN_BALANCES_CONCURRENCY=
# INDEXER_TOKEN_BALANCES_MAX_REFETCH_INTERVAL=
# INDEXER_TOKEN_BALANCES_EXPONENTIAL_TIMEOUT_COEFF=
# INDEXER_TX_ACTIONS_ENABLE=
# INDEXER_TX_ACTIONS_MAX_TOKEN_CACHE_SIZE=
# INDEXER_TX_ACTIONS_REINDEX_FIRST_BLOCK=

Loading…
Cancel
Save