diff --git a/CHANGELOG.md b/CHANGELOG.md index 0ffaf8eef8..967b514acc 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -6,6 +6,7 @@ ### Fixes +- [#9013](https://github.com/blockscout/blockscout/pull/9013) - Speed up `Indexer.Fetcher.TokenInstance.LegacySanitize` - [#8955](https://github.com/blockscout/blockscout/pull/8955) - Remove daily balances updating from BlockReward fetcher ### Chore diff --git a/apps/explorer/lib/explorer/chain.ex b/apps/explorer/lib/explorer/chain.ex index 6d3f48b91b..43a2ae836f 100644 --- a/apps/explorer/lib/explorer/chain.ex +++ b/apps/explorer/lib/explorer/chain.ex @@ -3554,56 +3554,6 @@ defmodule Explorer.Chain do |> Repo.stream_reduce(initial, reducer) end - @doc """ - Finds all token instances (pairs of contract_address_hash and token_id) which was met in token transfers but has no corresponding entry in token_instances table - """ - @spec stream_not_inserted_token_instances( - initial :: accumulator, - reducer :: (entry :: map(), accumulator -> accumulator) - ) :: {:ok, accumulator} - when accumulator: term() - def stream_not_inserted_token_instances(initial, reducer) when is_function(reducer, 2) do - nft_tokens = - from( - token in Token, - where: token.type == ^"ERC-721" or token.type == ^"ERC-1155", - select: token.contract_address_hash - ) - - token_ids_query = - from( - token_transfer in TokenTransfer, - select: %{ - token_contract_address_hash: token_transfer.token_contract_address_hash, - token_id: fragment("unnest(?)", token_transfer.token_ids) - } - ) - - query = - from( - transfer in subquery(token_ids_query), - inner_join: token in subquery(nft_tokens), - on: token.contract_address_hash == transfer.token_contract_address_hash, - left_join: instance in Instance, - on: - transfer.token_contract_address_hash == instance.token_contract_address_hash and - transfer.token_id == instance.token_id, - where: is_nil(instance.token_id), - select: %{ - contract_address_hash: transfer.token_contract_address_hash, - token_id: transfer.token_id - } - ) - - distinct_query = - from( - q in subquery(query), - distinct: [q.contract_address_hash, q.token_id] - ) - - Repo.stream_reduce(distinct_query, initial, reducer) - end - @doc """ Finds all token instances where metadata never tried to fetch """ diff --git a/apps/explorer/lib/explorer/chain/token/instance.ex b/apps/explorer/lib/explorer/chain/token/instance.ex index 608aec3319..e4680bb025 100644 --- a/apps/explorer/lib/explorer/chain/token/instance.ex +++ b/apps/explorer/lib/explorer/chain/token/instance.ex @@ -413,4 +413,32 @@ defmodule Explorer.Chain.Token.Instance do |> select_merge([ctb: ctb], %{current_token_balance: ctb}) |> Chain.select_repo(options).all() end + + @doc """ + Finds token instances (pairs of contract_address_hash and token_id) which was met in token transfers but has no corresponding entry in token_instances table + """ + @spec not_inserted_token_instances_query(integer()) :: Ecto.Query.t() + def not_inserted_token_instances_query(limit) do + token_transfers_query = + TokenTransfer + |> where([token_transfer], not is_nil(token_transfer.token_ids) and token_transfer.token_ids != ^[]) + |> select([token_transfer], %{ + token_contract_address_hash: token_transfer.token_contract_address_hash, + token_id: fragment("unnest(?)", token_transfer.token_ids) + }) + + token_transfers_query + |> subquery() + |> join(:left, [token_transfer], token_instance in __MODULE__, + on: + token_instance.token_contract_address_hash == token_transfer.token_contract_address_hash and + token_instance.token_id == token_transfer.token_id + ) + |> where([token_transfer, token_instance], is_nil(token_instance.token_id)) + |> select([token_transfer, token_instance], %{ + contract_address_hash: token_transfer.token_contract_address_hash, + token_id: token_transfer.token_id + }) + |> limit(^limit) + end end diff --git a/apps/explorer/test/explorer/chain/token/instance_test.exs b/apps/explorer/test/explorer/chain/token/instance_test.exs new file mode 100644 index 0000000000..96f78e79fc --- /dev/null +++ b/apps/explorer/test/explorer/chain/token/instance_test.exs @@ -0,0 +1,83 @@ +defmodule Explorer.Chain.Token.InstanceTest do + use Explorer.DataCase + + alias Explorer.Repo + alias Explorer.Chain.Token.Instance + + describe "stream_not_inserted_token_instances/2" do + test "reduces with given reducer and accumulator for ERC-721 token" do + token_contract_address = insert(:contract_address) + token = insert(:token, contract_address: token_contract_address, type: "ERC-721") + + transaction = + :transaction + |> insert() + |> with_block(insert(:block, number: 1)) + + token_transfer = + insert( + :token_transfer, + block_number: 1000, + to_address: build(:address), + transaction: transaction, + token_contract_address: token_contract_address, + token: token, + token_ids: [11] + ) + + assert [result] = 5 |> Instance.not_inserted_token_instances_query() |> Repo.all() + assert result.token_id == List.first(token_transfer.token_ids) + assert result.contract_address_hash == token_transfer.token_contract_address_hash + end + + test "does not fetch token transfers without token_ids" do + token_contract_address = insert(:contract_address) + token = insert(:token, contract_address: token_contract_address, type: "ERC-721") + + transaction = + :transaction + |> insert() + |> with_block(insert(:block, number: 1)) + + insert( + :token_transfer, + block_number: 1000, + to_address: build(:address), + transaction: transaction, + token_contract_address: token_contract_address, + token: token, + token_ids: nil + ) + + assert [] = 5 |> Instance.not_inserted_token_instances_query() |> Repo.all() + end + + test "do not fetch records with token instances" do + token_contract_address = insert(:contract_address) + token = insert(:token, contract_address: token_contract_address, type: "ERC-721") + + transaction = + :transaction + |> insert() + |> with_block(insert(:block, number: 1)) + + token_transfer = + insert( + :token_transfer, + block_number: 1000, + to_address: build(:address), + transaction: transaction, + token_contract_address: token_contract_address, + token: token, + token_ids: [11] + ) + + insert(:token_instance, + token_id: List.first(token_transfer.token_ids), + token_contract_address_hash: token_transfer.token_contract_address_hash + ) + + assert [] = 5 |> Instance.not_inserted_token_instances_query() |> Repo.all() + end + end +end diff --git a/apps/explorer/test/explorer/chain_test.exs b/apps/explorer/test/explorer/chain_test.exs index 4928613380..eb4acd1ea3 100644 --- a/apps/explorer/test/explorer/chain_test.exs +++ b/apps/explorer/test/explorer/chain_test.exs @@ -4066,83 +4066,6 @@ defmodule Explorer.ChainTest do end end - describe "stream_not_inserted_token_instances/2" do - test "reduces with given reducer and accumulator for ERC-721 token" do - token_contract_address = insert(:contract_address) - token = insert(:token, contract_address: token_contract_address, type: "ERC-721") - - transaction = - :transaction - |> insert() - |> with_block(insert(:block, number: 1)) - - token_transfer = - insert( - :token_transfer, - block_number: 1000, - to_address: build(:address), - transaction: transaction, - token_contract_address: token_contract_address, - token: token, - token_ids: [11] - ) - - assert {:ok, [result]} = Chain.stream_not_inserted_token_instances([], &[&1 | &2]) - assert result.token_id == List.first(token_transfer.token_ids) - assert result.contract_address_hash == token_transfer.token_contract_address_hash - end - - test "does not fetch token transfers without token_ids" do - token_contract_address = insert(:contract_address) - token = insert(:token, contract_address: token_contract_address, type: "ERC-721") - - transaction = - :transaction - |> insert() - |> with_block(insert(:block, number: 1)) - - insert( - :token_transfer, - block_number: 1000, - to_address: build(:address), - transaction: transaction, - token_contract_address: token_contract_address, - token: token, - token_ids: nil - ) - - assert {:ok, []} = Chain.stream_not_inserted_token_instances([], &[&1 | &2]) - end - - test "do not fetch records with token instances" do - token_contract_address = insert(:contract_address) - token = insert(:token, contract_address: token_contract_address, type: "ERC-721") - - transaction = - :transaction - |> insert() - |> with_block(insert(:block, number: 1)) - - token_transfer = - insert( - :token_transfer, - block_number: 1000, - to_address: build(:address), - transaction: transaction, - token_contract_address: token_contract_address, - token: token, - token_ids: [11] - ) - - insert(:token_instance, - token_id: List.first(token_transfer.token_ids), - token_contract_address_hash: token_transfer.token_contract_address_hash - ) - - assert {:ok, []} = Chain.stream_not_inserted_token_instances([], &[&1 | &2]) - end - end - describe "transaction_has_token_transfers?/1" do test "returns true if transaction has token transfers" do transaction = insert(:transaction) diff --git a/apps/indexer/lib/indexer/fetcher/token_instance/legacy_sanitize.ex b/apps/indexer/lib/indexer/fetcher/token_instance/legacy_sanitize.ex index 1fe11e1d90..391353ba37 100644 --- a/apps/indexer/lib/indexer/fetcher/token_instance/legacy_sanitize.ex +++ b/apps/indexer/lib/indexer/fetcher/token_instance/legacy_sanitize.ex @@ -1,58 +1,50 @@ defmodule Indexer.Fetcher.TokenInstance.LegacySanitize do @moduledoc """ - This fetcher is stands for creating token instances which wasn't inserted yet and index meta for them. Legacy is because now we token instances inserted on block import and this fetcher is only for historical and unfetched for some reasons data + This fetcher is stands for creating token instances which wasn't inserted yet and index meta for them. + Legacy is because now we token instances inserted on block import and this fetcher is only for historical and unfetched for some reasons data """ - use Indexer.Fetcher, restart: :permanent - use Spandex.Decorators + use GenServer, restart: :transient - import Indexer.Fetcher.TokenInstance.Helper - - alias Explorer.Chain - alias Indexer.BufferedTask - - @behaviour BufferedTask + alias Explorer.Chain.Token.Instance + alias Explorer.Repo - @default_max_batch_size 10 - @default_max_concurrency 10 - @doc false - def child_spec([init_options, gen_server_options]) do - merged_init_opts = - defaults() - |> Keyword.merge(init_options) - |> Keyword.merge(state: []) + import Indexer.Fetcher.TokenInstance.Helper - Supervisor.child_spec({BufferedTask, [{__MODULE__, merged_init_opts}, gen_server_options]}, id: __MODULE__) + def start_link(_) do + concurrency = Application.get_env(:indexer, __MODULE__)[:concurrency] + batch_size = Application.get_env(:indexer, __MODULE__)[:batch_size] + GenServer.start_link(__MODULE__, %{concurrency: concurrency, batch_size: batch_size}, name: __MODULE__) end - @impl BufferedTask - def init(initial_acc, reducer, _) do - {:ok, acc} = - Chain.stream_not_inserted_token_instances(initial_acc, fn data, acc -> - reducer.(data, acc) - end) + @impl true + def init(opts) do + GenServer.cast(__MODULE__, :backfill) - acc + {:ok, opts} end - @impl BufferedTask - def run(token_instances, _) when is_list(token_instances) do - token_instances - |> Enum.filter(fn %{contract_address_hash: hash, token_id: token_id} -> - not Chain.token_instance_exists?(token_id, hash) - end) - |> batch_fetch_instances() - - :ok + @impl true + def handle_cast(:backfill, %{concurrency: concurrency, batch_size: batch_size} = state) do + instances_to_fetch = + (concurrency * batch_size) + |> Instance.not_inserted_token_instances_query() + |> Repo.all() + + if Enum.empty?(instances_to_fetch) do + {:stop, :normal, state} + else + instances_to_fetch + |> Enum.uniq() + |> Enum.chunk_every(batch_size) + |> Enum.map(&process_batch/1) + |> Task.await_many(:infinity) + + GenServer.cast(__MODULE__, :backfill) + + {:noreply, state} + end end - defp defaults do - [ - flush_interval: :infinity, - max_concurrency: Application.get_env(:indexer, __MODULE__)[:concurrency] || @default_max_concurrency, - max_batch_size: Application.get_env(:indexer, __MODULE__)[:batch_size] || @default_max_batch_size, - poll: false, - task_supervisor: __MODULE__.TaskSupervisor - ] - end + defp process_batch(batch), do: Task.async(fn -> batch_fetch_instances(batch) end) end diff --git a/apps/indexer/lib/indexer/supervisor.ex b/apps/indexer/lib/indexer/supervisor.ex index 007eab9e5d..10a745512b 100644 --- a/apps/indexer/lib/indexer/supervisor.ex +++ b/apps/indexer/lib/indexer/supervisor.ex @@ -120,7 +120,7 @@ defmodule Indexer.Supervisor do {TokenInstanceRealtime.Supervisor, [[memory_monitor: memory_monitor]]}, {TokenInstanceRetry.Supervisor, [[memory_monitor: memory_monitor]]}, {TokenInstanceSanitize.Supervisor, [[memory_monitor: memory_monitor]]}, - {TokenInstanceLegacySanitize.Supervisor, [[memory_monitor: memory_monitor]]}, + {TokenInstanceLegacySanitize, [[memory_monitor: memory_monitor]]}, configure(TransactionAction.Supervisor, [[memory_monitor: memory_monitor]]), {ContractCode.Supervisor, [[json_rpc_named_arguments: json_rpc_named_arguments, memory_monitor: memory_monitor]]}, diff --git a/config/runtime.exs b/config/runtime.exs index ec0cd0427b..34de34ab9b 100644 --- a/config/runtime.exs +++ b/config/runtime.exs @@ -575,7 +575,7 @@ config :indexer, Indexer.Fetcher.TokenInstance.Sanitize, batch_size: ConfigHelper.parse_integer_env_var("INDEXER_TOKEN_INSTANCE_SANITIZE_BATCH_SIZE", 10) config :indexer, Indexer.Fetcher.TokenInstance.LegacySanitize, - concurrency: ConfigHelper.parse_integer_env_var("INDEXER_TOKEN_INSTANCE_LEGACY_SANITIZE_CONCURRENCY", 10), + concurrency: ConfigHelper.parse_integer_env_var("INDEXER_TOKEN_INSTANCE_LEGACY_SANITIZE_CONCURRENCY", 2), batch_size: ConfigHelper.parse_integer_env_var("INDEXER_TOKEN_INSTANCE_LEGACY_SANITIZE_BATCH_SIZE", 10) config :indexer, Indexer.Fetcher.InternalTransaction,