Speed up Indexer.Fetcher.TokenInstance.LegacySanitize

pull/9013/head
Nikita Pozdniakov 11 months ago
parent 0673e7fd07
commit b9318e09d7
No known key found for this signature in database
GPG Key ID: F344106F9804FE5F
  1. 1
      CHANGELOG.md
  2. 50
      apps/explorer/lib/explorer/chain.ex
  3. 28
      apps/explorer/lib/explorer/chain/token/instance.ex
  4. 83
      apps/explorer/test/explorer/chain/token/instance_test.exs
  5. 77
      apps/explorer/test/explorer/chain_test.exs
  6. 78
      apps/indexer/lib/indexer/fetcher/token_instance/legacy_sanitize.ex
  7. 2
      apps/indexer/lib/indexer/supervisor.ex
  8. 2
      config/runtime.exs

@ -6,6 +6,7 @@
### Fixes
- [#9013](https://github.com/blockscout/blockscout/pull/9013) - Speed up `Indexer.Fetcher.TokenInstance.LegacySanitize`
- [#8955](https://github.com/blockscout/blockscout/pull/8955) - Remove daily balances updating from BlockReward fetcher
### Chore

@ -3554,56 +3554,6 @@ defmodule Explorer.Chain do
|> Repo.stream_reduce(initial, reducer)
end
@doc """
Finds all token instances (pairs of contract_address_hash and token_id) which was met in token transfers but has no corresponding entry in token_instances table
"""
@spec stream_not_inserted_token_instances(
initial :: accumulator,
reducer :: (entry :: map(), accumulator -> accumulator)
) :: {:ok, accumulator}
when accumulator: term()
def stream_not_inserted_token_instances(initial, reducer) when is_function(reducer, 2) do
nft_tokens =
from(
token in Token,
where: token.type == ^"ERC-721" or token.type == ^"ERC-1155",
select: token.contract_address_hash
)
token_ids_query =
from(
token_transfer in TokenTransfer,
select: %{
token_contract_address_hash: token_transfer.token_contract_address_hash,
token_id: fragment("unnest(?)", token_transfer.token_ids)
}
)
query =
from(
transfer in subquery(token_ids_query),
inner_join: token in subquery(nft_tokens),
on: token.contract_address_hash == transfer.token_contract_address_hash,
left_join: instance in Instance,
on:
transfer.token_contract_address_hash == instance.token_contract_address_hash and
transfer.token_id == instance.token_id,
where: is_nil(instance.token_id),
select: %{
contract_address_hash: transfer.token_contract_address_hash,
token_id: transfer.token_id
}
)
distinct_query =
from(
q in subquery(query),
distinct: [q.contract_address_hash, q.token_id]
)
Repo.stream_reduce(distinct_query, initial, reducer)
end
@doc """
Finds all token instances where metadata never tried to fetch
"""

@ -413,4 +413,32 @@ defmodule Explorer.Chain.Token.Instance do
|> select_merge([ctb: ctb], %{current_token_balance: ctb})
|> Chain.select_repo(options).all()
end
@doc """
Finds token instances (pairs of contract_address_hash and token_id) which was met in token transfers but has no corresponding entry in token_instances table
"""
@spec not_inserted_token_instances_query(integer()) :: Ecto.Query.t()
def not_inserted_token_instances_query(limit) do
token_transfers_query =
TokenTransfer
|> where([token_transfer], not is_nil(token_transfer.token_ids) and token_transfer.token_ids != ^[])
|> select([token_transfer], %{
token_contract_address_hash: token_transfer.token_contract_address_hash,
token_id: fragment("unnest(?)", token_transfer.token_ids)
})
token_transfers_query
|> subquery()
|> join(:left, [token_transfer], token_instance in __MODULE__,
on:
token_instance.token_contract_address_hash == token_transfer.token_contract_address_hash and
token_instance.token_id == token_transfer.token_id
)
|> where([token_transfer, token_instance], is_nil(token_instance.token_id))
|> select([token_transfer, token_instance], %{
contract_address_hash: token_transfer.token_contract_address_hash,
token_id: token_transfer.token_id
})
|> limit(^limit)
end
end

@ -0,0 +1,83 @@
defmodule Explorer.Chain.Token.InstanceTest do
use Explorer.DataCase
alias Explorer.Repo
alias Explorer.Chain.Token.Instance
describe "stream_not_inserted_token_instances/2" do
test "reduces with given reducer and accumulator for ERC-721 token" do
token_contract_address = insert(:contract_address)
token = insert(:token, contract_address: token_contract_address, type: "ERC-721")
transaction =
:transaction
|> insert()
|> with_block(insert(:block, number: 1))
token_transfer =
insert(
:token_transfer,
block_number: 1000,
to_address: build(:address),
transaction: transaction,
token_contract_address: token_contract_address,
token: token,
token_ids: [11]
)
assert [result] = 5 |> Instance.not_inserted_token_instances_query() |> Repo.all()
assert result.token_id == List.first(token_transfer.token_ids)
assert result.contract_address_hash == token_transfer.token_contract_address_hash
end
test "does not fetch token transfers without token_ids" do
token_contract_address = insert(:contract_address)
token = insert(:token, contract_address: token_contract_address, type: "ERC-721")
transaction =
:transaction
|> insert()
|> with_block(insert(:block, number: 1))
insert(
:token_transfer,
block_number: 1000,
to_address: build(:address),
transaction: transaction,
token_contract_address: token_contract_address,
token: token,
token_ids: nil
)
assert [] = 5 |> Instance.not_inserted_token_instances_query() |> Repo.all()
end
test "do not fetch records with token instances" do
token_contract_address = insert(:contract_address)
token = insert(:token, contract_address: token_contract_address, type: "ERC-721")
transaction =
:transaction
|> insert()
|> with_block(insert(:block, number: 1))
token_transfer =
insert(
:token_transfer,
block_number: 1000,
to_address: build(:address),
transaction: transaction,
token_contract_address: token_contract_address,
token: token,
token_ids: [11]
)
insert(:token_instance,
token_id: List.first(token_transfer.token_ids),
token_contract_address_hash: token_transfer.token_contract_address_hash
)
assert [] = 5 |> Instance.not_inserted_token_instances_query() |> Repo.all()
end
end
end

@ -4066,83 +4066,6 @@ defmodule Explorer.ChainTest do
end
end
describe "stream_not_inserted_token_instances/2" do
test "reduces with given reducer and accumulator for ERC-721 token" do
token_contract_address = insert(:contract_address)
token = insert(:token, contract_address: token_contract_address, type: "ERC-721")
transaction =
:transaction
|> insert()
|> with_block(insert(:block, number: 1))
token_transfer =
insert(
:token_transfer,
block_number: 1000,
to_address: build(:address),
transaction: transaction,
token_contract_address: token_contract_address,
token: token,
token_ids: [11]
)
assert {:ok, [result]} = Chain.stream_not_inserted_token_instances([], &[&1 | &2])
assert result.token_id == List.first(token_transfer.token_ids)
assert result.contract_address_hash == token_transfer.token_contract_address_hash
end
test "does not fetch token transfers without token_ids" do
token_contract_address = insert(:contract_address)
token = insert(:token, contract_address: token_contract_address, type: "ERC-721")
transaction =
:transaction
|> insert()
|> with_block(insert(:block, number: 1))
insert(
:token_transfer,
block_number: 1000,
to_address: build(:address),
transaction: transaction,
token_contract_address: token_contract_address,
token: token,
token_ids: nil
)
assert {:ok, []} = Chain.stream_not_inserted_token_instances([], &[&1 | &2])
end
test "do not fetch records with token instances" do
token_contract_address = insert(:contract_address)
token = insert(:token, contract_address: token_contract_address, type: "ERC-721")
transaction =
:transaction
|> insert()
|> with_block(insert(:block, number: 1))
token_transfer =
insert(
:token_transfer,
block_number: 1000,
to_address: build(:address),
transaction: transaction,
token_contract_address: token_contract_address,
token: token,
token_ids: [11]
)
insert(:token_instance,
token_id: List.first(token_transfer.token_ids),
token_contract_address_hash: token_transfer.token_contract_address_hash
)
assert {:ok, []} = Chain.stream_not_inserted_token_instances([], &[&1 | &2])
end
end
describe "transaction_has_token_transfers?/1" do
test "returns true if transaction has token transfers" do
transaction = insert(:transaction)

@ -1,58 +1,50 @@
defmodule Indexer.Fetcher.TokenInstance.LegacySanitize do
@moduledoc """
This fetcher is stands for creating token instances which wasn't inserted yet and index meta for them. Legacy is because now we token instances inserted on block import and this fetcher is only for historical and unfetched for some reasons data
This fetcher is stands for creating token instances which wasn't inserted yet and index meta for them.
Legacy is because now we token instances inserted on block import and this fetcher is only for historical and unfetched for some reasons data
"""
use Indexer.Fetcher, restart: :permanent
use Spandex.Decorators
use GenServer, restart: :transient
import Indexer.Fetcher.TokenInstance.Helper
alias Explorer.Chain
alias Indexer.BufferedTask
alias Explorer.Chain.Token.Instance
alias Explorer.Repo
@behaviour BufferedTask
@default_max_batch_size 10
@default_max_concurrency 10
@doc false
def child_spec([init_options, gen_server_options]) do
merged_init_opts =
defaults()
|> Keyword.merge(init_options)
|> Keyword.merge(state: [])
import Indexer.Fetcher.TokenInstance.Helper
Supervisor.child_spec({BufferedTask, [{__MODULE__, merged_init_opts}, gen_server_options]}, id: __MODULE__)
def start_link(_) do
concurrency = Application.get_env(:indexer, __MODULE__)[:concurrency]
batch_size = Application.get_env(:indexer, __MODULE__)[:batch_size]
GenServer.start_link(__MODULE__, %{concurrency: concurrency, batch_size: batch_size}, name: __MODULE__)
end
@impl BufferedTask
def init(initial_acc, reducer, _) do
{:ok, acc} =
Chain.stream_not_inserted_token_instances(initial_acc, fn data, acc ->
reducer.(data, acc)
end)
@impl true
def init(opts) do
GenServer.cast(__MODULE__, :backfill)
acc
{:ok, opts}
end
@impl BufferedTask
def run(token_instances, _) when is_list(token_instances) do
token_instances
|> Enum.filter(fn %{contract_address_hash: hash, token_id: token_id} ->
not Chain.token_instance_exists?(token_id, hash)
end)
|> batch_fetch_instances()
:ok
@impl true
def handle_cast(:backfill, %{concurrency: concurrency, batch_size: batch_size} = state) do
instances_to_fetch =
(concurrency * batch_size)
|> Instance.not_inserted_token_instances_query()
|> Repo.all()
if Enum.empty?(instances_to_fetch) do
{:stop, :normal, state}
else
instances_to_fetch
|> Enum.uniq()
|> Enum.chunk_every(batch_size)
|> Enum.map(&process_batch/1)
|> Task.await_many(:infinity)
GenServer.cast(__MODULE__, :backfill)
{:noreply, state}
end
defp defaults do
[
flush_interval: :infinity,
max_concurrency: Application.get_env(:indexer, __MODULE__)[:concurrency] || @default_max_concurrency,
max_batch_size: Application.get_env(:indexer, __MODULE__)[:batch_size] || @default_max_batch_size,
poll: false,
task_supervisor: __MODULE__.TaskSupervisor
]
end
defp process_batch(batch), do: Task.async(fn -> batch_fetch_instances(batch) end)
end

@ -120,7 +120,7 @@ defmodule Indexer.Supervisor do
{TokenInstanceRealtime.Supervisor, [[memory_monitor: memory_monitor]]},
{TokenInstanceRetry.Supervisor, [[memory_monitor: memory_monitor]]},
{TokenInstanceSanitize.Supervisor, [[memory_monitor: memory_monitor]]},
{TokenInstanceLegacySanitize.Supervisor, [[memory_monitor: memory_monitor]]},
{TokenInstanceLegacySanitize, [[memory_monitor: memory_monitor]]},
configure(TransactionAction.Supervisor, [[memory_monitor: memory_monitor]]),
{ContractCode.Supervisor,
[[json_rpc_named_arguments: json_rpc_named_arguments, memory_monitor: memory_monitor]]},

@ -575,7 +575,7 @@ config :indexer, Indexer.Fetcher.TokenInstance.Sanitize,
batch_size: ConfigHelper.parse_integer_env_var("INDEXER_TOKEN_INSTANCE_SANITIZE_BATCH_SIZE", 10)
config :indexer, Indexer.Fetcher.TokenInstance.LegacySanitize,
concurrency: ConfigHelper.parse_integer_env_var("INDEXER_TOKEN_INSTANCE_LEGACY_SANITIZE_CONCURRENCY", 10),
concurrency: ConfigHelper.parse_integer_env_var("INDEXER_TOKEN_INSTANCE_LEGACY_SANITIZE_CONCURRENCY", 2),
batch_size: ConfigHelper.parse_integer_env_var("INDEXER_TOKEN_INSTANCE_LEGACY_SANITIZE_BATCH_SIZE", 10)
config :indexer, Indexer.Fetcher.InternalTransaction,

Loading…
Cancel
Save