fix: Delete incorrect coin balances on reorg (#10879)

pull/10697/head
Qwerty5Uiop 1 month ago committed by GitHub
parent 92e7ce50f5
commit a23e03e872
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
  1. 2
      apps/explorer/lib/explorer/chain/import/runner.ex
  2. 4
      apps/explorer/lib/explorer/chain/import/runner/addresses.ex
  3. 79
      apps/explorer/lib/explorer/chain/import/runner/blocks.ex
  4. 28
      apps/explorer/test/explorer/chain/import/runner/blocks_test.exs

@ -22,7 +22,7 @@ defmodule Explorer.Chain.Import.Runner do
@type changes_list :: [changes]
@type changeset_function_name :: atom
@type on_conflict :: :nothing | :replace_all | Ecto.Query.t()
@type on_conflict :: :nothing | :replace_all | {:replace, [atom()]} | Ecto.Query.t()
@typedoc """
Runner-specific options under `c:option_key/0` in all options passed to `c:run/3`.

@ -168,8 +168,8 @@ defmodule Explorer.Chain.Import.Runner.Addresses do
required(:timeout) => timeout,
required(:timestamps) => Import.timestamps()
}) :: {:ok, [Address.t()]}
defp insert(repo, ordered_changes_list, %{timeout: timeout, timestamps: timestamps} = options)
when is_list(ordered_changes_list) do
def insert(repo, ordered_changes_list, %{timeout: timeout, timestamps: timestamps} = options)
when is_list(ordered_changes_list) do
on_conflict = Map.get_lazy(options, :on_conflict, &default_on_conflict/0)
Import.insert_changes_list(

@ -31,7 +31,7 @@ defmodule Explorer.Chain.Import.Runner.Blocks do
alias Explorer.Chain.Block.Reward
alias Explorer.Chain.Import.Runner
alias Explorer.Chain.Import.Runner.Address.CurrentTokenBalances
alias Explorer.Chain.Import.Runner.{TokenInstances, Tokens}
alias Explorer.Chain.Import.Runner.{Addresses, TokenInstances, Tokens}
alias Explorer.Prometheus.Instrumenter
alias Explorer.Utility.MissingRangesManipulator
@ -160,6 +160,23 @@ defmodule Explorer.Chain.Import.Runner.Blocks do
:derive_transaction_forks
)
end)
|> Multi.run(:delete_address_coin_balances, fn repo, %{lose_consensus: non_consensus_blocks} ->
Instrumenter.block_import_stage_runner(
fn -> delete_address_coin_balances(repo, non_consensus_blocks, insert_options) end,
:address_referencing,
:blocks,
:delete_address_coin_balances
)
end)
|> Multi.run(:derive_address_fetched_coin_balances, fn repo,
%{delete_address_coin_balances: delete_address_coin_balances} ->
Instrumenter.block_import_stage_runner(
fn -> derive_address_fetched_coin_balances(repo, delete_address_coin_balances, insert_options) end,
:address_referencing,
:blocks,
:derive_address_fetched_coin_balances
)
end)
|> Multi.run(:delete_address_token_balances, fn repo, %{lose_consensus: non_consensus_blocks} ->
Instrumenter.block_import_stage_runner(
fn -> delete_address_token_balances(repo, non_consensus_blocks, insert_options) end,
@ -464,6 +481,66 @@ defmodule Explorer.Chain.Import.Runner.Blocks do
)
end
defp delete_address_coin_balances(_repo, [], _options), do: {:ok, []}
defp delete_address_coin_balances(repo, non_consensus_blocks, %{timeout: timeout}) do
non_consensus_block_numbers = Enum.map(non_consensus_blocks, fn {number, _hash} -> number end)
ordered_query =
from(cb in Address.CoinBalance,
where: cb.block_number in ^non_consensus_block_numbers,
select: map(cb, [:address_hash, :block_number]),
# Enforce TokenBalance ShareLocks order (see docs: sharelocks.md)
order_by: [cb.address_hash, cb.block_number],
lock: "FOR UPDATE"
)
query =
from(cb in Address.CoinBalance,
select: cb.address_hash,
inner_join: ordered_address_coin_balance in subquery(ordered_query),
on:
ordered_address_coin_balance.address_hash == cb.address_hash and
ordered_address_coin_balance.block_number == cb.block_number
)
try do
{_count, deleted_coin_balances_address_hashes} = repo.delete_all(query, timeout: timeout)
{:ok, deleted_coin_balances_address_hashes}
rescue
postgrex_error in Postgrex.Error ->
{:error, %{exception: postgrex_error, block_numbers: non_consensus_block_numbers}}
end
end
defp derive_address_fetched_coin_balances(_repo, [], _options), do: {:ok, []}
defp derive_address_fetched_coin_balances(repo, deleted_balances_address_hashes, options) do
last_balances_query =
from(cb in Address.CoinBalance,
where: cb.address_hash in ^deleted_balances_address_hashes,
where: not is_nil(cb.value),
distinct: cb.address_hash,
order_by: [asc: cb.address_hash, desc: cb.block_number],
select: %{
hash: cb.address_hash,
fetched_coin_balance: cb.value,
fetched_coin_balance_block_number: cb.block_number
}
)
addresses_params =
last_balances_query
|> repo.all()
|> Enum.sort_by(& &1.hash)
addresses_options =
Map.put(options, :on_conflict, {:replace, [:fetched_coin_balance, :fetched_coin_balance_block_number]})
Addresses.insert(repo, addresses_params, addresses_options)
end
defp delete_address_token_balances(_, [], _), do: {:ok, []}
defp delete_address_token_balances(repo, non_consensus_blocks, %{timeout: timeout}) do

@ -203,6 +203,34 @@ defmodule Explorer.Chain.Import.Runner.BlocksTest do
end)
end
test "coin balances are deleted and new balances are derived if some blocks lost consensus",
%{consensus_block: %{number: block_number} = block, options: options} do
%{hash: address_hash} = address = insert(:address)
prev_block_number = block_number - 1
insert(:address_coin_balance, address: address, block_number: block_number)
%{value: prev_value} = insert(:address_coin_balance, address: address, block_number: prev_block_number)
assert count(Address.CoinBalance) == 2
insert(:block, number: block_number, consensus: true)
assert {:ok,
%{
delete_address_coin_balances: [^address_hash],
derive_address_fetched_coin_balances: [
%{
hash: ^address_hash,
fetched_coin_balance: ^prev_value,
fetched_coin_balance_block_number: ^prev_block_number
}
]
}} = run_block_consensus_change(block, true, options)
assert %{value: ^prev_value, block_number: ^prev_block_number} = Repo.one(Address.CoinBalance)
end
test "delete_address_current_token_balances deletes rows with matching block number when consensus is true",
%{consensus_block: %{number: block_number} = block, options: options} do
%Address.CurrentTokenBalance{address_hash: address_hash, token_contract_address_hash: token_contract_address_hash} =

Loading…
Cancel
Save