fix: Filter WETH transfers in indexer + migration to delete historical incorrect WETH transfers (#10134)
parent
c77180c3ac
commit
80a8e3b464
@ -0,0 +1,145 @@ |
||||
defmodule Explorer.Migrator.SanitizeIncorrectWETHTokenTransfers do |
||||
@moduledoc """ |
||||
This migrator will delete all incorrect WETH token transfers. As incorrect we consider: |
||||
- WETH withdrawals and WETH deposits emitted by tokens which are not in `WHITELISTED_WETH_CONTRACTS` env |
||||
- WETH withdrawal or WETH deposit which has sibling token transfer within the same block and transaction, with the same amount, same from and to addresses, same token contract addresses. (We consider such pairs as duplicates) |
||||
""" |
||||
|
||||
use GenServer, restart: :transient |
||||
|
||||
import Ecto.Query |
||||
|
||||
require Logger |
||||
|
||||
alias Explorer.Chain.{Log, TokenTransfer} |
||||
alias Explorer.Migrator.MigrationStatus |
||||
alias Explorer.Repo |
||||
|
||||
@migration_name "sanitize_incorrect_weth_transfers" |
||||
@default_batch_size 500 |
||||
|
||||
def start_link(_) do |
||||
GenServer.start_link(__MODULE__, :ok, name: __MODULE__) |
||||
end |
||||
|
||||
@impl true |
||||
def init(_) do |
||||
case MigrationStatus.get_status(@migration_name) do |
||||
"completed" -> |
||||
:ignore |
||||
|
||||
_ -> |
||||
MigrationStatus.set_status(@migration_name, "started") |
||||
schedule_batch_migration() |
||||
{:ok, %{step: :delete_not_whitelisted_weth_transfers}} |
||||
end |
||||
end |
||||
|
||||
@impl true |
||||
def handle_info(:migrate_batch, %{step: step} = state) do |
||||
case last_unprocessed_identifiers(step) do |
||||
[] -> |
||||
case step do |
||||
:delete_not_whitelisted_weth_transfers -> |
||||
Logger.info( |
||||
"SanitizeIncorrectWETHTokenTransfers deletion of not whitelisted weth transfers finished, continuing with duplicates deletion" |
||||
) |
||||
|
||||
schedule_batch_migration() |
||||
{:noreply, %{step: :delete_duplicates}} |
||||
|
||||
:delete_duplicates -> |
||||
Logger.info("SanitizeIncorrectWETHTokenTransfers migration finished") |
||||
MigrationStatus.set_status(@migration_name, "completed") |
||||
{:stop, :normal, state} |
||||
end |
||||
|
||||
identifiers -> |
||||
identifiers |
||||
|> Enum.chunk_every(batch_size()) |
||||
|> Enum.map(&run_task/1) |
||||
|> Task.await_many(:infinity) |
||||
|
||||
schedule_batch_migration() |
||||
|
||||
{:noreply, state} |
||||
end |
||||
end |
||||
|
||||
defp last_unprocessed_identifiers(step) do |
||||
limit = batch_size() * concurrency() |
||||
|
||||
step |
||||
|> unprocessed_identifiers() |
||||
|> limit(^limit) |
||||
|> Repo.all(timeout: :infinity) |
||||
end |
||||
|
||||
defp unprocessed_identifiers(:delete_duplicates) do |
||||
weth_transfers = |
||||
from( |
||||
tt in TokenTransfer, |
||||
left_join: l in Log, |
||||
on: tt.block_hash == l.block_hash and tt.transaction_hash == l.transaction_hash and tt.log_index == l.index, |
||||
where: |
||||
l.first_topic == ^TokenTransfer.weth_deposit_signature() or |
||||
l.first_topic == ^TokenTransfer.weth_withdrawal_signature() |
||||
) |
||||
|
||||
from( |
||||
weth_tt in subquery(weth_transfers), |
||||
inner_join: tt in TokenTransfer, |
||||
on: weth_tt.block_hash == tt.block_hash and weth_tt.transaction_hash == tt.transaction_hash, |
||||
where: |
||||
weth_tt.log_index != tt.log_index and weth_tt.token_contract_address_hash == tt.token_contract_address_hash and |
||||
weth_tt.to_address_hash == tt.to_address_hash and weth_tt.from_address_hash == tt.from_address_hash and |
||||
weth_tt.amount == tt.amount, |
||||
select: {weth_tt.transaction_hash, weth_tt.block_hash, weth_tt.log_index} |
||||
) |
||||
end |
||||
|
||||
defp unprocessed_identifiers(:delete_not_whitelisted_weth_transfers) do |
||||
from( |
||||
tt in TokenTransfer, |
||||
left_join: l in Log, |
||||
on: tt.block_hash == l.block_hash and tt.transaction_hash == l.transaction_hash and tt.log_index == l.index, |
||||
where: |
||||
(l.first_topic == ^TokenTransfer.weth_deposit_signature() or |
||||
l.first_topic == ^TokenTransfer.weth_withdrawal_signature()) and |
||||
tt.token_contract_address_hash not in ^Application.get_env(:explorer, Explorer.Chain.TokenTransfer)[ |
||||
:whitelisted_weth_contracts |
||||
], |
||||
select: {tt.transaction_hash, tt.block_hash, tt.log_index} |
||||
) |
||||
end |
||||
|
||||
defp run_task(batch), do: Task.async(fn -> handle_batch(batch) end) |
||||
|
||||
defp handle_batch(token_transfer_ids) do |
||||
token_transfer_ids |
||||
|> build_delete_query() |
||||
|> Repo.query!([], timeout: :infinity) |
||||
end |
||||
|
||||
defp schedule_batch_migration do |
||||
Process.send(self(), :migrate_batch, []) |
||||
end |
||||
|
||||
defp batch_size do |
||||
Application.get_env(:explorer, __MODULE__)[:batch_size] || @default_batch_size |
||||
end |
||||
|
||||
defp concurrency do |
||||
default = 4 * System.schedulers_online() |
||||
|
||||
Application.get_env(:explorer, __MODULE__)[:concurrency] || default |
||||
end |
||||
|
||||
defp build_delete_query(token_transfer_ids) do |
||||
""" |
||||
DELETE |
||||
FROM token_transfers tt |
||||
WHERE (tt.transaction_hash, tt.block_hash, tt.log_index) IN #{TokenTransfer.encode_token_transfer_ids(token_transfer_ids)} |
||||
""" |
||||
end |
||||
end |
@ -0,0 +1,161 @@ |
||||
defmodule Explorer.Migrator.SanitizeIncorrectWETHTokenTransfersTest do |
||||
use Explorer.DataCase, async: false |
||||
|
||||
alias Explorer.Chain.TokenTransfer |
||||
alias Explorer.Migrator.{SanitizeIncorrectWETHTokenTransfers, MigrationStatus} |
||||
alias Explorer.Repo |
||||
|
||||
describe "SanitizeIncorrectWETHTokenTransfers" do |
||||
test "Deletes not whitelisted WETH transfers and duplicated WETH transfers" do |
||||
%{contract_address: token_address} = insert(:token, type: "ERC-20") |
||||
block = insert(:block, consensus: true) |
||||
burn_address = insert(:address, hash: "0x0000000000000000000000000000000000000000") |
||||
|
||||
insert(:token_transfer, |
||||
from_address: insert(:address), |
||||
block: block, |
||||
block_number: block.number, |
||||
token_contract_address: token_address, |
||||
token_ids: nil |
||||
) |
||||
|
||||
deposit_log = insert(:log, first_topic: TokenTransfer.weth_deposit_signature()) |
||||
|
||||
insert(:token_transfer, |
||||
from_address: insert(:address), |
||||
token_contract_address: token_address, |
||||
block: deposit_log.block, |
||||
transaction: deposit_log.transaction, |
||||
log_index: deposit_log.index |
||||
) |
||||
|
||||
withdrawal_log = insert(:log, first_topic: TokenTransfer.weth_withdrawal_signature()) |
||||
|
||||
insert(:token_transfer, |
||||
from_address: insert(:address), |
||||
token_contract_address: token_address, |
||||
block: withdrawal_log.block, |
||||
transaction: withdrawal_log.transaction, |
||||
log_index: withdrawal_log.index |
||||
) |
||||
|
||||
%{contract_address: whitelisted_token_address} = insert(:token, type: "ERC-20") |
||||
|
||||
env = Application.get_env(:explorer, Explorer.Chain.TokenTransfer) |
||||
|
||||
Application.put_env( |
||||
:explorer, |
||||
Explorer.Chain.TokenTransfer, |
||||
Keyword.put(env, :whitelisted_weth_contracts, [whitelisted_token_address |> to_string() |> String.downcase()]) |
||||
) |
||||
|
||||
withdrawal_log = insert(:log, first_topic: TokenTransfer.weth_withdrawal_signature()) |
||||
|
||||
insert(:token_transfer, |
||||
from_address: insert(:address), |
||||
token_contract_address: whitelisted_token_address, |
||||
block: withdrawal_log.block, |
||||
transaction: withdrawal_log.transaction, |
||||
log_index: withdrawal_log.index |
||||
) |
||||
|
||||
deposit_log = insert(:log, first_topic: TokenTransfer.weth_deposit_signature()) |
||||
|
||||
insert(:token_transfer, |
||||
from_address: insert(:address), |
||||
token_contract_address: whitelisted_token_address, |
||||
block: deposit_log.block, |
||||
transaction: deposit_log.transaction, |
||||
log_index: deposit_log.index |
||||
) |
||||
|
||||
withdrawal_log_duplicate = insert(:log, first_topic: TokenTransfer.weth_withdrawal_signature()) |
||||
|
||||
tt_withdrawal = |
||||
insert(:token_transfer, |
||||
from_address: burn_address, |
||||
token_contract_address: whitelisted_token_address, |
||||
block: withdrawal_log_duplicate.block, |
||||
transaction: withdrawal_log_duplicate.transaction, |
||||
log_index: withdrawal_log_duplicate.index |
||||
) |
||||
|
||||
insert(:token_transfer, |
||||
from_address: burn_address, |
||||
to_address: tt_withdrawal.to_address, |
||||
token_contract_address: whitelisted_token_address, |
||||
block: withdrawal_log_duplicate.block, |
||||
transaction: withdrawal_log_duplicate.transaction, |
||||
log_index: withdrawal_log_duplicate.index + 1, |
||||
amount: tt_withdrawal.amount |
||||
) |
||||
|
||||
deposit_log_duplicate = insert(:log, first_topic: TokenTransfer.weth_deposit_signature()) |
||||
|
||||
tt_deposit = |
||||
insert(:token_transfer, |
||||
to_address: burn_address, |
||||
token_contract_address: whitelisted_token_address, |
||||
block: deposit_log_duplicate.block, |
||||
transaction: deposit_log_duplicate.transaction, |
||||
log_index: deposit_log_duplicate.index |
||||
) |
||||
|
||||
insert(:token_transfer, |
||||
from_address: tt_deposit.from_address, |
||||
to_address: burn_address, |
||||
token_contract_address: whitelisted_token_address, |
||||
block: deposit_log_duplicate.block, |
||||
transaction: deposit_log_duplicate.transaction, |
||||
log_index: deposit_log_duplicate.index + 1, |
||||
amount: tt_deposit.amount |
||||
) |
||||
|
||||
assert MigrationStatus.get_status("sanitize_incorrect_weth_transfers") == nil |
||||
|
||||
Application.put_env(:explorer, Explorer.Migrator.SanitizeIncorrectWETHTokenTransfers, |
||||
batch_size: 1, |
||||
concurrency: 1 |
||||
) |
||||
|
||||
SanitizeIncorrectWETHTokenTransfers.start_link([]) |
||||
Process.sleep(100) |
||||
|
||||
assert MigrationStatus.get_status("sanitize_incorrect_weth_transfers") == "completed" |
||||
|
||||
token_address_hash = token_address.hash |
||||
whitelisted_token_address_hash = whitelisted_token_address.hash |
||||
|
||||
assert [ |
||||
%{token_contract_address_hash: ^token_address_hash}, |
||||
%{token_contract_address_hash: ^whitelisted_token_address_hash}, |
||||
%{token_contract_address_hash: ^whitelisted_token_address_hash}, |
||||
%{token_contract_address_hash: ^whitelisted_token_address_hash}, |
||||
%{token_contract_address_hash: ^whitelisted_token_address_hash} |
||||
] = transfers = Repo.all(TokenTransfer) |
||||
|
||||
withdrawal = Enum.at(transfers, 1) |
||||
deposit = Enum.at(transfers, 2) |
||||
assert withdrawal.block_hash == withdrawal_log.block_hash |
||||
assert withdrawal.transaction_hash == withdrawal_log.transaction_hash |
||||
assert withdrawal.log_index == withdrawal_log.index |
||||
|
||||
assert deposit.block_hash == deposit_log.block_hash |
||||
assert deposit.transaction_hash == deposit_log.transaction_hash |
||||
assert deposit.log_index == deposit_log.index |
||||
|
||||
withdrawal_analogue = Enum.at(transfers, 3) |
||||
deposit_analogue = Enum.at(transfers, 4) |
||||
|
||||
assert withdrawal_analogue.block_hash == withdrawal_log_duplicate.block_hash |
||||
assert withdrawal_analogue.transaction_hash == withdrawal_log_duplicate.transaction_hash |
||||
assert withdrawal_analogue.log_index == withdrawal_log_duplicate.index + 1 |
||||
|
||||
assert deposit_analogue.block_hash == deposit_log_duplicate.block_hash |
||||
assert deposit_analogue.transaction_hash == deposit_log_duplicate.transaction_hash |
||||
assert deposit_analogue.log_index == deposit_log_duplicate.index + 1 |
||||
|
||||
Application.put_env(:explorer, Explorer.Chain.TokenTransfer, env) |
||||
end |
||||
end |
||||
end |
Loading…
Reference in new issue