From 480252140f70a7bb3b6377b698b12c3db010248c Mon Sep 17 00:00:00 2001 From: pasqu4le Date: Mon, 23 Sep 2019 18:17:03 +0200 Subject: [PATCH 1/2] Remove internal_transaction block_number setting from blocks runner Problem: Explorer's blocks runner is severely slowed down by doing two operation on internal transactions: - populating their block_number field - removing those in blocks that have lost consensus Solution: The field setting was introduced only to keep it correct in case of a reorg, but those internal_transactions do not pose this problem anymore because they are removed. For this reason we can remove the block_number field setting and simplify the remaining removal operation --- CHANGELOG.md | 1 + .../explorer/chain/import/runner/blocks.ex | 68 ++++++------------- .../import/runner/internal_transactions.ex | 53 +++++++-------- 3 files changed, 45 insertions(+), 77 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index b5d5a5f82a..a45836d031 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,6 +1,7 @@ ## Current ### Features +- [#2726](https://github.com/poanetwork/blockscout/pull/2726) - Remove internal_transaction block_number setting from blocks runner - [#2717](https://github.com/poanetwork/blockscout/pull/2717) - Improve speed of nonconsensus data removal - [#2679](https://github.com/poanetwork/blockscout/pull/2679) - added fixed height for card chain blocks and card chain transactions - [#2678](https://github.com/poanetwork/blockscout/pull/2678) - fixed dashboard banner height bug diff --git a/apps/explorer/lib/explorer/chain/import/runner/blocks.ex b/apps/explorer/lib/explorer/chain/import/runner/blocks.ex index 394e21cbaa..7f94d8668c 100644 --- a/apps/explorer/lib/explorer/chain/import/runner/blocks.ex +++ b/apps/explorer/lib/explorer/chain/import/runner/blocks.ex @@ -56,7 +56,7 @@ defmodule Explorer.Chain.Import.Runner.Blocks do # Note, needs to be executed after `lose_consensus` for lock acquisition insert(repo, changes_list, insert_options) end) - |> Multi.run(:uncle_fetched_block_second_degree_relations, fn repo, %{blocks: blocks} when is_list(blocks) -> + |> Multi.run(:uncle_fetched_block_second_degree_relations, fn repo, _ -> update_block_second_degree_relations(repo, hashes, %{ timeout: options[Runner.Block.SecondDegreeRelations.option_key()][:timeout] || @@ -86,15 +86,9 @@ defmodule Explorer.Chain.Import.Runner.Blocks do |> Multi.run(:remove_nonconsensus_logs, fn repo, %{derive_transaction_forks: transactions} -> remove_nonconsensus_logs(repo, transactions, insert_options) end) - |> Multi.run(:acquire_internal_transactions, fn repo, %{derive_transaction_forks: transactions} -> - acquire_internal_transactions(repo, hashes, transactions) - end) |> Multi.run(:remove_nonconsensus_internal_transactions, fn repo, %{derive_transaction_forks: transactions} -> remove_nonconsensus_internal_transactions(repo, transactions, insert_options) end) - |> Multi.run(:internal_transaction_transaction_block_number, fn repo, _ -> - update_internal_transaction_block_number(repo, hashes) - end) |> Multi.run(:acquire_contract_address_tokens, fn repo, _ -> acquire_contract_address_tokens(repo, consensus_block_numbers) end) @@ -139,26 +133,6 @@ defmodule Explorer.Chain.Import.Runner.Blocks do Tokens.acquire_contract_address_tokens(repo, contract_address_hashes) end - defp acquire_internal_transactions(repo, hashes, forked_transaction_hashes) do - query = - from(internal_transaction in InternalTransaction, - join: transaction in Transaction, - on: internal_transaction.transaction_hash == transaction.hash, - where: transaction.block_hash in ^hashes, - or_where: transaction.hash in ^forked_transaction_hashes, - select: {internal_transaction.transaction_hash, internal_transaction.index}, - # Enforce InternalTransaction ShareLocks order (see docs: sharelocks.md) - order_by: [ - internal_transaction.transaction_hash, - internal_transaction.index - ], - # NOTE: find a better way to know the alias that ecto gives to token - lock: "FOR UPDATE OF i0" - ) - - {:ok, repo.all(query)} - end - defp fork_transactions(%{ repo: repo, timeout: timeout, @@ -379,13 +353,27 @@ defmodule Explorer.Chain.Import.Runner.Blocks do defp remove_nonconsensus_internal_transactions(repo, forked_transaction_hashes, %{timeout: timeout}) do query = - from(internal_transaction in InternalTransaction, + from( + internal_transaction in InternalTransaction, where: internal_transaction.transaction_hash in ^forked_transaction_hashes, - select: map(internal_transaction, [:transaction_hash, :index]) + select: %{transaction_hash: internal_transaction.transaction_hash}, + # Enforce InternalTransaction ShareLocks order (see docs: sharelocks.md) + order_by: [ + internal_transaction.transaction_hash, + internal_transaction.index + ], + lock: "FOR UPDATE" + ) + + delete_query = + from( + i in InternalTransaction, + join: s in subquery(query), + on: i.transaction_hash == s.transaction_hash, + select: map(i, [:transaction_hash, :index]) ) - # ShareLocks order already enforced by `acquire_internal_transactions` (see docs: sharelocks.md) - {_count, deleted_internal_transactions} = repo.delete_all(query, timeout: timeout) + {_count, deleted_internal_transactions} = repo.delete_all(delete_query, timeout: timeout) {:ok, deleted_internal_transactions} rescue @@ -645,24 +633,6 @@ defmodule Explorer.Chain.Import.Runner.Blocks do end end - defp update_internal_transaction_block_number(repo, blocks_hashes) when is_list(blocks_hashes) do - query = - from( - internal_transaction in InternalTransaction, - join: transaction in Transaction, - on: internal_transaction.transaction_hash == transaction.hash, - join: block in Block, - on: block.hash == transaction.block_hash, - where: block.hash in ^blocks_hashes, - update: [set: [block_number: block.number]] - ) - - # ShareLocks order already enforced by `acquire_internal_transactions` (see docs: sharelocks.md) - {total, _} = repo.update_all(query, []) - - {:ok, total} - end - defp where_forked(blocks_changes) when is_list(blocks_changes) do initial = from(t in Transaction, where: false) diff --git a/apps/explorer/lib/explorer/chain/import/runner/internal_transactions.ex b/apps/explorer/lib/explorer/chain/import/runner/internal_transactions.ex index 93e3bdb6b5..67657d6542 100644 --- a/apps/explorer/lib/explorer/chain/import/runner/internal_transactions.ex +++ b/apps/explorer/lib/explorer/chain/import/runner/internal_transactions.ex @@ -52,32 +52,36 @@ defmodule Explorer.Chain.Import.Runner.InternalTransactions do |> Multi.run(:acquire_transactions, fn repo, _ -> acquire_transactions(repo, changes_list) end) - |> Multi.run(:internal_transactions, fn repo, _ -> - insert(repo, changes_list, insert_options) + |> Multi.run(:internal_transactions, fn repo, %{acquire_transactions: transactions} -> + insert(repo, changes_list, transactions, insert_options) end) - |> Multi.run(:internal_transactions_indexed_at_transactions, fn repo, %{acquire_transactions: transaction_hashes} -> - update_transactions(repo, transaction_hashes, update_transactions_options) + |> Multi.run(:internal_transactions_indexed_at_transactions, fn repo, %{acquire_transactions: transactions} -> + update_transactions(repo, transactions, update_transactions_options) end) end @impl Runner def timeout, do: @timeout - @spec insert(Repo.t(), [map], %{ + @spec insert(Repo.t(), [map], [Transaction.t()], %{ optional(:on_conflict) => Runner.on_conflict(), required(:timeout) => timeout, required(:timestamps) => Import.timestamps() }) :: {:ok, [%{index: non_neg_integer, transaction_hash: Hash.t()}]} | {:error, [Changeset.t()]} - defp insert(repo, changes_list, %{timeout: timeout, timestamps: timestamps} = options) + defp insert(repo, changes_list, transactions, %{timeout: timeout, timestamps: timestamps} = options) when is_list(changes_list) do on_conflict = Map.get_lazy(options, :on_conflict, &default_on_conflict/0) + transactions_map = Map.new(transactions, &{&1.hash, &1}) + # Enforce InternalTransaction ShareLocks order (see docs: sharelocks.md) ordered_changes_list = Enum.sort_by(changes_list, &{&1.transaction_hash, &1.index}) - final_changes_list = reject_pending_transactions(ordered_changes_list, repo) + final_changes_list = + ordered_changes_list + |> reject_pending_transactions(transactions_map) {:ok, internal_transactions} = Import.insert_changes_list( @@ -158,8 +162,7 @@ defmodule Explorer.Chain.Import.Runner.InternalTransactions do from( t in Transaction, where: t.hash in ^transaction_hashes, - where: not is_nil(t.block_hash), - select: t.hash, + select: map(t, [:hash, :block_hash, :block_number]), # Enforce Transaction ShareLocks order (see docs: sharelocks.md) order_by: t.hash, lock: "FOR UPDATE" @@ -170,15 +173,19 @@ defmodule Explorer.Chain.Import.Runner.InternalTransactions do {:ok, hashes} end - defp update_transactions(repo, transaction_hashes, %{ + defp update_transactions(repo, transactions, %{ timeout: timeout, timestamps: timestamps }) - when is_list(transaction_hashes) do + when is_list(transactions) do + transaction_hashes = Enum.map(transactions, & &1.hash) + update_query = from( t in Transaction, where: t.hash in ^transaction_hashes, + # do not try to update pending transactions + where: not is_nil(t.block_hash), # ShareLocks order already enforced by `acquire_transactions` (see docs: sharelocks.md) update: [ set: [ @@ -214,22 +221,12 @@ defmodule Explorer.Chain.Import.Runner.InternalTransactions do end end - defp reject_pending_transactions(ordered_changes_list, repo) do - transaction_hashes = - ordered_changes_list - |> Enum.map(& &1.transaction_hash) - |> Enum.dedup() - - query = - from(t in Transaction, - where: t.hash in ^transaction_hashes, - where: is_nil(t.block_hash), - select: t.hash - ) - - pending_transactions = repo.all(query) - - ordered_changes_list - |> Enum.reject(fn %{transaction_hash: hash} -> Enum.member?(pending_transactions, hash) end) + defp reject_pending_transactions(ordered_changes_list, transactions_map) do + Enum.reject(ordered_changes_list, fn %{transaction_hash: hash} -> + transactions_map + |> Map.fetch!(hash) + |> Map.get(:block_hash) + |> is_nil() + end) end end From 50ad94f47b1046dbc5282f31348fa2f26df1c405 Mon Sep 17 00:00:00 2001 From: pasqu4le Date: Wed, 25 Sep 2019 20:18:59 +0200 Subject: [PATCH 2/2] Avoid internal_transactions import retries on blocks without valid transactions Problem: for some block numbers we fetch and receive internal transactions, but in the database the transactions they are part of either do not exist for that block or are pending. This causes a foreign key error and an endless cycle of import retries. Solution: instead of failing we remove consensus for such problematic blocks, so that they can be refetched with the correct transactions. --- .../import/runner/internal_transactions.ex | 80 ++++++++++++++----- ...internal_transactions_indexed_at_blocks.ex | 2 +- .../runner/internal_transactions_test.exs | 74 ++++++++++++++++- 3 files changed, 133 insertions(+), 23 deletions(-) diff --git a/apps/explorer/lib/explorer/chain/import/runner/internal_transactions.ex b/apps/explorer/lib/explorer/chain/import/runner/internal_transactions.ex index 67657d6542..b4704d382b 100644 --- a/apps/explorer/lib/explorer/chain/import/runner/internal_transactions.ex +++ b/apps/explorer/lib/explorer/chain/import/runner/internal_transactions.ex @@ -4,9 +4,10 @@ defmodule Explorer.Chain.Import.Runner.InternalTransactions do """ require Ecto.Query + require Logger alias Ecto.{Changeset, Multi, Repo} - alias Explorer.Chain.{Hash, Import, InternalTransaction, Transaction} + alias Explorer.Chain.{Block, Hash, Import, InternalTransaction, Transaction} alias Explorer.Chain.Import.Runner import Ecto.Query, only: [from: 2] @@ -58,6 +59,14 @@ defmodule Explorer.Chain.Import.Runner.InternalTransactions do |> Multi.run(:internal_transactions_indexed_at_transactions, fn repo, %{acquire_transactions: transactions} -> update_transactions(repo, transactions, update_transactions_options) end) + |> Multi.run( + :remove_consensus_of_missing_transactions_blocks, + fn repo, %{internal_transactions: inserted} = results_map -> + # NOTE: for this to work it has to follow the runner `internal_transactions_indexed_at_blocks` + block_hashes = Map.get(results_map, :internal_transactions_indexed_at_blocks, []) + remove_consensus_of_missing_transactions_blocks(repo, block_hashes, changes_list, inserted) + end + ) end @impl Runner @@ -76,12 +85,11 @@ defmodule Explorer.Chain.Import.Runner.InternalTransactions do transactions_map = Map.new(transactions, &{&1.hash, &1}) - # Enforce InternalTransaction ShareLocks order (see docs: sharelocks.md) - ordered_changes_list = Enum.sort_by(changes_list, &{&1.transaction_hash, &1.index}) - final_changes_list = - ordered_changes_list - |> reject_pending_transactions(transactions_map) + changes_list + # Enforce InternalTransaction ShareLocks order (see docs: sharelocks.md) + |> Enum.sort_by(&{&1.transaction_hash, &1.index}) + |> reject_missing_transactions(transactions_map) {:ok, internal_transactions} = Import.insert_changes_list( @@ -90,16 +98,12 @@ defmodule Explorer.Chain.Import.Runner.InternalTransactions do conflict_target: [:transaction_hash, :index], for: InternalTransaction, on_conflict: on_conflict, - returning: [:transaction_hash, :index], + returning: true, timeout: timeout, timestamps: timestamps ) - {:ok, - for( - internal_transaction <- internal_transactions, - do: Map.take(internal_transaction, [:id, :index, :transaction_hash]) - )} + {:ok, internal_transactions} end defp default_on_conflict do @@ -162,15 +166,15 @@ defmodule Explorer.Chain.Import.Runner.InternalTransactions do from( t in Transaction, where: t.hash in ^transaction_hashes, + # do not consider pending transactions + where: not is_nil(t.block_hash), select: map(t, [:hash, :block_hash, :block_number]), # Enforce Transaction ShareLocks order (see docs: sharelocks.md) order_by: t.hash, lock: "FOR UPDATE" ) - hashes = repo.all(query) - - {:ok, hashes} + {:ok, repo.all(query)} end defp update_transactions(repo, transactions, %{ @@ -183,9 +187,8 @@ defmodule Explorer.Chain.Import.Runner.InternalTransactions do update_query = from( t in Transaction, + # pending transactions are already excluded by `acquire_transactions` where: t.hash in ^transaction_hashes, - # do not try to update pending transactions - where: not is_nil(t.block_hash), # ShareLocks order already enforced by `acquire_transactions` (see docs: sharelocks.md) update: [ set: [ @@ -221,10 +224,49 @@ defmodule Explorer.Chain.Import.Runner.InternalTransactions do end end - defp reject_pending_transactions(ordered_changes_list, transactions_map) do + # If not using Parity this is not relevant + defp remove_consensus_of_missing_transactions_blocks(_, [], _, _), do: {:ok, []} + + defp remove_consensus_of_missing_transactions_blocks(repo, block_hashes, changes_list, inserted) do + inserted_block_numbers = MapSet.new(inserted, & &1.block_number) + + missing_transactions_block_numbers = + changes_list + |> MapSet.new(& &1.block_number) + |> MapSet.difference(inserted_block_numbers) + |> MapSet.to_list() + + update_query = + from( + b in Block, + where: b.number in ^missing_transactions_block_numbers, + where: b.hash in ^block_hashes, + # ShareLocks order already enforced by `internal_transactions_indexed_at_blocks` (see docs: sharelocks.md) + update: [set: [consensus: false, internal_transactions_indexed_at: nil]] + ) + + try do + {_num, result} = repo.update_all(update_query, []) + + Logger.debug(fn -> + [ + "consensus removed from blocks with numbers: ", + inspect(missing_transactions_block_numbers), + " because of missing transactions" + ] + end) + + {:ok, result} + rescue + postgrex_error in Postgrex.Error -> + {:error, %{exception: postgrex_error, missing_transactions_block_numbers: missing_transactions_block_numbers}} + end + end + + defp reject_missing_transactions(ordered_changes_list, transactions_map) do Enum.reject(ordered_changes_list, fn %{transaction_hash: hash} -> transactions_map - |> Map.fetch!(hash) + |> Map.get(hash, %{}) |> Map.get(:block_hash) |> is_nil() end) diff --git a/apps/explorer/lib/explorer/chain/import/runner/internal_transactions_indexed_at_blocks.ex b/apps/explorer/lib/explorer/chain/import/runner/internal_transactions_indexed_at_blocks.ex index 2726670a30..07008e0b72 100644 --- a/apps/explorer/lib/explorer/chain/import/runner/internal_transactions_indexed_at_blocks.ex +++ b/apps/explorer/lib/explorer/chain/import/runner/internal_transactions_indexed_at_blocks.ex @@ -70,7 +70,7 @@ defmodule Explorer.Chain.Import.Runner.InternalTransactionsIndexedAtBlocks do try do {^block_count, result} = repo.update_all( - from(b in Block, join: s in subquery(query), on: b.hash == s.hash), + from(b in Block, join: s in subquery(query), on: b.hash == s.hash, select: b.hash), [set: [internal_transactions_indexed_at: timestamps.updated_at]], timeout: timeout ) diff --git a/apps/explorer/test/explorer/chain/import/runner/internal_transactions_test.exs b/apps/explorer/test/explorer/chain/import/runner/internal_transactions_test.exs index 66d62ae940..700106ffb5 100644 --- a/apps/explorer/test/explorer/chain/import/runner/internal_transactions_test.exs +++ b/apps/explorer/test/explorer/chain/import/runner/internal_transactions_test.exs @@ -2,7 +2,7 @@ defmodule Explorer.Chain.Import.Runner.InternalTransactionsTest do use Explorer.DataCase alias Ecto.Multi - alias Explorer.Chain.{Data, Wei, Transaction, InternalTransaction} + alias Explorer.Chain.{Block, Data, Wei, Transaction, InternalTransaction} alias Explorer.Chain.Import.Runner.InternalTransactions describe "run/1" do @@ -42,10 +42,78 @@ defmodule Explorer.Chain.Import.Runner.InternalTransactionsTest do assert is_nil(Repo.get(Transaction, pending.hash).block_hash) end + + test "removes consensus to blocks where transactions are missing" do + empty_block = insert(:block) + pending = insert(:transaction) + + assert is_nil(pending.block_hash) + + full_block = insert(:block) + inserted = insert(:transaction) |> with_block(full_block) + + assert full_block.hash == inserted.block_hash + + index = 0 + + pending_transaction_changes = + pending.hash + |> make_internal_transaction_changes(index, nil) + |> Map.put(:block_number, empty_block.number) + + transaction_changes = + inserted.hash + |> make_internal_transaction_changes(index, nil) + |> Map.put(:block_number, full_block.number) + + multi = + Multi.new() + |> Multi.run(:internal_transactions_indexed_at_blocks, fn _, _ -> {:ok, [empty_block.hash, full_block.hash]} end) + + assert {:ok, _} = run_internal_transactions([pending_transaction_changes, transaction_changes], multi) + + assert from(i in InternalTransaction, where: i.transaction_hash == ^pending.hash) |> Repo.one() |> is_nil() + + assert %{consensus: false} = Repo.get(Block, empty_block.hash) + + assert from(i in InternalTransaction, where: i.transaction_hash == ^inserted.hash) |> Repo.one() |> is_nil() == + false + + assert %{consensus: true} = Repo.get(Block, full_block.hash) + end + + test "does not remove consensus when block is empty and no transactions are missing" do + empty_block = insert(:block) + + full_block = insert(:block) + inserted = insert(:transaction) |> with_block(full_block) + + assert full_block.hash == inserted.block_hash + + index = 0 + + transaction_changes = + inserted.hash + |> make_internal_transaction_changes(index, nil) + |> Map.put(:block_number, full_block.number) + + multi = + Multi.new() + |> Multi.run(:internal_transactions_indexed_at_blocks, fn _, _ -> {:ok, [empty_block.hash, full_block.hash]} end) + + assert {:ok, _} = run_internal_transactions([transaction_changes], multi) + + assert %{consensus: true} = Repo.get(Block, empty_block.hash) + + assert from(i in InternalTransaction, where: i.transaction_hash == ^inserted.hash) |> Repo.one() |> is_nil() == + false + + assert %{consensus: true} = Repo.get(Block, full_block.hash) + end end - defp run_internal_transactions(changes_list) when is_list(changes_list) do - Multi.new() + defp run_internal_transactions(changes_list, multi \\ Multi.new()) when is_list(changes_list) do + multi |> InternalTransactions.run(changes_list, %{ timeout: :infinity, timestamps: %{inserted_at: DateTime.utc_now(), updated_at: DateTime.utc_now()}