From c6ff374d8ca83742b42f5896886194ce532bccce Mon Sep 17 00:00:00 2001 From: nikitosing <32202610+nikitosing@users.noreply.github.com> Date: Mon, 29 Jul 2024 12:52:35 +0300 Subject: [PATCH] feat: Backfiller for omitted WETH transfers (#10466) * feat: Backfiller for omitted WETH transfers * todo: add token balance update * done RestoreOmittedWETHTransfers migrator * Remove dbg * remove dbg * Fix credo * Process review comments --- apps/explorer/config/config.exs | 1 + apps/explorer/config/runtime/test.exs | 1 + apps/explorer/lib/explorer/application.ex | 2 + apps/explorer/lib/explorer/chain.ex | 16 +- apps/explorer/lib/explorer/chain/log.ex | 28 +- apps/explorer/lib/explorer/helper.ex | 7 + .../restore_omitted_weth_transfers.ex | 269 ++++++++++++++++++ .../lib/indexer/transform/token_transfers.ex | 7 +- config/runtime.exs | 5 + docker-compose/envs/common-blockscout.env | 3 + 10 files changed, 329 insertions(+), 10 deletions(-) create mode 100644 apps/explorer/lib/explorer/migrator/restore_omitted_weth_transfers.ex diff --git a/apps/explorer/config/config.exs b/apps/explorer/config/config.exs index 03977df83b..27a109720e 100644 --- a/apps/explorer/config/config.exs +++ b/apps/explorer/config/config.exs @@ -125,6 +125,7 @@ config :explorer, Explorer.Migrator.TokenTransferTokenType, enabled: true config :explorer, Explorer.Migrator.SanitizeIncorrectWETHTokenTransfers, enabled: true config :explorer, Explorer.Migrator.TransactionBlockConsensus, enabled: true config :explorer, Explorer.Migrator.TokenTransferBlockConsensus, enabled: true +config :explorer, Explorer.Migrator.RestoreOmittedWETHTransfers, enabled: true config :explorer, Explorer.Chain.Fetcher.CheckBytecodeMatchingOnDemand, enabled: true diff --git a/apps/explorer/config/runtime/test.exs b/apps/explorer/config/runtime/test.exs index 56c02a8464..1dae56dfe0 100644 --- a/apps/explorer/config/runtime/test.exs +++ b/apps/explorer/config/runtime/test.exs @@ -47,6 +47,7 @@ config :explorer, Explorer.Migrator.TokenTransferTokenType, enabled: false config :explorer, Explorer.Migrator.SanitizeIncorrectWETHTokenTransfers, enabled: false config :explorer, Explorer.Migrator.TransactionBlockConsensus, enabled: false config :explorer, Explorer.Migrator.TokenTransferBlockConsensus, enabled: false +config :explorer, Explorer.Migrator.RestoreOmittedWETHTransfers, enabled: false config :explorer, realtime_events_sender: Explorer.Chain.Events.SimpleSender diff --git a/apps/explorer/lib/explorer/application.ex b/apps/explorer/lib/explorer/application.ex index c82ebcb201..a92c10a8c1 100644 --- a/apps/explorer/lib/explorer/application.ex +++ b/apps/explorer/lib/explorer/application.ex @@ -57,6 +57,7 @@ defmodule Explorer.Application do Supervisor.child_spec({Task.Supervisor, name: Explorer.LookUpSmartContractSourcesTaskSupervisor}, id: LookUpSmartContractSourcesTaskSupervisor ), + Supervisor.child_spec({Task.Supervisor, name: Explorer.WETHMigratorSupervisor}, id: WETHMigratorSupervisor), Explorer.SmartContract.SolcDownloader, Explorer.SmartContract.VyperDownloader, {Registry, keys: :duplicate, name: Registry.ChainEvents, id: Registry.ChainEvents}, @@ -140,6 +141,7 @@ defmodule Explorer.Application do configure(Explorer.Migrator.SanitizeIncorrectWETHTokenTransfers), configure(Explorer.Migrator.TransactionBlockConsensus), configure(Explorer.Migrator.TokenTransferBlockConsensus), + configure(Explorer.Migrator.RestoreOmittedWETHTransfers), configure_chain_type_dependent_process(Explorer.Chain.Cache.StabilityValidatorsCounters, :stability) ] |> List.flatten() diff --git a/apps/explorer/lib/explorer/chain.ex b/apps/explorer/lib/explorer/chain.ex index 7991bf07f5..391899bb38 100644 --- a/apps/explorer/lib/explorer/chain.ex +++ b/apps/explorer/lib/explorer/chain.ex @@ -3810,8 +3810,8 @@ defmodule Explorer.Chain do end end - @spec token_from_address_hash_exists?(Hash.Address.t(), [api?]) :: boolean() - def token_from_address_hash_exists?(%Hash{byte_count: unquote(Hash.Address.byte_count())} = hash, options) do + @spec token_from_address_hash_exists?(Hash.Address.t() | String.t(), [api?]) :: boolean() + def token_from_address_hash_exists?(hash, options) do query = from( t in Token, @@ -4595,6 +4595,18 @@ defmodule Explorer.Chain do |> TypeDecoder.decode_raw(types) end + @spec get_token_types([String.t()]) :: [{Hash.Address.t(), String.t()}] + def get_token_types(hashes) do + query = + from( + token in Token, + where: token.contract_address_hash in ^hashes, + select: {token.contract_address_hash, token.type} + ) + + Repo.all(query) + end + @spec get_token_type(Hash.Address.t()) :: String.t() | nil def get_token_type(hash) do query = diff --git a/apps/explorer/lib/explorer/chain/log.ex b/apps/explorer/lib/explorer/chain/log.ex index 46510f8e34..7147f20986 100644 --- a/apps/explorer/lib/explorer/chain/log.ex +++ b/apps/explorer/lib/explorer/chain/log.ex @@ -6,8 +6,8 @@ defmodule Explorer.Chain.Log do require Logger alias ABI.{Event, FunctionSelector} - alias Explorer.Chain - alias Explorer.Chain.{Address, Block, ContractMethod, Data, Hash, Log, Transaction} + alias Explorer.{Chain, Repo} + alias Explorer.Chain.{Address, Block, ContractMethod, Data, Hash, Log, TokenTransfer, Transaction} alias Explorer.Chain.SmartContract.Proxy alias Explorer.SmartContract.SigProviderInterface @@ -353,4 +353,28 @@ defmodule Explorer.Chain.Log do |> Chain.join_associations(necessity_by_association) |> Chain.select_repo(options).all() end + + @doc """ + Streams unfetched WETH token transfers. + Returns `{:ok, any()} | {:error, any()}` (return spec taken from Ecto.Repo.transaction/2) + Expects each_fun, a function to be called on each fetched log. It should accept log and return anything (return value will be discarded anyway) + """ + @spec stream_unfetched_weth_token_transfers((Log.t() -> any())) :: {:ok, any()} | {:error, any()} + def stream_unfetched_weth_token_transfers(each_fun) do + env = Application.get_env(:explorer, Explorer.Chain.TokenTransfer) + + __MODULE__ + |> where([log], log.address_hash in ^env[:whitelisted_weth_contracts]) + |> where( + [log], + log.first_topic == ^TokenTransfer.weth_deposit_signature() or + log.first_topic == ^TokenTransfer.weth_withdrawal_signature() + ) + |> join(:left, [log], tt in TokenTransfer, + on: log.block_hash == tt.block_hash and log.transaction_hash == tt.transaction_hash and log.index == tt.log_index + ) + |> where([log, tt], is_nil(tt.transaction_hash)) + |> select([log], log) + |> Repo.stream_each(each_fun) + end end diff --git a/apps/explorer/lib/explorer/helper.ex b/apps/explorer/lib/explorer/helper.ex index bc09fc0bc4..5f33572cfa 100644 --- a/apps/explorer/lib/explorer/helper.ex +++ b/apps/explorer/lib/explorer/helper.ex @@ -8,6 +8,7 @@ defmodule Explorer.Helper do alias Explorer.Chain.Data import Ecto.Query, only: [where: 3] + import Explorer.Chain.SmartContract, only: [burn_address_hash_string: 0] @max_safe_integer round(:math.pow(2, 63)) - 1 @@ -181,4 +182,10 @@ defmodule Explorer.Helper do true -> :eq end end + + def truncate_address_hash(nil), do: burn_address_hash_string() + + def truncate_address_hash("0x000000000000000000000000" <> truncated_hash) do + "0x#{truncated_hash}" + end end diff --git a/apps/explorer/lib/explorer/migrator/restore_omitted_weth_transfers.ex b/apps/explorer/lib/explorer/migrator/restore_omitted_weth_transfers.ex new file mode 100644 index 0000000000..edc89f2c50 --- /dev/null +++ b/apps/explorer/lib/explorer/migrator/restore_omitted_weth_transfers.ex @@ -0,0 +1,269 @@ +defmodule Explorer.Migrator.RestoreOmittedWETHTransfers do + @moduledoc """ + Inserts missed WETH token transfers + """ + + use GenServer, restart: :transient + + alias Explorer.{Chain, Helper} + alias Explorer.Chain.{Log, TokenTransfer} + alias Explorer.Migrator.MigrationStatus + + import Explorer.Chain.SmartContract, only: [burn_address_hash_string: 0] + + require Logger + + @enqueue_busy_waiting_timeout 500 + @migration_timeout 250 + @migration_name "restore_omitted_weth_transfers" + + def start_link(_) do + GenServer.start_link(__MODULE__, :ok, name: __MODULE__) + end + + @impl true + def init(_) do + {:ok, %{}, {:continue, :check_env}} + end + + @impl true + def handle_continue(:check_env, state) do + list = Application.get_env(:explorer, Explorer.Chain.TokenTransfer)[:whitelisted_weth_contracts] + + cond do + Enum.empty?(list) -> + {:stop, :normal, state} + + check_token_types(list) -> + {:noreply, %{}, {:continue, :check_migration_status}} + + true -> + Logger.error("Stopping") + {:stop, :normal, state} + end + end + + @impl true + def handle_continue(:check_migration_status, state) do + case MigrationStatus.get_status(@migration_name) do + "completed" -> + {:stop, :normal, state} + + _ -> + MigrationStatus.set_status(@migration_name, "started") + {:noreply, %{}, {:continue, :ok}} + end + end + + @impl true + def handle_continue(:ok, _state) do + %{ref: ref} = + Task.async(fn -> + Log.stream_unfetched_weth_token_transfers(&enqueue_if_queue_is_not_full/1) + end) + + to_insert = + Application.get_env(:explorer, Explorer.Chain.TokenTransfer)[:whitelisted_weth_contracts] + |> Enum.map(fn contract_address_hash_string -> + if !Chain.token_from_address_hash_exists?(contract_address_hash_string, []) do + %{ + contract_address_hash: contract_address_hash_string, + type: "ERC-20" + } + end + end) + |> Enum.reject(&is_nil/1) + + if !Enum.empty?(to_insert) do + Chain.import(%{tokens: %{params: to_insert}}) + end + + Process.send_after(self(), :migrate, @migration_timeout) + + {:noreply, %{queue: [], current_concurrency: 0, stream_ref: ref, stream_is_over: false}} + end + + defp enqueue_if_queue_is_not_full(log) do + if GenServer.call(__MODULE__, :not_full?) do + GenServer.cast(__MODULE__, {:append_to_queue, log}) + else + :timer.sleep(@enqueue_busy_waiting_timeout) + + enqueue_if_queue_is_not_full(log) + end + end + + @impl true + def handle_call(:not_full?, _from, %{queue: queue} = state) do + {:reply, Enum.count(queue) < max_queue_size(), state} + end + + @impl true + def handle_cast({:append_to_queue, log}, %{queue: queue} = state) do + {:noreply, %{state | queue: [log | queue]}} + end + + @impl true + def handle_info(:migrate, %{queue: [], stream_is_over: true, current_concurrency: current_concurrency} = state) do + if current_concurrency > 0 do + {:noreply, state} + else + Logger.info("RestoreOmittedWETHTransfers migration is complete.") + + MigrationStatus.set_status(@migration_name, "completed") + {:stop, :normal, state} + end + end + + # fetch token balances + @impl true + def handle_info(:migrate, %{queue: queue, current_concurrency: current_concurrency} = state) do + if Enum.count(queue) > 0 and current_concurrency < concurrency() do + to_take = batch_size() * (concurrency() - current_concurrency) + {to_process, remainder} = Enum.split(queue, to_take) + + spawned_tasks = + to_process + |> Enum.chunk_every(batch_size()) + |> Enum.map(fn batch -> + run_task(batch) + end) + + if Enum.empty?(remainder) do + Process.send_after(self(), :migrate, migration_timeout()) + else + Process.send(self(), :migrate, []) + end + + {:noreply, %{state | queue: remainder, current_concurrency: current_concurrency + Enum.count(spawned_tasks)}} + else + Process.send_after(self(), :migrate, migration_timeout()) + {:noreply, state} + end + end + + @impl true + def handle_info({ref, _answer}, %{stream_ref: ref} = state) do + {:noreply, %{state | stream_is_over: true}} + end + + @impl true + def handle_info({ref, _answer}, %{current_concurrency: counter} = state) do + Process.demonitor(ref, [:flush]) + Process.send(self(), :migrate, []) + {:noreply, %{state | current_concurrency: counter - 1}} + end + + @impl true + def handle_info({:DOWN, ref, :process, _pid, _reason}, %{stream_ref: ref} = state) do + {:noreply, %{state | stream_is_over: true}} + end + + @impl true + def handle_info({:DOWN, _ref, :process, _pid, _reason}, %{current_concurrency: counter} = state) do + Process.send(self(), :migrate, []) + {:noreply, %{state | current_concurrency: counter - 1}} + end + + defp migrate_batch(batch) do + {token_transfers, token_balances} = + batch + |> Enum.map(fn log -> + with %{second_topic: second_topic, third_topic: nil, fourth_topic: nil, data: data} + when not is_nil(second_topic) <- + log, + [amount] <- Helper.decode_data(data, [{:uint, 256}]) do + {from_address_hash, to_address_hash, balance_address_hash} = + if log.first_topic == TokenTransfer.weth_deposit_signature() do + to_address_hash = Helper.truncate_address_hash(to_string(second_topic)) + {burn_address_hash_string(), to_address_hash, to_address_hash} + else + from_address_hash = Helper.truncate_address_hash(to_string(second_topic)) + {from_address_hash, burn_address_hash_string(), from_address_hash} + end + + token_transfer = %{ + amount: Decimal.new(amount || 0), + block_number: log.block_number, + block_hash: log.block_hash, + log_index: log.index, + from_address_hash: from_address_hash, + to_address_hash: to_address_hash, + token_contract_address_hash: log.address_hash, + transaction_hash: log.transaction_hash, + token_ids: nil, + token_type: "ERC-20" + } + + token_balance = %{ + address_hash: balance_address_hash, + token_contract_address_hash: log.address_hash, + block_number: log.block_number, + token_id: nil, + token_type: "ERC-20" + } + + {token_transfer, token_balance} + else + _ -> + Logger.error( + "Failed to decode log: (tx_hash, block_hash, index) = #{to_string(log.transaction_hash)}, #{to_string(log.block_hash)}, #{to_string(log.index)}" + ) + + nil + end + end) + |> Enum.reject(&is_nil/1) + |> Enum.unzip() + + current_token_balances = + token_balances + |> Enum.group_by(fn %{ + address_hash: address_hash, + token_contract_address_hash: token_contract_address_hash + } -> + {address_hash, token_contract_address_hash} + end) + |> Enum.map(fn {_, grouped_address_token_balances} -> + Enum.max_by(grouped_address_token_balances, fn %{block_number: block_number} -> block_number end) + end) + |> Enum.sort_by(&{&1.token_contract_address_hash, &1.address_hash}) + + if !Enum.empty?(token_transfers) do + Chain.import(%{ + token_transfers: %{params: token_transfers}, + address_token_balances: %{params: token_balances}, + address_current_token_balances: %{ + params: current_token_balances + } + }) + end + end + + defp run_task(batch) do + Task.Supervisor.async_nolink(Explorer.WETHMigratorSupervisor, fn -> + migrate_batch(batch) + end) + end + + defp check_token_types(token_address_hashes) do + token_address_hashes + |> Chain.get_token_types() + |> Enum.reduce(true, fn {token_hash, token_type}, acc -> + if token_type == "ERC-20" do + acc + else + Logger.error("Wrong token type of #{to_string(token_hash)}: #{token_type}") + false + end + end) + end + + def concurrency, do: Application.get_env(:explorer, __MODULE__)[:concurrency] + + def batch_size, do: Application.get_env(:explorer, __MODULE__)[:batch_size] + + def migration_timeout, do: Application.get_env(:explorer, __MODULE__)[:timeout] + + def max_queue_size, do: concurrency() * batch_size() * 2 +end diff --git a/apps/indexer/lib/indexer/transform/token_transfers.ex b/apps/indexer/lib/indexer/transform/token_transfers.ex index 761323d1a4..161e8dfdcb 100644 --- a/apps/indexer/lib/indexer/transform/token_transfers.ex +++ b/apps/indexer/lib/indexer/transform/token_transfers.ex @@ -6,6 +6,7 @@ defmodule Indexer.Transform.TokenTransfers do require Logger import Explorer.Chain.SmartContract, only: [burn_address_hash_string: 0] + import Explorer.Helper, only: [truncate_address_hash: 1] alias Explorer.{Helper, Repo} alias Explorer.Chain.{Hash, Token, TokenTransfer} @@ -483,12 +484,6 @@ defmodule Indexer.Transform.TokenTransfers do end end - defp truncate_address_hash(nil), do: burn_address_hash_string() - - defp truncate_address_hash("0x000000000000000000000000" <> truncated_hash) do - "0x#{truncated_hash}" - end - defp encode_address_hash(binary) do "0x" <> Base.encode16(binary, case: :lower) end diff --git a/config/runtime.exs b/config/runtime.exs index 671cfcd0a0..10941697c3 100644 --- a/config/runtime.exs +++ b/config/runtime.exs @@ -583,6 +583,11 @@ config :explorer, Explorer.Migrator.SanitizeIncorrectWETHTokenTransfers, batch_size: ConfigHelper.parse_integer_env_var("SANITIZE_INCORRECT_WETH_BATCH_SIZE", 100), concurrency: ConfigHelper.parse_integer_env_var("SANITIZE_INCORRECT_WETH_CONCURRENCY", 1) +config :explorer, Explorer.Migrator.RestoreOmittedWETHTransfers, + concurrency: ConfigHelper.parse_integer_env_var("MIGRATION_RESTORE_OMITTED_WETH_TOKEN_TRANSFERS_CONCURRENCY", 5), + batch_size: ConfigHelper.parse_integer_env_var("MIGRATION_RESTORE_OMITTED_WETH_TOKEN_TRANSFERS_BATCH_SIZE", 50), + timeout: ConfigHelper.parse_time_env_var("MIGRATION_RESTORE_OMITTED_WETH_TOKEN_TRANSFERS_TIMEOUT", "250ms") + config :explorer, Explorer.Chain.BridgedToken, eth_omni_bridge_mediator: System.get_env("BRIDGED_TOKENS_ETH_OMNI_BRIDGE_MEDIATOR"), bsc_omni_bridge_mediator: System.get_env("BRIDGED_TOKENS_BSC_OMNI_BRIDGE_MEDIATOR"), diff --git a/docker-compose/envs/common-blockscout.env b/docker-compose/envs/common-blockscout.env index 1d7d0f2509..489e32ebfe 100644 --- a/docker-compose/envs/common-blockscout.env +++ b/docker-compose/envs/common-blockscout.env @@ -374,6 +374,9 @@ EIP_1559_ELASTICITY_MULTIPLIER=2 # TOKEN_TRANSFER_TOKEN_TYPE_MIGRATION_CONCURRENCY= # SANITIZE_INCORRECT_NFT_BATCH_SIZE= # SANITIZE_INCORRECT_NFT_CONCURRENCY= +# MIGRATION_RESTORE_OMITTED_WETH_TOKEN_TRANSFERS_CONCURRENCY= +# MIGRATION_RESTORE_OMITTED_WETH_TOKEN_TRANSFERS_BATCH_SIZE= +# MIGRATION_RESTORE_OMITTED_WETH_TOKEN_TRANSFERS_TIMEOUT= SOURCIFY_INTEGRATION_ENABLED=false SOURCIFY_SERVER_URL= SOURCIFY_REPO_URL=