fix: Revert the deletion of deriving current token balances (#10811)

pull/10827/head
Qwerty5Uiop 1 month ago committed by GitHub
parent d95332fc6b
commit 3fffd46197
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
  1. 86
      apps/explorer/lib/explorer/chain/import/runner/blocks.ex
  2. 38
      apps/explorer/lib/explorer/migrator/sanitize_missing_token_balances.ex
  3. 7
      apps/explorer/priv/repo/migrations/20240923135258_reset_sanitize_missing_token_balances_migration_status.exs
  4. 96
      apps/explorer/test/explorer/chain/import/runner/blocks_test.exs
  5. 184
      apps/explorer/test/explorer/chain/import_test.exs
  6. 20
      apps/explorer/test/explorer/migrator/sanitize_missing_token_balances_test.exs

@ -530,10 +530,8 @@ defmodule Explorer.Chain.Import.Runner.Blocks do
select:
map(ctb, [
:address_hash,
:block_number,
:token_contract_address_hash,
:token_id,
:token_type,
# Used to determine if `address_hash` was a holder of `token_contract_address_hash` before
# `address_current_token_balance` is deleted in `update_tokens_holder_count`.
@ -566,28 +564,43 @@ defmodule Explorer.Chain.Import.Runner.Blocks do
%{timeout: timeout} = options
)
when is_list(deleted_address_current_token_balances) do
new_current_token_balances_placeholders =
Enum.map(deleted_address_current_token_balances, fn deleted_balance ->
now = DateTime.utc_now()
final_query = derive_address_current_token_balances_grouped_query(deleted_address_current_token_balances)
%{
address_hash: deleted_balance.address_hash,
token_contract_address_hash: deleted_balance.token_contract_address_hash,
token_id: deleted_balance.token_id,
token_type: deleted_balance.token_type,
block_number: deleted_balance.block_number,
value: nil,
value_fetched_at: nil,
inserted_at: now,
updated_at: now
}
end)
new_current_token_balance_query =
from(new_current_token_balance in subquery(final_query),
inner_join: tb in Address.TokenBalance,
on:
tb.address_hash == new_current_token_balance.address_hash and
tb.token_contract_address_hash == new_current_token_balance.token_contract_address_hash and
((is_nil(tb.token_id) and is_nil(new_current_token_balance.token_id)) or
(tb.token_id == new_current_token_balance.token_id and
not is_nil(tb.token_id) and not is_nil(new_current_token_balance.token_id))) and
tb.block_number == new_current_token_balance.block_number,
select: %{
address_hash: new_current_token_balance.address_hash,
token_contract_address_hash: new_current_token_balance.token_contract_address_hash,
token_id: new_current_token_balance.token_id,
token_type: tb.token_type,
block_number: new_current_token_balance.block_number,
value: tb.value,
value_fetched_at: tb.value_fetched_at,
inserted_at: over(min(tb.inserted_at), :w),
updated_at: over(max(tb.updated_at), :w)
},
windows: [
w: [partition_by: [tb.address_hash, tb.token_contract_address_hash, tb.token_id]]
]
)
current_token_balance =
new_current_token_balance_query
|> repo.all()
timestamps = Import.timestamps()
result =
CurrentTokenBalances.insert_changes_list_with_and_without_token_id(
new_current_token_balances_placeholders,
current_token_balance,
repo,
timestamps,
timeout,
@ -812,6 +825,43 @@ defmodule Explorer.Chain.Import.Runner.Blocks do
)
end
defp derive_address_current_token_balances_grouped_query(deleted_address_current_token_balances) do
initial_query =
from(tb in Address.TokenBalance,
select: %{
address_hash: tb.address_hash,
token_contract_address_hash: tb.token_contract_address_hash,
token_id: tb.token_id,
block_number: max(tb.block_number)
},
group_by: [tb.address_hash, tb.token_contract_address_hash, tb.token_id]
)
Enum.reduce(deleted_address_current_token_balances, initial_query, fn %{
address_hash: address_hash,
token_contract_address_hash:
token_contract_address_hash,
token_id: token_id
},
acc_query ->
if token_id do
from(tb in acc_query,
or_where:
tb.address_hash == ^address_hash and
tb.token_contract_address_hash == ^token_contract_address_hash and
tb.token_id == ^token_id
)
else
from(tb in acc_query,
or_where:
tb.address_hash == ^address_hash and
tb.token_contract_address_hash == ^token_contract_address_hash and
is_nil(tb.token_id)
)
end
end)
end
# `block_rewards` are linked to `blocks.hash`, but fetched by `blocks.number`, so when a block with the same number is
# inserted, the old block rewards need to be deleted, so that the old and new rewards aren't combined.
defp delete_rewards(repo, blocks_changes, %{timeout: timeout}) do

@ -1,7 +1,7 @@
defmodule Explorer.Migrator.SanitizeMissingTokenBalances do
@moduledoc """
Set value and value_fetched_at to nil for those token balances that are already filled but their
current token balances are not so the token balance fetcher could re-fetch them.
Deletes empty current token balances if the related highest block_number historical token balance is filled.
Set value and value_fetched_at to nil for those token balances so the token balance fetcher could re-fetch them.
"""
use Explorer.Migrator.FillingMigration
@ -23,7 +23,7 @@ defmodule Explorer.Migrator.SanitizeMissingTokenBalances do
ids =
unprocessed_data_query()
|> select([_ctb, tb], tb.id)
|> select([ctb, tb], {ctb.id, tb.id})
|> limit(^limit)
|> Repo.all(timeout: :infinity)
@ -38,22 +38,34 @@ defmodule Explorer.Migrator.SanitizeMissingTokenBalances do
on:
ctb.address_hash == tb.address_hash and
ctb.token_contract_address_hash == tb.token_contract_address_hash and
ctb.block_number == tb.block_number and
((is_nil(ctb.token_id) and is_nil(tb.token_id)) or ctb.token_id == tb.token_id),
where: is_nil(ctb.value) or is_nil(ctb.value_fetched_at),
where: not is_nil(tb.value) and not is_nil(tb.value_fetched_at)
where: not is_nil(tb.value) and not is_nil(tb.value_fetched_at),
distinct: ctb.id,
order_by: [asc: ctb.id, desc: tb.block_number]
)
end
@impl FillingMigration
def update_batch(token_balance_ids) do
query =
from(tb in TokenBalance,
where: tb.id in ^token_balance_ids,
update: [set: [value: nil, value_fetched_at: nil]]
)
Repo.update_all(query, [], timeout: :infinity)
def update_batch(identifiers) do
{ctb_ids, tb_ids} =
Enum.reduce(identifiers, {[], []}, fn {ctb_id, tb_id}, {ctb_acc, tb_acc} ->
{[ctb_id | ctb_acc], [tb_id | tb_acc]}
end)
Repo.transaction(fn ->
ctb_query = from(ctb in CurrentTokenBalance, where: ctb.id in ^ctb_ids)
Repo.delete_all(ctb_query, timeout: :infinity)
tb_query =
from(tb in TokenBalance,
where: tb.id in ^tb_ids,
update: [set: [value: nil, value_fetched_at: nil]]
)
Repo.update_all(tb_query, [], timeout: :infinity)
end)
end
@impl FillingMigration

@ -0,0 +1,7 @@
defmodule Explorer.Repo.Migrations.ResetSanitizeMissingTokenBalancesMigrationStatus do
use Ecto.Migration
def change do
execute("DELETE FROM migrations_status WHERE migration_name = 'sanitize_missing_token_balances'")
end
end

@ -219,7 +219,7 @@ defmodule Explorer.Chain.Import.Runner.BlocksTest do
]
}} = run_block_consensus_change(block, true, options)
assert %{value: nil} = Repo.one(Address.CurrentTokenBalance)
assert count(Address.CurrentTokenBalance) == 0
end
test "delete_address_current_token_balances does not delete rows with matching block number when consensus is false",
@ -238,6 +238,100 @@ defmodule Explorer.Chain.Import.Runner.BlocksTest do
assert count(Address.CurrentTokenBalance) == count
end
test "derive_address_current_token_balances inserts rows if there is an address_token_balance left for the rows deleted by delete_address_current_token_balances",
%{consensus_block: %{number: block_number} = block, options: options} do
token = insert(:token)
token_contract_address_hash = token.contract_address_hash
%Address{hash: address_hash} =
insert_address_with_token_balances(%{
previous: %{value: 1},
current: %{block_number: block_number, value: 2},
token_contract_address_hash: token_contract_address_hash
})
# Token must exist with non-`nil` `holder_count` for `blocks_update_token_holder_counts` to update
update_holder_count!(token_contract_address_hash, 1)
assert count(Address.TokenBalance) == 2
assert count(Address.CurrentTokenBalance) == 1
previous_block_number = block_number - 1
insert(:block, number: block_number, consensus: true)
assert {:ok,
%{
delete_address_current_token_balances: [
%{
address_hash: ^address_hash,
token_contract_address_hash: ^token_contract_address_hash
}
],
delete_address_token_balances: [
%{
address_hash: ^address_hash,
token_contract_address_hash: ^token_contract_address_hash,
block_number: ^block_number
}
],
derive_address_current_token_balances: [
%{
address_hash: ^address_hash,
token_contract_address_hash: ^token_contract_address_hash,
block_number: ^previous_block_number
}
],
# no updates because it both deletes and derives a holder
blocks_update_token_holder_counts: []
}} = run_block_consensus_change(block, true, options)
assert count(Address.TokenBalance) == 1
assert count(Address.CurrentTokenBalance) == 1
previous_value = Decimal.new(1)
assert %Address.CurrentTokenBalance{block_number: ^previous_block_number, value: ^previous_value} =
Repo.get_by(Address.CurrentTokenBalance,
address_hash: address_hash,
token_contract_address_hash: token_contract_address_hash
)
end
test "a non-holder reverting to a holder increases the holder_count",
%{consensus_block: %{hash: block_hash, miner_hash: miner_hash, number: block_number}, options: options} do
token = insert(:token)
token_contract_address_hash = token.contract_address_hash
non_holder_reverts_to_holder(%{
current: %{block_number: block_number},
token_contract_address_hash: token_contract_address_hash
})
# Token must exist with non-`nil` `holder_count` for `blocks_update_token_holder_counts` to update
update_holder_count!(token_contract_address_hash, 0)
insert(:block, number: block_number, consensus: true)
block_params = params_for(:block, hash: block_hash, miner_hash: miner_hash, number: block_number, consensus: true)
%Ecto.Changeset{valid?: true, changes: block_changes} = Block.changeset(%Block{}, block_params)
changes_list = [block_changes]
assert {:ok,
%{
blocks_update_token_holder_counts: [
%{
contract_address_hash: ^token_contract_address_hash,
holder_count: 1
}
]
}} =
Multi.new()
|> Blocks.run(changes_list, options)
|> Repo.transaction()
end
test "a holder reverting to a non-holder decreases the holder_count",
%{consensus_block: %{hash: block_hash, miner_hash: miner_hash, number: block_number}, options: options} do
token = insert(:token)

