From 0944ea5cdcc186f35ed5d89eebf151e7da9f6d32 Mon Sep 17 00:00:00 2001 From: pasqu4le Date: Fri, 20 Sep 2019 18:01:19 +0200 Subject: [PATCH 1/2] Improve speed of nonconsensus data removal Problem: removal of nonconsensus data is too inefficient and as a result blocks are imported too slow. Solution: reformulation of deletion logic for better performance --- .../explorer/chain/import/runner/blocks.ex | 246 ++++++------------ .../chain/import/runner/blocks_test.exs | 12 +- 2 files changed, 91 insertions(+), 167 deletions(-) diff --git a/apps/explorer/lib/explorer/chain/import/runner/blocks.ex b/apps/explorer/lib/explorer/chain/import/runner/blocks.ex index f893df7684..394e21cbaa 100644 --- a/apps/explorer/lib/explorer/chain/import/runner/blocks.ex +++ b/apps/explorer/lib/explorer/chain/import/runner/blocks.ex @@ -46,32 +46,14 @@ defmodule Explorer.Chain.Import.Runner.Blocks do hashes = Enum.map(changes_list, & &1.hash) consensus_block_numbers = consensus_block_numbers(changes_list) - where_invalid_neighbour = where_invalid_neighbour(changes_list) # Enforce ShareLocks tables order (see docs: sharelocks.md) multi - |> Multi.run(:acquire_blocks, fn repo, _ -> - acquire_blocks(repo, hashes, consensus_block_numbers, where_invalid_neighbour) - end) |> Multi.run(:lose_consensus, fn repo, _ -> - lose_consensus(repo, consensus_block_numbers, insert_options) - end) - |> Multi.run(:lose_invalid_neighbour_consensus, fn repo, _ -> - lose_invalid_neighbour_consensus(repo, where_invalid_neighbour, insert_options) - end) - |> Multi.run(:nonconsensus_block_numbers, fn _repo, - %{ - lose_consensus: lost_consensus_blocks, - lose_invalid_neighbour_consensus: lost_consensus_neighbours - } -> - nonconsensus_block_numbers = - (lost_consensus_blocks ++ lost_consensus_neighbours) - |> Enum.sort() - |> Enum.dedup() - - {:ok, nonconsensus_block_numbers} + lose_consensus(repo, hashes, consensus_block_numbers, changes_list, insert_options) end) |> Multi.run(:blocks, fn repo, _ -> + # Note, needs to be executed after `lose_consensus` for lock acquisition insert(repo, changes_list, insert_options) end) |> Multi.run(:uncle_fetched_block_second_degree_relations, fn repo, %{blocks: blocks} when is_list(blocks) -> @@ -101,27 +83,14 @@ defmodule Explorer.Chain.Import.Runner.Blocks do transactions: transactions }) end) - |> Multi.run(:remove_nonconsensus_logs, fn repo, - %{ - nonconsensus_block_numbers: nonconsensus_block_numbers, - fork_transactions: transactions - } -> - remove_nonconsensus_logs(repo, nonconsensus_block_numbers, transactions, insert_options) + |> Multi.run(:remove_nonconsensus_logs, fn repo, %{derive_transaction_forks: transactions} -> + remove_nonconsensus_logs(repo, transactions, insert_options) end) - |> Multi.run(:acquire_internal_transactions, fn repo, - %{ - nonconsensus_block_numbers: nonconsensus_block_numbers, - fork_transactions: transactions - } -> - acquire_internal_transactions(repo, nonconsensus_block_numbers, hashes, transactions) + |> Multi.run(:acquire_internal_transactions, fn repo, %{derive_transaction_forks: transactions} -> + acquire_internal_transactions(repo, hashes, transactions) end) - |> Multi.run(:remove_nonconsensus_internal_transactions, fn repo, - %{ - nonconsensus_block_numbers: - nonconsensus_block_numbers, - fork_transactions: transactions - } -> - remove_nonconsensus_internal_transactions(repo, nonconsensus_block_numbers, transactions, insert_options) + |> Multi.run(:remove_nonconsensus_internal_transactions, fn repo, %{derive_transaction_forks: transactions} -> + remove_nonconsensus_internal_transactions(repo, transactions, insert_options) end) |> Multi.run(:internal_transaction_transaction_block_number, fn repo, _ -> update_internal_transaction_block_number(repo, hashes) @@ -129,9 +98,8 @@ defmodule Explorer.Chain.Import.Runner.Blocks do |> Multi.run(:acquire_contract_address_tokens, fn repo, _ -> acquire_contract_address_tokens(repo, consensus_block_numbers) end) - |> Multi.run(:remove_nonconsensus_token_transfers, fn repo, - %{nonconsensus_block_numbers: nonconsensus_block_numbers} -> - remove_nonconsensus_token_transfers(repo, nonconsensus_block_numbers, insert_options) + |> Multi.run(:remove_nonconsensus_token_transfers, fn repo, %{derive_transaction_forks: transactions} -> + remove_nonconsensus_token_transfers(repo, transactions, insert_options) end) |> Multi.run(:delete_address_token_balances, fn repo, _ -> delete_address_token_balances(repo, consensus_block_numbers, insert_options) @@ -159,22 +127,6 @@ defmodule Explorer.Chain.Import.Runner.Blocks do @impl Runner def timeout, do: @timeout - defp acquire_blocks(repo, hashes, consensus_block_numbers, where_invalid_neighbour) do - query = - from( - block in where_invalid_neighbour, - or_where: block.number in ^consensus_block_numbers, - or_where: block.hash in ^hashes, - select: block.hash, - # Enforce Block ShareLocks order (see docs: sharelocks.md) - order_by: [asc: block.hash], - lock: "FOR UPDATE" - ) - - blocks = repo.all(query) - {:ok, blocks} - end - defp acquire_contract_address_tokens(repo, consensus_block_numbers) do query = from(address_current_token_balance in Address.CurrentTokenBalance, @@ -187,15 +139,12 @@ defmodule Explorer.Chain.Import.Runner.Blocks do Tokens.acquire_contract_address_tokens(repo, contract_address_hashes) end - defp acquire_internal_transactions(repo, nonconsensus_block_numbers, hashes, forked_transactions) do - forked_transaction_hashes = Enum.map(forked_transactions, & &1.hash) - + defp acquire_internal_transactions(repo, hashes, forked_transaction_hashes) do query = from(internal_transaction in InternalTransaction, join: transaction in Transaction, on: internal_transaction.transaction_hash == transaction.hash, - where: transaction.block_number in ^nonconsensus_block_numbers, - or_where: transaction.block_hash in ^hashes, + where: transaction.block_hash in ^hashes, or_where: transaction.hash in ^forked_transaction_hashes, select: {internal_transaction.transaction_hash, internal_transaction.index}, # Enforce InternalTransaction ShareLocks order (see docs: sharelocks.md) @@ -229,14 +178,11 @@ defmodule Explorer.Chain.Import.Runner.Blocks do lock: "FOR UPDATE" ) - transactions = repo.all(query) - - hashes = Enum.map(transactions, & &1.hash) - update_query = from( t in Transaction, - where: t.hash in ^hashes, + join: s in subquery(query), + on: t.hash == s.hash, update: [ set: [ block_hash: nil, @@ -250,17 +196,19 @@ defmodule Explorer.Chain.Import.Runner.Blocks do updated_at: ^updated_at ] ], - select: t.hash + select: %{ + block_hash: s.block_hash, + index: s.index, + hash: s.hash + } ) - try do - {_num, _res} = repo.update_all(update_query, [], timeout: timeout) + {_num, transactions} = repo.update_all(update_query, [], timeout: timeout) - {:ok, transactions} - rescue - postgrex_error in Postgrex.Error -> - {:error, %{exception: postgrex_error}} - end + {:ok, transactions} + rescue + postgrex_error in Postgrex.Error -> + {:error, %{exception: postgrex_error}} end defp derive_transaction_forks(%{ @@ -283,7 +231,7 @@ defmodule Explorer.Chain.Import.Runner.Blocks do # Enforce Fork ShareLocks order (see docs: sharelocks.md) |> Enum.sort_by(&{&1.uncle_hash, &1.index}) - {_total, result} = + {_total, forked_transaction} = repo.insert_all( Transaction.Fork, transaction_forks, @@ -294,11 +242,11 @@ defmodule Explorer.Chain.Import.Runner.Blocks do update: [set: [hash: fragment("EXCLUDED.hash")]], where: fragment("EXCLUDED.hash <> ?", transaction_fork.hash) ), - returning: [:uncle_hash, :hash], + returning: [:hash], timeout: timeout ) - {:ok, result} + {:ok, Enum.map(forked_transaction, & &1.hash)} end @spec insert(Repo.t(), [map()], %{ @@ -364,47 +312,48 @@ defmodule Explorer.Chain.Import.Runner.Blocks do |> Enum.map(& &1.number) end - defp lose_consensus(_, [], _), do: {:ok, []} - - defp lose_consensus(repo, consensus_block_number, %{timeout: timeout, timestamps: %{updated_at: updated_at}}) - when is_list(consensus_block_number) do - # ShareLocks order already enforced by `acquire_blocks` (see docs: sharelocks.md) - {_, result} = - repo.update_all( - from(block in Block, where: block.number in ^consensus_block_number, select: block.number), - [set: [consensus: false, updated_at: updated_at]], - timeout: timeout - ) - - {:ok, result} - rescue - postgrex_error in Postgrex.Error -> - {:error, %{exception: postgrex_error, consensus_block_numbers: consensus_block_number}} - end - - defp lose_invalid_neighbour_consensus(repo, where_invalid_neighbour, %{ + defp lose_consensus(repo, hashes, consensus_block_numbers, changes_list, %{ timeout: timeout, timestamps: %{updated_at: updated_at} }) do - # ShareLocks order already enforced by `acquire_blocks` (see docs: sharelocks.md) - {_, result} = + acquire_query = + from( + block in where_invalid_neighbour(changes_list), + or_where: block.number in ^consensus_block_numbers, + # we also need to acquire blocks that will be upserted here, for ordering + or_where: block.hash in ^hashes, + select: block.hash, + # Enforce Block ShareLocks order (see docs: sharelocks.md) + order_by: [asc: block.hash], + lock: "FOR UPDATE" + ) + + {_, removed_consensus_block_hashes} = repo.update_all( - from(block in where_invalid_neighbour, select: block.number), + from( + block in Block, + join: s in subquery(acquire_query), + on: block.hash == s.hash, + # we don't want to remove consensus from blocks that will be upserted + where: block.hash not in ^hashes, + select: block.hash + ), [set: [consensus: false, updated_at: updated_at]], timeout: timeout ) - {:ok, result} + {:ok, removed_consensus_block_hashes} rescue postgrex_error in Postgrex.Error -> - {:error, %{exception: postgrex_error, where_invalid_neighbour: where_invalid_neighbour}} + {:error, %{exception: postgrex_error, consensus_block_numbers: consensus_block_numbers}} end - defp remove_nonconsensus_token_transfers(repo, nonconsensus_block_numbers, %{timeout: timeout}) do + defp remove_nonconsensus_token_transfers(repo, forked_transaction_hashes, %{timeout: timeout}) do ordered_token_transfers = - from(token_transfer in TokenTransfer, - where: token_transfer.block_number in ^nonconsensus_block_numbers, - select: map(token_transfer, [:transaction_hash, :log_index]), + from( + token_transfer in TokenTransfer, + where: token_transfer.transaction_hash in ^forked_transaction_hashes, + select: token_transfer.transaction_hash, # Enforce TokenTransfer ShareLocks order (see docs: sharelocks.md) order_by: [ token_transfer.transaction_hash, @@ -417,91 +366,60 @@ defmodule Explorer.Chain.Import.Runner.Blocks do from(token_transfer in TokenTransfer, select: map(token_transfer, [:transaction_hash, :log_index]), inner_join: ordered_token_transfer in subquery(ordered_token_transfers), - on: - ordered_token_transfer.transaction_hash == - token_transfer.transaction_hash and - ordered_token_transfer.log_index == token_transfer.log_index + on: ordered_token_transfer.transaction_hash == token_transfer.transaction_hash ) - try do - {_count, deleted_token_transfers} = repo.delete_all(query, timeout: timeout) + {_count, deleted_token_transfers} = repo.delete_all(query, timeout: timeout) - {:ok, deleted_token_transfers} - rescue - postgrex_error in Postgrex.Error -> - {:error, %{exception: postgrex_error, block_numbers: nonconsensus_block_numbers}} - end + {:ok, deleted_token_transfers} + rescue + postgrex_error in Postgrex.Error -> + {:error, %{exception: postgrex_error, transactions: forked_transaction_hashes}} end - defp remove_nonconsensus_internal_transactions(repo, nonconsensus_block_numbers, forked_transactions, %{ - timeout: timeout - }) do - forked_transaction_hashes = Enum.map(forked_transactions, & &1.hash) - - transaction_query = - from(transaction in Transaction, - where: transaction.block_number in ^nonconsensus_block_numbers, - or_where: transaction.hash in ^forked_transaction_hashes, - select: map(transaction, [:hash]) - ) - + defp remove_nonconsensus_internal_transactions(repo, forked_transaction_hashes, %{timeout: timeout}) do query = from(internal_transaction in InternalTransaction, - inner_join: transaction in subquery(transaction_query), - on: internal_transaction.transaction_hash == transaction.hash, + where: internal_transaction.transaction_hash in ^forked_transaction_hashes, select: map(internal_transaction, [:transaction_hash, :index]) ) - try do - # ShareLocks order already enforced by `acquire_internal_transactions` (see docs: sharelocks.md) - {_count, deleted_internal_transactions} = repo.delete_all(query, timeout: timeout) + # ShareLocks order already enforced by `acquire_internal_transactions` (see docs: sharelocks.md) + {_count, deleted_internal_transactions} = repo.delete_all(query, timeout: timeout) - {:ok, deleted_internal_transactions} - rescue - postgrex_error in Postgrex.Error -> - {:error, %{exception: postgrex_error, block_numbers: nonconsensus_block_numbers}} - end + {:ok, deleted_internal_transactions} + rescue + postgrex_error in Postgrex.Error -> + {:error, %{exception: postgrex_error, transactions: forked_transaction_hashes}} end - defp remove_nonconsensus_logs(repo, nonconsensus_block_numbers, forked_transactions, %{timeout: timeout}) do - forked_transaction_hashes = Enum.map(forked_transactions, & &1.hash) - - transaction_query = - from(transaction in Transaction, - where: transaction.block_number in ^nonconsensus_block_numbers, - or_where: transaction.hash in ^forked_transaction_hashes, - select: map(transaction, [:hash]), - order_by: transaction.hash - ) - + defp remove_nonconsensus_logs(repo, forked_transaction_hashes, %{timeout: timeout}) do ordered_logs = - from(log in Log, - inner_join: transaction in subquery(transaction_query), - on: log.transaction_hash == transaction.hash, - select: map(log, [:transaction_hash, :index]), + from( + log in Log, + where: log.transaction_hash in ^forked_transaction_hashes, + select: log.transaction_hash, # Enforce Log ShareLocks order (see docs: sharelocks.md) order_by: [ log.transaction_hash, log.index ], - lock: "FOR UPDATE OF l0" + lock: "FOR UPDATE" ) query = from(log in Log, select: map(log, [:transaction_hash, :index]), inner_join: ordered_log in subquery(ordered_logs), - on: ordered_log.transaction_hash == log.transaction_hash and ordered_log.index == log.index + on: ordered_log.transaction_hash == log.transaction_hash ) - try do - {_count, deleted_logs} = repo.delete_all(query, timeout: timeout) + {_count, deleted_logs} = repo.delete_all(query, timeout: timeout) - {:ok, deleted_logs} - rescue - postgrex_error in Postgrex.Error -> - {:error, %{exception: postgrex_error, block_numbers: nonconsensus_block_numbers}} - end + {:ok, deleted_logs} + rescue + postgrex_error in Postgrex.Error -> + {:error, %{exception: postgrex_error, transactions: forked_transaction_hashes}} end defp delete_address_token_balances(_, [], _), do: {:ok, []} diff --git a/apps/explorer/test/explorer/chain/import/runner/blocks_test.exs b/apps/explorer/test/explorer/chain/import/runner/blocks_test.exs index 370fd65021..3cbd7f4834 100644 --- a/apps/explorer/test/explorer/chain/import/runner/blocks_test.exs +++ b/apps/explorer/test/explorer/chain/import/runner/blocks_test.exs @@ -117,10 +117,12 @@ defmodule Explorer.Chain.Import.Runner.BlocksTest do test "remove_nonconsensus_token_transfers deletes token transfer rows with matching block number when new consensus block is inserted", %{consensus_block: %{number: block_number} = block, options: options} do - insert(:block, number: block_number, consensus: true) + consensus_block = insert(:block, number: block_number, consensus: true) + + transaction = insert(:transaction) |> with_block(consensus_block) %TokenTransfer{transaction_hash: transaction_hash, log_index: log_index} = - insert(:token_transfer, block_number: block_number, transaction: insert(:transaction)) + insert(:token_transfer, block_number: block_number, transaction: transaction) assert count(TokenTransfer) == 1 @@ -136,7 +138,11 @@ defmodule Explorer.Chain.Import.Runner.BlocksTest do test "remove_nonconsensus_token_transfers does not delete token transfer rows with matching block number when new consensus block wasn't inserted", %{consensus_block: %{number: block_number} = block, options: options} do - insert(:token_transfer, block_number: block_number, transaction: insert(:transaction)) + consensus_block = insert(:block, number: block_number, consensus: true) + + transaction = insert(:transaction) |> with_block(consensus_block) + + insert(:token_transfer, block_number: block_number, transaction: transaction) count = 1 From bc47fe957e77116f342fa1db37d5a86f1b77abe0 Mon Sep 17 00:00:00 2001 From: pasqu4le Date: Fri, 20 Sep 2019 18:14:05 +0200 Subject: [PATCH 2/2] changelog entry --- CHANGELOG.md | 1 + 1 file changed, 1 insertion(+) diff --git a/CHANGELOG.md b/CHANGELOG.md index b6402a0acd..a525fc5951 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,6 +1,7 @@ ## Current ### Features +- [#2717](https://github.com/poanetwork/blockscout/pull/2717) - Improve speed of nonconsensus data removal - [#2679](https://github.com/poanetwork/blockscout/pull/2679) - added fixed height for card chain blocks and card chain transactions - [#2678](https://github.com/poanetwork/blockscout/pull/2678) - fixed dashboard banner height bug - [#2672](https://github.com/poanetwork/blockscout/pull/2672) - added new theme for xUSDT