From d727ab9728372f15ab51eb5861034153f807a30f Mon Sep 17 00:00:00 2001 From: pasqu4le Date: Thu, 9 May 2019 12:32:02 +0200 Subject: [PATCH 1/4] Test block refetch after transactions overwritten by race condition Regression test for the race condition described in issue #1911 --- .../chain/import/runner/blocks_test.exs | 47 ++++++++++++++++++- 1 file changed, 46 insertions(+), 1 deletion(-) diff --git a/apps/explorer/test/explorer/chain/import/runner/blocks_test.exs b/apps/explorer/test/explorer/chain/import/runner/blocks_test.exs index ecf1d71dfb..63110cba40 100644 --- a/apps/explorer/test/explorer/chain/import/runner/blocks_test.exs +++ b/apps/explorer/test/explorer/chain/import/runner/blocks_test.exs @@ -6,7 +6,7 @@ defmodule Explorer.Chain.Import.Runner.BlocksTest do import Explorer.Chain.Import.RunnerCase, only: [insert_address_with_token_balances: 1, update_holder_count!: 2] alias Ecto.Multi - alias Explorer.Chain.Import.Runner.{Blocks, Transaction} + alias Explorer.Chain.Import.Runner.{Blocks, Transactions} alias Explorer.Chain.{Address, Block, Transaction} alias Explorer.Chain alias Explorer.Repo @@ -283,6 +283,29 @@ defmodule Explorer.Chain.Import.Runner.BlocksTest do insert_block(new_block1, options) assert Chain.missing_block_number_ranges(range) == [] end + + # Regression test for https://github.com/poanetwork/blockscout/issues/1911 + test "forces block refetch if transaction is re-collated in a different block", + %{consensus_block: %Block{number: block_number, hash: block_hash, miner_hash: miner_hash}, options: options} do + new_block1 = params_for(:block, miner_hash: miner_hash, parent_hash: block_hash, number: block_number + 1) + new_block2 = params_for(:block, miner_hash: miner_hash, parent_hash: new_block1.hash, number: block_number + 2) + + range = block_number..(block_number + 2) + + insert_block(new_block1, options) + insert_block(new_block2, options) + assert Chain.missing_block_number_ranges(range) == [] + + trans_hash = transaction_hash() + + transaction1 = transaction_params_with_block([hash: trans_hash], new_block1) + insert_transaction(transaction1, options) + assert Chain.missing_block_number_ranges(range) == [] + + transaction2 = transaction_params_with_block([hash: trans_hash], new_block2) + insert_transaction(transaction2, options) + assert Chain.missing_block_number_ranges(range) == [(block_number + 1)..(block_number + 1)] + end end defp insert_block(block_params, options) do @@ -293,6 +316,28 @@ defmodule Explorer.Chain.Import.Runner.BlocksTest do |> Repo.transaction() end + defp transaction_params_with_block(transaction_params, block_params) do + params_for(:transaction, transaction_params) + |> Map.merge(%{ + block_hash: block_params.hash, + block_number: block_params.number, + cumulative_gas_used: 50_000, + error: nil, + gas_used: 50_000, + index: 0, + from_address_hash: insert(:address).hash + }) + end + + defp insert_transaction(transaction_params, options) do + %Ecto.Changeset{valid?: true, changes: transaction_changes} = + Transaction.changeset(%Transaction{}, transaction_params) + + Multi.new() + |> Transactions.run([transaction_changes], options) + |> Repo.transaction() + end + defp count(schema) do Repo.one!(select(schema, fragment("COUNT(*)"))) end From e73b7276285b8b6fb9e39786e3e839a351336ad6 Mon Sep 17 00:00:00 2001 From: goodsoft Date: Wed, 8 May 2019 18:43:53 +0300 Subject: [PATCH 2/4] Fix potential race condition while dropping replaced transactions Don't discard a transaction if it was eventually collated in the period between replaced transaction worker initialization and actual dropping. --- apps/explorer/lib/explorer/chain.ex | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/apps/explorer/lib/explorer/chain.ex b/apps/explorer/lib/explorer/chain.ex index adb9759638..59258a90d1 100644 --- a/apps/explorer/lib/explorer/chain.ex +++ b/apps/explorer/lib/explorer/chain.ex @@ -2565,7 +2565,7 @@ defmodule Explorer.Chain do join: duplicate in subquery(query), on: duplicate.nonce == pending.nonce, on: duplicate.from_address_hash == pending.from_address_hash, - where: pending.hash in ^hashes + where: pending.hash in ^hashes and is_nil(pending.block_hash) ) Repo.update_all(transactions_to_update, [set: [error: "dropped/replaced", status: :error]], timeout: timeout) From b9cc8a417fdf22eb6e3d35643fd2eb2aa4237fb9 Mon Sep 17 00:00:00 2001 From: goodsoft Date: Wed, 8 May 2019 18:48:09 +0300 Subject: [PATCH 3/4] Force block refetch if transaction is recollated in a different block Due to race conditions described in #1911 transactions from a consensus block might get overwritten by the same transactions from a non-consensus block. To prevent this we force a block refetch (by marking it as non-consensus), if a transaction belonging to it gets overwritten by the same transaction from a different block. --- .../chain/import/runner/transactions.ex | 50 +++++++++++++++++-- .../lib/explorer/chain/transaction.ex | 5 ++ ...22_add_old_block_hash_for_transactions.exs | 12 +++++ 3 files changed, 64 insertions(+), 3 deletions(-) create mode 100644 apps/explorer/priv/repo/migrations/20190508152922_add_old_block_hash_for_transactions.exs diff --git a/apps/explorer/lib/explorer/chain/import/runner/transactions.ex b/apps/explorer/lib/explorer/chain/import/runner/transactions.ex index 4f1d2d6fe5..1a9fcc497d 100644 --- a/apps/explorer/lib/explorer/chain/import/runner/transactions.ex +++ b/apps/explorer/lib/explorer/chain/import/runner/transactions.ex @@ -8,7 +8,7 @@ defmodule Explorer.Chain.Import.Runner.Transactions do import Ecto.Query, only: [from: 2] alias Ecto.{Multi, Repo} - alias Explorer.Chain.{Data, Hash, Import, Transaction} + alias Explorer.Chain.{Block, Data, Hash, Import, Transaction} alias Explorer.Chain.Import.Runner.TokenTransfers @behaviour Import.Runner @@ -42,9 +42,13 @@ defmodule Explorer.Chain.Import.Runner.Transactions do |> Map.put(:timestamps, timestamps) |> Map.put(:token_transfer_transaction_hash_set, token_transfer_transaction_hash_set(options)) - Multi.run(multi, :transactions, fn repo, _ -> + multi + |> Multi.run(:transactions, fn repo, _ -> insert(repo, changes_list, insert_options) end) + |> Multi.run(:recollated_transactions, fn repo, %{transactions: transactions} -> + discard_blocks_for_recollated_transactions(repo, transactions, insert_options) + end) end @impl Import.Runner @@ -87,7 +91,7 @@ defmodule Explorer.Chain.Import.Runner.Transactions do on_conflict: on_conflict, for: Transaction, returning: - ~w(block_number index hash internal_transactions_indexed_at block_hash nonce from_address_hash created_contract_address_hash)a, + ~w(block_number index hash internal_transactions_indexed_at block_hash old_block_hash nonce from_address_hash created_contract_address_hash)a, timeout: timeout, timestamps: timestamps ) @@ -99,6 +103,7 @@ defmodule Explorer.Chain.Import.Runner.Transactions do update: [ set: [ block_hash: fragment("EXCLUDED.block_hash"), + old_block_hash: transaction.block_hash, block_number: fragment("EXCLUDED.block_number"), created_contract_address_hash: fragment("EXCLUDED.created_contract_address_hash"), cumulative_gas_used: fragment("EXCLUDED.cumulative_gas_used"), @@ -179,4 +184,43 @@ defmodule Explorer.Chain.Import.Runner.Transactions do end defp put_internal_transactions_indexed_at?(_, _), do: false + + defp discard_blocks_for_recollated_transactions(repo, transactions, %{ + timeout: timeout, + timestamps: %{updated_at: updated_at} + }) + when is_list(transactions) do + ordered_block_hashes = + transactions + |> Enum.filter(fn %{block_hash: block_hash, old_block_hash: old_block_hash} -> + not is_nil(old_block_hash) and block_hash != old_block_hash + end) + |> MapSet.new(& &1.old_block_hash) + |> Enum.sort() + + if Enum.empty?(ordered_block_hashes) do + {:ok, []} + else + query = + from( + block in Block, + where: block.hash in ^ordered_block_hashes, + update: [ + set: [ + consensus: false, + updated_at: ^updated_at + ] + ] + ) + + try do + {_, result} = repo.update_all(query, [], timeout: timeout) + + {:ok, result} + rescue + postgrex_error in Postgrex.Error -> + {:error, %{exception: postgrex_error, block_hashes: ordered_block_hashes}} + end + end + end end diff --git a/apps/explorer/lib/explorer/chain/transaction.ex b/apps/explorer/lib/explorer/chain/transaction.ex index 1cf9c8f116..de99198181 100644 --- a/apps/explorer/lib/explorer/chain/transaction.ex +++ b/apps/explorer/lib/explorer/chain/transaction.ex @@ -205,6 +205,11 @@ defmodule Explorer.Chain.Transaction do field(:v, :decimal) field(:value, Wei) + # A transient field for deriving old block hash during transaction upserts. + # Used to force refetch of a block in case a transaction is re-collated + # in a different block. See: https://github.com/poanetwork/blockscout/issues/1911 + field(:old_block_hash, Hash.Full) + timestamps() belongs_to(:block, Block, foreign_key: :block_hash, references: :hash, type: Hash.Full) diff --git a/apps/explorer/priv/repo/migrations/20190508152922_add_old_block_hash_for_transactions.exs b/apps/explorer/priv/repo/migrations/20190508152922_add_old_block_hash_for_transactions.exs new file mode 100644 index 0000000000..daf04c22f3 --- /dev/null +++ b/apps/explorer/priv/repo/migrations/20190508152922_add_old_block_hash_for_transactions.exs @@ -0,0 +1,12 @@ +defmodule Explorer.Repo.Migrations.AddOldBlockHashForTransactions do + use Ecto.Migration + + def change do + alter table(:transactions) do + # A transient field for deriving old block hash during transaction upserts. + # Used to force refetch of a block in case a transaction is re-collated + # in a different block. See: https://github.com/poanetwork/blockscout/issues/1911 + add(:old_block_hash, :bytea, null: true) + end + end +end From 64acef7b7106ab008355fe649ef348211e4c9231 Mon Sep 17 00:00:00 2001 From: pasqu4le Date: Fri, 10 May 2019 19:52:22 +0200 Subject: [PATCH 4/4] Add temporary indexer worker to remove consensus on blocks with mismatching transactions After the solution for #1991 it is necessary to remove consensus from blocks whose number of transactions mismatch compared to a node, so that these blocks are refetched. --- CHANGELOG.md | 1 + apps/explorer/lib/explorer/chain/block.ex | 6 +- ...0513134025_add_refetch_needed_to_block.exs | 11 ++ apps/indexer/README.md | 1 + apps/indexer/lib/indexer/supervisor.ex | 3 + .../temporary/blocks_transactions_mismatch.ex | 115 ++++++++++++++++++ 6 files changed, 135 insertions(+), 2 deletions(-) create mode 100644 apps/explorer/priv/repo/migrations/20190513134025_add_refetch_needed_to_block.exs create mode 100644 apps/indexer/lib/indexer/temporary/blocks_transactions_mismatch.ex diff --git a/CHANGELOG.md b/CHANGELOG.md index 0a7159ec7d..22cca8a536 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -43,6 +43,7 @@ - [#1898](https://github.com/poanetwork/blockscout/pull/1898) - check if the constructor has arguments before verifying constructor arguments - [#1915](https://github.com/poanetwork/blockscout/pull/1915) - fallback to 2 latest evm versions - [#1937](https://github.com/poanetwork/blockscout/pull/1937) - Check the presence of overlap[i] object before retrieving properties from it +- [#1917](https://github.com/poanetwork/blockscout/pull/1917) - Force block refetch if transaction is re-collated in a different block ### Chore diff --git a/apps/explorer/lib/explorer/chain/block.ex b/apps/explorer/lib/explorer/chain/block.ex index 58d187d9d8..9cf86b570b 100644 --- a/apps/explorer/lib/explorer/chain/block.ex +++ b/apps/explorer/lib/explorer/chain/block.ex @@ -10,7 +10,7 @@ defmodule Explorer.Chain.Block do alias Explorer.Chain.{Address, Gas, Hash, Transaction} alias Explorer.Chain.Block.{Reward, SecondDegreeRelation} - @optional_attrs ~w(internal_transactions_indexed_at size)a + @optional_attrs ~w(internal_transactions_indexed_at size refetch_needed)a @required_attrs ~w(consensus difficulty gas_limit gas_used hash miner_hash nonce number parent_hash timestamp total_difficulty)a @@ -63,7 +63,8 @@ defmodule Explorer.Chain.Block do timestamp: DateTime.t(), total_difficulty: difficulty(), transactions: %Ecto.Association.NotLoaded{} | [Transaction.t()], - internal_transactions_indexed_at: DateTime.t() + internal_transactions_indexed_at: DateTime.t(), + refetch_needed: boolean() } @primary_key {:hash, Hash.Full, autogenerate: false} @@ -78,6 +79,7 @@ defmodule Explorer.Chain.Block do field(:timestamp, :utc_datetime_usec) field(:total_difficulty, :decimal) field(:internal_transactions_indexed_at, :utc_datetime_usec) + field(:refetch_needed, :boolean) timestamps() diff --git a/apps/explorer/priv/repo/migrations/20190513134025_add_refetch_needed_to_block.exs b/apps/explorer/priv/repo/migrations/20190513134025_add_refetch_needed_to_block.exs new file mode 100644 index 0000000000..70ddac3e03 --- /dev/null +++ b/apps/explorer/priv/repo/migrations/20190513134025_add_refetch_needed_to_block.exs @@ -0,0 +1,11 @@ +defmodule Explorer.Repo.Migrations.AddRefetchNeededToBlock do + use Ecto.Migration + + def change do + alter table(:blocks) do + add(:refetch_needed, :boolean, default: false) + end + + execute("UPDATE blocks SET refetch_needed = TRUE;", "") + end +end diff --git a/apps/indexer/README.md b/apps/indexer/README.md index 34fb6e6ef3..173df0c6ab 100644 --- a/apps/indexer/README.md +++ b/apps/indexer/README.md @@ -92,6 +92,7 @@ After all deployed instances get all needed data, these fetchers should be depre - `uncataloged_token_transfers`: extracts token transfers from logs, which previously weren't parsed due to unknown format - `uncles_without_index`: adds previously unfetched `index` field for unfetched blocks in `block_second_degree_relations` +- `blocks_transactions_mismatch`: refetches each block once and revokes consensus to those whose transaction number mismatches with the number currently stored. This is meant to force the correction of a race condition that caused successfully fetched transactions to be overwritten by a following non-consensus block: [#1911](https://github.com/poanetwork/blockscout/issues/1911). ## Memory Usage diff --git a/apps/indexer/lib/indexer/supervisor.ex b/apps/indexer/lib/indexer/supervisor.ex index 21dc3637ec..b2c3d19664 100644 --- a/apps/indexer/lib/indexer/supervisor.ex +++ b/apps/indexer/lib/indexer/supervisor.ex @@ -24,6 +24,7 @@ defmodule Indexer.Supervisor do } alias Indexer.Temporary.{ + BlocksTransactionsMismatch, UncatalogedTokenTransfers, UnclesWithoutIndex } @@ -124,6 +125,8 @@ defmodule Indexer.Supervisor do # Temporary workers {UncatalogedTokenTransfers.Supervisor, [[]]}, {UnclesWithoutIndex.Supervisor, + [[json_rpc_named_arguments: json_rpc_named_arguments, memory_monitor: memory_monitor]]}, + {BlocksTransactionsMismatch.Supervisor, [[json_rpc_named_arguments: json_rpc_named_arguments, memory_monitor: memory_monitor]]} ], strategy: :one_for_one diff --git a/apps/indexer/lib/indexer/temporary/blocks_transactions_mismatch.ex b/apps/indexer/lib/indexer/temporary/blocks_transactions_mismatch.ex new file mode 100644 index 0000000000..76f4322379 --- /dev/null +++ b/apps/indexer/lib/indexer/temporary/blocks_transactions_mismatch.ex @@ -0,0 +1,115 @@ +defmodule Indexer.Temporary.BlocksTransactionsMismatch do + @moduledoc """ + Fetches `consensus` `t:Explorer.Chain.Block.t/0` and compares their transaction + number against a node, to revoke `consensus` on mismatch. + + This is meant to fix incorrectly strored transactions that happened as a result + of a race condition due to the asynchronicity of indexer's components. + """ + + use Indexer.Fetcher + + require Logger + + import Ecto.Query + + alias Ecto.Multi + alias EthereumJSONRPC.Blocks + alias Explorer.Chain.Block + alias Explorer.Repo + alias Indexer.BufferedTask + + @behaviour BufferedTask + + @defaults [ + flush_interval: :timer.seconds(3), + max_batch_size: 10, + max_concurrency: 4, + task_supervisor: Indexer.Temporary.BlocksTransactionsMismatch.TaskSupervisor, + metadata: [fetcher: :blocks_transactions_mismatch] + ] + + @doc false + def child_spec([init_options, gen_server_options]) when is_list(init_options) do + {state, mergeable_init_options} = Keyword.pop(init_options, :json_rpc_named_arguments) + + unless state do + raise ArgumentError, + ":json_rpc_named_arguments must be provided to `#{__MODULE__}.child_spec " <> + "to allow for json_rpc calls when running." + end + + merged_init_options = + @defaults + |> Keyword.merge(mergeable_init_options) + |> Keyword.put(:state, state) + + Supervisor.child_spec({BufferedTask, [{__MODULE__, merged_init_options}, gen_server_options]}, id: __MODULE__) + end + + @impl BufferedTask + def init(initial, reducer, _) do + query = + from(block in Block, + join: transactions in assoc(block, :transactions), + where: block.consensus and block.refetch_needed, + group_by: block.hash, + select: {block, count(transactions.hash)} + ) + + {:ok, final} = Repo.stream_reduce(query, initial, &reducer.(&1, &2)) + + final + end + + @impl BufferedTask + def run(blocks_data, json_rpc_named_arguments) do + hashes = Enum.map(blocks_data, fn {block, _trans_num} -> block.hash end) + + Logger.debug("fetching") + + case EthereumJSONRPC.fetch_blocks_by_hash(hashes, json_rpc_named_arguments) do + {:ok, blocks} -> + run_blocks(blocks, blocks_data) + + {:error, reason} -> + Logger.error(fn -> ["failed to fetch: ", inspect(reason)] end) + {:retry, blocks_data} + end + end + + defp run_blocks(%Blocks{blocks_params: []}, blocks_data), do: {:retry, blocks_data} + + defp run_blocks( + %Blocks{transactions_params: transactions_params}, + blocks_data + ) do + found_blocks_map = + transactions_params + |> Enum.group_by(&Map.fetch!(&1, :block_hash)) + |> Map.new(fn {block_hash, trans_lst} -> {block_hash, Enum.count(trans_lst)} end) + + {found_blocks_data, missing_blocks_data} = + Enum.split_with(blocks_data, fn {block, _trans_num} -> + Map.has_key?(found_blocks_map, to_string(block.hash)) + end) + + {:ok, _} = + found_blocks_data + |> Enum.reduce(Multi.new(), fn {block, trans_num}, multi -> + changes = %{ + refetch_needed: false, + consensus: found_blocks_map[to_string(block.hash)] == trans_num + } + + Multi.update(multi, block.hash, Block.changeset(block, changes)) + end) + |> Repo.transaction() + + if Enum.empty?(missing_blocks_data) do + :ok + else + {:retry, missing_blocks_data} + end + end +end