Merge pull request #2687 from poanetwork/ab-remove-non-conconsensus-data

remove non-consensus data when inserting new consensus blocks
ab-add-block-number-transactions-index
Victor Baranov 5 years ago committed by GitHub
commit 84737b1417
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 8
      CHANGELOG.md
  2. 116
      apps/explorer/lib/explorer/chain/import/runner/blocks.ex
  3. 7
      apps/explorer/priv/repo/migrations/20190910170703_create_indexes_for_block_number_in_token_transfers_and_transactions.exs
  4. 90
      apps/explorer/test/explorer/chain/import/runner/blocks_test.exs

@ -1,16 +1,18 @@
## Current ## Current
### Features ### Features
- [#2665](https://github.com/poanetwork/blockscout/pull/2665) - new menu layout for mobile devices - [#2679](https://github.com/poanetwork/blockscout/pull/2679) - added fixed height for card chain blocks and card chain transactions
- [#2679](https://github.com/poanetwork/blockscout/pull/2679) - added fixed height for card chain blocks and card chain transactions
- [#2678](https://github.com/poanetwork/blockscout/pull/2678) - fixed dashboard banner height bug - [#2678](https://github.com/poanetwork/blockscout/pull/2678) - fixed dashboard banner height bug
- [#2672](https://github.com/poanetwork/blockscout/pull/2672) - added new theme for xUSDT - [#2672](https://github.com/poanetwork/blockscout/pull/2672) - added new theme for xUSDT
- [#2665](https://github.com/poanetwork/blockscout/pull/2665) - new menu layout for mobile devices
- [#2663](https://github.com/poanetwork/blockscout/pull/2663) - Fetch address counters in parallel - [#2663](https://github.com/poanetwork/blockscout/pull/2663) - Fetch address counters in parallel
### Fixes ### Fixes
- [#2687](https://github.com/poanetwork/blockscout/pull/2687) - remove non-consensus token transfers, logs when inserting new consensus blocks
- [#2684](https://github.com/poanetwork/blockscout/pull/2684) - do not filter pending logs - [#2684](https://github.com/poanetwork/blockscout/pull/2684) - do not filter pending logs
- [#2682](https://github.com/poanetwork/blockscout/pull/2682) - Use Task.start instead of Task.async in caches - [#2682](https://github.com/poanetwork/blockscout/pull/2682) - Use Task.start instead of Task.async in caches
- [#2671](https://github.com/poanetwork/blockscout/pull/2671) - fixed buttons color at smart contract section - [#2671](https://github.com/poanetwork/blockscout/pull/2671) - fixed buttons color at smart contract section
### Chore ### Chore

@ -9,7 +9,7 @@ defmodule Explorer.Chain.Import.Runner.Blocks do
alias Ecto.Adapters.SQL alias Ecto.Adapters.SQL
alias Ecto.{Changeset, Multi, Repo} alias Ecto.{Changeset, Multi, Repo}
alias Explorer.Chain.{Address, Block, Hash, Import, InternalTransaction, Transaction} alias Explorer.Chain.{Address, Block, Hash, Import, InternalTransaction, Log, TokenTransfer, Transaction}
alias Explorer.Chain.Block.Reward alias Explorer.Chain.Block.Reward
alias Explorer.Chain.Import.Runner alias Explorer.Chain.Import.Runner
alias Explorer.Chain.Import.Runner.Address.CurrentTokenBalances alias Explorer.Chain.Import.Runner.Address.CurrentTokenBalances
@ -58,6 +58,31 @@ defmodule Explorer.Chain.Import.Runner.Blocks do
where_forked: where_forked where_forked: where_forked
}) })
end) end)
|> Multi.run(:lose_consensus, fn repo, _ ->
lose_consensus(repo, ordered_consensus_block_numbers, insert_options)
end)
|> Multi.run(:lose_invalid_neighbour_consensus, fn repo, _ ->
lose_invalid_neighbour_consensus(repo, where_invalid_neighbour, insert_options)
end)
|> Multi.run(:remove_nonconsensus_data, fn repo,
%{
lose_consensus: lost_consensus_blocks,
lose_invalid_neighbour_consensus: lost_consensus_neighbours
} ->
nonconsensus_block_numbers =
(lost_consensus_blocks ++ lost_consensus_neighbours)
|> Enum.map(fn %{number: number} ->
number
end)
|> Enum.sort()
|> Enum.dedup()
remove_nonconsensus_data(
repo,
nonconsensus_block_numbers,
insert_options
)
end)
# MUST be after `:derive_transaction_forks`, which depends on values in `transactions` table # MUST be after `:derive_transaction_forks`, which depends on values in `transactions` table
|> Multi.run(:fork_transactions, fn repo, _ -> |> Multi.run(:fork_transactions, fn repo, _ ->
fork_transactions(%{ fork_transactions(%{
@ -67,12 +92,6 @@ defmodule Explorer.Chain.Import.Runner.Blocks do
where_forked: where_forked where_forked: where_forked
}) })
end) end)
|> Multi.run(:lose_consensus, fn repo, _ ->
lose_consensus(repo, ordered_consensus_block_numbers, insert_options)
end)
|> Multi.run(:lose_invalid_neighbour_consensus, fn repo, _ ->
lose_invalid_neighbour_consensus(repo, where_invalid_neighbour, insert_options)
end)
|> Multi.run(:delete_address_token_balances, fn repo, _ -> |> Multi.run(:delete_address_token_balances, fn repo, _ ->
delete_address_token_balances(repo, ordered_consensus_block_numbers, insert_options) delete_address_token_balances(repo, ordered_consensus_block_numbers, insert_options)
end) end)
@ -342,6 +361,89 @@ defmodule Explorer.Chain.Import.Runner.Blocks do
end end
end end
defp remove_nonconsensus_data(
repo,
nonconsensus_block_numbers,
insert_options
) do
with {:ok, deleted_token_transfers} <-
remove_nonconsensus_token_transfers(repo, nonconsensus_block_numbers, insert_options),
{:ok, deleted_logs} <- remove_nonconsensus_logs(repo, nonconsensus_block_numbers, insert_options) do
{:ok, %{token_transfers: deleted_token_transfers, logs: deleted_logs}}
end
end
defp remove_nonconsensus_token_transfers(repo, nonconsensus_block_numbers, %{timeout: timeout}) do
ordered_token_transfers =
from(token_transfer in TokenTransfer,
where: token_transfer.block_number in ^nonconsensus_block_numbers,
select: map(token_transfer, [:transaction_hash, :log_index]),
# Enforce TokenTransfer ShareLocks order (see docs: sharelocks.md)
order_by: [
token_transfer.transaction_hash,
token_transfer.log_index
],
lock: "FOR UPDATE"
)
query =
from(token_transfer in TokenTransfer,
select: map(token_transfer, [:transaction_hash, :log_index]),
inner_join: ordered_token_transfer in subquery(ordered_token_transfers),
on:
ordered_token_transfer.transaction_hash ==
token_transfer.transaction_hash and
ordered_token_transfer.log_index == token_transfer.log_index
)
try do
{_count, deleted_token_transfers} = repo.delete_all(query, timeout: timeout)
{:ok, deleted_token_transfers}
rescue
postgrex_error in Postgrex.Error ->
{:error, %{exception: postgrex_error, block_numbers: nonconsensus_block_numbers}}
end
end
defp remove_nonconsensus_logs(repo, nonconsensus_block_numbers, %{timeout: timeout}) do
transaction_query =
from(transaction in Transaction,
where: transaction.block_number in ^nonconsensus_block_numbers,
select: map(transaction, [:hash]),
order_by: transaction.hash
)
ordered_logs =
from(log in Log,
inner_join: transaction in subquery(transaction_query),
on: log.transaction_hash == transaction.hash,
select: map(log, [:transaction_hash, :index]),
# Enforce Log ShareLocks order (see docs: sharelocks.md)
order_by: [
log.transaction_hash,
log.index
],
lock: "FOR UPDATE OF l0"
)
query =
from(log in Log,
select: map(log, [:transaction_hash, :index]),
inner_join: ordered_log in subquery(ordered_logs),
on: ordered_log.transaction_hash == log.transaction_hash and ordered_log.index == log.index
)
try do
{_count, deleted_logs} = repo.delete_all(query, timeout: timeout)
{:ok, deleted_logs}
rescue
postgrex_error in Postgrex.Error ->
{:error, %{exception: postgrex_error, block_numbers: nonconsensus_block_numbers}}
end
end
defp delete_address_token_balances(_, [], _), do: {:ok, []} defp delete_address_token_balances(_, [], _), do: {:ok, []}
defp delete_address_token_balances(repo, ordered_consensus_block_numbers, %{timeout: timeout}) do defp delete_address_token_balances(repo, ordered_consensus_block_numbers, %{timeout: timeout}) do

@ -0,0 +1,7 @@
defmodule Explorer.Repo.Migrations.CreateIndexesForBlockNumberInTokenTransfersAndTransactions do
use Ecto.Migration
def change do
create_if_not_exists(index(:token_transfers, [:block_number]))
end
end

@ -7,13 +7,14 @@ defmodule Explorer.Chain.Import.Runner.BlocksTest do
alias Ecto.Multi alias Ecto.Multi
alias Explorer.Chain.Import.Runner.{Blocks, Transactions} alias Explorer.Chain.Import.Runner.{Blocks, Transactions}
alias Explorer.Chain.{Address, Block, Transaction} alias Explorer.Chain.{Address, Block, Log, Transaction, TokenTransfer}
alias Explorer.Chain alias Explorer.Chain
alias Explorer.Repo alias Explorer.Repo
describe "run/1" do describe "run/1" do
setup do setup do
block = insert(:block, consensus: true) miner = insert(:address)
block = params_for(:block, consensus: true, miner_hash: miner.hash)
timestamp = DateTime.utc_now() timestamp = DateTime.utc_now()
options = %{timestamps: %{inserted_at: timestamp, updated_at: timestamp}} options = %{timestamps: %{inserted_at: timestamp, updated_at: timestamp}}
@ -22,9 +23,11 @@ defmodule Explorer.Chain.Import.Runner.BlocksTest do
end end
test "derive_transaction_forks replaces hash on conflicting (uncle_hash, index)", %{ test "derive_transaction_forks replaces hash on conflicting (uncle_hash, index)", %{
consensus_block: %Block{hash: block_hash, miner_hash: miner_hash, number: block_number} = consensus_block, consensus_block: %{hash: block_hash, miner_hash: miner_hash, number: block_number},
options: options options: options
} do } do
consensus_block = insert(:block, %{hash: block_hash, number: block_number})
transaction = transaction =
:transaction :transaction
|> insert() |> insert()
@ -81,7 +84,7 @@ defmodule Explorer.Chain.Import.Runner.BlocksTest do
end end
test "delete_address_current_token_balances deletes rows with matching block number when consensus is true", test "delete_address_current_token_balances deletes rows with matching block number when consensus is true",
%{consensus_block: %Block{number: block_number} = block, options: options} do %{consensus_block: %{number: block_number} = block, options: options} do
%Address.CurrentTokenBalance{address_hash: address_hash, token_contract_address_hash: token_contract_address_hash} = %Address.CurrentTokenBalance{address_hash: address_hash, token_contract_address_hash: token_contract_address_hash} =
insert(:address_current_token_balance, block_number: block_number) insert(:address_current_token_balance, block_number: block_number)
@ -98,7 +101,7 @@ defmodule Explorer.Chain.Import.Runner.BlocksTest do
end end
test "delete_address_current_token_balances does not delete rows with matching block number when consensus is false", test "delete_address_current_token_balances does not delete rows with matching block number when consensus is false",
%{consensus_block: %Block{number: block_number} = block, options: options} do %{consensus_block: %{number: block_number} = block, options: options} do
%Address.CurrentTokenBalance{} = insert(:address_current_token_balance, block_number: block_number) %Address.CurrentTokenBalance{} = insert(:address_current_token_balance, block_number: block_number)
count = 1 count = 1
@ -113,8 +116,69 @@ defmodule Explorer.Chain.Import.Runner.BlocksTest do
assert count(Address.CurrentTokenBalance) == count assert count(Address.CurrentTokenBalance) == count
end end
test "remove_nonconsensus_data deletes token transfer rows with matching block number when new consensus block is inserted",
%{consensus_block: %{number: block_number} = block, options: options} do
insert(:block, number: block_number, consensus: true)
%TokenTransfer{transaction_hash: transaction_hash, log_index: log_index} =
insert(:token_transfer, block_number: block_number, transaction: insert(:transaction))
assert count(TokenTransfer) == 1
assert {:ok,
%{
remove_nonconsensus_data: %{
token_transfers: [
%{transaction_hash: ^transaction_hash, log_index: ^log_index}
]
}
}} = run_block_consensus_change(block, true, options)
assert count(TokenTransfer) == 0
end
test "remove_nonconsensus_data does not delete token transfer rows with matching block number when new consensus block wasn't inserted",
%{consensus_block: %{number: block_number} = block, options: options} do
insert(:token_transfer, block_number: block_number, transaction: insert(:transaction))
count = 1
assert count(TokenTransfer) == count
assert {:ok,
%{
remove_nonconsensus_data: %{
token_transfers: []
}
}} = run_block_consensus_change(block, false, options)
assert count(TokenTransfer) == count
end
test "remove_nonconsensus_data deletes nonconsensus logs", %{
consensus_block: %{number: block_number} = block,
options: options
} do
old_block = insert(:block, number: block_number, consensus: true)
forked_transaction = :transaction |> insert() |> with_block(old_block)
%Log{transaction_hash: hash, index: index} = insert(:log, transaction: forked_transaction)
assert count(Log) == 1
assert {:ok,
%{
remove_nonconsensus_data: %{
logs: [
%{transaction_hash: ^hash, index: ^index}
]
}
}} = run_block_consensus_change(block, true, options)
assert count(Log) == 0
end
test "derive_address_current_token_balances inserts rows if there is an address_token_balance left for the rows deleted by delete_address_current_token_balances", test "derive_address_current_token_balances inserts rows if there is an address_token_balance left for the rows deleted by delete_address_current_token_balances",
%{consensus_block: %Block{number: block_number} = block, options: options} do %{consensus_block: %{number: block_number} = block, options: options} do
token = insert(:token) token = insert(:token)
token_contract_address_hash = token.contract_address_hash token_contract_address_hash = token.contract_address_hash
@ -172,7 +236,7 @@ defmodule Explorer.Chain.Import.Runner.BlocksTest do
end end
test "a non-holder reverting to a holder increases the holder_count", test "a non-holder reverting to a holder increases the holder_count",
%{consensus_block: %Block{hash: block_hash, miner_hash: miner_hash, number: block_number}, options: options} do %{consensus_block: %{hash: block_hash, miner_hash: miner_hash, number: block_number}, options: options} do
token = insert(:token) token = insert(:token)
token_contract_address_hash = token.contract_address_hash token_contract_address_hash = token.contract_address_hash
@ -204,7 +268,7 @@ defmodule Explorer.Chain.Import.Runner.BlocksTest do
end end
test "a holder reverting to a non-holder decreases the holder_count", test "a holder reverting to a non-holder decreases the holder_count",
%{consensus_block: %Block{hash: block_hash, miner_hash: miner_hash, number: block_number}, options: options} do %{consensus_block: %{hash: block_hash, miner_hash: miner_hash, number: block_number}, options: options} do
token = insert(:token) token = insert(:token)
token_contract_address_hash = token.contract_address_hash token_contract_address_hash = token.contract_address_hash
@ -236,7 +300,7 @@ defmodule Explorer.Chain.Import.Runner.BlocksTest do
end end
test "a non-holder becoming and a holder becoming while a holder becomes a non-holder cancels out and holder_count does not change", test "a non-holder becoming and a holder becoming while a holder becomes a non-holder cancels out and holder_count does not change",
%{consensus_block: %Block{number: block_number} = block, options: options} do %{consensus_block: %{number: block_number} = block, options: options} do
token = insert(:token) token = insert(:token)
token_contract_address_hash = token.contract_address_hash token_contract_address_hash = token.contract_address_hash
@ -262,7 +326,8 @@ defmodule Explorer.Chain.Import.Runner.BlocksTest do
# Regression test for https://github.com/poanetwork/blockscout/issues/1644 # Regression test for https://github.com/poanetwork/blockscout/issues/1644
test "discards neighbouring blocks if they aren't related to the current one because of reorg and/or import timeout", test "discards neighbouring blocks if they aren't related to the current one because of reorg and/or import timeout",
%{consensus_block: %Block{number: block_number, hash: block_hash, miner_hash: miner_hash}, options: options} do %{consensus_block: %{number: block_number, hash: block_hash, miner_hash: miner_hash}, options: options} do
insert(:block, %{number: block_number, hash: block_hash})
old_block1 = params_for(:block, miner_hash: miner_hash, parent_hash: block_hash, number: block_number + 1) old_block1 = params_for(:block, miner_hash: miner_hash, parent_hash: block_hash, number: block_number + 1)
new_block1 = params_for(:block, miner_hash: miner_hash, parent_hash: block_hash, number: block_number + 1) new_block1 = params_for(:block, miner_hash: miner_hash, parent_hash: block_hash, number: block_number + 1)
@ -286,7 +351,8 @@ defmodule Explorer.Chain.Import.Runner.BlocksTest do
# Regression test for https://github.com/poanetwork/blockscout/issues/1911 # Regression test for https://github.com/poanetwork/blockscout/issues/1911
test "forces block refetch if transaction is re-collated in a different block", test "forces block refetch if transaction is re-collated in a different block",
%{consensus_block: %Block{number: block_number, hash: block_hash, miner_hash: miner_hash}, options: options} do %{consensus_block: %{number: block_number, hash: block_hash, miner_hash: miner_hash}, options: options} do
insert(:block, %{number: block_number, hash: block_hash})
new_block1 = params_for(:block, miner_hash: miner_hash, parent_hash: block_hash, number: block_number + 1) new_block1 = params_for(:block, miner_hash: miner_hash, parent_hash: block_hash, number: block_number + 1)
new_block2 = params_for(:block, miner_hash: miner_hash, parent_hash: new_block1.hash, number: block_number + 2) new_block2 = params_for(:block, miner_hash: miner_hash, parent_hash: new_block1.hash, number: block_number + 2)
@ -365,7 +431,7 @@ defmodule Explorer.Chain.Import.Runner.BlocksTest do
end end
defp run_block_consensus_change( defp run_block_consensus_change(
%Block{hash: block_hash, miner_hash: miner_hash, number: block_number}, %{hash: block_hash, miner_hash: miner_hash, number: block_number},
consensus, consensus,
options options
) do ) do

Loading…
Cancel
Save