Update address_current_token_balances and address_token_balances for reorgs

Fixes #1320

1. Delete any `address_token_balances` with matching consensus block number.
2. Delete any `address_current_token_balances` with matching consensus
   block number.
3. Derive `address_current_token_balances` for any deleted in (2).
pull/1323/head
Luke Imhoff 6 years ago
parent 51f181db15
commit 8718ca4069
  1. 203
      apps/explorer/lib/explorer/chain/import/runner/blocks.ex

@ -5,11 +5,11 @@ defmodule Explorer.Chain.Import.Runner.Blocks do
require Ecto.Query
import Ecto.Query, only: [from: 2, select: 2, update: 2]
import Ecto.Query, only: [from: 2, select: 2, subquery: 1, update: 2]
alias Ecto.Adapters.SQL
alias Ecto.{Changeset, Multi, Repo}
alias Explorer.Chain.{Block, Import, InternalTransaction, Transaction}
alias Explorer.Chain.{Address, Block, Hash, Import, InternalTransaction, Transaction}
alias Explorer.Chain.Block.Reward
alias Explorer.Chain.Import.Runner
@ -43,6 +43,7 @@ defmodule Explorer.Chain.Import.Runner.Blocks do
|> Map.put_new(:timeout, @timeout)
|> Map.put(:timestamps, timestamps)
ordered_consensus_block_numbers = ordered_consensus_block_numbers(changes_list)
where_forked = where_forked(changes_list)
multi
@ -64,9 +65,22 @@ defmodule Explorer.Chain.Import.Runner.Blocks do
})
end)
|> Multi.run(:lose_consenus, fn repo, _ ->
lose_consensus(repo, changes_list, insert_options)
lose_consensus(repo, ordered_consensus_block_numbers, insert_options)
end)
|> Multi.run(:remove_rewards, fn repo, _ ->
|> Multi.run(:delete_address_token_balances, fn repo, _ ->
delete_address_token_balances(repo, ordered_consensus_block_numbers, insert_options)
end)
|> Multi.run(:delete_address_current_token_balances, fn repo, _ ->
delete_address_current_token_balances(repo, ordered_consensus_block_numbers, insert_options)
end)
|> Multi.run(:derive_address_current_token_balances, fn repo,
%{
delete_address_current_token_balances:
deleted_address_current_token_balances
} ->
derive_address_current_token_balances(repo, deleted_address_current_token_balances, insert_options)
end)
|> Multi.run(:delete_rewards, fn repo, _ ->
delete_rewards(repo, changes_list, insert_options)
end)
|> Multi.run(:blocks, fn repo, _ ->
@ -247,19 +261,22 @@ defmodule Explorer.Chain.Import.Runner.Blocks do
)
end
defp lose_consensus(repo, blocks_changes, %{timeout: timeout, timestamps: %{updated_at: updated_at}})
when is_list(blocks_changes) do
ordered_consensus_block_number =
blocks_changes
|> Enum.reduce(MapSet.new(), fn
%{consensus: true, number: number}, acc ->
MapSet.put(acc, number)
defp ordered_consensus_block_numbers(blocks_changes) when is_list(blocks_changes) do
blocks_changes
|> Enum.reduce(MapSet.new(), fn
%{consensus: true, number: number}, acc ->
MapSet.put(acc, number)
%{consensus: false}, acc ->
acc
end)
|> Enum.sort()
%{consensus: false}, acc ->
acc
end)
|> Enum.sort()
end
defp lose_consensus(_, [], _), do: {:ok, []}
defp lose_consensus(repo, ordered_consensus_block_number, %{timeout: timeout, timestamps: %{updated_at: updated_at}})
when is_list(ordered_consensus_block_number) do
query =
from(
block in Block,
@ -283,6 +300,162 @@ defmodule Explorer.Chain.Import.Runner.Blocks do
end
end
defp delete_address_token_balances(_, [], _), do: {:ok, []}
defp delete_address_token_balances(repo, ordered_consensus_block_numbers, %{timeout: timeout}) do
ordered_query =
from(address_token_balance in Address.TokenBalance,
where: address_token_balance.block_number in ^ordered_consensus_block_numbers,
select: map(address_token_balance, [:address_hash, :token_contract_address_hash, :block_number]),
# MUST match order in `Explorer.Chain.Import.Runner.Address.TokenBalances.insert` to prevent ShareLock ordering deadlocks.
order_by: [
address_token_balance.address_hash,
address_token_balance.token_contract_address_hash,
address_token_balance.block_number
],
# ensures rows remains locked while outer query is joining to it
lock: "FOR UPDATE"
)
query =
from(address_token_balance in Address.TokenBalance,
select: map(address_token_balance, [:address_hash, :token_contract_address_hash, :block_number]),
inner_join: ordered_address_token_balance in subquery(ordered_query),
on:
ordered_address_token_balance.address_hash == address_token_balance.address_hash and
ordered_address_token_balance.token_contract_address_hash ==
address_token_balance.token_contract_address_hash and
ordered_address_token_balance.block_number == address_token_balance.block_number
)
try do
{_count, deleted_address_token_balances} = repo.delete_all(query, timeout: timeout)
{:ok, deleted_address_token_balances}
rescue
postgrex_error in Postgrex.Error ->
{:error, %{exception: postgrex_error, block_numbers: ordered_consensus_block_numbers}}
end
end
defp delete_address_current_token_balances(_, [], _), do: {:ok, []}
defp delete_address_current_token_balances(repo, ordered_consensus_block_numbers, %{timeout: timeout}) do
ordered_query =
from(address_current_token_balance in Address.CurrentTokenBalance,
where: address_current_token_balance.block_number in ^ordered_consensus_block_numbers,
select: map(address_current_token_balance, [:address_hash, :token_contract_address_hash]),
# MUST match order in `Explorer.Chain.Import.Runner.Address.CurrentTokenBalances.insert` to prevent ShareLock ordering deadlocks.
order_by: [
address_current_token_balance.address_hash,
address_current_token_balance.token_contract_address_hash
],
# ensures row remains locked while outer query is joining to it
lock: "FOR UPDATE"
)
query =
from(address_current_token_balance in Address.CurrentTokenBalance,
select: map(address_current_token_balance, [:address_hash, :token_contract_address_hash]),
inner_join: ordered_address_current_token_balance in subquery(ordered_query),
on:
ordered_address_current_token_balance.address_hash == address_current_token_balance.address_hash and
ordered_address_current_token_balance.token_contract_address_hash ==
address_current_token_balance.token_contract_address_hash
)
try do
{_count, deleted_address_current_token_balances} = repo.delete_all(query, timeout: timeout)
{:ok, deleted_address_current_token_balances}
rescue
postgrex_error in Postgrex.Error ->
{:error, %{exception: postgrex_error, block_numbers: ordered_consensus_block_numbers}}
end
end
defp derive_address_current_token_balances(_, [], _), do: {:ok, []}
# sobelow_skip ["SQL.Query"]
defp derive_address_current_token_balances(repo, deleted_address_current_token_balances, %{timeout: timeout})
when is_list(deleted_address_current_token_balances) do
initial_query =
from(address_token_balance in Address.TokenBalance,
select: %{
address_hash: address_token_balance.address_hash,
token_contract_address_hash: address_token_balance.token_contract_address_hash,
block_number: max(address_token_balance.block_number)
},
group_by: [address_token_balance.address_hash, address_token_balance.token_contract_address_hash]
)
final_query =
Enum.reduce(deleted_address_current_token_balances, initial_query, fn %{
address_hash: address_hash,
token_contract_address_hash:
token_contract_address_hash
},
acc_query ->
from(address_token_balance in acc_query,
or_where:
address_token_balance.address_hash == ^address_hash and
address_token_balance.token_contract_address_hash == ^token_contract_address_hash
)
end)
new_current_token_balance_query =
from(new_current_token_balance in subquery(final_query),
inner_join: address_token_balance in Address.TokenBalance,
on:
address_token_balance.address_hash == new_current_token_balance.address_hash and
address_token_balance.token_contract_address_hash == new_current_token_balance.token_contract_address_hash and
address_token_balance.block_number == new_current_token_balance.block_number,
select: {
new_current_token_balance.address_hash,
new_current_token_balance.token_contract_address_hash,
new_current_token_balance.block_number,
address_token_balance.value,
over(min(address_token_balance.inserted_at), :w),
over(max(address_token_balance.updated_at), :w)
},
# Prevent ShareLock deadlock by matching order of `Explorer.Chain.Import.Runner.Address.CurrentTokenBalances.insert`
order_by: [new_current_token_balance.address_hash, new_current_token_balance.token_contract_address_hash],
windows: [
w: [partition_by: [address_token_balance.address_hash, address_token_balance.token_contract_address_hash]]
]
)
{select_sql, parameters} = SQL.to_sql(:all, repo, new_current_token_balance_query)
# No `ON CONFLICT` because `delete_address_current_token_balances` should have removed any conflicts.
insert_sql = """
INSERT INTO address_current_token_balances (address_hash, token_contract_address_hash, block_number, value, inserted_at, updated_at)
#{select_sql}
RETURNING address_hash, token_contract_address_hash, block_number
"""
with {:ok,
%Postgrex.Result{
columns: ["address_hash", "token_contract_address_hash", "block_number"],
command: :insert,
rows: rows
}} <- SQL.query(repo, insert_sql, parameters, timeout: timeout) do
derived_address_current_token_balances =
Enum.map(rows, fn [address_hash_bytes, token_contract_address_hash_bytes, block_number] ->
{:ok, address_hash} = Hash.Address.load(address_hash_bytes)
{:ok, token_contract_address_hash} = Hash.Address.load(token_contract_address_hash_bytes)
%{
address_hash: address_hash,
token_contract_address_hash: token_contract_address_hash,
block_number: block_number
}
end)
{:ok, derived_address_current_token_balances}
end
end
# `block_rewards` are linked to `blocks.hash`, but fetched by `blocks.number`, so when a block with the same number is
# inserted, the old block rewards need to be deleted, so that the old and new rewards aren't combined.
defp delete_rewards(repo, blocks_changes, %{timeout: timeout}) do

Loading…
Cancel
Save