Create Indexer.Fetcher.TokenInstance.{SanitizeERC721, SanitizeERC1155… (#9226)

* Create Indexer.Fetcher.TokenInstance.{SanitizeERC721, SanitizeERC1155}; Move token instances to BlockReferencing stage

* Add envs to .env file

* Fix dialyzer

* Process review comments

* Add env to .env file
vb-metadata-from-base-uri
nikitosing 9 months ago committed by GitHub
parent ebfc315838
commit 389debbc85
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
  1. 1
      CHANGELOG.md
  2. 3
      apps/block_scout_web/test/block_scout_web/controllers/api/v2/address_controller_test.exs
  3. 36
      apps/explorer/lib/explorer/application/constants.ex
  4. 3
      apps/explorer/lib/explorer/chain/import/stage/block_following.ex
  5. 1
      apps/explorer/lib/explorer/chain/import/stage/block_referencing.ex
  6. 17
      apps/explorer/lib/explorer/chain/token.ex
  7. 45
      apps/explorer/lib/explorer/chain/token/instance.ex
  8. 51
      apps/indexer/lib/indexer/fetcher/token_instance/sanitize_erc1155.ex
  9. 89
      apps/indexer/lib/indexer/fetcher/token_instance/sanitize_erc721.ex
  10. 6
      apps/indexer/lib/indexer/supervisor.ex
  11. 33
      apps/indexer/test/indexer/fetcher/token_instance/sanitize_erc1155_test.exs
  12. 39
      apps/indexer/test/indexer/fetcher/token_instance/sanitize_erc721_test.exs
  13. 20
      config/runtime.exs
  14. 7
      docker-compose/envs/common-blockscout.env

@ -25,6 +25,7 @@
- [#9306](https://github.com/blockscout/blockscout/pull/9306) - Improve marking of failed internal transactions
- [#9305](https://github.com/blockscout/blockscout/pull/9305) - Add effective gas price calculation as fallback
- [#9300](https://github.com/blockscout/blockscout/pull/9300) - Fix read contract bug
- [#9226](https://github.com/blockscout/blockscout/pull/9226) - Split Indexer.Fetcher.TokenInstance.LegacySanitize
### Chore

@ -2476,8 +2476,7 @@ defmodule BlockScoutWeb.API.V2.AddressControllerTest do
for _ <- 0..(amount - 1) do
ti =
insert(:token_instance,
token_contract_address_hash: token.contract_address_hash,
owner_address_hash: address.hash
token_contract_address_hash: token.contract_address_hash
)
|> Repo.preload([:token])

@ -5,8 +5,10 @@ defmodule Explorer.Application.Constants do
use Explorer.Schema
alias Explorer.{Chain, Repo}
alias Explorer.Chain.Hash
@keys_manager_contract_address_key "keys_manager_contract_address"
@last_processed_erc_721_token "token_instance_sanitizer_last_processed_erc_721_token"
@primary_key false
typed_schema "constants" do
@ -43,4 +45,38 @@ defmodule Explorer.Application.Constants do
def get_keys_manager_contract_address(options \\ []) do
get_constant_by_key(@keys_manager_contract_address_key, options)
end
@doc """
For usage in Indexer.Fetcher.TokenInstance.SanitizeERC721
"""
@spec insert_last_processed_token_address_hash(Hash.Address.t()) :: Ecto.Schema.t()
def insert_last_processed_token_address_hash(address_hash) do
existing_value = Repo.get(__MODULE__, @last_processed_erc_721_token)
if existing_value do
existing_value
|> changeset(%{value: to_string(address_hash)})
|> Repo.update!()
else
%{key: @last_processed_erc_721_token, value: to_string(address_hash)}
|> changeset()
|> Repo.insert!()
end
end
@doc """
For usage in Indexer.Fetcher.TokenInstance.SanitizeERC721
"""
@spec get_last_processed_token_address_hash(keyword()) :: nil | Explorer.Chain.Hash.t()
def get_last_processed_token_address_hash(options \\ []) do
result = get_constant_by_key(@last_processed_erc_721_token, options)
case Chain.string_to_address_hash(result) do
{:ok, address_hash} ->
address_hash
_ ->
nil
end
end
end

@ -13,8 +13,7 @@ defmodule Explorer.Chain.Import.Stage.BlockFollowing do
do: [
Runner.Block.SecondDegreeRelations,
Runner.Block.Rewards,
Runner.Address.CurrentTokenBalances,
Runner.TokenInstances
Runner.Address.CurrentTokenBalances
]
@impl Stage

@ -13,6 +13,7 @@ defmodule Explorer.Chain.Import.Stage.BlockReferencing do
Runner.Logs,
Runner.Tokens,
Runner.TokenTransfers,
Runner.TokenInstances,
Runner.Address.TokenBalances,
Runner.TransactionActions,
Runner.Withdrawals

@ -77,7 +77,7 @@ defmodule Explorer.Chain.Token do
alias Ecto.Changeset
alias Explorer.{Chain, SortingHelper}
alias Explorer.Chain.{BridgedToken, Search, Token}
alias Explorer.Chain.{BridgedToken, Hash, Search, Token}
alias Explorer.SmartContract.Helper
@default_sorting [
@ -231,4 +231,19 @@ defmodule Explorer.Chain.Token do
def get_by_contract_address_hash(hash, options) do
Chain.select_repo(options).get_by(__MODULE__, contract_address_hash: hash)
end
@doc """
For usage in Indexer.Fetcher.TokenInstance.LegacySanitizeERC721
"""
@spec ordered_erc_721_token_address_hashes_list_query(integer(), Hash.Address.t() | nil) :: Ecto.Query.t()
def ordered_erc_721_token_address_hashes_list_query(limit, last_address_hash \\ nil) do
query =
__MODULE__
|> order_by([token], asc: token.contract_address_hash)
|> where([token], token.type == "ERC-721")
|> limit(^limit)
|> select([token], token.contract_address_hash)
(last_address_hash && where(query, [token], token.contract_address_hash > ^last_address_hash)) || query
end
end

@ -431,6 +431,51 @@ defmodule Explorer.Chain.Token.Instance do
|> limit(^limit)
end
@doc """
Finds token instances of a particular token (pairs of contract_address_hash and token_id) which was met in token_transfers table but has no corresponding entry in token_instances table.
"""
@spec not_inserted_token_instances_query_by_token(integer(), Hash.Address.t()) :: Ecto.Query.t()
def not_inserted_token_instances_query_by_token(limit, token_contract_address_hash) do
token_transfers_query =
TokenTransfer
|> where([token_transfer], token_transfer.token_contract_address_hash == ^token_contract_address_hash)
|> select([token_transfer], %{
token_contract_address_hash: token_transfer.token_contract_address_hash,
token_id: fragment("unnest(?)", token_transfer.token_ids)
})
token_transfers_query
|> subquery()
|> join(:left, [token_transfer], token_instance in __MODULE__,
on:
token_instance.token_contract_address_hash == token_transfer.token_contract_address_hash and
token_instance.token_id == token_transfer.token_id
)
|> where([token_transfer, token_instance], is_nil(token_instance.token_id))
|> select([token_transfer, token_instance], %{
contract_address_hash: token_transfer.token_contract_address_hash,
token_id: token_transfer.token_id
})
|> limit(^limit)
end
@doc """
Finds ERC-1155 token instances (pairs of contract_address_hash and token_id) which was met in current_token_balances table but has no corresponding entry in token_instances table.
"""
@spec not_inserted_erc_1155_token_instances(integer()) :: Ecto.Query.t()
def not_inserted_erc_1155_token_instances(limit) do
CurrentTokenBalance
|> join(:left, [actb], ti in __MODULE__,
on: actb.token_contract_address_hash == ti.token_contract_address_hash and actb.token_id == ti.token_id
)
|> where([actb, ti], not is_nil(actb.token_id) and is_nil(ti.token_id))
|> select([actb], %{
contract_address_hash: actb.token_contract_address_hash,
token_id: actb.token_id
})
|> limit(^limit)
end
@doc """
Puts is_unique field in token instance. Returns updated token instance
is_unique is true for ERC-721 always and for ERC-1155 only if token_id is unique

@ -0,0 +1,51 @@
defmodule Indexer.Fetcher.TokenInstance.SanitizeERC1155 do
@moduledoc """
This fetcher is stands for creating token instances which wasn't inserted yet and index meta for them.
!!!Imports only ERC-1155 token instances!!!
"""
use GenServer, restart: :transient
alias Explorer.Chain.Token.Instance
alias Explorer.Repo
import Indexer.Fetcher.TokenInstance.Helper
def start_link(_) do
concurrency = Application.get_env(:indexer, __MODULE__)[:concurrency]
batch_size = Application.get_env(:indexer, __MODULE__)[:batch_size]
GenServer.start_link(__MODULE__, %{concurrency: concurrency, batch_size: batch_size}, name: __MODULE__)
end
@impl true
def init(opts) do
GenServer.cast(__MODULE__, :backfill)
{:ok, opts}
end
@impl true
def handle_cast(:backfill, %{concurrency: concurrency, batch_size: batch_size} = state) do
instances_to_fetch =
(concurrency * batch_size)
|> Instance.not_inserted_erc_1155_token_instances()
|> Repo.all()
if Enum.empty?(instances_to_fetch) do
{:stop, :normal, state}
else
instances_to_fetch
|> Enum.uniq()
|> Enum.chunk_every(batch_size)
|> Enum.map(&process_batch/1)
|> Task.await_many(:infinity)
GenServer.cast(__MODULE__, :backfill)
{:noreply, state}
end
end
defp process_batch(batch), do: Task.async(fn -> batch_fetch_instances(batch) end)
end

@ -0,0 +1,89 @@
defmodule Indexer.Fetcher.TokenInstance.SanitizeERC721 do
@moduledoc """
This fetcher is stands for creating token instances which wasn't inserted yet and index meta for them.
!!!Imports only ERC-721 token instances!!!
"""
use GenServer, restart: :transient
alias Explorer.Application.Constants
alias Explorer.Chain.Token
alias Explorer.Chain.Token.Instance
alias Explorer.Repo
import Indexer.Fetcher.TokenInstance.Helper
def start_link(_) do
concurrency = Application.get_env(:indexer, __MODULE__)[:concurrency]
batch_size = Application.get_env(:indexer, __MODULE__)[:batch_size]
tokens_queue_size = Application.get_env(:indexer, __MODULE__)[:tokens_queue_size]
GenServer.start_link(
__MODULE__,
%{concurrency: concurrency, batch_size: batch_size, tokens_queue_size: tokens_queue_size},
name: __MODULE__
)
end
@impl true
def init(opts) do
last_token_address_hash = Constants.get_last_processed_token_address_hash()
GenServer.cast(__MODULE__, :fetch_tokens_queue)
{:ok, Map.put(opts, :last_token_address_hash, last_token_address_hash)}
end
@impl true
def handle_cast(:fetch_tokens_queue, state) do
address_hashes =
state[:tokens_queue_size]
|> Token.ordered_erc_721_token_address_hashes_list_query(state[:last_token_address_hash])
|> Repo.all()
if Enum.empty?(address_hashes) do
{:stop, :normal, state}
else
GenServer.cast(__MODULE__, :backfill)
{:noreply, Map.put(state, :tokens_queue, address_hashes)}
end
end
@impl true
def handle_cast(:backfill, %{tokens_queue: []} = state) do
GenServer.cast(__MODULE__, :fetch_tokens_queue)
{:noreply, state}
end
@impl true
def handle_cast(
:backfill,
%{concurrency: concurrency, batch_size: batch_size, tokens_queue: [current_address_hash | remains]} = state
) do
instances_to_fetch =
(concurrency * batch_size)
|> Instance.not_inserted_token_instances_query_by_token(current_address_hash)
|> Repo.all()
if Enum.empty?(instances_to_fetch) do
Constants.insert_last_processed_token_address_hash(current_address_hash)
GenServer.cast(__MODULE__, :backfill)
{:noreply, %{state | tokens_queue: remains, last_token_address_hash: current_address_hash}}
else
instances_to_fetch
|> Enum.uniq()
|> Enum.chunk_every(batch_size)
|> Enum.map(&process_batch/1)
|> Task.await_many(:infinity)
GenServer.cast(__MODULE__, :backfill)
{:noreply, state}
end
end
defp process_batch(batch), do: Task.async(fn -> batch_fetch_instances(batch) end)
end

@ -22,6 +22,8 @@ defmodule Indexer.Supervisor do
alias Indexer.Fetcher.TokenInstance.Realtime, as: TokenInstanceRealtime
alias Indexer.Fetcher.TokenInstance.Retry, as: TokenInstanceRetry
alias Indexer.Fetcher.TokenInstance.Sanitize, as: TokenInstanceSanitize
alias Indexer.Fetcher.TokenInstance.SanitizeERC1155, as: TokenInstanceSanitizeERC1155
alias Indexer.Fetcher.TokenInstance.SanitizeERC721, as: TokenInstanceSanitizeERC721
alias Indexer.Fetcher.{
BlockReward,
@ -122,7 +124,9 @@ defmodule Indexer.Supervisor do
{TokenInstanceRealtime.Supervisor, [[memory_monitor: memory_monitor]]},
{TokenInstanceRetry.Supervisor, [[memory_monitor: memory_monitor]]},
{TokenInstanceSanitize.Supervisor, [[memory_monitor: memory_monitor]]},
{TokenInstanceLegacySanitize, [[memory_monitor: memory_monitor]]},
configure(TokenInstanceLegacySanitize, [[memory_monitor: memory_monitor]]),
configure(TokenInstanceSanitizeERC721, [[memory_monitor: memory_monitor]]),
configure(TokenInstanceSanitizeERC1155, [[memory_monitor: memory_monitor]]),
configure(TransactionAction.Supervisor, [[memory_monitor: memory_monitor]]),
{ContractCode.Supervisor,
[[json_rpc_named_arguments: json_rpc_named_arguments, memory_monitor: memory_monitor]]},

@ -0,0 +1,33 @@
defmodule Indexer.Fetcher.TokenInstance.SanitizeERC1155Test do
use Explorer.DataCase
alias Explorer.Repo
alias Explorer.Chain.Token.Instance
alias EthereumJSONRPC.Encoder
describe "sanitizer test" do
test "imports token instances" do
for i <- 0..3 do
token = insert(:token, type: "ERC-1155")
insert(:address_current_token_balance,
token_type: "ERC-1155",
token_id: i,
token_contract_address_hash: token.contract_address_hash,
value: Enum.random(1..100_000)
)
end
assert [] = Repo.all(Instance)
start_supervised!({Indexer.Fetcher.TokenInstance.SanitizeERC1155, []})
:timer.sleep(500)
instances = Repo.all(Instance)
assert Enum.count(instances) == 4
assert Enum.all?(instances, fn instance -> !is_nil(instance.error) and is_nil(instance.metadata) end)
end
end
end

@ -0,0 +1,39 @@
defmodule Indexer.Fetcher.TokenInstance.SanitizeERC721Test do
use Explorer.DataCase
alias Explorer.Repo
alias Explorer.Chain.Token.Instance
alias EthereumJSONRPC.Encoder
describe "sanitizer test" do
test "imports token instances" do
for x <- 0..3 do
erc_721_token = insert(:token, type: "ERC-721")
tx = insert(:transaction, input: "0xabcd010203040506") |> with_block()
address = insert(:address)
insert(:token_transfer,
transaction: tx,
block: tx.block,
block_number: tx.block_number,
from_address: address,
token_contract_address: erc_721_token.contract_address,
token_ids: [x]
)
end
assert [] = Repo.all(Instance)
start_supervised!({Indexer.Fetcher.TokenInstance.SanitizeERC721, []})
:timer.sleep(500)
instances = Repo.all(Instance)
assert Enum.count(instances) == 4
assert Enum.all?(instances, fn instance -> !is_nil(instance.error) and is_nil(instance.metadata) end)
end
end
end

@ -596,8 +596,14 @@ config :indexer, Indexer.Fetcher.TokenInstance.Retry.Supervisor,
config :indexer, Indexer.Fetcher.TokenInstance.Sanitize.Supervisor,
disabled?: ConfigHelper.parse_bool_env_var("INDEXER_DISABLE_TOKEN_INSTANCE_SANITIZE_FETCHER")
config :indexer, Indexer.Fetcher.TokenInstance.LegacySanitize.Supervisor,
disabled?: ConfigHelper.parse_bool_env_var("INDEXER_DISABLE_TOKEN_INSTANCE_LEGACY_SANITIZE_FETCHER", "true")
config :indexer, Indexer.Fetcher.TokenInstance.LegacySanitize,
enabled: !ConfigHelper.parse_bool_env_var("INDEXER_DISABLE_TOKEN_INSTANCE_LEGACY_SANITIZE_FETCHER", "true")
config :indexer, Indexer.Fetcher.TokenInstance.SanitizeERC1155,
enabled: !ConfigHelper.parse_bool_env_var("INDEXER_DISABLE_TOKEN_INSTANCE_ERC_1155_SANITIZE_FETCHER", "false")
config :indexer, Indexer.Fetcher.TokenInstance.SanitizeERC721,
enabled: !ConfigHelper.parse_bool_env_var("INDEXER_DISABLE_TOKEN_INSTANCE_ERC_721_SANITIZE_FETCHER", "false")
config :indexer, Indexer.Fetcher.EmptyBlocksSanitizer,
batch_size: ConfigHelper.parse_integer_env_var("INDEXER_EMPTY_BLOCKS_SANITIZER_BATCH_SIZE", 100),
@ -634,6 +640,16 @@ config :indexer, Indexer.Fetcher.TokenInstance.LegacySanitize,
concurrency: ConfigHelper.parse_integer_env_var("INDEXER_TOKEN_INSTANCE_LEGACY_SANITIZE_CONCURRENCY", 2),
batch_size: ConfigHelper.parse_integer_env_var("INDEXER_TOKEN_INSTANCE_LEGACY_SANITIZE_BATCH_SIZE", 10)
config :indexer, Indexer.Fetcher.TokenInstance.SanitizeERC1155,
concurrency: ConfigHelper.parse_integer_env_var("INDEXER_TOKEN_INSTANCE_ERC_1155_SANITIZE_CONCURRENCY", 2),
batch_size: ConfigHelper.parse_integer_env_var("INDEXER_TOKEN_INSTANCE_ERC_1155_SANITIZE_BATCH_SIZE", 10)
config :indexer, Indexer.Fetcher.TokenInstance.SanitizeERC721,
concurrency: ConfigHelper.parse_integer_env_var("INDEXER_TOKEN_INSTANCE_ERC_721_SANITIZE_CONCURRENCY", 2),
batch_size: ConfigHelper.parse_integer_env_var("INDEXER_TOKEN_INSTANCE_ERC_721_SANITIZE_BATCH_SIZE", 10),
tokens_queue_size:
ConfigHelper.parse_integer_env_var("INDEXER_TOKEN_INSTANCE_ERC_721_SANITIZE_TOKENS_BATCH_SIZE", 100)
config :indexer, Indexer.Fetcher.InternalTransaction,
batch_size: ConfigHelper.parse_integer_env_var("INDEXER_INTERNAL_TRANSACTIONS_BATCH_SIZE", 10),
concurrency: ConfigHelper.parse_integer_env_var("INDEXER_INTERNAL_TRANSACTIONS_CONCURRENCY", 4),

@ -144,6 +144,13 @@ INDEXER_DISABLE_INTERNAL_TRANSACTIONS_FETCHER=false
# INDEXER_TOKEN_INSTANCE_SANITIZE_CONCURRENCY=
# INDEXER_TOKEN_INSTANCE_LEGACY_SANITIZE_BATCH_SIZE=10
# INDEXER_TOKEN_INSTANCE_LEGACY_SANITIZE_CONCURRENCY=10
# INDEXER_DISABLE_TOKEN_INSTANCE_ERC_1155_SANITIZE_FETCHER=false
# INDEXER_DISABLE_TOKEN_INSTANCE_ERC_721_SANITIZE_FETCHER=false
# INDEXER_TOKEN_INSTANCE_ERC_1155_SANITIZE_CONCURRENCY=2
# INDEXER_TOKEN_INSTANCE_ERC_1155_SANITIZE_BATCH_SIZE=10
# INDEXER_TOKEN_INSTANCE_ERC_721_SANITIZE_CONCURRENCY=2
# INDEXER_TOKEN_INSTANCE_ERC_721_SANITIZE_BATCH_SIZE=10
# INDEXER_TOKEN_INSTANCE_ERC_721_SANITIZE_TOKENS_BATCH_SIZE=100
# TOKEN_INSTANCE_OWNER_MIGRATION_CONCURRENCY=5
# TOKEN_INSTANCE_OWNER_MIGRATION_BATCH_SIZE=50
# INDEXER_COIN_BALANCES_BATCH_SIZE=

Loading…
Cancel
Save