Improve speed of nonconsensus data removal

Problem: removal of nonconsensus data is too inefficient and as a result blocks are imported too slow.

Solution: reformulation of deletion logic for better performance
pull/2717/head
pasqu4le 5 years ago
parent b544895f0a
commit 0944ea5cdc
No known key found for this signature in database
GPG Key ID: 8F3EE01F1DC90687
  1. 246
      apps/explorer/lib/explorer/chain/import/runner/blocks.ex
  2. 12
      apps/explorer/test/explorer/chain/import/runner/blocks_test.exs

@ -46,32 +46,14 @@ defmodule Explorer.Chain.Import.Runner.Blocks do
hashes = Enum.map(changes_list, & &1.hash)
consensus_block_numbers = consensus_block_numbers(changes_list)
where_invalid_neighbour = where_invalid_neighbour(changes_list)
# Enforce ShareLocks tables order (see docs: sharelocks.md)
multi
|> Multi.run(:acquire_blocks, fn repo, _ ->
acquire_blocks(repo, hashes, consensus_block_numbers, where_invalid_neighbour)
end)
|> Multi.run(:lose_consensus, fn repo, _ ->
lose_consensus(repo, consensus_block_numbers, insert_options)
end)
|> Multi.run(:lose_invalid_neighbour_consensus, fn repo, _ ->
lose_invalid_neighbour_consensus(repo, where_invalid_neighbour, insert_options)
end)
|> Multi.run(:nonconsensus_block_numbers, fn _repo,
%{
lose_consensus: lost_consensus_blocks,
lose_invalid_neighbour_consensus: lost_consensus_neighbours
} ->
nonconsensus_block_numbers =
(lost_consensus_blocks ++ lost_consensus_neighbours)
|> Enum.sort()
|> Enum.dedup()
{:ok, nonconsensus_block_numbers}
lose_consensus(repo, hashes, consensus_block_numbers, changes_list, insert_options)
end)
|> Multi.run(:blocks, fn repo, _ ->
# Note, needs to be executed after `lose_consensus` for lock acquisition
insert(repo, changes_list, insert_options)
end)
|> Multi.run(:uncle_fetched_block_second_degree_relations, fn repo, %{blocks: blocks} when is_list(blocks) ->
@ -101,27 +83,14 @@ defmodule Explorer.Chain.Import.Runner.Blocks do
transactions: transactions
})
end)
|> Multi.run(:remove_nonconsensus_logs, fn repo,
%{
nonconsensus_block_numbers: nonconsensus_block_numbers,
fork_transactions: transactions
} ->
remove_nonconsensus_logs(repo, nonconsensus_block_numbers, transactions, insert_options)
|> Multi.run(:remove_nonconsensus_logs, fn repo, %{derive_transaction_forks: transactions} ->
remove_nonconsensus_logs(repo, transactions, insert_options)
end)
|> Multi.run(:acquire_internal_transactions, fn repo,
%{
nonconsensus_block_numbers: nonconsensus_block_numbers,
fork_transactions: transactions
} ->
acquire_internal_transactions(repo, nonconsensus_block_numbers, hashes, transactions)
|> Multi.run(:acquire_internal_transactions, fn repo, %{derive_transaction_forks: transactions} ->
acquire_internal_transactions(repo, hashes, transactions)
end)
|> Multi.run(:remove_nonconsensus_internal_transactions, fn repo,
%{
nonconsensus_block_numbers:
nonconsensus_block_numbers,
fork_transactions: transactions
} ->
remove_nonconsensus_internal_transactions(repo, nonconsensus_block_numbers, transactions, insert_options)
|> Multi.run(:remove_nonconsensus_internal_transactions, fn repo, %{derive_transaction_forks: transactions} ->
remove_nonconsensus_internal_transactions(repo, transactions, insert_options)
end)
|> Multi.run(:internal_transaction_transaction_block_number, fn repo, _ ->
update_internal_transaction_block_number(repo, hashes)
@ -129,9 +98,8 @@ defmodule Explorer.Chain.Import.Runner.Blocks do
|> Multi.run(:acquire_contract_address_tokens, fn repo, _ ->
acquire_contract_address_tokens(repo, consensus_block_numbers)
end)
|> Multi.run(:remove_nonconsensus_token_transfers, fn repo,
%{nonconsensus_block_numbers: nonconsensus_block_numbers} ->
remove_nonconsensus_token_transfers(repo, nonconsensus_block_numbers, insert_options)
|> Multi.run(:remove_nonconsensus_token_transfers, fn repo, %{derive_transaction_forks: transactions} ->
remove_nonconsensus_token_transfers(repo, transactions, insert_options)
end)
|> Multi.run(:delete_address_token_balances, fn repo, _ ->
delete_address_token_balances(repo, consensus_block_numbers, insert_options)
@ -159,22 +127,6 @@ defmodule Explorer.Chain.Import.Runner.Blocks do
@impl Runner
def timeout, do: @timeout
defp acquire_blocks(repo, hashes, consensus_block_numbers, where_invalid_neighbour) do
query =
from(
block in where_invalid_neighbour,
or_where: block.number in ^consensus_block_numbers,
or_where: block.hash in ^hashes,
select: block.hash,
# Enforce Block ShareLocks order (see docs: sharelocks.md)
order_by: [asc: block.hash],
lock: "FOR UPDATE"
)
blocks = repo.all(query)
{:ok, blocks}
end
defp acquire_contract_address_tokens(repo, consensus_block_numbers) do
query =
from(address_current_token_balance in Address.CurrentTokenBalance,
@ -187,15 +139,12 @@ defmodule Explorer.Chain.Import.Runner.Blocks do
Tokens.acquire_contract_address_tokens(repo, contract_address_hashes)
end
defp acquire_internal_transactions(repo, nonconsensus_block_numbers, hashes, forked_transactions) do
forked_transaction_hashes = Enum.map(forked_transactions, & &1.hash)
defp acquire_internal_transactions(repo, hashes, forked_transaction_hashes) do
query =
from(internal_transaction in InternalTransaction,
join: transaction in Transaction,
on: internal_transaction.transaction_hash == transaction.hash,
where: transaction.block_number in ^nonconsensus_block_numbers,
or_where: transaction.block_hash in ^hashes,
where: transaction.block_hash in ^hashes,
or_where: transaction.hash in ^forked_transaction_hashes,
select: {internal_transaction.transaction_hash, internal_transaction.index},
# Enforce InternalTransaction ShareLocks order (see docs: sharelocks.md)
@ -229,14 +178,11 @@ defmodule Explorer.Chain.Import.Runner.Blocks do
lock: "FOR UPDATE"
)
transactions = repo.all(query)
hashes = Enum.map(transactions, & &1.hash)
update_query =
from(
t in Transaction,
where: t.hash in ^hashes,
join: s in subquery(query),
on: t.hash == s.hash,
update: [
set: [
block_hash: nil,
@ -250,17 +196,19 @@ defmodule Explorer.Chain.Import.Runner.Blocks do
updated_at: ^updated_at
]
],
select: t.hash
select: %{
block_hash: s.block_hash,
index: s.index,
hash: s.hash
}
)
try do
{_num, _res} = repo.update_all(update_query, [], timeout: timeout)
{_num, transactions} = repo.update_all(update_query, [], timeout: timeout)
{:ok, transactions}
rescue
postgrex_error in Postgrex.Error ->
{:error, %{exception: postgrex_error}}
end
{:ok, transactions}
rescue
postgrex_error in Postgrex.Error ->
{:error, %{exception: postgrex_error}}
end
defp derive_transaction_forks(%{
@ -283,7 +231,7 @@ defmodule Explorer.Chain.Import.Runner.Blocks do
# Enforce Fork ShareLocks order (see docs: sharelocks.md)
|> Enum.sort_by(&{&1.uncle_hash, &1.index})
{_total, result} =
{_total, forked_transaction} =
repo.insert_all(
Transaction.Fork,
transaction_forks,
@ -294,11 +242,11 @@ defmodule Explorer.Chain.Import.Runner.Blocks do
update: [set: [hash: fragment("EXCLUDED.hash")]],
where: fragment("EXCLUDED.hash <> ?", transaction_fork.hash)
),
returning: [:uncle_hash, :hash],
returning: [:hash],
timeout: timeout
)
{:ok, result}
{:ok, Enum.map(forked_transaction, & &1.hash)}
end
@spec insert(Repo.t(), [map()], %{
@ -364,47 +312,48 @@ defmodule Explorer.Chain.Import.Runner.Blocks do
|> Enum.map(& &1.number)
end
defp lose_consensus(_, [], _), do: {:ok, []}
defp lose_consensus(repo, consensus_block_number, %{timeout: timeout, timestamps: %{updated_at: updated_at}})
when is_list(consensus_block_number) do
# ShareLocks order already enforced by `acquire_blocks` (see docs: sharelocks.md)
{_, result} =
repo.update_all(
from(block in Block, where: block.number in ^consensus_block_number, select: block.number),
[set: [consensus: false, updated_at: updated_at]],
timeout: timeout
)
{:ok, result}
rescue
postgrex_error in Postgrex.Error ->
{:error, %{exception: postgrex_error, consensus_block_numbers: consensus_block_number}}
end
defp lose_invalid_neighbour_consensus(repo, where_invalid_neighbour, %{
defp lose_consensus(repo, hashes, consensus_block_numbers, changes_list, %{
timeout: timeout,
timestamps: %{updated_at: updated_at}
}) do
# ShareLocks order already enforced by `acquire_blocks` (see docs: sharelocks.md)
{_, result} =
acquire_query =
from(
block in where_invalid_neighbour(changes_list),
or_where: block.number in ^consensus_block_numbers,
# we also need to acquire blocks that will be upserted here, for ordering
or_where: block.hash in ^hashes,
select: block.hash,
# Enforce Block ShareLocks order (see docs: sharelocks.md)
order_by: [asc: block.hash],
lock: "FOR UPDATE"
)
{_, removed_consensus_block_hashes} =
repo.update_all(
from(block in where_invalid_neighbour, select: block.number),
from(
block in Block,
join: s in subquery(acquire_query),
on: block.hash == s.hash,
# we don't want to remove consensus from blocks that will be upserted
where: block.hash not in ^hashes,
select: block.hash
),
[set: [consensus: false, updated_at: updated_at]],
timeout: timeout
)
{:ok, result}
{:ok, removed_consensus_block_hashes}
rescue
postgrex_error in Postgrex.Error ->
{:error, %{exception: postgrex_error, where_invalid_neighbour: where_invalid_neighbour}}
{:error, %{exception: postgrex_error, consensus_block_numbers: consensus_block_numbers}}
end
defp remove_nonconsensus_token_transfers(repo, nonconsensus_block_numbers, %{timeout: timeout}) do
defp remove_nonconsensus_token_transfers(repo, forked_transaction_hashes, %{timeout: timeout}) do
ordered_token_transfers =
from(token_transfer in TokenTransfer,
where: token_transfer.block_number in ^nonconsensus_block_numbers,
select: map(token_transfer, [:transaction_hash, :log_index]),
from(
token_transfer in TokenTransfer,
where: token_transfer.transaction_hash in ^forked_transaction_hashes,
select: token_transfer.transaction_hash,
# Enforce TokenTransfer ShareLocks order (see docs: sharelocks.md)
order_by: [
token_transfer.transaction_hash,
@ -417,91 +366,60 @@ defmodule Explorer.Chain.Import.Runner.Blocks do
from(token_transfer in TokenTransfer,
select: map(token_transfer, [:transaction_hash, :log_index]),
inner_join: ordered_token_transfer in subquery(ordered_token_transfers),
on:
ordered_token_transfer.transaction_hash ==
token_transfer.transaction_hash and
ordered_token_transfer.log_index == token_transfer.log_index
on: ordered_token_transfer.transaction_hash == token_transfer.transaction_hash
)
try do
{_count, deleted_token_transfers} = repo.delete_all(query, timeout: timeout)
{_count, deleted_token_transfers} = repo.delete_all(query, timeout: timeout)
{:ok, deleted_token_transfers}
rescue
postgrex_error in Postgrex.Error ->
{:error, %{exception: postgrex_error, block_numbers: nonconsensus_block_numbers}}
end
{:ok, deleted_token_transfers}
rescue
postgrex_error in Postgrex.Error ->
{:error, %{exception: postgrex_error, transactions: forked_transaction_hashes}}
end
defp remove_nonconsensus_internal_transactions(repo, nonconsensus_block_numbers, forked_transactions, %{
timeout: timeout
}) do
forked_transaction_hashes = Enum.map(forked_transactions, & &1.hash)
transaction_query =
from(transaction in Transaction,
where: transaction.block_number in ^nonconsensus_block_numbers,
or_where: transaction.hash in ^forked_transaction_hashes,
select: map(transaction, [:hash])
)
defp remove_nonconsensus_internal_transactions(repo, forked_transaction_hashes, %{timeout: timeout}) do
query =
from(internal_transaction in InternalTransaction,
inner_join: transaction in subquery(transaction_query),
on: internal_transaction.transaction_hash == transaction.hash,
where: internal_transaction.transaction_hash in ^forked_transaction_hashes,
select: map(internal_transaction, [:transaction_hash, :index])
)
try do
# ShareLocks order already enforced by `acquire_internal_transactions` (see docs: sharelocks.md)
{_count, deleted_internal_transactions} = repo.delete_all(query, timeout: timeout)
# ShareLocks order already enforced by `acquire_internal_transactions` (see docs: sharelocks.md)
{_count, deleted_internal_transactions} = repo.delete_all(query, timeout: timeout)
{:ok, deleted_internal_transactions}
rescue
postgrex_error in Postgrex.Error ->
{:error, %{exception: postgrex_error, block_numbers: nonconsensus_block_numbers}}
end
{:ok, deleted_internal_transactions}
rescue
postgrex_error in Postgrex.Error ->
{:error, %{exception: postgrex_error, transactions: forked_transaction_hashes}}
end
defp remove_nonconsensus_logs(repo, nonconsensus_block_numbers, forked_transactions, %{timeout: timeout}) do
forked_transaction_hashes = Enum.map(forked_transactions, & &1.hash)
transaction_query =
from(transaction in Transaction,
where: transaction.block_number in ^nonconsensus_block_numbers,
or_where: transaction.hash in ^forked_transaction_hashes,
select: map(transaction, [:hash]),
order_by: transaction.hash
)
defp remove_nonconsensus_logs(repo, forked_transaction_hashes, %{timeout: timeout}) do
ordered_logs =
from(log in Log,
inner_join: transaction in subquery(transaction_query),
on: log.transaction_hash == transaction.hash,
select: map(log, [:transaction_hash, :index]),
from(
log in Log,
where: log.transaction_hash in ^forked_transaction_hashes,
select: log.transaction_hash,
# Enforce Log ShareLocks order (see docs: sharelocks.md)
order_by: [
log.transaction_hash,
log.index
],
lock: "FOR UPDATE OF l0"
lock: "FOR UPDATE"
)
query =
from(log in Log,
select: map(log, [:transaction_hash, :index]),
inner_join: ordered_log in subquery(ordered_logs),
on: ordered_log.transaction_hash == log.transaction_hash and ordered_log.index == log.index
on: ordered_log.transaction_hash == log.transaction_hash
)
try do
{_count, deleted_logs} = repo.delete_all(query, timeout: timeout)
{_count, deleted_logs} = repo.delete_all(query, timeout: timeout)
{:ok, deleted_logs}
rescue
postgrex_error in Postgrex.Error ->
{:error, %{exception: postgrex_error, block_numbers: nonconsensus_block_numbers}}
end
{:ok, deleted_logs}
rescue
postgrex_error in Postgrex.Error ->
{:error, %{exception: postgrex_error, transactions: forked_transaction_hashes}}
end
defp delete_address_token_balances(_, [], _), do: {:ok, []}

@ -117,10 +117,12 @@ defmodule Explorer.Chain.Import.Runner.BlocksTest do
test "remove_nonconsensus_token_transfers deletes token transfer rows with matching block number when new consensus block is inserted",
%{consensus_block: %{number: block_number} = block, options: options} do
insert(:block, number: block_number, consensus: true)
consensus_block = insert(:block, number: block_number, consensus: true)
transaction = insert(:transaction) |> with_block(consensus_block)
%TokenTransfer{transaction_hash: transaction_hash, log_index: log_index} =
insert(:token_transfer, block_number: block_number, transaction: insert(:transaction))
insert(:token_transfer, block_number: block_number, transaction: transaction)
assert count(TokenTransfer) == 1
@ -136,7 +138,11 @@ defmodule Explorer.Chain.Import.Runner.BlocksTest do
test "remove_nonconsensus_token_transfers does not delete token transfer rows with matching block number when new consensus block wasn't inserted",
%{consensus_block: %{number: block_number} = block, options: options} do
insert(:token_transfer, block_number: block_number, transaction: insert(:transaction))
consensus_block = insert(:block, number: block_number, consensus: true)
transaction = insert(:transaction) |> with_block(consensus_block)
insert(:token_transfer, block_number: block_number, transaction: transaction)
count = 1

Loading…
Cancel
Save