@ -2145,11 +2145,12 @@ defmodule Explorer.Chain.ImportTest do
}
})
assert %{value: nil} =
assert is_nil(
Repo.get_by(Address.CurrentTokenBalance,
address_hash: address_hash,
token_contract_address_hash: token_contract_address_hash
)
)
assert is_nil(
Repo.get_by(Address.TokenBalance,
@ -2159,5 +2160,186 @@ defmodule Explorer.Chain.ImportTest do
)
)
end
test "address_current_token_balances is derived during reorgs" do
%Block{number: block_number} = insert(:block, consensus: true)
previous_block_number = block_number - 1
%Address.TokenBalance{
address_hash: address_hash,
token_contract_address_hash: token_contract_address_hash,
value: previous_value,
block_number: previous_block_number
} = insert(:token_balance, block_number: previous_block_number)
address = Repo.get(Address, address_hash)
%Address.TokenBalance{
address_hash: ^address_hash,
token_contract_address_hash: token_contract_address_hash,
value: current_value,
block_number: ^block_number
} =
insert(:token_balance,
address: address,
token_contract_address_hash: token_contract_address_hash,
block_number: block_number
)
refute current_value == previous_value
%Address.CurrentTokenBalance{
address_hash: ^address_hash,
token_contract_address_hash: ^token_contract_address_hash,
block_number: ^block_number
} =
insert(:address_current_token_balance,
address: address,
token_contract_address_hash: token_contract_address_hash,
block_number: block_number,
value: current_value
)
miner_hash_after = address_hash()
from_address_hash_after = address_hash()
block_hash_after = block_hash()
assert {:ok, _} =
Import.all(%{
addresses: %{
params: [
%{hash: miner_hash_after},
%{hash: from_address_hash_after}
]
},
blocks: %{
params: [
%{
consensus: true,
difficulty: 1,
gas_limit: 1,
gas_used: 1,
hash: block_hash_after,
miner_hash: miner_hash_after,
nonce: 1,
number: block_number,
parent_hash: block_hash(),
size: 1,
timestamp: Timex.parse!("2019-01-01T02:00:00Z", "{ISO:Extended:Z}"),
total_difficulty: 1
}
]
}
})
assert %Address.CurrentTokenBalance{block_number: ^previous_block_number, value: ^previous_value} =
Repo.get_by(Address.CurrentTokenBalance,
address_hash: address_hash,
token_contract_address_hash: token_contract_address_hash
)
assert is_nil(
Repo.get_by(Address.TokenBalance,
address_hash: address_hash,
token_contract_address_hash: token_contract_address_hash,
block_number: block_number
)
)
end
test "address_token_balances and address_current_token_balances can be replaced during reorgs" do
%Block{number: block_number} = insert(:block, consensus: true)
value_before = Decimal.new(1)
%Address{hash: address_hash} = address = insert(:address)
%Address.TokenBalance{
address_hash: ^address_hash,
token_contract_address_hash: token_contract_address_hash,
block_number: ^block_number
} = insert(:token_balance, address: address, block_number: block_number, value: value_before)
%Address.CurrentTokenBalance{
address_hash: ^address_hash,
token_contract_address_hash: ^token_contract_address_hash,
block_number: ^block_number
} =
insert(:address_current_token_balance,
address: address,
token_contract_address_hash: token_contract_address_hash,
block_number: block_number,
value: value_before
)
miner_hash_after = address_hash()
from_address_hash_after = address_hash()
block_hash_after = block_hash()
value_after = Decimal.add(value_before, 1)
assert {:ok, _} =
Import.all(%{
addresses: %{
params: [
%{hash: address_hash},
%{hash: token_contract_address_hash},
%{hash: miner_hash_after},
%{hash: from_address_hash_after}
]
},
address_token_balances: %{
params: [
%{
address_hash: address_hash,
token_contract_address_hash: token_contract_address_hash,
block_number: block_number,
value: value_after,
token_type: "ERC-20"
}
]
},
address_current_token_balances: %{
params: [
%{
address_hash: address_hash,
token_contract_address_hash: token_contract_address_hash,
block_number: block_number,
value: value_after,
token_type: "ERC-20"
}
]
},
blocks: %{
params: [
%{
consensus: true,
difficulty: 1,
gas_limit: 1,
gas_used: 1,
hash: block_hash_after,
miner_hash: miner_hash_after,
nonce: 1,
number: block_number,
parent_hash: block_hash(),
size: 1,
timestamp: Timex.parse!("2019-01-01T02:00:00Z", "{ISO:Extended:Z}"),
total_difficulty: 1
}
]
}
})
assert %Address.CurrentTokenBalance{value: ^value_after} =
Repo.get_by(Address.CurrentTokenBalance,
address_hash: address_hash,
token_contract_address_hash: token_contract_address_hash
)
assert %Address.TokenBalance{value: ^value_after} =
Repo.get_by(Address.TokenBalance,
address_hash: address_hash,
token_contract_address_hash: token_contract_address_hash,
block_number: block_number
)
end
end
end

