Merge pull request #1323 from poanetwork/1320

Update `address_current_token_balances` and `address_token_balances` to remove those referencing replaced block numbers during reorgs
pull/1327/head
Luke Imhoff 6 years ago committed by GitHub
commit a5147a639d
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 203
      apps/explorer/lib/explorer/chain/import/runner/blocks.ex
  2. 155
      apps/explorer/test/explorer/chain/import/runner/blocks_test.exs
  3. 251
      apps/explorer/test/explorer/chain/import_test.exs

@ -5,11 +5,11 @@ defmodule Explorer.Chain.Import.Runner.Blocks do
require Ecto.Query require Ecto.Query
import Ecto.Query, only: [from: 2, select: 2, update: 2] import Ecto.Query, only: [from: 2, select: 2, subquery: 1, update: 2]
alias Ecto.Adapters.SQL alias Ecto.Adapters.SQL
alias Ecto.{Changeset, Multi, Repo} alias Ecto.{Changeset, Multi, Repo}
alias Explorer.Chain.{Block, Import, InternalTransaction, Transaction} alias Explorer.Chain.{Address, Block, Hash, Import, InternalTransaction, Transaction}
alias Explorer.Chain.Block.Reward alias Explorer.Chain.Block.Reward
alias Explorer.Chain.Import.Runner alias Explorer.Chain.Import.Runner
@ -43,6 +43,7 @@ defmodule Explorer.Chain.Import.Runner.Blocks do
|> Map.put_new(:timeout, @timeout) |> Map.put_new(:timeout, @timeout)
|> Map.put(:timestamps, timestamps) |> Map.put(:timestamps, timestamps)
ordered_consensus_block_numbers = ordered_consensus_block_numbers(changes_list)
where_forked = where_forked(changes_list) where_forked = where_forked(changes_list)
multi multi
@ -64,9 +65,22 @@ defmodule Explorer.Chain.Import.Runner.Blocks do
}) })
end) end)
|> Multi.run(:lose_consenus, fn repo, _ -> |> Multi.run(:lose_consenus, fn repo, _ ->
lose_consensus(repo, changes_list, insert_options) lose_consensus(repo, ordered_consensus_block_numbers, insert_options)
end) end)
|> Multi.run(:remove_rewards, fn repo, _ -> |> Multi.run(:delete_address_token_balances, fn repo, _ ->
delete_address_token_balances(repo, ordered_consensus_block_numbers, insert_options)
end)
|> Multi.run(:delete_address_current_token_balances, fn repo, _ ->
delete_address_current_token_balances(repo, ordered_consensus_block_numbers, insert_options)
end)
|> Multi.run(:derive_address_current_token_balances, fn repo,
%{
delete_address_current_token_balances:
deleted_address_current_token_balances
} ->
derive_address_current_token_balances(repo, deleted_address_current_token_balances, insert_options)
end)
|> Multi.run(:delete_rewards, fn repo, _ ->
delete_rewards(repo, changes_list, insert_options) delete_rewards(repo, changes_list, insert_options)
end) end)
|> Multi.run(:blocks, fn repo, _ -> |> Multi.run(:blocks, fn repo, _ ->
@ -247,19 +261,22 @@ defmodule Explorer.Chain.Import.Runner.Blocks do
) )
end end
defp lose_consensus(repo, blocks_changes, %{timeout: timeout, timestamps: %{updated_at: updated_at}}) defp ordered_consensus_block_numbers(blocks_changes) when is_list(blocks_changes) do
when is_list(blocks_changes) do blocks_changes
ordered_consensus_block_number = |> Enum.reduce(MapSet.new(), fn
blocks_changes %{consensus: true, number: number}, acc ->
|> Enum.reduce(MapSet.new(), fn MapSet.put(acc, number)
%{consensus: true, number: number}, acc ->
MapSet.put(acc, number)
%{consensus: false}, acc -> %{consensus: false}, acc ->
acc acc
end) end)
|> Enum.sort() |> Enum.sort()
end
defp lose_consensus(_, [], _), do: {:ok, []}
defp lose_consensus(repo, ordered_consensus_block_number, %{timeout: timeout, timestamps: %{updated_at: updated_at}})
when is_list(ordered_consensus_block_number) do
query = query =
from( from(
block in Block, block in Block,
@ -283,6 +300,162 @@ defmodule Explorer.Chain.Import.Runner.Blocks do
end end
end end
defp delete_address_token_balances(_, [], _), do: {:ok, []}
defp delete_address_token_balances(repo, ordered_consensus_block_numbers, %{timeout: timeout}) do
ordered_query =
from(address_token_balance in Address.TokenBalance,
where: address_token_balance.block_number in ^ordered_consensus_block_numbers,
select: map(address_token_balance, [:address_hash, :token_contract_address_hash, :block_number]),
# MUST match order in `Explorer.Chain.Import.Runner.Address.TokenBalances.insert` to prevent ShareLock ordering deadlocks.
order_by: [
address_token_balance.address_hash,
address_token_balance.token_contract_address_hash,
address_token_balance.block_number
],
# ensures rows remains locked while outer query is joining to it
lock: "FOR UPDATE"
)
query =
from(address_token_balance in Address.TokenBalance,
select: map(address_token_balance, [:address_hash, :token_contract_address_hash, :block_number]),
inner_join: ordered_address_token_balance in subquery(ordered_query),
on:
ordered_address_token_balance.address_hash == address_token_balance.address_hash and
ordered_address_token_balance.token_contract_address_hash ==
address_token_balance.token_contract_address_hash and
ordered_address_token_balance.block_number == address_token_balance.block_number
)
try do
{_count, deleted_address_token_balances} = repo.delete_all(query, timeout: timeout)
{:ok, deleted_address_token_balances}
rescue
postgrex_error in Postgrex.Error ->
{:error, %{exception: postgrex_error, block_numbers: ordered_consensus_block_numbers}}
end
end
defp delete_address_current_token_balances(_, [], _), do: {:ok, []}
defp delete_address_current_token_balances(repo, ordered_consensus_block_numbers, %{timeout: timeout}) do
ordered_query =
from(address_current_token_balance in Address.CurrentTokenBalance,
where: address_current_token_balance.block_number in ^ordered_consensus_block_numbers,
select: map(address_current_token_balance, [:address_hash, :token_contract_address_hash]),
# MUST match order in `Explorer.Chain.Import.Runner.Address.CurrentTokenBalances.insert` to prevent ShareLock ordering deadlocks.
order_by: [
address_current_token_balance.address_hash,
address_current_token_balance.token_contract_address_hash
],
# ensures row remains locked while outer query is joining to it
lock: "FOR UPDATE"
)
query =
from(address_current_token_balance in Address.CurrentTokenBalance,
select: map(address_current_token_balance, [:address_hash, :token_contract_address_hash]),
inner_join: ordered_address_current_token_balance in subquery(ordered_query),
on:
ordered_address_current_token_balance.address_hash == address_current_token_balance.address_hash and
ordered_address_current_token_balance.token_contract_address_hash ==
address_current_token_balance.token_contract_address_hash
)
try do
{_count, deleted_address_current_token_balances} = repo.delete_all(query, timeout: timeout)
{:ok, deleted_address_current_token_balances}
rescue
postgrex_error in Postgrex.Error ->
{:error, %{exception: postgrex_error, block_numbers: ordered_consensus_block_numbers}}
end
end
defp derive_address_current_token_balances(_, [], _), do: {:ok, []}
# sobelow_skip ["SQL.Query"]
defp derive_address_current_token_balances(repo, deleted_address_current_token_balances, %{timeout: timeout})
when is_list(deleted_address_current_token_balances) do
initial_query =
from(address_token_balance in Address.TokenBalance,
select: %{
address_hash: address_token_balance.address_hash,
token_contract_address_hash: address_token_balance.token_contract_address_hash,
block_number: max(address_token_balance.block_number)
},
group_by: [address_token_balance.address_hash, address_token_balance.token_contract_address_hash]
)
final_query =
Enum.reduce(deleted_address_current_token_balances, initial_query, fn %{
address_hash: address_hash,
token_contract_address_hash:
token_contract_address_hash
},
acc_query ->
from(address_token_balance in acc_query,
or_where:
address_token_balance.address_hash == ^address_hash and
address_token_balance.token_contract_address_hash == ^token_contract_address_hash
)
end)
new_current_token_balance_query =
from(new_current_token_balance in subquery(final_query),
inner_join: address_token_balance in Address.TokenBalance,
on:
address_token_balance.address_hash == new_current_token_balance.address_hash and
address_token_balance.token_contract_address_hash == new_current_token_balance.token_contract_address_hash and
address_token_balance.block_number == new_current_token_balance.block_number,
select: {
new_current_token_balance.address_hash,
new_current_token_balance.token_contract_address_hash,
new_current_token_balance.block_number,
address_token_balance.value,
over(min(address_token_balance.inserted_at), :w),
over(max(address_token_balance.updated_at), :w)
},
# Prevent ShareLock deadlock by matching order of `Explorer.Chain.Import.Runner.Address.CurrentTokenBalances.insert`
order_by: [new_current_token_balance.address_hash, new_current_token_balance.token_contract_address_hash],
windows: [
w: [partition_by: [address_token_balance.address_hash, address_token_balance.token_contract_address_hash]]
]
)
{select_sql, parameters} = SQL.to_sql(:all, repo, new_current_token_balance_query)
# No `ON CONFLICT` because `delete_address_current_token_balances` should have removed any conflicts.
insert_sql = """
INSERT INTO address_current_token_balances (address_hash, token_contract_address_hash, block_number, value, inserted_at, updated_at)
#{select_sql}
RETURNING address_hash, token_contract_address_hash, block_number
"""
with {:ok,
%Postgrex.Result{
columns: ["address_hash", "token_contract_address_hash", "block_number"],
command: :insert,
rows: rows
}} <- SQL.query(repo, insert_sql, parameters, timeout: timeout) do
derived_address_current_token_balances =
Enum.map(rows, fn [address_hash_bytes, token_contract_address_hash_bytes, block_number] ->
{:ok, address_hash} = Hash.Address.load(address_hash_bytes)
{:ok, token_contract_address_hash} = Hash.Address.load(token_contract_address_hash_bytes)
%{
address_hash: address_hash,
token_contract_address_hash: token_contract_address_hash,
block_number: block_number
}
end)
{:ok, derived_address_current_token_balances}
end
end
# `block_rewards` are linked to `blocks.hash`, but fetched by `blocks.number`, so when a block with the same number is # `block_rewards` are linked to `blocks.hash`, but fetched by `blocks.number`, so when a block with the same number is
# inserted, the old block rewards need to be deleted, so that the old and new rewards aren't combined. # inserted, the old block rewards need to be deleted, so that the old and new rewards aren't combined.
defp delete_rewards(repo, blocks_changes, %{timeout: timeout}) do defp delete_rewards(repo, blocks_changes, %{timeout: timeout}) do

@ -5,34 +5,34 @@ defmodule Explorer.Chain.Import.Runner.BlocksTest do
alias Ecto.Multi alias Ecto.Multi
alias Explorer.Chain.Import.Runner.{Blocks, Transaction} alias Explorer.Chain.Import.Runner.{Blocks, Transaction}
alias Explorer.Chain.{Block, Transaction} alias Explorer.Chain.{Address, Block, Transaction}
alias Explorer.Repo alias Explorer.Repo
describe "run/1" do describe "run/1" do
setup do setup do
block = insert(:block, consensus: true) block = insert(:block, consensus: true)
transaction = timestamp = DateTime.utc_now()
:transaction options = %{timestamps: %{inserted_at: timestamp, updated_at: timestamp}}
|> insert()
|> with_block(block)
%{consensus_block: block, transaction: transaction} %{consensus_block: block, options: options}
end end
test "derive_transaction_forks replaces hash on conflicting (uncle_hash, index)", %{ test "derive_transaction_forks replaces hash on conflicting (uncle_hash, index)", %{
consensus_block: %Block{hash: block_hash, miner_hash: miner_hash, number: block_number}, consensus_block: %Block{hash: block_hash, miner_hash: miner_hash, number: block_number} = consensus_block,
transaction: transaction options: options
} do } do
transaction =
:transaction
|> insert()
|> with_block(consensus_block)
block_params = block_params =
params_for(:block, hash: block_hash, miner_hash: miner_hash, number: block_number, consensus: false) params_for(:block, hash: block_hash, miner_hash: miner_hash, number: block_number, consensus: false)
%Ecto.Changeset{valid?: true, changes: block_changes} = Block.changeset(%Block{}, block_params) %Ecto.Changeset{valid?: true, changes: block_changes} = Block.changeset(%Block{}, block_params)
changes_list = [block_changes] changes_list = [block_changes]
timestamp = DateTime.utc_now()
options = %{timestamps: %{inserted_at: timestamp, updated_at: timestamp}}
assert Repo.aggregate(from(transaction in Transaction, where: is_nil(transaction.block_number)), :count, :hash) == assert Repo.aggregate(from(transaction in Transaction, where: is_nil(transaction.block_number)), :count, :hash) ==
0 0
@ -76,6 +76,139 @@ defmodule Explorer.Chain.Import.Runner.BlocksTest do
assert Repo.one!(from(transaction_fork in Transaction.Fork, select: "ctid")) == ctid, assert Repo.one!(from(transaction_fork in Transaction.Fork, select: "ctid")) == ctid,
"Tuple was written even though it is not distinct" "Tuple was written even though it is not distinct"
end end
test "delete_address_current_token_balances deletes rows with matching block number when consensus is true",
%{consensus_block: %Block{hash: block_hash, miner_hash: miner_hash, number: block_number}, options: options} do
%Address.CurrentTokenBalance{address_hash: address_hash, token_contract_address_hash: token_contract_address_hash} =
insert(:address_current_token_balance, block_number: block_number)
block_params = params_for(:block, hash: block_hash, miner_hash: miner_hash, number: block_number, consensus: true)
%Ecto.Changeset{valid?: true, changes: block_changes} = Block.changeset(%Block{}, block_params)
changes_list = [block_changes]
assert count(Address.CurrentTokenBalance) == 1
assert {:ok,
%{
delete_address_current_token_balances: [
%{address_hash: ^address_hash, token_contract_address_hash: ^token_contract_address_hash}
]
}} =
Multi.new()
|> Blocks.run(changes_list, options)
|> Repo.transaction()
assert count(Address.CurrentTokenBalance) == 0
end
test "delete_address_current_token_balances does not delete rows with matching block number when consensus is false",
%{consensus_block: %Block{hash: block_hash, miner_hash: miner_hash, number: block_number}, options: options} do
%Address.CurrentTokenBalance{} = insert(:address_current_token_balance, block_number: block_number)
block_params =
params_for(:block, hash: block_hash, miner_hash: miner_hash, number: block_number, consensus: false)
%Ecto.Changeset{valid?: true, changes: block_changes} = Block.changeset(%Block{}, block_params)
changes_list = [block_changes]
count = 1
assert count(Address.CurrentTokenBalance) == count
assert {:ok,
%{
delete_address_current_token_balances: []
}} =
Multi.new()
|> Blocks.run(changes_list, options)
|> Repo.transaction()
assert count(Address.CurrentTokenBalance) == count
end
test "derive_address_current_token_balances inserts rows if there is an address_token_balance left for the rows deleted by delete_address_current_token_balances",
%{consensus_block: %Block{hash: block_hash, miner_hash: miner_hash, number: block_number}, options: options} do
%Address.TokenBalance{
address_hash: address_hash,
token_contract_address_hash: token_contract_address_hash,
value: previous_value,
block_number: previous_block_number
} = insert(:token_balance, block_number: block_number - 1)
address = Repo.get(Address, address_hash)
%Address.TokenBalance{
address_hash: ^address_hash,
token_contract_address_hash: ^token_contract_address_hash,
value: current_value,
block_number: ^block_number
} =
insert(:token_balance,
address: address,
token_contract_address_hash: token_contract_address_hash,
block_number: block_number
)
refute current_value == previous_value
%Address.CurrentTokenBalance{
address_hash: ^address_hash,
token_contract_address_hash: ^token_contract_address_hash,
block_number: ^block_number,
value: ^current_value
} =
insert(:address_current_token_balance,
address: address,
token_contract_address_hash: token_contract_address_hash,
block_number: block_number,
value: current_value
)
block_params = params_for(:block, hash: block_hash, miner_hash: miner_hash, number: block_number, consensus: true)
%Ecto.Changeset{valid?: true, changes: block_changes} = Block.changeset(%Block{}, block_params)
changes_list = [block_changes]
assert count(Address.TokenBalance) == 2
assert count(Address.CurrentTokenBalance) == 1
assert {:ok,
%{
delete_address_current_token_balances: [
%{
address_hash: ^address_hash,
token_contract_address_hash: ^token_contract_address_hash
}
],
delete_address_token_balances: [
%{
address_hash: ^address_hash,
token_contract_address_hash: ^token_contract_address_hash,
block_number: ^block_number
}
],
derive_address_current_token_balances: [
%{
address_hash: ^address_hash,
token_contract_address_hash: ^token_contract_address_hash,
block_number: ^previous_block_number
}
]
}} =
Multi.new()
|> Blocks.run(changes_list, options)
|> Repo.transaction()
assert count(Address.TokenBalance) == 1
assert count(Address.CurrentTokenBalance) == 1
assert %Address.CurrentTokenBalance{block_number: ^previous_block_number, value: ^previous_value} =
Repo.get_by(Address.CurrentTokenBalance,
address_hash: address_hash,
token_contract_address_hash: token_contract_address_hash
)
end
end end
defp count(schema) do defp count(schema) do

@ -1828,5 +1828,256 @@ defmodule Explorer.Chain.ImportTest do
assert transaction_after.error == nil assert transaction_after.error == nil
assert transaction_after.status == nil assert transaction_after.status == nil
end end
test "address_token_balances and address_current_token_balances are deleted during reorgs" do
%Block{number: block_number} = insert(:block, consensus: true)
value_before = Decimal.new(1)
%Address{hash: address_hash} = address = insert(:address)
%Address.TokenBalance{
address_hash: ^address_hash,
token_contract_address_hash: token_contract_address_hash,
block_number: ^block_number
} = insert(:token_balance, address: address, block_number: block_number, value: value_before)
%Address.CurrentTokenBalance{
address_hash: ^address_hash,
token_contract_address_hash: ^token_contract_address_hash,
block_number: ^block_number
} =
insert(:address_current_token_balance,
address: address,
token_contract_address_hash: token_contract_address_hash,
block_number: block_number,
value: value_before
)
miner_hash_after = address_hash()
from_address_hash_after = address_hash()
block_hash_after = block_hash()
assert {:ok, _} =
Import.all(%{
addresses: %{
params: [
%{hash: miner_hash_after},
%{hash: from_address_hash_after}
]
},
blocks: %{
params: [
%{
consensus: true,
difficulty: 1,
gas_limit: 1,
gas_used: 1,
hash: block_hash_after,
miner_hash: miner_hash_after,
nonce: 1,
number: block_number,
parent_hash: block_hash(),
size: 1,
timestamp: Timex.parse!("2019-01-01T02:00:00Z", "{ISO:Extended:Z}"),
total_difficulty: 1
}
]
}
})
assert is_nil(
Repo.get_by(Address.CurrentTokenBalance,
address_hash: address_hash,
token_contract_address_hash: token_contract_address_hash
)
)
assert is_nil(
Repo.get_by(Address.TokenBalance,
address_hash: address_hash,
token_contract_address_hash: token_contract_address_hash,
block_number: block_number
)
)
end
test "address_current_token_balances is derived during reorgs" do
%Block{number: block_number} = insert(:block, consensus: true)
previous_block_number = block_number - 1
%Address.TokenBalance{
address_hash: address_hash,
token_contract_address_hash: token_contract_address_hash,
value: previous_value,
block_number: previous_block_number
} = insert(:token_balance, block_number: previous_block_number)
address = Repo.get(Address, address_hash)
%Address.TokenBalance{
address_hash: ^address_hash,
token_contract_address_hash: token_contract_address_hash,
value: current_value,
block_number: ^block_number
} =
insert(:token_balance,
address: address,
token_contract_address_hash: token_contract_address_hash,
block_number: block_number
)
refute current_value == previous_value
%Address.CurrentTokenBalance{
address_hash: ^address_hash,
token_contract_address_hash: ^token_contract_address_hash,
block_number: ^block_number
} =
insert(:address_current_token_balance,
address: address,
token_contract_address_hash: token_contract_address_hash,
block_number: block_number,
value: current_value
)
miner_hash_after = address_hash()
from_address_hash_after = address_hash()
block_hash_after = block_hash()
assert {:ok, _} =
Import.all(%{
addresses: %{
params: [
%{hash: miner_hash_after},
%{hash: from_address_hash_after}
]
},
blocks: %{
params: [
%{
consensus: true,
difficulty: 1,
gas_limit: 1,
gas_used: 1,
hash: block_hash_after,
miner_hash: miner_hash_after,
nonce: 1,
number: block_number,
parent_hash: block_hash(),
size: 1,
timestamp: Timex.parse!("2019-01-01T02:00:00Z", "{ISO:Extended:Z}"),
total_difficulty: 1
}
]
}
})
assert %Address.CurrentTokenBalance{block_number: ^previous_block_number, value: ^previous_value} =
Repo.get_by(Address.CurrentTokenBalance,
address_hash: address_hash,
token_contract_address_hash: token_contract_address_hash
)
assert is_nil(
Repo.get_by(Address.TokenBalance,
address_hash: address_hash,
token_contract_address_hash: token_contract_address_hash,
block_number: block_number
)
)
end
test "address_token_balances and address_current_token_balances can be replaced during reorgs" do
%Block{number: block_number} = insert(:block, consensus: true)
value_before = Decimal.new(1)
%Address{hash: address_hash} = address = insert(:address)
%Address.TokenBalance{
address_hash: ^address_hash,
token_contract_address_hash: token_contract_address_hash,
block_number: ^block_number
} = insert(:token_balance, address: address, block_number: block_number, value: value_before)
%Address.CurrentTokenBalance{
address_hash: ^address_hash,
token_contract_address_hash: ^token_contract_address_hash,
block_number: ^block_number
} =
insert(:address_current_token_balance,
address: address,
token_contract_address_hash: token_contract_address_hash,
block_number: block_number,
value: value_before
)
miner_hash_after = address_hash()
from_address_hash_after = address_hash()
block_hash_after = block_hash()
value_after = Decimal.add(value_before, 1)
assert {:ok, _} =
Import.all(%{
addresses: %{
params: [
%{hash: address_hash},
%{hash: token_contract_address_hash},
%{hash: miner_hash_after},
%{hash: from_address_hash_after}
]
},
address_token_balances: %{
params: [
%{
address_hash: address_hash,
token_contract_address_hash: token_contract_address_hash,
block_number: block_number,
value: value_after
}
]
},
address_current_token_balances: %{
params: [
%{
address_hash: address_hash,
token_contract_address_hash: token_contract_address_hash,
block_number: block_number,
value: value_after
}
]
},
blocks: %{
params: [
%{
consensus: true,
difficulty: 1,
gas_limit: 1,
gas_used: 1,
hash: block_hash_after,
miner_hash: miner_hash_after,
nonce: 1,
number: block_number,
parent_hash: block_hash(),
size: 1,
timestamp: Timex.parse!("2019-01-01T02:00:00Z", "{ISO:Extended:Z}"),
total_difficulty: 1
}
]
}
})
assert %Address.CurrentTokenBalance{value: ^value_after} =
Repo.get_by(Address.CurrentTokenBalance,
address_hash: address_hash,
token_contract_address_hash: token_contract_address_hash
)
assert %Address.TokenBalance{value: ^value_after} =
Repo.get_by(Address.TokenBalance,
address_hash: address_hash,
token_contract_address_hash: token_contract_address_hash,
block_number: block_number
)
end
end end
end end

Loading…
Cancel
Save