diff --git a/CHANGELOG.md b/CHANGELOG.md index 120b6f856d..0696b53b23 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -4,6 +4,7 @@ ### Features +- [#8386](https://github.com/blockscout/blockscout/pull/8386) - Add `owner_address_hash` to the `token_instances` - [#8530](https://github.com/blockscout/blockscout/pull/8530) - Add `block_type` to search results - [#8180](https://github.com/blockscout/blockscout/pull/8180) - Deposits and Withdrawals for Polygon Edge - [#7996](https://github.com/blockscout/blockscout/pull/7996) - Add CoinBalance fetcher init query limit diff --git a/apps/explorer/config/config.exs b/apps/explorer/config/config.exs index 8bfc263b53..feb7ca93e8 100644 --- a/apps/explorer/config/config.exs +++ b/apps/explorer/config/config.exs @@ -110,6 +110,8 @@ config :explorer, Explorer.Counters.BlockPriorityFeeCounter, config :explorer, Explorer.TokenTransferTokenIdMigration.Supervisor, enabled: true +config :explorer, Explorer.TokenInstanceOwnerAddressMigration.Supervisor, enabled: true + config :explorer, Explorer.Chain.Fetcher.CheckBytecodeMatchingOnDemand, enabled: true config :explorer, Explorer.Chain.Fetcher.FetchValidatorInfoOnDemand, enabled: true diff --git a/apps/explorer/config/runtime/test.exs b/apps/explorer/config/runtime/test.exs index 0afa030023..bef121b2c1 100644 --- a/apps/explorer/config/runtime/test.exs +++ b/apps/explorer/config/runtime/test.exs @@ -30,6 +30,8 @@ config :explorer, Explorer.Tracer, disabled?: false config :explorer, Explorer.TokenTransferTokenIdMigration.Supervisor, enabled: false +config :explorer, Explorer.TokenInstanceOwnerAddressMigration.Supervisor, enabled: false + config :explorer, realtime_events_sender: Explorer.Chain.Events.SimpleSender diff --git a/apps/explorer/lib/explorer/application.ex b/apps/explorer/lib/explorer/application.ex index beb6f40824..23c5f2548f 100644 --- a/apps/explorer/lib/explorer/application.ex +++ b/apps/explorer/lib/explorer/application.ex @@ -119,6 +119,7 @@ defmodule Explorer.Application do configure(TokenTransferTokenIdMigration.Supervisor), configure(Explorer.Chain.Fetcher.CheckBytecodeMatchingOnDemand), configure(Explorer.Chain.Fetcher.FetchValidatorInfoOnDemand), + configure(Explorer.TokenInstanceOwnerAddressMigration.Supervisor), sc_microservice_configure(Explorer.Chain.Fetcher.LookUpSmartContractSourcesOnDemand) ] |> List.flatten() diff --git a/apps/explorer/lib/explorer/chain.ex b/apps/explorer/lib/explorer/chain.ex index 6aa532de03..e7e493c803 100644 --- a/apps/explorer/lib/explorer/chain.ex +++ b/apps/explorer/lib/explorer/chain.ex @@ -567,7 +567,7 @@ defmodule Explorer.Chain do select: log, inner_join: block in Block, on: block.hash == log.block_hash, - where: block.consensus + where: block.consensus == true ) preloaded_query = @@ -4391,12 +4391,15 @@ defmodule Explorer.Chain do |> Repo.stream_reduce(initial, reducer) end - @spec stream_unfetched_token_instances( + @doc """ + Finds all token instances (pairs of contract_address_hash and token_id) which was met in token transfers but has no corresponding entry in token_instances table + """ + @spec stream_not_inserted_token_instances( initial :: accumulator, reducer :: (entry :: map(), accumulator -> accumulator) ) :: {:ok, accumulator} when accumulator: term() - def stream_unfetched_token_instances(initial, reducer) when is_function(reducer, 2) do + def stream_not_inserted_token_instances(initial, reducer) when is_function(reducer, 2) do nft_tokens = from( token in Token, @@ -4438,6 +4441,24 @@ defmodule Explorer.Chain do Repo.stream_reduce(distinct_query, initial, reducer) end + @doc """ + Finds all token instances where metadata never tried to fetch + """ + @spec stream_token_instances_with_unfetched_metadata( + initial :: accumulator, + reducer :: (entry :: map(), accumulator -> accumulator) + ) :: {:ok, accumulator} + when accumulator: term() + def stream_token_instances_with_unfetched_metadata(initial, reducer) when is_function(reducer, 2) do + Instance + |> where([instance], is_nil(instance.error) and is_nil(instance.metadata)) + |> select([instance], %{ + contract_address_hash: instance.token_contract_address_hash, + token_id: instance.token_id + }) + |> Repo.stream_reduce(initial, reducer) + end + @spec stream_token_instances_with_error( initial :: accumulator, reducer :: (entry :: map(), accumulator -> accumulator), @@ -4696,18 +4717,37 @@ defmodule Explorer.Chain do end @doc """ - Expects map of change params. Inserts using on_conflict: :replace_all + Expects map of change params. Inserts using on_conflict: `token_instance_metadata_on_conflict/0` + !!! Supposed to be used ONLY for import of `metadata` or `error`. """ @spec upsert_token_instance(map()) :: {:ok, Instance.t()} | {:error, Ecto.Changeset.t()} def upsert_token_instance(params) do changeset = Instance.changeset(%Instance{}, params) Repo.insert(changeset, - on_conflict: :replace_all, + on_conflict: token_instance_metadata_on_conflict(), conflict_target: [:token_id, :token_contract_address_hash] ) end + defp token_instance_metadata_on_conflict do + from( + token_instance in Instance, + update: [ + set: [ + metadata: fragment("EXCLUDED.metadata"), + error: fragment("EXCLUDED.error"), + owner_updated_at_block: token_instance.owner_updated_at_block, + owner_updated_at_log_index: token_instance.owner_updated_at_log_index, + owner_address_hash: token_instance.owner_address_hash, + inserted_at: fragment("LEAST(?, EXCLUDED.inserted_at)", token_instance.inserted_at), + updated_at: fragment("GREATEST(?, EXCLUDED.updated_at)", token_instance.updated_at) + ] + ], + where: is_nil(token_instance.metadata) + ) + end + @doc """ Inserts list of token instances via upsert_token_instance/1. """ @@ -4809,6 +4849,17 @@ defmodule Explorer.Chain do select_repo(options).exists?(query) end + @spec token_instance_with_unfetched_metadata?(non_neg_integer, Hash.Address.t(), [api?]) :: boolean + def token_instance_with_unfetched_metadata?(token_id, token_contract_address, options \\ []) do + Instance + |> where([instance], is_nil(instance.error) and is_nil(instance.metadata)) + |> where( + [instance], + instance.token_id == ^token_id and instance.token_contract_address_hash == ^token_contract_address + ) + |> select_repo(options).exists?() + end + defp fetch_coin_balances(address, paging_options) do address.hash |> CoinBalance.fetch_coin_balances(paging_options) diff --git a/apps/explorer/lib/explorer/chain/import/runner/blocks.ex b/apps/explorer/lib/explorer/chain/import/runner/blocks.ex index ea53311901..57fc876023 100644 --- a/apps/explorer/lib/explorer/chain/import/runner/blocks.ex +++ b/apps/explorer/lib/explorer/chain/import/runner/blocks.ex @@ -8,11 +8,22 @@ defmodule Explorer.Chain.Import.Runner.Blocks do import Ecto.Query, only: [from: 2, where: 3, subquery: 1] alias Ecto.{Changeset, Multi, Repo} - alias Explorer.Chain.{Address, Block, Import, PendingBlockOperation, Transaction} + + alias Explorer.Chain.{ + Address, + Block, + Import, + PendingBlockOperation, + Token, + Token.Instance, + TokenTransfer, + Transaction + } + alias Explorer.Chain.Block.Reward alias Explorer.Chain.Import.Runner alias Explorer.Chain.Import.Runner.Address.CurrentTokenBalances - alias Explorer.Chain.Import.Runner.Tokens + alias Explorer.Chain.Import.Runner.{TokenInstances, Tokens} alias Explorer.Prometheus.Instrumenter alias Explorer.Repo, as: ExplorerRepo alias Explorer.Utility.MissingRangesManipulator @@ -177,6 +188,14 @@ defmodule Explorer.Chain.Import.Runner.Blocks do :derive_address_current_token_balances ) end) + |> Multi.run(:update_token_instances_owner, fn repo, %{derive_transaction_forks: transactions} -> + Instrumenter.block_import_stage_runner( + fn -> update_token_instances_owner(repo, transactions, insert_options) end, + :address_referencing, + :blocks, + :update_token_instances_owner + ) + end) |> Multi.run(:blocks_update_token_holder_counts, fn repo, %{ delete_address_current_token_balances: deleted, @@ -577,6 +596,155 @@ defmodule Explorer.Chain.Import.Runner.Blocks do {:ok, derived_address_current_token_balances} end + defp update_token_instances_owner(_, [], _), do: {:ok, []} + + defp update_token_instances_owner(repo, forked_transaction_hashes, options) do + forked_transaction_hashes + |> forked_token_transfers_query() + |> repo.all() + |> process_forked_token_transfers(repo, options) + end + + defp process_forked_token_transfers([], _, _), do: {:ok, []} + + defp process_forked_token_transfers(token_transfers, repo, options) do + changes_initial = + Enum.reduce(token_transfers, %{}, fn tt, acc -> + Map.put_new(acc, {tt.token_contract_address_hash, tt.token_id}, %{ + token_contract_address_hash: tt.token_contract_address_hash, + token_id: tt.token_id, + owner_address_hash: tt.from, + owner_updated_at_block: -1, + owner_updated_at_log_index: -1 + }) + end) + + non_consensus_block_numbers = token_transfers |> Enum.map(fn tt -> tt.block_number end) |> Enum.uniq() + + filtered_query = TokenTransfer.only_consensus_transfers_query() + + base_query = + from(token_transfer in subquery(filtered_query), + select: %{ + token_contract_address_hash: token_transfer.token_contract_address_hash, + token_id: fragment("(?)[1]", token_transfer.token_ids), + block_number: max(token_transfer.block_number) + }, + group_by: [token_transfer.token_contract_address_hash, fragment("(?)[1]", token_transfer.token_ids)] + ) + + historical_token_transfers_query = + Enum.reduce(token_transfers, base_query, fn tt, acc -> + from(token_transfer in acc, + or_where: + token_transfer.token_contract_address_hash == ^tt.token_contract_address_hash and + fragment("? @> ARRAY[?::decimal]", token_transfer.token_ids, ^tt.token_id) and + token_transfer.block_number < ^tt.block_number and + token_transfer.block_number not in ^non_consensus_block_numbers + ) + end) + + refs_to_token_transfers = + from(historical_tt in subquery(historical_token_transfers_query), + inner_join: tt in subquery(filtered_query), + on: + tt.token_contract_address_hash == historical_tt.token_contract_address_hash and + tt.block_number == historical_tt.block_number and + fragment("? @> ARRAY[?::decimal]", tt.token_ids, historical_tt.token_id), + select: %{ + token_contract_address_hash: tt.token_contract_address_hash, + token_id: historical_tt.token_id, + log_index: max(tt.log_index), + block_number: tt.block_number + }, + group_by: [tt.token_contract_address_hash, historical_tt.token_id, tt.block_number] + ) + + derived_token_transfers_query = + from(tt in filtered_query, + inner_join: tt_1 in subquery(refs_to_token_transfers), + on: tt_1.log_index == tt.log_index and tt_1.block_number == tt.block_number + ) + + changes = + derived_token_transfers_query + |> repo.all() + |> Enum.reduce(changes_initial, fn tt, acc -> + token_id = List.first(tt.token_ids) + current_key = {tt.token_contract_address_hash, token_id} + + params = %{ + token_contract_address_hash: tt.token_contract_address_hash, + token_id: token_id, + owner_address_hash: tt.to_address_hash, + owner_updated_at_block: tt.block_number, + owner_updated_at_log_index: tt.log_index + } + + Map.put( + acc, + current_key, + Enum.max_by([acc[current_key], params], fn %{ + owner_updated_at_block: block_number, + owner_updated_at_log_index: log_index + } -> + {block_number, log_index} + end) + ) + end) + |> Map.values() + + TokenInstances.insert( + repo, + changes, + options + |> Map.put(:timestamps, Import.timestamps()) + |> Map.put(:on_conflict, token_instances_on_conflict()) + ) + end + + defp forked_token_transfers_query(forked_transaction_hashes) do + from(token_transfer in TokenTransfer, + where: token_transfer.transaction_hash in ^forked_transaction_hashes, + inner_join: token in Token, + on: token.contract_address_hash == token_transfer.token_contract_address_hash, + where: token.type == "ERC-721", + inner_join: instance in Instance, + on: + fragment("? @> ARRAY[?::decimal]", token_transfer.token_ids, instance.token_id) and + instance.token_contract_address_hash == token_transfer.token_contract_address_hash, + # per one token instance we will have only one token transfer + where: + token_transfer.block_number == instance.owner_updated_at_block and + token_transfer.log_index == instance.owner_updated_at_log_index, + select: %{ + from: token_transfer.from_address_hash, + to: token_transfer.to_address_hash, + token_id: instance.token_id, + token_contract_address_hash: token_transfer.token_contract_address_hash, + block_number: token_transfer.block_number, + log_index: token_transfer.log_index + } + ) + end + + defp token_instances_on_conflict do + from( + token_instance in Instance, + update: [ + set: [ + metadata: token_instance.metadata, + error: token_instance.error, + owner_updated_at_block: fragment("EXCLUDED.owner_updated_at_block"), + owner_updated_at_log_index: fragment("EXCLUDED.owner_updated_at_log_index"), + owner_address_hash: fragment("EXCLUDED.owner_address_hash"), + inserted_at: fragment("LEAST(?, EXCLUDED.inserted_at)", token_instance.inserted_at), + updated_at: fragment("GREATEST(?, EXCLUDED.updated_at)", token_instance.updated_at) + ] + ] + ) + end + defp derive_address_current_token_balances_grouped_query(deleted_address_current_token_balances) do initial_query = from(tb in Address.TokenBalance, diff --git a/apps/explorer/lib/explorer/chain/import/runner/token_instances.ex b/apps/explorer/lib/explorer/chain/import/runner/token_instances.ex new file mode 100644 index 0000000000..0fb38962a7 --- /dev/null +++ b/apps/explorer/lib/explorer/chain/import/runner/token_instances.ex @@ -0,0 +1,106 @@ +defmodule Explorer.Chain.Import.Runner.TokenInstances do + @moduledoc """ + Bulk imports `t:Explorer.Chain.TokenInstances.t/0`. + """ + + require Ecto.Query + + alias Ecto.{Changeset, Multi, Repo} + alias Explorer.Chain.Import + alias Explorer.Chain.Token.Instance, as: TokenInstance + alias Explorer.Prometheus.Instrumenter + + import Ecto.Query, only: [from: 2] + + @behaviour Import.Runner + + # milliseconds + @timeout 60_000 + + @type imported :: [TokenInstance.t()] + + @impl Import.Runner + def ecto_schema_module, do: TokenInstance + + @impl Import.Runner + def option_key, do: :token_instances + + @impl Import.Runner + def imported_table_row do + %{ + value_type: "[#{ecto_schema_module()}.t()]", + value_description: "List of `t:#{ecto_schema_module()}.t/0`s" + } + end + + @impl Import.Runner + def run(multi, changes_list, %{timestamps: timestamps} = options) do + insert_options = + options + |> Map.get(option_key(), %{}) + |> Map.take(~w(on_conflict timeout)a) + |> Map.put_new(:timeout, @timeout) + |> Map.put(:timestamps, timestamps) + + Multi.run(multi, :token_instances, fn repo, _ -> + Instrumenter.block_import_stage_runner( + fn -> insert(repo, changes_list, insert_options) end, + :block_referencing, + :token_instances, + :token_instances + ) + end) + end + + @impl Import.Runner + def timeout, do: @timeout + + @spec insert(Repo.t(), [map()], %{ + optional(:on_conflict) => Import.Runner.on_conflict(), + required(:timeout) => timeout, + required(:timestamps) => Import.timestamps() + }) :: + {:ok, [TokenInstance.t()]} + | {:error, [Changeset.t()]} + def insert(repo, changes_list, %{timeout: timeout, timestamps: timestamps} = options) when is_list(changes_list) do + on_conflict = Map.get_lazy(options, :on_conflict, &default_on_conflict/0) + + # Guarantee the same import order to avoid deadlocks + ordered_changes_list = Enum.sort_by(changes_list, &{&1.token_contract_address_hash, &1.token_id}) + + {:ok, _} = + Import.insert_changes_list( + repo, + ordered_changes_list, + conflict_target: [:token_contract_address_hash, :token_id], + on_conflict: on_conflict, + for: TokenInstance, + returning: true, + timeout: timeout, + timestamps: timestamps + ) + end + + defp default_on_conflict do + from( + token_instance in TokenInstance, + update: [ + set: [ + metadata: token_instance.metadata, + error: token_instance.error, + owner_updated_at_block: fragment("EXCLUDED.owner_updated_at_block"), + owner_updated_at_log_index: fragment("EXCLUDED.owner_updated_at_log_index"), + owner_address_hash: fragment("EXCLUDED.owner_address_hash"), + inserted_at: fragment("LEAST(?, EXCLUDED.inserted_at)", token_instance.inserted_at), + updated_at: fragment("GREATEST(?, EXCLUDED.updated_at)", token_instance.updated_at) + ] + ], + where: + fragment("EXCLUDED.owner_address_hash IS NOT NULL") and fragment("EXCLUDED.owner_updated_at_block IS NOT NULL") and + (fragment("EXCLUDED.owner_updated_at_block > ?", token_instance.owner_updated_at_block) or + (fragment("EXCLUDED.owner_updated_at_block = ?", token_instance.owner_updated_at_block) and + fragment("EXCLUDED.owner_updated_at_log_index >= ?", token_instance.owner_updated_at_log_index)) or + is_nil(token_instance.owner_updated_at_block) or is_nil(token_instance.owner_address_hash)) + ) + end +end diff --git a/apps/explorer/lib/explorer/chain/import/stage/block_following.ex b/apps/explorer/lib/explorer/chain/import/stage/block_following.ex index 65a7c07603..8abbf9f79b 100644 --- a/apps/explorer/lib/explorer/chain/import/stage/block_following.ex +++ b/apps/explorer/lib/explorer/chain/import/stage/block_following.ex @@ -15,7 +15,8 @@ defmodule Explorer.Chain.Import.Stage.BlockFollowing do do: [ Runner.Block.SecondDegreeRelations, Runner.Block.Rewards, - Runner.Address.CurrentTokenBalances + Runner.Address.CurrentTokenBalances, + Runner.TokenInstances ] @impl Stage diff --git a/apps/explorer/lib/explorer/chain/token/instance.ex b/apps/explorer/lib/explorer/chain/token/instance.ex index ea91796e32..c1b5bcd5f4 100644 --- a/apps/explorer/lib/explorer/chain/token/instance.ex +++ b/apps/explorer/lib/explorer/chain/token/instance.ex @@ -5,7 +5,7 @@ defmodule Explorer.Chain.Token.Instance do use Explorer.Schema - alias Explorer.Chain.{Address, Hash, Token, TokenTransfer} + alias Explorer.Chain.{Address, Block, Hash, Token, TokenTransfer} alias Explorer.Chain.Token.Instance alias Explorer.PagingOptions @@ -20,7 +20,10 @@ defmodule Explorer.Chain.Token.Instance do token_id: non_neg_integer(), token_contract_address_hash: Hash.Address.t(), metadata: map() | nil, - error: String.t() + error: String.t(), + owner_address_hash: Hash.Address.t(), + owner_updated_at_block: Block.block_number(), + owner_updated_at_log_index: non_neg_integer() } @primary_key false @@ -28,8 +31,10 @@ defmodule Explorer.Chain.Token.Instance do field(:token_id, :decimal, primary_key: true) field(:metadata, :map) field(:error, :string) + field(:owner_updated_at_block, :integer) + field(:owner_updated_at_log_index, :integer) - belongs_to(:owner, Address, references: :hash, define_field: false) + belongs_to(:owner, Address, foreign_key: :owner_address_hash, references: :hash, type: Hash.Address) belongs_to( :token, @@ -45,7 +50,15 @@ defmodule Explorer.Chain.Token.Instance do def changeset(%Instance{} = instance, params \\ %{}) do instance - |> cast(params, [:token_id, :metadata, :token_contract_address_hash, :error]) + |> cast(params, [ + :token_id, + :metadata, + :token_contract_address_hash, + :error, + :owner_address_hash, + :owner_updated_at_block, + :owner_updated_at_log_index + ]) |> validate_required([:token_id, :token_contract_address_hash]) |> foreign_key_constraint(:token_contract_address_hash) end diff --git a/apps/explorer/lib/explorer/chain/token_transfer.ex b/apps/explorer/lib/explorer/chain/token_transfer.ex index ff17c0f768..1c5a15123f 100644 --- a/apps/explorer/lib/explorer/chain/token_transfer.ex +++ b/apps/explorer/lib/explorer/chain/token_transfer.ex @@ -47,7 +47,7 @@ defmodule Explorer.Chain.TokenTransfer do * `:token_id` - ID of the token (applicable to ERC-721 tokens) * `:transaction` - The `t:Explorer.Chain.Transaction.t/0` ledger * `:transaction_hash` - Transaction foreign key - * `:log_index` - Index of the corresponding `t:Explorer.Chain.Log.t/0` in the transaction. + * `:log_index` - Index of the corresponding `t:Explorer.Chain.Log.t/0` in the block. * `:amounts` - Tokens transferred amounts in case of batched transfer in ERC-1155 * `:token_ids` - IDs of the tokens (applicable to ERC-1155 tokens) """ @@ -360,4 +360,12 @@ defmodule Explorer.Chain.TokenTransfer do end def filter_by_type(query, _), do: query + + def only_consensus_transfers_query do + from(token_transfer in __MODULE__, + inner_join: block in Block, + on: token_transfer.block_hash == block.hash, + where: block.consensus == true + ) + end end diff --git a/apps/explorer/lib/explorer/token_instance_owner_address_migration/helper.ex b/apps/explorer/lib/explorer/token_instance_owner_address_migration/helper.ex new file mode 100644 index 0000000000..7b0a092245 --- /dev/null +++ b/apps/explorer/lib/explorer/token_instance_owner_address_migration/helper.ex @@ -0,0 +1,77 @@ +defmodule Explorer.TokenInstanceOwnerAddressMigration.Helper do + @moduledoc """ + Auxiliary functions for TokenInstanceOwnerAddressMigration.{Worker and Supervisor} + """ + import Ecto.Query, + only: [ + from: 2 + ] + + alias Explorer.{Chain, Repo} + alias Explorer.Chain.Token.Instance + alias Explorer.Chain.{SmartContract, TokenTransfer} + + {:ok, burn_address_hash} = Chain.string_to_address_hash(SmartContract.burn_address_hash_string()) + @burn_address_hash burn_address_hash + + @spec filtered_token_instances_query(non_neg_integer()) :: Ecto.Query.t() + def filtered_token_instances_query(limit) do + from(instance in Instance, + where: is_nil(instance.owner_address_hash), + inner_join: token in assoc(instance, :token), + where: token.type == "ERC-721", + limit: ^limit, + select: %{token_id: instance.token_id, token_contract_address_hash: instance.token_contract_address_hash} + ) + end + + @spec fetch_and_insert([map]) :: + {:error, :timeout | [map]} + | {:ok, + %{ + :token_instances => [Instance.t()] + }} + | {:error, any, any, map} + def fetch_and_insert(batch) do + changes = + Enum.map(batch, fn %{token_id: token_id, token_contract_address_hash: token_contract_address_hash} -> + token_transfer_query = + from(tt in TokenTransfer.only_consensus_transfers_query(), + where: + tt.token_contract_address_hash == ^token_contract_address_hash and + fragment("? @> ARRAY[?::decimal]", tt.token_ids, ^token_id), + order_by: [desc: tt.block_number, desc: tt.log_index], + limit: 1, + select: %{ + token_contract_address_hash: tt.token_contract_address_hash, + token_ids: tt.token_ids, + to_address_hash: tt.to_address_hash, + block_number: tt.block_number, + log_index: tt.log_index + } + ) + + token_transfer = + Repo.one(token_transfer_query) || + %{to_address_hash: @burn_address_hash, block_number: -1, log_index: -1} + + %{ + token_contract_address_hash: token_contract_address_hash, + token_id: token_id, + token_type: "ERC-721", + owner_address_hash: token_transfer.to_address_hash, + owner_updated_at_block: token_transfer.block_number, + owner_updated_at_log_index: token_transfer.log_index + } + end) + + Chain.import(%{token_instances: %{params: changes}}) + end + + @spec unfilled_token_instances_exists? :: boolean + def unfilled_token_instances_exists? do + 1 + |> filtered_token_instances_query() + |> Repo.exists?() + end +end diff --git a/apps/explorer/lib/explorer/token_instance_owner_address_migration/supervisor.ex b/apps/explorer/lib/explorer/token_instance_owner_address_migration/supervisor.ex new file mode 100644 index 0000000000..5bb8532fdf --- /dev/null +++ b/apps/explorer/lib/explorer/token_instance_owner_address_migration/supervisor.ex @@ -0,0 +1,26 @@ +defmodule Explorer.TokenInstanceOwnerAddressMigration.Supervisor do + @moduledoc """ + Supervisor for Explorer.TokenInstanceOwnerAddressMigration.Worker + """ + + use Supervisor + + alias Explorer.TokenInstanceOwnerAddressMigration.{Helper, Worker} + + def start_link(init_arg) do + Supervisor.start_link(__MODULE__, init_arg, name: __MODULE__) + end + + @impl true + def init(_init_arg) do + if Helper.unfilled_token_instances_exists?() do + children = [ + {Worker, Application.get_env(:explorer, Explorer.TokenInstanceOwnerAddressMigration)} + ] + + Supervisor.init(children, strategy: :one_for_one) + else + :ignore + end + end +end diff --git a/apps/explorer/lib/explorer/token_instance_owner_address_migration/worker.ex b/apps/explorer/lib/explorer/token_instance_owner_address_migration/worker.ex new file mode 100644 index 0000000000..c0ad7c4879 --- /dev/null +++ b/apps/explorer/lib/explorer/token_instance_owner_address_migration/worker.ex @@ -0,0 +1,51 @@ +defmodule Explorer.TokenInstanceOwnerAddressMigration.Worker do + @moduledoc """ + GenServer for filling owner_address_hash, owner_updated_at_block and owner_updated_at_log_index + for ERC-721 token instances. Works in the following way + 1. Checks if there are some unprocessed nfts. + - if yes, then go to 2 stage + - if no, then shutdown + 2. Fetch `(concurrency * batch_size)` token instances, process them in `concurrency` tasks. + 3. Go to step 1 + """ + + use GenServer, restart: :transient + + alias Explorer.Repo + alias Explorer.TokenInstanceOwnerAddressMigration.Helper + + def start_link(concurrency: concurrency, batch_size: batch_size) do + GenServer.start_link(__MODULE__, %{concurrency: concurrency, batch_size: batch_size}, name: __MODULE__) + end + + @impl true + def init(opts) do + GenServer.cast(__MODULE__, :check_necessity) + + {:ok, opts} + end + + @impl true + def handle_cast(:check_necessity, state) do + if Helper.unfilled_token_instances_exists?() do + GenServer.cast(__MODULE__, :backfill) + {:noreply, state} + else + {:stop, :normal, state} + end + end + + @impl true + def handle_cast(:backfill, %{concurrency: concurrency, batch_size: batch_size} = state) do + (concurrency * batch_size) + |> Helper.filtered_token_instances_query() + |> Repo.all() + |> Enum.chunk_every(batch_size) + |> Enum.map(fn batch -> Task.async(fn -> Helper.fetch_and_insert(batch) end) end) + |> Task.await_many(:infinity) + + GenServer.cast(__MODULE__, :check_necessity) + + {:noreply, state} + end +end diff --git a/apps/explorer/priv/repo/migrations/20230818094455_add_token_ids_to_address_token_balances.exs b/apps/explorer/priv/repo/migrations/20230818094455_add_token_ids_to_address_token_balances.exs new file mode 100644 index 0000000000..a141276ab5 --- /dev/null +++ b/apps/explorer/priv/repo/migrations/20230818094455_add_token_ids_to_address_token_balances.exs @@ -0,0 +1,13 @@ +defmodule Explorer.Repo.Migrations.AddTokenIdsToAddressTokenBalances do + use Ecto.Migration + + def change do + alter table(:token_instances) do + add(:owner_address_hash, :bytea, null: true) + add(:owner_updated_at_block, :bigint, null: true) + add(:owner_updated_at_log_index, :integer, null: true) + end + + create(index(:token_instances, [:owner_address_hash])) + end +end diff --git a/apps/explorer/test/explorer/chain/import/runner/blocks_test.exs b/apps/explorer/test/explorer/chain/import/runner/blocks_test.exs index ab2d5bc464..7e79c20916 100644 --- a/apps/explorer/test/explorer/chain/import/runner/blocks_test.exs +++ b/apps/explorer/test/explorer/chain/import/runner/blocks_test.exs @@ -368,6 +368,166 @@ defmodule Explorer.Chain.Import.Runner.BlocksTest do assert %{block_number: ^number, block_hash: ^hash} = Repo.one(PendingBlockOperation) end + + test "change instance owner if was token transfer in older blocks", + %{consensus_block: %{hash: block_hash, miner_hash: miner_hash, number: block_number}, options: options} do + block_number = block_number + 2 + consensus_block = insert(:block, %{hash: block_hash, number: block_number}) + + transaction = + :transaction + |> insert() + |> with_block(consensus_block) + + token_address = insert(:contract_address) + insert(:token, contract_address: token_address, type: "ERC-721") + id = Decimal.new(1) + + tt = + insert(:token_transfer, + token_ids: [id], + transaction: transaction, + token_contract_address: token_address, + block_number: block_number, + block: consensus_block, + log_index: 123 + ) + + %{hash: hash_1} = params_for(:block, consensus: true, miner_hash: miner_hash) + consensus_block_1 = insert(:block, %{hash: hash_1, number: block_number - 1}) + + transaction = + :transaction + |> insert() + |> with_block(consensus_block_1) + + for _ <- 0..10 do + insert(:token_transfer, + token_ids: [id], + transaction: transaction, + token_contract_address: tt.token_contract_address, + block_number: consensus_block_1.number, + block: consensus_block_1 + ) + end + + tt_1 = + insert(:token_transfer, + token_ids: [id], + transaction: transaction, + token_contract_address: tt.token_contract_address, + block_number: consensus_block_1.number, + block: consensus_block_1 + ) + + %{hash: hash_2} = params_for(:block, consensus: true, miner_hash: miner_hash) + consensus_block_2 = insert(:block, %{hash: hash_2, number: block_number - 2}) + + for _ <- 0..10 do + tx = + :transaction + |> insert() + |> with_block(consensus_block_2) + + insert(:token_transfer, + token_ids: [id], + transaction: tx, + token_contract_address: tt.token_contract_address, + block_number: consensus_block_2.number, + block: consensus_block_2 + ) + end + + instance = + insert(:token_instance, + token_contract_address_hash: token_address.hash, + token_id: id, + owner_updated_at_block: tt.block_number, + owner_updated_at_log_index: tt.log_index, + owner_address_hash: insert(:address).hash + ) + + block_params = + params_for(:block, hash: block_hash, miner_hash: miner_hash, number: block_number, consensus: false) + + %Ecto.Changeset{valid?: true, changes: block_changes} = Block.changeset(%Block{}, block_params) + changes_list = [block_changes] + error = instance.error + block_number = tt_1.block_number + log_index = tt_1.log_index + owner_address_hash = tt_1.to_address_hash + token_address_hash = token_address.hash + + assert {:ok, + %{ + update_token_instances_owner: [ + %Explorer.Chain.Token.Instance{ + token_id: ^id, + error: ^error, + owner_updated_at_block: ^block_number, + owner_updated_at_log_index: ^log_index, + owner_address_hash: ^owner_address_hash, + token_contract_address_hash: ^token_address_hash + } + ] + }} = Multi.new() |> Blocks.run(changes_list, options) |> Repo.transaction() + end + + test "change instance owner if there was no more token transfers", + %{consensus_block: %{hash: block_hash, miner_hash: miner_hash, number: block_number}, options: options} do + block_number = block_number + 1 + consensus_block = insert(:block, %{hash: block_hash, number: block_number}) + + transaction = + :transaction + |> insert() + |> with_block(consensus_block) + + token_address = insert(:contract_address) + insert(:token, contract_address: token_address, type: "ERC-721") + id = Decimal.new(1) + + tt = + insert(:token_transfer, + token_ids: [id], + transaction: transaction, + token_contract_address: token_address, + block_number: block_number, + block: consensus_block + ) + + instance = + insert(:token_instance, + token_contract_address_hash: token_address.hash, + token_id: id, + owner_updated_at_block: tt.block_number, + owner_updated_at_log_index: tt.log_index, + owner_address_hash: insert(:address).hash + ) + + block_params = + params_for(:block, hash: block_hash, miner_hash: miner_hash, number: block_number, consensus: false) + + %Ecto.Changeset{valid?: true, changes: block_changes} = Block.changeset(%Block{}, block_params) + changes_list = [block_changes] + error = instance.error + owner_address_hash = tt.from_address_hash + token_address_hash = token_address.hash + + assert {:ok, + %{ + update_token_instances_owner: [ + %Explorer.Chain.Token.Instance{ + token_id: ^id, + error: ^error, + owner_updated_at_block: -1, + owner_updated_at_log_index: -1, + owner_address_hash: ^owner_address_hash, + token_contract_address_hash: ^token_address_hash + } + ] + }} = Multi.new() |> Blocks.run(changes_list, options) |> Repo.transaction() + end end describe "lose_consensus/5" do diff --git a/apps/explorer/test/explorer/chain_test.exs b/apps/explorer/test/explorer/chain_test.exs index 728d7adc29..b7472db712 100644 --- a/apps/explorer/test/explorer/chain_test.exs +++ b/apps/explorer/test/explorer/chain_test.exs @@ -183,30 +183,6 @@ defmodule Explorer.ChainTest do assert result.token_contract_address_hash == token.contract_address_hash end - test "replaces existing token instance record" do - token = insert(:token) - - params = %{ - token_id: 1, - token_contract_address_hash: token.contract_address_hash, - metadata: %{uri: "http://example.com"} - } - - {:ok, _} = Chain.upsert_token_instance(params) - - params1 = %{ - token_id: 1, - token_contract_address_hash: token.contract_address_hash, - metadata: %{uri: "http://example1.com"} - } - - {:ok, result} = Chain.upsert_token_instance(params1) - - assert result.token_id == Decimal.new(1) - assert result.metadata == params1.metadata - assert result.token_contract_address_hash == token.contract_address_hash - end - test "fails to import with invalid params" do params = %{ token_id: 1, @@ -243,7 +219,8 @@ defmodule Explorer.ChainTest do insert(:token_instance, token_id: 1, token_contract_address_hash: token.contract_address_hash, - error: "no uri" + error: "no uri", + metadata: nil ) params = %{ @@ -4903,7 +4880,7 @@ defmodule Explorer.ChainTest do end end - describe "stream_unfetched_token_instances/2" do + describe "stream_not_inserted_token_instances/2" do test "reduces with given reducer and accumulator for ERC-721 token" do token_contract_address = insert(:contract_address) token = insert(:token, contract_address: token_contract_address, type: "ERC-721") @@ -4924,7 +4901,7 @@ defmodule Explorer.ChainTest do token_ids: [11] ) - assert {:ok, [result]} = Chain.stream_unfetched_token_instances([], &[&1 | &2]) + assert {:ok, [result]} = Chain.stream_not_inserted_token_instances([], &[&1 | &2]) assert result.token_id == List.first(token_transfer.token_ids) assert result.contract_address_hash == token_transfer.token_contract_address_hash end @@ -4948,7 +4925,7 @@ defmodule Explorer.ChainTest do token_ids: nil ) - assert {:ok, []} = Chain.stream_unfetched_token_instances([], &[&1 | &2]) + assert {:ok, []} = Chain.stream_not_inserted_token_instances([], &[&1 | &2]) end test "do not fetch records with token instances" do @@ -4976,7 +4953,7 @@ defmodule Explorer.ChainTest do token_contract_address_hash: token_transfer.token_contract_address_hash ) - assert {:ok, []} = Chain.stream_unfetched_token_instances([], &[&1 | &2]) + assert {:ok, []} = Chain.stream_not_inserted_token_instances([], &[&1 | &2]) end end diff --git a/apps/explorer/test/explorer/token_instance_owner_address_migration/helper_test.exs b/apps/explorer/test/explorer/token_instance_owner_address_migration/helper_test.exs new file mode 100644 index 0000000000..355d161259 --- /dev/null +++ b/apps/explorer/test/explorer/token_instance_owner_address_migration/helper_test.exs @@ -0,0 +1,121 @@ +defmodule Explorer.TokenInstanceOwnerAddressMigration.HelperTest do + use Explorer.DataCase + + alias Explorer.{Chain, Repo} + alias Explorer.Chain.Token.Instance + alias Explorer.TokenInstanceOwnerAddressMigration.Helper + + {:ok, burn_address_hash} = Chain.string_to_address_hash("0x0000000000000000000000000000000000000000") + @burn_address_hash burn_address_hash + + describe "fetch_and_insert/2" do + test "successfully update owner of single token instance" do + token_address = insert(:contract_address) + insert(:token, contract_address: token_address, type: "ERC-721") + + instance = insert(:token_instance, token_contract_address_hash: token_address.hash) + + transaction = + :transaction + |> insert() + |> with_block() + + tt_1 = + insert(:token_transfer, + token_ids: [instance.token_id], + transaction: transaction, + token_contract_address: token_address + ) + + Helper.fetch_and_insert([ + %{token_id: instance.token_id, token_contract_address_hash: instance.token_contract_address_hash} + ]) + + owner_address = tt_1.to_address_hash + block_number = tt_1.block_number + log_index = tt_1.log_index + + assert %Instance{ + owner_address_hash: ^owner_address, + owner_updated_at_block: ^block_number, + owner_updated_at_log_index: ^log_index + } = + Repo.get_by(Instance, + token_id: instance.token_id, + token_contract_address_hash: instance.token_contract_address_hash + ) + end + + test "put placeholder value if tt absent in db" do + instance = insert(:token_instance) + + Helper.fetch_and_insert([ + %{token_id: instance.token_id, token_contract_address_hash: instance.token_contract_address_hash} + ]) + + assert %Instance{ + owner_address_hash: @burn_address_hash, + owner_updated_at_block: -1, + owner_updated_at_log_index: -1 + } = + Repo.get_by(Instance, + token_id: instance.token_id, + token_contract_address_hash: instance.token_contract_address_hash + ) + end + + test "update owners of token instances batch" do + instances = + for _ <- 0..5 do + token_address = insert(:contract_address) + insert(:token, contract_address: token_address, type: "ERC-721") + + instance = insert(:token_instance, token_contract_address_hash: token_address.hash) + + tt = + for _ <- 0..5 do + transaction = + :transaction + |> insert() + |> with_block() + + for _ <- 0..5 do + insert(:token_transfer, + token_ids: [instance.token_id], + transaction: transaction, + token_contract_address: token_address + ) + end + end + |> Enum.concat() + |> Enum.max_by(fn tt -> {tt.block_number, tt.log_index} end) + + %{ + token_id: instance.token_id, + token_contract_address_hash: instance.token_contract_address_hash, + owner_address_hash: tt.to_address_hash, + owner_updated_at_block: tt.block_number, + owner_updated_at_log_index: tt.log_index + } + end + + Helper.fetch_and_insert(instances) + + for ti <- instances do + owner_address = ti.owner_address_hash + block_number = ti.owner_updated_at_block + log_index = ti.owner_updated_at_log_index + + assert %Instance{ + owner_address_hash: ^owner_address, + owner_updated_at_block: ^block_number, + owner_updated_at_log_index: ^log_index + } = + Repo.get_by(Instance, + token_id: ti.token_id, + token_contract_address_hash: ti.token_contract_address_hash + ) + end + end + end +end diff --git a/apps/explorer/test/support/factory.ex b/apps/explorer/test/support/factory.ex index da2e13f666..b85a0fff6d 100644 --- a/apps/explorer/test/support/factory.ex +++ b/apps/explorer/test/support/factory.ex @@ -863,6 +863,12 @@ defmodule Explorer.Factory do } end + def log_index_factory do + %{ + log_index: sequence("token_id", & &1) + } + end + def token_balance_factory do %TokenBalance{ address: build(:address), diff --git a/apps/indexer/lib/indexer/block/fetcher.ex b/apps/indexer/lib/indexer/block/fetcher.ex index 4b0cf3aaa0..5b5eaf0b89 100644 --- a/apps/indexer/lib/indexer/block/fetcher.ex +++ b/apps/indexer/lib/indexer/block/fetcher.ex @@ -37,6 +37,7 @@ defmodule Indexer.Block.Fetcher do Addresses, AddressTokenBalances, MintTransfers, + TokenInstances, TokenTransfers, TransactionActions } @@ -183,6 +184,7 @@ defmodule Indexer.Block.Fetcher do address_token_balances = AddressTokenBalances.params_set(%{token_transfers_params: token_transfers}), transaction_actions = Enum.map(transaction_actions, fn action -> Map.put(action, :data, Map.delete(action.data, :block_number)) end), + token_instances = TokenInstances.params_set(%{token_transfers_params: token_transfers}), basic_import_options = %{ addresses: %{params: addresses}, address_coin_balances: %{params: coin_balances_params_set}, @@ -198,7 +200,8 @@ defmodule Indexer.Block.Fetcher do token_transfers: %{params: token_transfers}, tokens: %{on_conflict: :nothing, params: tokens}, transactions: %{params: transactions_with_receipts}, - withdrawals: %{params: withdrawals_params} + withdrawals: %{params: withdrawals_params}, + token_instances: %{params: token_instances} }, import_options = (if Application.get_env(:explorer, :chain_type) == "polygon_edge" do diff --git a/apps/indexer/lib/indexer/fetcher/token_instance/legacy_sanitize.ex b/apps/indexer/lib/indexer/fetcher/token_instance/legacy_sanitize.ex new file mode 100644 index 0000000000..1fe11e1d90 --- /dev/null +++ b/apps/indexer/lib/indexer/fetcher/token_instance/legacy_sanitize.ex @@ -0,0 +1,58 @@ +defmodule Indexer.Fetcher.TokenInstance.LegacySanitize do + @moduledoc """ + This fetcher is stands for creating token instances which wasn't inserted yet and index meta for them. Legacy is because now we token instances inserted on block import and this fetcher is only for historical and unfetched for some reasons data + """ + + use Indexer.Fetcher, restart: :permanent + use Spandex.Decorators + + import Indexer.Fetcher.TokenInstance.Helper + + alias Explorer.Chain + alias Indexer.BufferedTask + + @behaviour BufferedTask + + @default_max_batch_size 10 + @default_max_concurrency 10 + @doc false + def child_spec([init_options, gen_server_options]) do + merged_init_opts = + defaults() + |> Keyword.merge(init_options) + |> Keyword.merge(state: []) + + Supervisor.child_spec({BufferedTask, [{__MODULE__, merged_init_opts}, gen_server_options]}, id: __MODULE__) + end + + @impl BufferedTask + def init(initial_acc, reducer, _) do + {:ok, acc} = + Chain.stream_not_inserted_token_instances(initial_acc, fn data, acc -> + reducer.(data, acc) + end) + + acc + end + + @impl BufferedTask + def run(token_instances, _) when is_list(token_instances) do + token_instances + |> Enum.filter(fn %{contract_address_hash: hash, token_id: token_id} -> + not Chain.token_instance_exists?(token_id, hash) + end) + |> batch_fetch_instances() + + :ok + end + + defp defaults do + [ + flush_interval: :infinity, + max_concurrency: Application.get_env(:indexer, __MODULE__)[:concurrency] || @default_max_concurrency, + max_batch_size: Application.get_env(:indexer, __MODULE__)[:batch_size] || @default_max_batch_size, + poll: false, + task_supervisor: __MODULE__.TaskSupervisor + ] + end +end diff --git a/apps/indexer/lib/indexer/fetcher/token_instance/realtime.ex b/apps/indexer/lib/indexer/fetcher/token_instance/realtime.ex index 804c9258c8..14375b1601 100644 --- a/apps/indexer/lib/indexer/fetcher/token_instance/realtime.ex +++ b/apps/indexer/lib/indexer/fetcher/token_instance/realtime.ex @@ -35,7 +35,7 @@ defmodule Indexer.Fetcher.TokenInstance.Realtime do def run(token_instances, _) when is_list(token_instances) do token_instances |> Enum.filter(fn %{contract_address_hash: hash, token_id: token_id} -> - not Chain.token_instance_exists?(token_id, hash) + Chain.token_instance_with_unfetched_metadata?(token_id, hash) end) |> batch_fetch_instances() diff --git a/apps/indexer/lib/indexer/fetcher/token_instance/sanitize.ex b/apps/indexer/lib/indexer/fetcher/token_instance/sanitize.ex index 1bc3a70bd1..f2cc5fa71d 100644 --- a/apps/indexer/lib/indexer/fetcher/token_instance/sanitize.ex +++ b/apps/indexer/lib/indexer/fetcher/token_instance/sanitize.ex @@ -28,7 +28,7 @@ defmodule Indexer.Fetcher.TokenInstance.Sanitize do @impl BufferedTask def init(initial_acc, reducer, _) do {:ok, acc} = - Chain.stream_unfetched_token_instances(initial_acc, fn data, acc -> + Chain.stream_token_instances_with_unfetched_metadata(initial_acc, fn data, acc -> reducer.(data, acc) end) @@ -39,7 +39,7 @@ defmodule Indexer.Fetcher.TokenInstance.Sanitize do def run(token_instances, _) when is_list(token_instances) do token_instances |> Enum.filter(fn %{contract_address_hash: hash, token_id: token_id} -> - not Chain.token_instance_exists?(token_id, hash) + Chain.token_instance_with_unfetched_metadata?(token_id, hash) end) |> batch_fetch_instances() diff --git a/apps/indexer/lib/indexer/supervisor.ex b/apps/indexer/lib/indexer/supervisor.ex index ccbeddca50..732c71727a 100644 --- a/apps/indexer/lib/indexer/supervisor.ex +++ b/apps/indexer/lib/indexer/supervisor.ex @@ -13,6 +13,7 @@ defmodule Indexer.Supervisor do alias Indexer.Block.Catchup, as: BlockCatchup alias Indexer.Block.Realtime, as: BlockRealtime + alias Indexer.Fetcher.TokenInstance.LegacySanitize, as: TokenInstanceLegacySanitize alias Indexer.Fetcher.TokenInstance.Realtime, as: TokenInstanceRealtime alias Indexer.Fetcher.TokenInstance.Retry, as: TokenInstanceRetry alias Indexer.Fetcher.TokenInstance.Sanitize, as: TokenInstanceSanitize @@ -115,6 +116,7 @@ defmodule Indexer.Supervisor do {TokenInstanceRealtime.Supervisor, [[memory_monitor: memory_monitor]]}, {TokenInstanceRetry.Supervisor, [[memory_monitor: memory_monitor]]}, {TokenInstanceSanitize.Supervisor, [[memory_monitor: memory_monitor]]}, + {TokenInstanceLegacySanitize.Supervisor, [[memory_monitor: memory_monitor]]}, configure(TransactionAction.Supervisor, [[memory_monitor: memory_monitor]]), {ContractCode.Supervisor, [[json_rpc_named_arguments: json_rpc_named_arguments, memory_monitor: memory_monitor]]}, diff --git a/apps/indexer/lib/indexer/transform/address_token_balances.ex b/apps/indexer/lib/indexer/transform/address_token_balances.ex index cae8b2cbac..291783f6a2 100644 --- a/apps/indexer/lib/indexer/transform/address_token_balances.ex +++ b/apps/indexer/lib/indexer/transform/address_token_balances.ex @@ -11,7 +11,6 @@ defmodule Indexer.Transform.AddressTokenBalances do defp reducer({:token_transfers_params, token_transfers_params}, initial) when is_list(token_transfers_params) do token_transfers_params - |> ignore_burn_address_transfers_for_token_erc_721() |> Enum.reduce(initial, fn %{ block_number: block_number, from_address_hash: from_address_hash, @@ -31,10 +30,6 @@ defmodule Indexer.Transform.AddressTokenBalances do end) end - defp ignore_burn_address_transfers_for_token_erc_721(token_transfers_params) do - Enum.filter(token_transfers_params, &do_filter_burn_address/1) - end - defp add_token_balance_address(map_set, unquote(burn_address_hash_string()), _, _, _, _), do: map_set defp add_token_balance_address(map_set, address, token_contract_address, token_id, token_type, block_number) do @@ -46,12 +41,4 @@ defmodule Indexer.Transform.AddressTokenBalances do token_type: token_type }) end - - def do_filter_burn_address(%{to_address_hash: unquote(burn_address_hash_string()), token_type: "ERC-721"}) do - false - end - - def do_filter_burn_address(_token_balance_param) do - true - end end diff --git a/apps/indexer/lib/indexer/transform/token_instances.ex b/apps/indexer/lib/indexer/transform/token_instances.ex new file mode 100644 index 0000000000..a9fb4372d2 --- /dev/null +++ b/apps/indexer/lib/indexer/transform/token_instances.ex @@ -0,0 +1,88 @@ +defmodule Indexer.Transform.TokenInstances do + @moduledoc """ + Module extracts token instances from token transfers + """ + + def params_set(%{} = import_options) do + Enum.reduce(import_options, %{}, &reducer/2) + end + + defp reducer({:token_transfers_params, token_transfers_params}, initial) when is_list(token_transfers_params) do + token_transfers_params + |> Enum.reduce(initial, fn + %{ + block_number: block_number, + from_address_hash: from_address_hash, + to_address_hash: to_address_hash, + token_contract_address_hash: token_contract_address_hash, + token_ids: [_ | _] + } = tt, + acc + when is_integer(block_number) and + is_binary(from_address_hash) and + is_binary(to_address_hash) and is_binary(token_contract_address_hash) -> + transfer_to_instances(tt, acc) + + _, acc -> + acc + end) + |> Map.values() + end + + defp transfer_to_instances( + %{ + token_type: "ERC-721" = token_type, + to_address_hash: to_address_hash, + token_ids: [token_id], + token_contract_address_hash: token_contract_address_hash, + block_number: block_number, + log_index: log_index + }, + acc + ) do + params = %{ + token_contract_address_hash: token_contract_address_hash, + token_id: token_id, + token_type: token_type, + owner_address_hash: to_address_hash, + owner_updated_at_block: block_number, + owner_updated_at_log_index: log_index + } + + current_key = {token_contract_address_hash, token_id} + + Map.put( + acc, + current_key, + Enum.max_by( + [ + params, + acc[current_key] || params + ], + fn %{ + owner_updated_at_block: owner_updated_at_block, + owner_updated_at_log_index: owner_updated_at_log_index + } -> + {owner_updated_at_block, owner_updated_at_log_index} + end + ) + ) + end + + defp transfer_to_instances( + %{ + token_type: _token_type, + token_ids: [_ | _] = token_ids, + token_contract_address_hash: token_contract_address_hash + }, + acc + ) do + Enum.reduce(token_ids, acc, fn id, sub_acc -> + Map.put(sub_acc, {token_contract_address_hash, id}, %{ + token_contract_address_hash: token_contract_address_hash, + token_id: id, + token_type: "ERC-1155" + }) + end) + end +end diff --git a/apps/indexer/test/indexer/transform/address_token_balances_test.exs b/apps/indexer/test/indexer/transform/address_token_balances_test.exs index 04111ff262..b70e2f84fe 100644 --- a/apps/indexer/test/indexer/transform/address_token_balances_test.exs +++ b/apps/indexer/test/indexer/transform/address_token_balances_test.exs @@ -66,7 +66,7 @@ defmodule Indexer.Transform.AddressTokenBalancesTest do ]) end - test "does not set params when the to_address_hash is the burn address for the Token ERC-721" do + test "does set params when the to_address_hash is the burn address for the Token ERC-721" do block_number = 1 from_address_hash = "0x5b8410f67eb8040bb1cd1e8a4ff9d5f6ce678a15" to_address_hash = "0x0000000000000000000000000000000000000000" @@ -77,12 +77,22 @@ defmodule Indexer.Transform.AddressTokenBalancesTest do from_address_hash: from_address_hash, to_address_hash: to_address_hash, token_contract_address_hash: token_contract_address_hash, - token_type: "ERC-721" + token_type: "ERC-721", + token_ids: [1] } params_set = AddressTokenBalances.params_set(%{token_transfers_params: [token_transfer_params]}) - assert MapSet.size(params_set) == 0 + assert params_set == + MapSet.new([ + %{ + address_hash: "0x5b8410f67eb8040bb1cd1e8a4ff9d5f6ce678a15", + block_number: 1, + token_contract_address_hash: "0xe18035bf8712672935fdb4e5e431b1a0183d2dfc", + token_id: 1, + token_type: "ERC-721" + } + ]) end end end diff --git a/config/runtime.exs b/config/runtime.exs index 2d51abb619..e83d0489d8 100644 --- a/config/runtime.exs +++ b/config/runtime.exs @@ -412,6 +412,10 @@ config :explorer, Explorer.Chain.Fetcher.LookUpSmartContractSourcesOnDemand, config :explorer, Explorer.Chain.Cache.MinMissingBlockNumber, enabled: !ConfigHelper.parse_bool_env_var("DISABLE_INDEXER") +config :explorer, Explorer.TokenInstanceOwnerAddressMigration, + concurrency: ConfigHelper.parse_integer_env_var("TOKEN_INSTANCE_OWNER_MIGRATION_CONCURRENCY", 5), + batch_size: ConfigHelper.parse_integer_env_var("TOKEN_INSTANCE_OWNER_MIGRATION_BATCH_SIZE", 50) + ############### ### Indexer ### ############### @@ -502,6 +506,9 @@ config :indexer, Indexer.Fetcher.TokenInstance.Retry.Supervisor, config :indexer, Indexer.Fetcher.TokenInstance.Sanitize.Supervisor, disabled?: ConfigHelper.parse_bool_env_var("INDEXER_DISABLE_TOKEN_INSTANCE_SANITIZE_FETCHER") +config :indexer, Indexer.Fetcher.TokenInstance.LegacySanitize.Supervisor, + disabled?: ConfigHelper.parse_bool_env_var("INDEXER_DISABLE_TOKEN_INSTANCE_LEGACY_SANITIZE_FETCHER", "true") + config :indexer, Indexer.Fetcher.EmptyBlocksSanitizer, batch_size: ConfigHelper.parse_integer_env_var("INDEXER_EMPTY_BLOCKS_SANITIZER_BATCH_SIZE", 100) @@ -532,6 +539,10 @@ config :indexer, Indexer.Fetcher.TokenInstance.Sanitize, concurrency: ConfigHelper.parse_integer_env_var("INDEXER_TOKEN_INSTANCE_SANITIZE_CONCURRENCY", 10), batch_size: ConfigHelper.parse_integer_env_var("INDEXER_TOKEN_INSTANCE_SANITIZE_BATCH_SIZE", 10) +config :indexer, Indexer.Fetcher.TokenInstance.LegacySanitize, + concurrency: ConfigHelper.parse_integer_env_var("INDEXER_TOKEN_INSTANCE_LEGACY_SANITIZE_CONCURRENCY", 10), + batch_size: ConfigHelper.parse_integer_env_var("INDEXER_TOKEN_INSTANCE_LEGACY_SANITIZE_BATCH_SIZE", 10) + config :indexer, Indexer.Fetcher.InternalTransaction, batch_size: ConfigHelper.parse_integer_env_var("INDEXER_INTERNAL_TRANSACTIONS_BATCH_SIZE", 10), concurrency: ConfigHelper.parse_integer_env_var("INDEXER_INTERNAL_TRANSACTIONS_CONCURRENCY", 4), diff --git a/docker-compose/envs/common-blockscout.env b/docker-compose/envs/common-blockscout.env index 5d8c29b3c5..6211113d22 100644 --- a/docker-compose/envs/common-blockscout.env +++ b/docker-compose/envs/common-blockscout.env @@ -100,6 +100,7 @@ DISABLE_REALTIME_INDEXER=false INDEXER_DISABLE_TOKEN_INSTANCE_REALTIME_FETCHER=false INDEXER_DISABLE_TOKEN_INSTANCE_RETRY_FETCHER=false INDEXER_DISABLE_TOKEN_INSTANCE_SANITIZE_FETCHER=false +INDEXER_DISABLE_TOKEN_INSTANCE_LEGACY_SANITIZE_FETCHER=false INDEXER_DISABLE_PENDING_TRANSACTIONS_FETCHER=false INDEXER_DISABLE_INTERNAL_TRANSACTIONS_FETCHER=false # INDEXER_CATCHUP_BLOCKS_BATCH_SIZE= @@ -113,6 +114,7 @@ INDEXER_DISABLE_INTERNAL_TRANSACTIONS_FETCHER=false # INDEXER_TOKEN_INSTANCE_RETRY_CONCURRENCY= # INDEXER_TOKEN_INSTANCE_REALTIME_CONCURRENCY= # INDEXER_TOKEN_INSTANCE_SANITIZE_CONCURRENCY= +# INDEXER_TOKEN_INSTANCE_LEGACY_SANITIZE_CONCURRENCY=10 # INDEXER_COIN_BALANCES_BATCH_SIZE= # INDEXER_COIN_BALANCES_CONCURRENCY= # INDEXER_RECEIPTS_BATCH_SIZE= @@ -230,4 +232,7 @@ EIP_1559_ELASTICITY_MULTIPLIER=2 # INDEXER_TOKEN_INSTANCE_RETRY_BATCH_SIZE=10 # INDEXER_TOKEN_INSTANCE_REALTIME_BATCH_SIZE=1 # INDEXER_TOKEN_INSTANCE_SANITIZE_BATCH_SIZE=10 -API_V2_ENABLED=true \ No newline at end of file +API_V2_ENABLED=true +# INDEXER_TOKEN_INSTANCE_LEGACY_SANITIZE_BATCH_SIZE=10 +# TOKEN_INSTANCE_OWNER_MIGRATION_CONCURRENCY=5 +# TOKEN_INSTANCE_OWNER_MIGRATION_BATCH_SIZE=50