@ -1,7 +1,7 @@
defmodule Explorer.Migrator.SanitizeMissingTokenBalancesTest do
use Explorer.DataCase, async: false
alias Explorer.Chain.Address.TokenBalance
alias Explorer.Chain.Address.{CurrentTokenBalance, TokenBalance}
alias Explorer.Migrator.{SanitizeMissingTokenBalances, MigrationStatus}
alias Explorer.Repo
@ -10,11 +10,16 @@ defmodule Explorer.Migrator.SanitizeMissingTokenBalancesTest do
Enum.each(0..10, fn _x ->
token_balance = insert(:token_balance)
insert(:token_balance,
address: token_balance.address,
token_contract_address_hash: token_balance.token_contract_address_hash,
token_id: token_balance.token_id
)
insert(:address_current_token_balance,
address: token_balance.address,
token_contract_address_hash: token_balance.token_contract_address_hash,
token_id: token_balance.token_id,
block_number: token_balance.block_number,
value: nil,
value_fetched_at: nil
)
@ -30,10 +35,17 @@ defmodule Explorer.Migrator.SanitizeMissingTokenBalancesTest do
TokenBalance
|> Repo.all()
|> Enum.each(fn tb ->
assert %{value: nil, value_fetched_at: nil} = tb
|> Enum.group_by(&{&1.address_hash, &1.token_contract_address_hash, &1.token_id})
|> Enum.each(fn {_, tbs} ->
assert [%{value: nil, value_fetched_at: nil}, %{value: old_value, value_fetched_at: old_value_fetched_at}] =
Enum.sort_by(tbs, & &1.block_number, &>=/2)
refute is_nil(old_value)
refute is_nil(old_value_fetched_at)
end)
assert Repo.all(CurrentTokenBalance) == []
assert MigrationStatus.get_status("sanitize_missing_token_balances") == "completed"
end
end

Loading…
Cancel
Save