Add owner_address_hash and other fields to token_instances; Move toke… (#8386)

* Add owner_address_hash and other fields to token_instances; Move token instances' insert into synchronous block import stage

* Add TokenInstanceOwnerAddressMigration process

* Add TokenInstanceOwnerAddressMigration.Helper tests

* Add envs to Makefile and .env file

* Process review comments

* Process review comments
pull/8552/head
nikitosing 1 year ago committed by GitHub
parent 2e8d9060c5
commit daed959d8f
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 1
      CHANGELOG.md
  2. 2
      apps/explorer/config/config.exs
  3. 2
      apps/explorer/config/runtime/test.exs
  4. 1
      apps/explorer/lib/explorer/application.ex
  5. 61
      apps/explorer/lib/explorer/chain.ex
  6. 172
      apps/explorer/lib/explorer/chain/import/runner/blocks.ex
  7. 106
      apps/explorer/lib/explorer/chain/import/runner/token_instances.ex
  8. 3
      apps/explorer/lib/explorer/chain/import/stage/block_following.ex
  9. 21
      apps/explorer/lib/explorer/chain/token/instance.ex
  10. 10
      apps/explorer/lib/explorer/chain/token_transfer.ex
  11. 77
      apps/explorer/lib/explorer/token_instance_owner_address_migration/helper.ex
  12. 26
      apps/explorer/lib/explorer/token_instance_owner_address_migration/supervisor.ex
  13. 51
      apps/explorer/lib/explorer/token_instance_owner_address_migration/worker.ex
  14. 13
      apps/explorer/priv/repo/migrations/20230818094455_add_token_ids_to_address_token_balances.exs
  15. 160
      apps/explorer/test/explorer/chain/import/runner/blocks_test.exs
  16. 35
      apps/explorer/test/explorer/chain_test.exs
  17. 121
      apps/explorer/test/explorer/token_instance_owner_address_migration/helper_test.exs
  18. 6
      apps/explorer/test/support/factory.ex
  19. 5
      apps/indexer/lib/indexer/block/fetcher.ex
  20. 58
      apps/indexer/lib/indexer/fetcher/token_instance/legacy_sanitize.ex
  21. 2
      apps/indexer/lib/indexer/fetcher/token_instance/realtime.ex
  22. 4
      apps/indexer/lib/indexer/fetcher/token_instance/sanitize.ex
  23. 2
      apps/indexer/lib/indexer/supervisor.ex
  24. 13
      apps/indexer/lib/indexer/transform/address_token_balances.ex
  25. 88
      apps/indexer/lib/indexer/transform/token_instances.ex
  26. 16
      apps/indexer/test/indexer/transform/address_token_balances_test.exs
  27. 11
      config/runtime.exs
  28. 5
      docker-compose/envs/common-blockscout.env

@ -4,6 +4,7 @@
### Features
- [#8386](https://github.com/blockscout/blockscout/pull/8386) - Add `owner_address_hash` to the `token_instances`
- [#8530](https://github.com/blockscout/blockscout/pull/8530) - Add `block_type` to search results
- [#8180](https://github.com/blockscout/blockscout/pull/8180) - Deposits and Withdrawals for Polygon Edge
- [#7996](https://github.com/blockscout/blockscout/pull/7996) - Add CoinBalance fetcher init query limit

@ -110,6 +110,8 @@ config :explorer, Explorer.Counters.BlockPriorityFeeCounter,
config :explorer, Explorer.TokenTransferTokenIdMigration.Supervisor, enabled: true
config :explorer, Explorer.TokenInstanceOwnerAddressMigration.Supervisor, enabled: true
config :explorer, Explorer.Chain.Fetcher.CheckBytecodeMatchingOnDemand, enabled: true
config :explorer, Explorer.Chain.Fetcher.FetchValidatorInfoOnDemand, enabled: true

@ -30,6 +30,8 @@ config :explorer, Explorer.Tracer, disabled?: false
config :explorer, Explorer.TokenTransferTokenIdMigration.Supervisor, enabled: false
config :explorer, Explorer.TokenInstanceOwnerAddressMigration.Supervisor, enabled: false
config :explorer,
realtime_events_sender: Explorer.Chain.Events.SimpleSender

@ -119,6 +119,7 @@ defmodule Explorer.Application do
configure(TokenTransferTokenIdMigration.Supervisor),
configure(Explorer.Chain.Fetcher.CheckBytecodeMatchingOnDemand),
configure(Explorer.Chain.Fetcher.FetchValidatorInfoOnDemand),
configure(Explorer.TokenInstanceOwnerAddressMigration.Supervisor),
sc_microservice_configure(Explorer.Chain.Fetcher.LookUpSmartContractSourcesOnDemand)
]
|> List.flatten()

@ -567,7 +567,7 @@ defmodule Explorer.Chain do
select: log,
inner_join: block in Block,
on: block.hash == log.block_hash,
where: block.consensus
where: block.consensus == true
)
preloaded_query =
@ -4391,12 +4391,15 @@ defmodule Explorer.Chain do
|> Repo.stream_reduce(initial, reducer)
end
@spec stream_unfetched_token_instances(
@doc """
Finds all token instances (pairs of contract_address_hash and token_id) which was met in token transfers but has no corresponding entry in token_instances table
"""
@spec stream_not_inserted_token_instances(
initial :: accumulator,
reducer :: (entry :: map(), accumulator -> accumulator)
) :: {:ok, accumulator}
when accumulator: term()
def stream_unfetched_token_instances(initial, reducer) when is_function(reducer, 2) do
def stream_not_inserted_token_instances(initial, reducer) when is_function(reducer, 2) do
nft_tokens =
from(
token in Token,
@ -4438,6 +4441,24 @@ defmodule Explorer.Chain do
Repo.stream_reduce(distinct_query, initial, reducer)
end
@doc """
Finds all token instances where metadata never tried to fetch
"""
@spec stream_token_instances_with_unfetched_metadata(
initial :: accumulator,
reducer :: (entry :: map(), accumulator -> accumulator)
) :: {:ok, accumulator}
when accumulator: term()
def stream_token_instances_with_unfetched_metadata(initial, reducer) when is_function(reducer, 2) do
Instance
|> where([instance], is_nil(instance.error) and is_nil(instance.metadata))
|> select([instance], %{
contract_address_hash: instance.token_contract_address_hash,
token_id: instance.token_id
})
|> Repo.stream_reduce(initial, reducer)
end
@spec stream_token_instances_with_error(
initial :: accumulator,
reducer :: (entry :: map(), accumulator -> accumulator),
@ -4696,18 +4717,37 @@ defmodule Explorer.Chain do
end
@doc """
Expects map of change params. Inserts using on_conflict: :replace_all
Expects map of change params. Inserts using on_conflict: `token_instance_metadata_on_conflict/0`
!!! Supposed to be used ONLY for import of `metadata` or `error`.
"""
@spec upsert_token_instance(map()) :: {:ok, Instance.t()} | {:error, Ecto.Changeset.t()}
def upsert_token_instance(params) do
changeset = Instance.changeset(%Instance{}, params)
Repo.insert(changeset,
on_conflict: :replace_all,
on_conflict: token_instance_metadata_on_conflict(),
conflict_target: [:token_id, :token_contract_address_hash]
)
end
defp token_instance_metadata_on_conflict do
from(
token_instance in Instance,
update: [
set: [
metadata: fragment("EXCLUDED.metadata"),
error: fragment("EXCLUDED.error"),
owner_updated_at_block: token_instance.owner_updated_at_block,
owner_updated_at_log_index: token_instance.owner_updated_at_log_index,
owner_address_hash: token_instance.owner_address_hash,
inserted_at: fragment("LEAST(?, EXCLUDED.inserted_at)", token_instance.inserted_at),
updated_at: fragment("GREATEST(?, EXCLUDED.updated_at)", token_instance.updated_at)
]
],
where: is_nil(token_instance.metadata)
)
end
@doc """
Inserts list of token instances via upsert_token_instance/1.
"""
@ -4809,6 +4849,17 @@ defmodule Explorer.Chain do
select_repo(options).exists?(query)
end
@spec token_instance_with_unfetched_metadata?(non_neg_integer, Hash.Address.t(), [api?]) :: boolean
def token_instance_with_unfetched_metadata?(token_id, token_contract_address, options \\ []) do
Instance
|> where([instance], is_nil(instance.error) and is_nil(instance.metadata))
|> where(
[instance],
instance.token_id == ^token_id and instance.token_contract_address_hash == ^token_contract_address
)
|> select_repo(options).exists?()
end
defp fetch_coin_balances(address, paging_options) do
address.hash
|> CoinBalance.fetch_coin_balances(paging_options)

@ -8,11 +8,22 @@ defmodule Explorer.Chain.Import.Runner.Blocks do
import Ecto.Query, only: [from: 2, where: 3, subquery: 1]
alias Ecto.{Changeset, Multi, Repo}
alias Explorer.Chain.{Address, Block, Import, PendingBlockOperation, Transaction}
alias Explorer.Chain.{
Address,
Block,
Import,
PendingBlockOperation,
Token,
Token.Instance,
TokenTransfer,
Transaction
}
alias Explorer.Chain.Block.Reward
alias Explorer.Chain.Import.Runner
alias Explorer.Chain.Import.Runner.Address.CurrentTokenBalances
alias Explorer.Chain.Import.Runner.Tokens
alias Explorer.Chain.Import.Runner.{TokenInstances, Tokens}
alias Explorer.Prometheus.Instrumenter
alias Explorer.Repo, as: ExplorerRepo
alias Explorer.Utility.MissingRangesManipulator
@ -177,6 +188,14 @@ defmodule Explorer.Chain.Import.Runner.Blocks do
:derive_address_current_token_balances
)
end)
|> Multi.run(:update_token_instances_owner, fn repo, %{derive_transaction_forks: transactions} ->
Instrumenter.block_import_stage_runner(
fn -> update_token_instances_owner(repo, transactions, insert_options) end,
:address_referencing,
:blocks,
:update_token_instances_owner
)
end)
|> Multi.run(:blocks_update_token_holder_counts, fn repo,
%{
delete_address_current_token_balances: deleted,
@ -577,6 +596,155 @@ defmodule Explorer.Chain.Import.Runner.Blocks do
{:ok, derived_address_current_token_balances}
end
defp update_token_instances_owner(_, [], _), do: {:ok, []}
defp update_token_instances_owner(repo, forked_transaction_hashes, options) do
forked_transaction_hashes
|> forked_token_transfers_query()
|> repo.all()
|> process_forked_token_transfers(repo, options)
end
defp process_forked_token_transfers([], _, _), do: {:ok, []}
defp process_forked_token_transfers(token_transfers, repo, options) do
changes_initial =
Enum.reduce(token_transfers, %{}, fn tt, acc ->
Map.put_new(acc, {tt.token_contract_address_hash, tt.token_id}, %{
token_contract_address_hash: tt.token_contract_address_hash,
token_id: tt.token_id,
owner_address_hash: tt.from,
owner_updated_at_block: -1,
owner_updated_at_log_index: -1
})
end)
non_consensus_block_numbers = token_transfers |> Enum.map(fn tt -> tt.block_number end) |> Enum.uniq()
filtered_query = TokenTransfer.only_consensus_transfers_query()
base_query =
from(token_transfer in subquery(filtered_query),
select: %{
token_contract_address_hash: token_transfer.token_contract_address_hash,
token_id: fragment("(?)[1]", token_transfer.token_ids),
block_number: max(token_transfer.block_number)
},
group_by: [token_transfer.token_contract_address_hash, fragment("(?)[1]", token_transfer.token_ids)]
)
historical_token_transfers_query =
Enum.reduce(token_transfers, base_query, fn tt, acc ->
from(token_transfer in acc,
or_where:
token_transfer.token_contract_address_hash == ^tt.token_contract_address_hash and
fragment("? @> ARRAY[?::decimal]", token_transfer.token_ids, ^tt.token_id) and
token_transfer.block_number < ^tt.block_number and
token_transfer.block_number not in ^non_consensus_block_numbers
)
end)
refs_to_token_transfers =
from(historical_tt in subquery(historical_token_transfers_query),
inner_join: tt in subquery(filtered_query),
on:
tt.token_contract_address_hash == historical_tt.token_contract_address_hash and
tt.block_number == historical_tt.block_number and
fragment("? @> ARRAY[?::decimal]", tt.token_ids, historical_tt.token_id),
select: %{
token_contract_address_hash: tt.token_contract_address_hash,
token_id: historical_tt.token_id,
log_index: max(tt.log_index),
block_number: tt.block_number
},
group_by: [tt.token_contract_address_hash, historical_tt.token_id, tt.block_number]
)
derived_token_transfers_query =
from(tt in filtered_query,
inner_join: tt_1 in subquery(refs_to_token_transfers),
on: tt_1.log_index == tt.log_index and tt_1.block_number == tt.block_number
)
changes =
derived_token_transfers_query
|> repo.all()
|> Enum.reduce(changes_initial, fn tt, acc ->
token_id = List.first(tt.token_ids)
current_key = {tt.token_contract_address_hash, token_id}
params = %{
token_contract_address_hash: tt.token_contract_address_hash,
token_id: token_id,
owner_address_hash: tt.to_address_hash,
owner_updated_at_block: tt.block_number,
owner_updated_at_log_index: tt.log_index
}
Map.put(
acc,
current_key,
Enum.max_by([acc[current_key], params], fn %{
owner_updated_at_block: block_number,
owner_updated_at_log_index: log_index
} ->
{block_number, log_index}
end)
)
end)
|> Map.values()
TokenInstances.insert(
repo,
changes,
options
|> Map.put(:timestamps, Import.timestamps())
|> Map.put(:on_conflict, token_instances_on_conflict())
)
end
defp forked_token_transfers_query(forked_transaction_hashes) do
from(token_transfer in TokenTransfer,
where: token_transfer.transaction_hash in ^forked_transaction_hashes,
inner_join: token in Token,
on: token.contract_address_hash == token_transfer.token_contract_address_hash,
where: token.type == "ERC-721",
inner_join: instance in Instance,
on:
fragment("? @> ARRAY[?::decimal]", token_transfer.token_ids, instance.token_id) and
instance.token_contract_address_hash == token_transfer.token_contract_address_hash,
# per one token instance we will have only one token transfer
where:
token_transfer.block_number == instance.owner_updated_at_block and
token_transfer.log_index == instance.owner_updated_at_log_index,
select: %{
from: token_transfer.from_address_hash,
to: token_transfer.to_address_hash,
token_id: instance.token_id,
token_contract_address_hash: token_transfer.token_contract_address_hash,
block_number: token_transfer.block_number,
log_index: token_transfer.log_index
}
)
end
defp token_instances_on_conflict do
from(
token_instance in Instance,
update: [
set: [
metadata: token_instance.metadata,
error: token_instance.error,
owner_updated_at_block: fragment("EXCLUDED.owner_updated_at_block"),
owner_updated_at_log_index: fragment("EXCLUDED.owner_updated_at_log_index"),
owner_address_hash: fragment("EXCLUDED.owner_address_hash"),
inserted_at: fragment("LEAST(?, EXCLUDED.inserted_at)", token_instance.inserted_at),
updated_at: fragment("GREATEST(?, EXCLUDED.updated_at)", token_instance.updated_at)
]
]
)
end
defp derive_address_current_token_balances_grouped_query(deleted_address_current_token_balances) do
initial_query =
from(tb in Address.TokenBalance,

@ -0,0 +1,106 @@
defmodule Explorer.Chain.Import.Runner.TokenInstances do
@moduledoc """
Bulk imports `t:Explorer.Chain.TokenInstances.t/0`.
"""
require Ecto.Query
alias Ecto.{Changeset, Multi, Repo}
alias Explorer.Chain.Import
alias Explorer.Chain.Token.Instance, as: TokenInstance
alias Explorer.Prometheus.Instrumenter
import Ecto.Query, only: [from: 2]
@behaviour Import.Runner
# milliseconds
@timeout 60_000
@type imported :: [TokenInstance.t()]
@impl Import.Runner
def ecto_schema_module, do: TokenInstance
@impl Import.Runner
def option_key, do: :token_instances
@impl Import.Runner
def imported_table_row do
%{
value_type: "[#{ecto_schema_module()}.t()]",
value_description: "List of `t:#{ecto_schema_module()}.t/0`s"
}
end
@impl Import.Runner
def run(multi, changes_list, %{timestamps: timestamps} = options) do
insert_options =
options
|> Map.get(option_key(), %{})
|> Map.take(~w(on_conflict timeout)a)
|> Map.put_new(:timeout, @timeout)
|> Map.put(:timestamps, timestamps)
Multi.run(multi, :token_instances, fn repo, _ ->
Instrumenter.block_import_stage_runner(
fn -> insert(repo, changes_list, insert_options) end,
:block_referencing,
:token_instances,
:token_instances
)
end)
end
@impl Import.Runner
def timeout, do: @timeout
@spec insert(Repo.t(), [map()], %{
optional(:on_conflict) => Import.Runner.on_conflict(),
required(:timeout) => timeout,
required(:timestamps) => Import.timestamps()
}) ::
{:ok, [TokenInstance.t()]}
| {:error, [Changeset.t()]}
def insert(repo, changes_list, %{timeout: timeout, timestamps: timestamps} = options) when is_list(changes_list) do
on_conflict = Map.get_lazy(options, :on_conflict, &default_on_conflict/0)
# Guarantee the same import order to avoid deadlocks
ordered_changes_list = Enum.sort_by(changes_list, &{&1.token_contract_address_hash, &1.token_id})
{:ok, _} =
Import.insert_changes_list(
repo,
ordered_changes_list,
conflict_target: [:token_contract_address_hash, :token_id],
on_conflict: on_conflict,
for: TokenInstance,
returning: true,
timeout: timeout,
timestamps: timestamps
)
end
defp default_on_conflict do
from(
token_instance in TokenInstance,
update: [
set: [
metadata: token_instance.metadata,
error: token_instance.error,
owner_updated_at_block: fragment("EXCLUDED.owner_updated_at_block"),
owner_updated_at_log_index: fragment("EXCLUDED.owner_updated_at_log_index"),
owner_address_hash: fragment("EXCLUDED.owner_address_hash"),
inserted_at: fragment("LEAST(?, EXCLUDED.inserted_at)", token_instance.inserted_at),
updated_at: fragment("GREATEST(?, EXCLUDED.updated_at)", token_instance.updated_at)
]
],
where:
fragment("EXCLUDED.owner_address_hash IS NOT NULL") and fragment("EXCLUDED.owner_updated_at_block IS NOT NULL") and
(fragment("EXCLUDED.owner_updated_at_block > ?", token_instance.owner_updated_at_block) or
(fragment("EXCLUDED.owner_updated_at_block = ?", token_instance.owner_updated_at_block) and
fragment("EXCLUDED.owner_updated_at_log_index >= ?", token_instance.owner_updated_at_log_index)) or
is_nil(token_instance.owner_updated_at_block) or is_nil(token_instance.owner_address_hash))
)
end
end

@ -15,7 +15,8 @@ defmodule Explorer.Chain.Import.Stage.BlockFollowing do
do: [
Runner.Block.SecondDegreeRelations,
Runner.Block.Rewards,
Runner.Address.CurrentTokenBalances
Runner.Address.CurrentTokenBalances,
Runner.TokenInstances
]
@impl Stage

@ -5,7 +5,7 @@ defmodule Explorer.Chain.Token.Instance do
use Explorer.Schema
alias Explorer.Chain.{Address, Hash, Token, TokenTransfer}
alias Explorer.Chain.{Address, Block, Hash, Token, TokenTransfer}
alias Explorer.Chain.Token.Instance
alias Explorer.PagingOptions
@ -20,7 +20,10 @@ defmodule Explorer.Chain.Token.Instance do
token_id: non_neg_integer(),
token_contract_address_hash: Hash.Address.t(),
metadata: map() | nil,
error: String.t()
error: String.t(),
owner_address_hash: Hash.Address.t(),
owner_updated_at_block: Block.block_number(),
owner_updated_at_log_index: non_neg_integer()
}
@primary_key false
@ -28,8 +31,10 @@ defmodule Explorer.Chain.Token.Instance do
field(:token_id, :decimal, primary_key: true)
field(:metadata, :map)
field(:error, :string)
field(:owner_updated_at_block, :integer)
field(:owner_updated_at_log_index, :integer)
belongs_to(:owner, Address, references: :hash, define_field: false)
belongs_to(:owner, Address, foreign_key: :owner_address_hash, references: :hash, type: Hash.Address)
belongs_to(
:token,
@ -45,7 +50,15 @@ defmodule Explorer.Chain.Token.Instance do
def changeset(%Instance{} = instance, params \\ %{}) do
instance
|> cast(params, [:token_id, :metadata, :token_contract_address_hash, :error])
|> cast(params, [
:token_id,
:metadata,
:token_contract_address_hash,
:error,
:owner_address_hash,
:owner_updated_at_block,
:owner_updated_at_log_index
])
|> validate_required([:token_id, :token_contract_address_hash])
|> foreign_key_constraint(:token_contract_address_hash)
end

@ -47,7 +47,7 @@ defmodule Explorer.Chain.TokenTransfer do
* `:token_id` - ID of the token (applicable to ERC-721 tokens)
* `:transaction` - The `t:Explorer.Chain.Transaction.t/0` ledger
* `:transaction_hash` - Transaction foreign key
* `:log_index` - Index of the corresponding `t:Explorer.Chain.Log.t/0` in the transaction.
* `:log_index` - Index of the corresponding `t:Explorer.Chain.Log.t/0` in the block.
* `:amounts` - Tokens transferred amounts in case of batched transfer in ERC-1155
* `:token_ids` - IDs of the tokens (applicable to ERC-1155 tokens)
"""
@ -360,4 +360,12 @@ defmodule Explorer.Chain.TokenTransfer do
end
def filter_by_type(query, _), do: query
def only_consensus_transfers_query do
from(token_transfer in __MODULE__,
inner_join: block in Block,
on: token_transfer.block_hash == block.hash,
where: block.consensus == true
)
end
end

@ -0,0 +1,77 @@
defmodule Explorer.TokenInstanceOwnerAddressMigration.Helper do
@moduledoc """
Auxiliary functions for TokenInstanceOwnerAddressMigration.{Worker and Supervisor}
"""
import Ecto.Query,
only: [
from: 2
]
alias Explorer.{Chain, Repo}
alias Explorer.Chain.Token.Instance
alias Explorer.Chain.{SmartContract, TokenTransfer}
{:ok, burn_address_hash} = Chain.string_to_address_hash(SmartContract.burn_address_hash_string())
@burn_address_hash burn_address_hash
@spec filtered_token_instances_query(non_neg_integer()) :: Ecto.Query.t()
def filtered_token_instances_query(limit) do
from(instance in Instance,
where: is_nil(instance.owner_address_hash),
inner_join: token in assoc(instance, :token),
where: token.type == "ERC-721",
limit: ^limit,
select: %{token_id: instance.token_id, token_contract_address_hash: instance.token_contract_address_hash}
)
end
@spec fetch_and_insert([map]) ::
{:error, :timeout | [map]}
| {:ok,
%{
:token_instances => [Instance.t()]
}}
| {:error, any, any, map}
def fetch_and_insert(batch) do
changes =
Enum.map(batch, fn %{token_id: token_id, token_contract_address_hash: token_contract_address_hash} ->
token_transfer_query =
from(tt in TokenTransfer.only_consensus_transfers_query(),
where:
tt.token_contract_address_hash == ^token_contract_address_hash and
fragment("? @> ARRAY[?::decimal]", tt.token_ids, ^token_id),
order_by: [desc: tt.block_number, desc: tt.log_index],
limit: 1,
select: %{
token_contract_address_hash: tt.token_contract_address_hash,
token_ids: tt.token_ids,
to_address_hash: tt.to_address_hash,
block_number: tt.block_number,
log_index: tt.log_index
}
)
token_transfer =
Repo.one(token_transfer_query) ||
%{to_address_hash: @burn_address_hash, block_number: -1, log_index: -1}
%{
token_contract_address_hash: token_contract_address_hash,
token_id: token_id,
token_type: "ERC-721",
owner_address_hash: token_transfer.to_address_hash,
owner_updated_at_block: token_transfer.block_number,
owner_updated_at_log_index: token_transfer.log_index
}
end)
Chain.import(%{token_instances: %{params: changes}})
end
@spec unfilled_token_instances_exists? :: boolean
def unfilled_token_instances_exists? do
1
|> filtered_token_instances_query()
|> Repo.exists?()
end
end

@ -0,0 +1,26 @@
defmodule Explorer.TokenInstanceOwnerAddressMigration.Supervisor do
@moduledoc """
Supervisor for Explorer.TokenInstanceOwnerAddressMigration.Worker
"""
use Supervisor
alias Explorer.TokenInstanceOwnerAddressMigration.{Helper, Worker}
def start_link(init_arg) do
Supervisor.start_link(__MODULE__, init_arg, name: __MODULE__)
end
@impl true
def init(_init_arg) do
if Helper.unfilled_token_instances_exists?() do
children = [
{Worker, Application.get_env(:explorer, Explorer.TokenInstanceOwnerAddressMigration)}
]
Supervisor.init(children, strategy: :one_for_one)
else
:ignore
end
end
end

@ -0,0 +1,51 @@
defmodule Explorer.TokenInstanceOwnerAddressMigration.Worker do
@moduledoc """
GenServer for filling owner_address_hash, owner_updated_at_block and owner_updated_at_log_index
for ERC-721 token instances. Works in the following way
1. Checks if there are some unprocessed nfts.
- if yes, then go to 2 stage
- if no, then shutdown
2. Fetch `(concurrency * batch_size)` token instances, process them in `concurrency` tasks.
3. Go to step 1
"""
use GenServer, restart: :transient
alias Explorer.Repo
alias Explorer.TokenInstanceOwnerAddressMigration.Helper
def start_link(concurrency: concurrency, batch_size: batch_size) do
GenServer.start_link(__MODULE__, %{concurrency: concurrency, batch_size: batch_size}, name: __MODULE__)
end
@impl true
def init(opts) do
GenServer.cast(__MODULE__, :check_necessity)
{:ok, opts}
end
@impl true
def handle_cast(:check_necessity, state) do
if Helper.unfilled_token_instances_exists?() do
GenServer.cast(__MODULE__, :backfill)
{:noreply, state}
else
{:stop, :normal, state}
end
end
@impl true
def handle_cast(:backfill, %{concurrency: concurrency, batch_size: batch_size} = state) do
(concurrency * batch_size)
|> Helper.filtered_token_instances_query()
|> Repo.all()
|> Enum.chunk_every(batch_size)
|> Enum.map(fn batch -> Task.async(fn -> Helper.fetch_and_insert(batch) end) end)
|> Task.await_many(:infinity)
GenServer.cast(__MODULE__, :check_necessity)
{:noreply, state}
end
end

@ -0,0 +1,13 @@
defmodule Explorer.Repo.Migrations.AddTokenIdsToAddressTokenBalances do
use Ecto.Migration
def change do
alter table(:token_instances) do
add(:owner_address_hash, :bytea, null: true)
add(:owner_updated_at_block, :bigint, null: true)
add(:owner_updated_at_log_index, :integer, null: true)
end
create(index(:token_instances, [:owner_address_hash]))
end
end

@ -368,6 +368,166 @@ defmodule Explorer.Chain.Import.Runner.BlocksTest do
assert %{block_number: ^number, block_hash: ^hash} = Repo.one(PendingBlockOperation)
end
test "change instance owner if was token transfer in older blocks",
%{consensus_block: %{hash: block_hash, miner_hash: miner_hash, number: block_number}, options: options} do
block_number = block_number + 2
consensus_block = insert(:block, %{hash: block_hash, number: block_number})
transaction =
:transaction
|> insert()
|> with_block(consensus_block)
token_address = insert(:contract_address)
insert(:token, contract_address: token_address, type: "ERC-721")
id = Decimal.new(1)
tt =
insert(:token_transfer,
token_ids: [id],
transaction: transaction,
token_contract_address: token_address,
block_number: block_number,
block: consensus_block,
log_index: 123
)
%{hash: hash_1} = params_for(:block, consensus: true, miner_hash: miner_hash)
consensus_block_1 = insert(:block, %{hash: hash_1, number: block_number - 1})
transaction =
:transaction
|> insert()
|> with_block(consensus_block_1)
for _ <- 0..10 do
insert(:token_transfer,
token_ids: [id],
transaction: transaction,
token_contract_address: tt.token_contract_address,
block_number: consensus_block_1.number,
block: consensus_block_1
)
end
tt_1 =
insert(:token_transfer,
token_ids: [id],
transaction: transaction,
token_contract_address: tt.token_contract_address,
block_number: consensus_block_1.number,
block: consensus_block_1
)
%{hash: hash_2} = params_for(:block, consensus: true, miner_hash: miner_hash)
consensus_block_2 = insert(:block, %{hash: hash_2, number: block_number - 2})
for _ <- 0..10 do
tx =
:transaction
|> insert()
|> with_block(consensus_block_2)
insert(:token_transfer,
token_ids: [id],
transaction: tx,
token_contract_address: tt.token_contract_address,
block_number: consensus_block_2.number,
block: consensus_block_2
)
end
instance =
insert(:token_instance,
token_contract_address_hash: token_address.hash,
token_id: id,
owner_updated_at_block: tt.block_number,
owner_updated_at_log_index: tt.log_index,
owner_address_hash: insert(:address).hash
)
block_params =
params_for(:block, hash: block_hash, miner_hash: miner_hash, number: block_number, consensus: false)
%Ecto.Changeset{valid?: true, changes: block_changes} = Block.changeset(%Block{}, block_params)
changes_list = [block_changes]
error = instance.error
block_number = tt_1.block_number
log_index = tt_1.log_index
owner_address_hash = tt_1.to_address_hash
token_address_hash = token_address.hash
assert {:ok,
%{
update_token_instances_owner: [
%Explorer.Chain.Token.Instance{
token_id: ^id,
error: ^error,
owner_updated_at_block: ^block_number,
owner_updated_at_log_index: ^log_index,
owner_address_hash: ^owner_address_hash,
token_contract_address_hash: ^token_address_hash
}
]
}} = Multi.new() |> Blocks.run(changes_list, options) |> Repo.transaction()
end
test "change instance owner if there was no more token transfers",
%{consensus_block: %{hash: block_hash, miner_hash: miner_hash, number: block_number}, options: options} do
block_number = block_number + 1
consensus_block = insert(:block, %{hash: block_hash, number: block_number})
transaction =
:transaction
|> insert()
|> with_block(consensus_block)
token_address = insert(:contract_address)
insert(:token, contract_address: token_address, type: "ERC-721")
id = Decimal.new(1)
tt =
insert(:token_transfer,
token_ids: [id],
transaction: transaction,
token_contract_address: token_address,
block_number: block_number,
block: consensus_block
)
instance =
insert(:token_instance,
token_contract_address_hash: token_address.hash,
token_id: id,
owner_updated_at_block: tt.block_number,
owner_updated_at_log_index: tt.log_index,
owner_address_hash: insert(:address).hash
)
block_params =
params_for(:block, hash: block_hash, miner_hash: miner_hash, number: block_number, consensus: false)
%Ecto.Changeset{valid?: true, changes: block_changes} = Block.changeset(%Block{}, block_params)
changes_list = [block_changes]
error = instance.error
owner_address_hash = tt.from_address_hash
token_address_hash = token_address.hash
assert {:ok,
%{
update_token_instances_owner: [
%Explorer.Chain.Token.Instance{
token_id: ^id,
error: ^error,
owner_updated_at_block: -1,
owner_updated_at_log_index: -1,
owner_address_hash: ^owner_address_hash,
token_contract_address_hash: ^token_address_hash
}
]
}} = Multi.new() |> Blocks.run(changes_list, options) |> Repo.transaction()
end
end
describe "lose_consensus/5" do

@ -183,30 +183,6 @@ defmodule Explorer.ChainTest do
assert result.token_contract_address_hash == token.contract_address_hash
end
test "replaces existing token instance record" do
token = insert(:token)
params = %{
token_id: 1,
token_contract_address_hash: token.contract_address_hash,
metadata: %{uri: "http://example.com"}
}
{:ok, _} = Chain.upsert_token_instance(params)
params1 = %{
token_id: 1,
token_contract_address_hash: token.contract_address_hash,
metadata: %{uri: "http://example1.com"}
}
{:ok, result} = Chain.upsert_token_instance(params1)
assert result.token_id == Decimal.new(1)
assert result.metadata == params1.metadata
assert result.token_contract_address_hash == token.contract_address_hash
end
test "fails to import with invalid params" do
params = %{
token_id: 1,
@ -243,7 +219,8 @@ defmodule Explorer.ChainTest do
insert(:token_instance,
token_id: 1,
token_contract_address_hash: token.contract_address_hash,
error: "no uri"
error: "no uri",
metadata: nil
)
params = %{
@ -4903,7 +4880,7 @@ defmodule Explorer.ChainTest do
end
end
describe "stream_unfetched_token_instances/2" do
describe "stream_not_inserted_token_instances/2" do
test "reduces with given reducer and accumulator for ERC-721 token" do
token_contract_address = insert(:contract_address)
token = insert(:token, contract_address: token_contract_address, type: "ERC-721")
@ -4924,7 +4901,7 @@ defmodule Explorer.ChainTest do
token_ids: [11]
)
assert {:ok, [result]} = Chain.stream_unfetched_token_instances([], &[&1 | &2])
assert {:ok, [result]} = Chain.stream_not_inserted_token_instances([], &[&1 | &2])
assert result.token_id == List.first(token_transfer.token_ids)
assert result.contract_address_hash == token_transfer.token_contract_address_hash
end
@ -4948,7 +4925,7 @@ defmodule Explorer.ChainTest do
token_ids: nil
)
assert {:ok, []} = Chain.stream_unfetched_token_instances([], &[&1 | &2])
assert {:ok, []} = Chain.stream_not_inserted_token_instances([], &[&1 | &2])
end
test "do not fetch records with token instances" do
@ -4976,7 +4953,7 @@ defmodule Explorer.ChainTest do
token_contract_address_hash: token_transfer.token_contract_address_hash
)
assert {:ok, []} = Chain.stream_unfetched_token_instances([], &[&1 | &2])
assert {:ok, []} = Chain.stream_not_inserted_token_instances([], &[&1 | &2])
end
end

@ -0,0 +1,121 @@
defmodule Explorer.TokenInstanceOwnerAddressMigration.HelperTest do
use Explorer.DataCase
alias Explorer.{Chain, Repo}
alias Explorer.Chain.Token.Instance
alias Explorer.TokenInstanceOwnerAddressMigration.Helper
{:ok, burn_address_hash} = Chain.string_to_address_hash("0x0000000000000000000000000000000000000000")
@burn_address_hash burn_address_hash
describe "fetch_and_insert/2" do
test "successfully update owner of single token instance" do
token_address = insert(:contract_address)
insert(:token, contract_address: token_address, type: "ERC-721")
instance = insert(:token_instance, token_contract_address_hash: token_address.hash)
transaction =
:transaction
|> insert()
|> with_block()
tt_1 =
insert(:token_transfer,
token_ids: [instance.token_id],
transaction: transaction,
token_contract_address: token_address
)
Helper.fetch_and_insert([
%{token_id: instance.token_id, token_contract_address_hash: instance.token_contract_address_hash}
])
owner_address = tt_1.to_address_hash
block_number = tt_1.block_number
log_index = tt_1.log_index
assert %Instance{
owner_address_hash: ^owner_address,
owner_updated_at_block: ^block_number,
owner_updated_at_log_index: ^log_index
} =
Repo.get_by(Instance,
token_id: instance.token_id,
token_contract_address_hash: instance.token_contract_address_hash
)
end
test "put placeholder value if tt absent in db" do
instance = insert(:token_instance)
Helper.fetch_and_insert([
%{token_id: instance.token_id, token_contract_address_hash: instance.token_contract_address_hash}
])
assert %Instance{
owner_address_hash: @burn_address_hash,
owner_updated_at_block: -1,
owner_updated_at_log_index: -1
} =
Repo.get_by(Instance,
token_id: instance.token_id,
token_contract_address_hash: instance.token_contract_address_hash
)
end
test "update owners of token instances batch" do
instances =
for _ <- 0..5 do
token_address = insert(:contract_address)
insert(:token, contract_address: token_address, type: "ERC-721")
instance = insert(:token_instance, token_contract_address_hash: token_address.hash)
tt =
for _ <- 0..5 do
transaction =
:transaction
|> insert()
|> with_block()
for _ <- 0..5 do
insert(:token_transfer,
token_ids: [instance.token_id],
transaction: transaction,
token_contract_address: token_address
)
end
end
|> Enum.concat()
|> Enum.max_by(fn tt -> {tt.block_number, tt.log_index} end)
%{
token_id: instance.token_id,
token_contract_address_hash: instance.token_contract_address_hash,
owner_address_hash: tt.to_address_hash,
owner_updated_at_block: tt.block_number,
owner_updated_at_log_index: tt.log_index
}
end
Helper.fetch_and_insert(instances)
for ti <- instances do
owner_address = ti.owner_address_hash
block_number = ti.owner_updated_at_block
log_index = ti.owner_updated_at_log_index
assert %Instance{
owner_address_hash: ^owner_address,
owner_updated_at_block: ^block_number,
owner_updated_at_log_index: ^log_index
} =
Repo.get_by(Instance,
token_id: ti.token_id,
token_contract_address_hash: ti.token_contract_address_hash
)
end
end
end
end

@ -863,6 +863,12 @@ defmodule Explorer.Factory do
}
end
def log_index_factory do
%{
log_index: sequence("token_id", & &1)
}
end
def token_balance_factory do
%TokenBalance{
address: build(:address),

@ -37,6 +37,7 @@ defmodule Indexer.Block.Fetcher do
Addresses,
AddressTokenBalances,
MintTransfers,
TokenInstances,
TokenTransfers,
TransactionActions
}
@ -183,6 +184,7 @@ defmodule Indexer.Block.Fetcher do
address_token_balances = AddressTokenBalances.params_set(%{token_transfers_params: token_transfers}),
transaction_actions =
Enum.map(transaction_actions, fn action -> Map.put(action, :data, Map.delete(action.data, :block_number)) end),
token_instances = TokenInstances.params_set(%{token_transfers_params: token_transfers}),
basic_import_options = %{
addresses: %{params: addresses},
address_coin_balances: %{params: coin_balances_params_set},
@ -198,7 +200,8 @@ defmodule Indexer.Block.Fetcher do
token_transfers: %{params: token_transfers},
tokens: %{on_conflict: :nothing, params: tokens},
transactions: %{params: transactions_with_receipts},
withdrawals: %{params: withdrawals_params}
withdrawals: %{params: withdrawals_params},
token_instances: %{params: token_instances}
},
import_options =
(if Application.get_env(:explorer, :chain_type) == "polygon_edge" do

@ -0,0 +1,58 @@
defmodule Indexer.Fetcher.TokenInstance.LegacySanitize do
@moduledoc """
This fetcher is stands for creating token instances which wasn't inserted yet and index meta for them. Legacy is because now we token instances inserted on block import and this fetcher is only for historical and unfetched for some reasons data
"""
use Indexer.Fetcher, restart: :permanent
use Spandex.Decorators
import Indexer.Fetcher.TokenInstance.Helper
alias Explorer.Chain
alias Indexer.BufferedTask
@behaviour BufferedTask
@default_max_batch_size 10
@default_max_concurrency 10
@doc false
def child_spec([init_options, gen_server_options]) do
merged_init_opts =
defaults()
|> Keyword.merge(init_options)
|> Keyword.merge(state: [])
Supervisor.child_spec({BufferedTask, [{__MODULE__, merged_init_opts}, gen_server_options]}, id: __MODULE__)
end
@impl BufferedTask
def init(initial_acc, reducer, _) do
{:ok, acc} =
Chain.stream_not_inserted_token_instances(initial_acc, fn data, acc ->
reducer.(data, acc)
end)
acc
end
@impl BufferedTask
def run(token_instances, _) when is_list(token_instances) do
token_instances
|> Enum.filter(fn %{contract_address_hash: hash, token_id: token_id} ->
not Chain.token_instance_exists?(token_id, hash)
end)
|> batch_fetch_instances()
:ok
end
defp defaults do
[
flush_interval: :infinity,
max_concurrency: Application.get_env(:indexer, __MODULE__)[:concurrency] || @default_max_concurrency,
max_batch_size: Application.get_env(:indexer, __MODULE__)[:batch_size] || @default_max_batch_size,
poll: false,
task_supervisor: __MODULE__.TaskSupervisor
]
end
end

@ -35,7 +35,7 @@ defmodule Indexer.Fetcher.TokenInstance.Realtime do
def run(token_instances, _) when is_list(token_instances) do
token_instances
|> Enum.filter(fn %{contract_address_hash: hash, token_id: token_id} ->
not Chain.token_instance_exists?(token_id, hash)
Chain.token_instance_with_unfetched_metadata?(token_id, hash)
end)
|> batch_fetch_instances()

@ -28,7 +28,7 @@ defmodule Indexer.Fetcher.TokenInstance.Sanitize do
@impl BufferedTask
def init(initial_acc, reducer, _) do
{:ok, acc} =
Chain.stream_unfetched_token_instances(initial_acc, fn data, acc ->
Chain.stream_token_instances_with_unfetched_metadata(initial_acc, fn data, acc ->
reducer.(data, acc)
end)
@ -39,7 +39,7 @@ defmodule Indexer.Fetcher.TokenInstance.Sanitize do
def run(token_instances, _) when is_list(token_instances) do
token_instances
|> Enum.filter(fn %{contract_address_hash: hash, token_id: token_id} ->
not Chain.token_instance_exists?(token_id, hash)
Chain.token_instance_with_unfetched_metadata?(token_id, hash)
end)
|> batch_fetch_instances()

@ -13,6 +13,7 @@ defmodule Indexer.Supervisor do
alias Indexer.Block.Catchup, as: BlockCatchup
alias Indexer.Block.Realtime, as: BlockRealtime
alias Indexer.Fetcher.TokenInstance.LegacySanitize, as: TokenInstanceLegacySanitize
alias Indexer.Fetcher.TokenInstance.Realtime, as: TokenInstanceRealtime
alias Indexer.Fetcher.TokenInstance.Retry, as: TokenInstanceRetry
alias Indexer.Fetcher.TokenInstance.Sanitize, as: TokenInstanceSanitize
@ -115,6 +116,7 @@ defmodule Indexer.Supervisor do
{TokenInstanceRealtime.Supervisor, [[memory_monitor: memory_monitor]]},
{TokenInstanceRetry.Supervisor, [[memory_monitor: memory_monitor]]},
{TokenInstanceSanitize.Supervisor, [[memory_monitor: memory_monitor]]},
{TokenInstanceLegacySanitize.Supervisor, [[memory_monitor: memory_monitor]]},
configure(TransactionAction.Supervisor, [[memory_monitor: memory_monitor]]),
{ContractCode.Supervisor,
[[json_rpc_named_arguments: json_rpc_named_arguments, memory_monitor: memory_monitor]]},

@ -11,7 +11,6 @@ defmodule Indexer.Transform.AddressTokenBalances do
defp reducer({:token_transfers_params, token_transfers_params}, initial) when is_list(token_transfers_params) do
token_transfers_params
|> ignore_burn_address_transfers_for_token_erc_721()
|> Enum.reduce(initial, fn %{
block_number: block_number,
from_address_hash: from_address_hash,
@ -31,10 +30,6 @@ defmodule Indexer.Transform.AddressTokenBalances do
end)
end
defp ignore_burn_address_transfers_for_token_erc_721(token_transfers_params) do
Enum.filter(token_transfers_params, &do_filter_burn_address/1)
end
defp add_token_balance_address(map_set, unquote(burn_address_hash_string()), _, _, _, _), do: map_set
defp add_token_balance_address(map_set, address, token_contract_address, token_id, token_type, block_number) do
@ -46,12 +41,4 @@ defmodule Indexer.Transform.AddressTokenBalances do
token_type: token_type
})
end
def do_filter_burn_address(%{to_address_hash: unquote(burn_address_hash_string()), token_type: "ERC-721"}) do
false
end
def do_filter_burn_address(_token_balance_param) do
true
end
end

@ -0,0 +1,88 @@
defmodule Indexer.Transform.TokenInstances do
@moduledoc """
Module extracts token instances from token transfers
"""
def params_set(%{} = import_options) do
Enum.reduce(import_options, %{}, &reducer/2)
end
defp reducer({:token_transfers_params, token_transfers_params}, initial) when is_list(token_transfers_params) do
token_transfers_params
|> Enum.reduce(initial, fn
%{
block_number: block_number,
from_address_hash: from_address_hash,
to_address_hash: to_address_hash,
token_contract_address_hash: token_contract_address_hash,
token_ids: [_ | _]
} = tt,
acc
when is_integer(block_number) and
is_binary(from_address_hash) and
is_binary(to_address_hash) and is_binary(token_contract_address_hash) ->
transfer_to_instances(tt, acc)
_, acc ->
acc
end)
|> Map.values()
end
defp transfer_to_instances(
%{
token_type: "ERC-721" = token_type,
to_address_hash: to_address_hash,
token_ids: [token_id],
token_contract_address_hash: token_contract_address_hash,
block_number: block_number,
log_index: log_index
},
acc
) do
params = %{
token_contract_address_hash: token_contract_address_hash,
token_id: token_id,
token_type: token_type,
owner_address_hash: to_address_hash,
owner_updated_at_block: block_number,
owner_updated_at_log_index: log_index
}
current_key = {token_contract_address_hash, token_id}
Map.put(
acc,
current_key,
Enum.max_by(
[
params,
acc[current_key] || params
],
fn %{
owner_updated_at_block: owner_updated_at_block,
owner_updated_at_log_index: owner_updated_at_log_index
} ->
{owner_updated_at_block, owner_updated_at_log_index}
end
)
)
end
defp transfer_to_instances(
%{
token_type: _token_type,
token_ids: [_ | _] = token_ids,
token_contract_address_hash: token_contract_address_hash
},
acc
) do
Enum.reduce(token_ids, acc, fn id, sub_acc ->
Map.put(sub_acc, {token_contract_address_hash, id}, %{
token_contract_address_hash: token_contract_address_hash,
token_id: id,
token_type: "ERC-1155"
})
end)
end
end

@ -66,7 +66,7 @@ defmodule Indexer.Transform.AddressTokenBalancesTest do
])
end
test "does not set params when the to_address_hash is the burn address for the Token ERC-721" do
test "does set params when the to_address_hash is the burn address for the Token ERC-721" do
block_number = 1
from_address_hash = "0x5b8410f67eb8040bb1cd1e8a4ff9d5f6ce678a15"
to_address_hash = "0x0000000000000000000000000000000000000000"
@ -77,12 +77,22 @@ defmodule Indexer.Transform.AddressTokenBalancesTest do
from_address_hash: from_address_hash,
to_address_hash: to_address_hash,
token_contract_address_hash: token_contract_address_hash,
token_type: "ERC-721"
token_type: "ERC-721",
token_ids: [1]
}
params_set = AddressTokenBalances.params_set(%{token_transfers_params: [token_transfer_params]})
assert MapSet.size(params_set) == 0
assert params_set ==
MapSet.new([
%{
address_hash: "0x5b8410f67eb8040bb1cd1e8a4ff9d5f6ce678a15",
block_number: 1,
token_contract_address_hash: "0xe18035bf8712672935fdb4e5e431b1a0183d2dfc",
token_id: 1,
token_type: "ERC-721"
}
])
end
end
end

@ -412,6 +412,10 @@ config :explorer, Explorer.Chain.Fetcher.LookUpSmartContractSourcesOnDemand,
config :explorer, Explorer.Chain.Cache.MinMissingBlockNumber,
enabled: !ConfigHelper.parse_bool_env_var("DISABLE_INDEXER")
config :explorer, Explorer.TokenInstanceOwnerAddressMigration,
concurrency: ConfigHelper.parse_integer_env_var("TOKEN_INSTANCE_OWNER_MIGRATION_CONCURRENCY", 5),
batch_size: ConfigHelper.parse_integer_env_var("TOKEN_INSTANCE_OWNER_MIGRATION_BATCH_SIZE", 50)
###############
### Indexer ###
###############
@ -502,6 +506,9 @@ config :indexer, Indexer.Fetcher.TokenInstance.Retry.Supervisor,
config :indexer, Indexer.Fetcher.TokenInstance.Sanitize.Supervisor,
disabled?: ConfigHelper.parse_bool_env_var("INDEXER_DISABLE_TOKEN_INSTANCE_SANITIZE_FETCHER")
config :indexer, Indexer.Fetcher.TokenInstance.LegacySanitize.Supervisor,
disabled?: ConfigHelper.parse_bool_env_var("INDEXER_DISABLE_TOKEN_INSTANCE_LEGACY_SANITIZE_FETCHER", "true")
config :indexer, Indexer.Fetcher.EmptyBlocksSanitizer,
batch_size: ConfigHelper.parse_integer_env_var("INDEXER_EMPTY_BLOCKS_SANITIZER_BATCH_SIZE", 100)
@ -532,6 +539,10 @@ config :indexer, Indexer.Fetcher.TokenInstance.Sanitize,
concurrency: ConfigHelper.parse_integer_env_var("INDEXER_TOKEN_INSTANCE_SANITIZE_CONCURRENCY", 10),
batch_size: ConfigHelper.parse_integer_env_var("INDEXER_TOKEN_INSTANCE_SANITIZE_BATCH_SIZE", 10)
config :indexer, Indexer.Fetcher.TokenInstance.LegacySanitize,
concurrency: ConfigHelper.parse_integer_env_var("INDEXER_TOKEN_INSTANCE_LEGACY_SANITIZE_CONCURRENCY", 10),
batch_size: ConfigHelper.parse_integer_env_var("INDEXER_TOKEN_INSTANCE_LEGACY_SANITIZE_BATCH_SIZE", 10)
config :indexer, Indexer.Fetcher.InternalTransaction,
batch_size: ConfigHelper.parse_integer_env_var("INDEXER_INTERNAL_TRANSACTIONS_BATCH_SIZE", 10),
concurrency: ConfigHelper.parse_integer_env_var("INDEXER_INTERNAL_TRANSACTIONS_CONCURRENCY", 4),

@ -100,6 +100,7 @@ DISABLE_REALTIME_INDEXER=false
INDEXER_DISABLE_TOKEN_INSTANCE_REALTIME_FETCHER=false
INDEXER_DISABLE_TOKEN_INSTANCE_RETRY_FETCHER=false
INDEXER_DISABLE_TOKEN_INSTANCE_SANITIZE_FETCHER=false
INDEXER_DISABLE_TOKEN_INSTANCE_LEGACY_SANITIZE_FETCHER=false
INDEXER_DISABLE_PENDING_TRANSACTIONS_FETCHER=false
INDEXER_DISABLE_INTERNAL_TRANSACTIONS_FETCHER=false
# INDEXER_CATCHUP_BLOCKS_BATCH_SIZE=
@ -113,6 +114,7 @@ INDEXER_DISABLE_INTERNAL_TRANSACTIONS_FETCHER=false
# INDEXER_TOKEN_INSTANCE_RETRY_CONCURRENCY=
# INDEXER_TOKEN_INSTANCE_REALTIME_CONCURRENCY=
# INDEXER_TOKEN_INSTANCE_SANITIZE_CONCURRENCY=
# INDEXER_TOKEN_INSTANCE_LEGACY_SANITIZE_CONCURRENCY=10
# INDEXER_COIN_BALANCES_BATCH_SIZE=
# INDEXER_COIN_BALANCES_CONCURRENCY=
# INDEXER_RECEIPTS_BATCH_SIZE=
@ -231,3 +233,6 @@ EIP_1559_ELASTICITY_MULTIPLIER=2
# INDEXER_TOKEN_INSTANCE_REALTIME_BATCH_SIZE=1
# INDEXER_TOKEN_INSTANCE_SANITIZE_BATCH_SIZE=10
API_V2_ENABLED=true
# INDEXER_TOKEN_INSTANCE_LEGACY_SANITIZE_BATCH_SIZE=10
# TOKEN_INSTANCE_OWNER_MIGRATION_CONCURRENCY=5
# TOKEN_INSTANCE_OWNER_MIGRATION_BATCH_SIZE=50

Loading…
Cancel
Save