Merge pull request #1917 from poanetwork/gs-tx-race-conditions

Force block refetch if transaction is re-collated in a different block
pull/1967/head
Victor Baranov 6 years ago committed by GitHub
commit 8c1187a4d1
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 1
      CHANGELOG.md
  2. 2
      apps/explorer/lib/explorer/chain.ex
  3. 6
      apps/explorer/lib/explorer/chain/block.ex
  4. 50
      apps/explorer/lib/explorer/chain/import/runner/transactions.ex
  5. 5
      apps/explorer/lib/explorer/chain/transaction.ex
  6. 12
      apps/explorer/priv/repo/migrations/20190508152922_add_old_block_hash_for_transactions.exs
  7. 11
      apps/explorer/priv/repo/migrations/20190513134025_add_refetch_needed_to_block.exs
  8. 47
      apps/explorer/test/explorer/chain/import/runner/blocks_test.exs
  9. 1
      apps/indexer/README.md
  10. 3
      apps/indexer/lib/indexer/supervisor.ex
  11. 115
      apps/indexer/lib/indexer/temporary/blocks_transactions_mismatch.ex

@ -43,6 +43,7 @@
- [#1898](https://github.com/poanetwork/blockscout/pull/1898) - check if the constructor has arguments before verifying constructor arguments
- [#1915](https://github.com/poanetwork/blockscout/pull/1915) - fallback to 2 latest evm versions
- [#1937](https://github.com/poanetwork/blockscout/pull/1937) - Check the presence of overlap[i] object before retrieving properties from it
- [#1917](https://github.com/poanetwork/blockscout/pull/1917) - Force block refetch if transaction is re-collated in a different block
### Chore

@ -2565,7 +2565,7 @@ defmodule Explorer.Chain do
join: duplicate in subquery(query),
on: duplicate.nonce == pending.nonce,
on: duplicate.from_address_hash == pending.from_address_hash,
where: pending.hash in ^hashes
where: pending.hash in ^hashes and is_nil(pending.block_hash)
)
Repo.update_all(transactions_to_update, [set: [error: "dropped/replaced", status: :error]], timeout: timeout)

@ -10,7 +10,7 @@ defmodule Explorer.Chain.Block do
alias Explorer.Chain.{Address, Gas, Hash, Transaction}
alias Explorer.Chain.Block.{Reward, SecondDegreeRelation}
@optional_attrs ~w(internal_transactions_indexed_at size)a
@optional_attrs ~w(internal_transactions_indexed_at size refetch_needed)a
@required_attrs ~w(consensus difficulty gas_limit gas_used hash miner_hash nonce number parent_hash timestamp total_difficulty)a
@ -63,7 +63,8 @@ defmodule Explorer.Chain.Block do
timestamp: DateTime.t(),
total_difficulty: difficulty(),
transactions: %Ecto.Association.NotLoaded{} | [Transaction.t()],
internal_transactions_indexed_at: DateTime.t()
internal_transactions_indexed_at: DateTime.t(),
refetch_needed: boolean()
}
@primary_key {:hash, Hash.Full, autogenerate: false}
@ -78,6 +79,7 @@ defmodule Explorer.Chain.Block do
field(:timestamp, :utc_datetime_usec)
field(:total_difficulty, :decimal)
field(:internal_transactions_indexed_at, :utc_datetime_usec)
field(:refetch_needed, :boolean)
timestamps()

@ -8,7 +8,7 @@ defmodule Explorer.Chain.Import.Runner.Transactions do
import Ecto.Query, only: [from: 2]
alias Ecto.{Multi, Repo}
alias Explorer.Chain.{Data, Hash, Import, Transaction}
alias Explorer.Chain.{Block, Data, Hash, Import, Transaction}
alias Explorer.Chain.Import.Runner.TokenTransfers
@behaviour Import.Runner
@ -42,9 +42,13 @@ defmodule Explorer.Chain.Import.Runner.Transactions do
|> Map.put(:timestamps, timestamps)
|> Map.put(:token_transfer_transaction_hash_set, token_transfer_transaction_hash_set(options))
Multi.run(multi, :transactions, fn repo, _ ->
multi
|> Multi.run(:transactions, fn repo, _ ->
insert(repo, changes_list, insert_options)
end)
|> Multi.run(:recollated_transactions, fn repo, %{transactions: transactions} ->
discard_blocks_for_recollated_transactions(repo, transactions, insert_options)
end)
end
@impl Import.Runner
@ -87,7 +91,7 @@ defmodule Explorer.Chain.Import.Runner.Transactions do
on_conflict: on_conflict,
for: Transaction,
returning:
~w(block_number index hash internal_transactions_indexed_at block_hash nonce from_address_hash created_contract_address_hash)a,
~w(block_number index hash internal_transactions_indexed_at block_hash old_block_hash nonce from_address_hash created_contract_address_hash)a,
timeout: timeout,
timestamps: timestamps
)
@ -99,6 +103,7 @@ defmodule Explorer.Chain.Import.Runner.Transactions do
update: [
set: [
block_hash: fragment("EXCLUDED.block_hash"),
old_block_hash: transaction.block_hash,
block_number: fragment("EXCLUDED.block_number"),
created_contract_address_hash: fragment("EXCLUDED.created_contract_address_hash"),
cumulative_gas_used: fragment("EXCLUDED.cumulative_gas_used"),
@ -179,4 +184,43 @@ defmodule Explorer.Chain.Import.Runner.Transactions do
end
defp put_internal_transactions_indexed_at?(_, _), do: false
defp discard_blocks_for_recollated_transactions(repo, transactions, %{
timeout: timeout,
timestamps: %{updated_at: updated_at}
})
when is_list(transactions) do
ordered_block_hashes =
transactions
|> Enum.filter(fn %{block_hash: block_hash, old_block_hash: old_block_hash} ->
not is_nil(old_block_hash) and block_hash != old_block_hash
end)
|> MapSet.new(& &1.old_block_hash)
|> Enum.sort()
if Enum.empty?(ordered_block_hashes) do
{:ok, []}
else
query =
from(
block in Block,
where: block.hash in ^ordered_block_hashes,
update: [
set: [
consensus: false,
updated_at: ^updated_at
]
]
)
try do
{_, result} = repo.update_all(query, [], timeout: timeout)
{:ok, result}
rescue
postgrex_error in Postgrex.Error ->
{:error, %{exception: postgrex_error, block_hashes: ordered_block_hashes}}
end
end
end
end

@ -205,6 +205,11 @@ defmodule Explorer.Chain.Transaction do
field(:v, :decimal)
field(:value, Wei)
# A transient field for deriving old block hash during transaction upserts.
# Used to force refetch of a block in case a transaction is re-collated
# in a different block. See: https://github.com/poanetwork/blockscout/issues/1911
field(:old_block_hash, Hash.Full)
timestamps()
belongs_to(:block, Block, foreign_key: :block_hash, references: :hash, type: Hash.Full)

@ -0,0 +1,12 @@
defmodule Explorer.Repo.Migrations.AddOldBlockHashForTransactions do
use Ecto.Migration
def change do
alter table(:transactions) do
# A transient field for deriving old block hash during transaction upserts.
# Used to force refetch of a block in case a transaction is re-collated
# in a different block. See: https://github.com/poanetwork/blockscout/issues/1911
add(:old_block_hash, :bytea, null: true)
end
end
end

@ -0,0 +1,11 @@
defmodule Explorer.Repo.Migrations.AddRefetchNeededToBlock do
use Ecto.Migration
def change do
alter table(:blocks) do
add(:refetch_needed, :boolean, default: false)
end
execute("UPDATE blocks SET refetch_needed = TRUE;", "")
end
end

@ -6,7 +6,7 @@ defmodule Explorer.Chain.Import.Runner.BlocksTest do
import Explorer.Chain.Import.RunnerCase, only: [insert_address_with_token_balances: 1, update_holder_count!: 2]
alias Ecto.Multi
alias Explorer.Chain.Import.Runner.{Blocks, Transaction}
alias Explorer.Chain.Import.Runner.{Blocks, Transactions}
alias Explorer.Chain.{Address, Block, Transaction}
alias Explorer.Chain
alias Explorer.Repo
@ -283,6 +283,29 @@ defmodule Explorer.Chain.Import.Runner.BlocksTest do
insert_block(new_block1, options)
assert Chain.missing_block_number_ranges(range) == []
end
# Regression test for https://github.com/poanetwork/blockscout/issues/1911
test "forces block refetch if transaction is re-collated in a different block",
%{consensus_block: %Block{number: block_number, hash: block_hash, miner_hash: miner_hash}, options: options} do
new_block1 = params_for(:block, miner_hash: miner_hash, parent_hash: block_hash, number: block_number + 1)
new_block2 = params_for(:block, miner_hash: miner_hash, parent_hash: new_block1.hash, number: block_number + 2)
range = block_number..(block_number + 2)
insert_block(new_block1, options)
insert_block(new_block2, options)
assert Chain.missing_block_number_ranges(range) == []
trans_hash = transaction_hash()
transaction1 = transaction_params_with_block([hash: trans_hash], new_block1)
insert_transaction(transaction1, options)
assert Chain.missing_block_number_ranges(range) == []
transaction2 = transaction_params_with_block([hash: trans_hash], new_block2)
insert_transaction(transaction2, options)
assert Chain.missing_block_number_ranges(range) == [(block_number + 1)..(block_number + 1)]
end
end
defp insert_block(block_params, options) do
@ -293,6 +316,28 @@ defmodule Explorer.Chain.Import.Runner.BlocksTest do
|> Repo.transaction()
end
defp transaction_params_with_block(transaction_params, block_params) do
params_for(:transaction, transaction_params)
|> Map.merge(%{
block_hash: block_params.hash,
block_number: block_params.number,
cumulative_gas_used: 50_000,
error: nil,
gas_used: 50_000,
index: 0,
from_address_hash: insert(:address).hash
})
end
defp insert_transaction(transaction_params, options) do
%Ecto.Changeset{valid?: true, changes: transaction_changes} =
Transaction.changeset(%Transaction{}, transaction_params)
Multi.new()
|> Transactions.run([transaction_changes], options)
|> Repo.transaction()
end
defp count(schema) do
Repo.one!(select(schema, fragment("COUNT(*)")))
end

@ -92,6 +92,7 @@ After all deployed instances get all needed data, these fetchers should be depre
- `uncataloged_token_transfers`: extracts token transfers from logs, which previously weren't parsed due to unknown format
- `uncles_without_index`: adds previously unfetched `index` field for unfetched blocks in `block_second_degree_relations`
- `blocks_transactions_mismatch`: refetches each block once and revokes consensus to those whose transaction number mismatches with the number currently stored. This is meant to force the correction of a race condition that caused successfully fetched transactions to be overwritten by a following non-consensus block: [#1911](https://github.com/poanetwork/blockscout/issues/1911).
## Memory Usage

@ -24,6 +24,7 @@ defmodule Indexer.Supervisor do
}
alias Indexer.Temporary.{
BlocksTransactionsMismatch,
UncatalogedTokenTransfers,
UnclesWithoutIndex
}
@ -124,6 +125,8 @@ defmodule Indexer.Supervisor do
# Temporary workers
{UncatalogedTokenTransfers.Supervisor, [[]]},
{UnclesWithoutIndex.Supervisor,
[[json_rpc_named_arguments: json_rpc_named_arguments, memory_monitor: memory_monitor]]},
{BlocksTransactionsMismatch.Supervisor,
[[json_rpc_named_arguments: json_rpc_named_arguments, memory_monitor: memory_monitor]]}
],
strategy: :one_for_one

@ -0,0 +1,115 @@
defmodule Indexer.Temporary.BlocksTransactionsMismatch do
@moduledoc """
Fetches `consensus` `t:Explorer.Chain.Block.t/0` and compares their transaction
number against a node, to revoke `consensus` on mismatch.
This is meant to fix incorrectly strored transactions that happened as a result
of a race condition due to the asynchronicity of indexer's components.
"""
use Indexer.Fetcher
require Logger
import Ecto.Query
alias Ecto.Multi
alias EthereumJSONRPC.Blocks
alias Explorer.Chain.Block
alias Explorer.Repo
alias Indexer.BufferedTask
@behaviour BufferedTask
@defaults [
flush_interval: :timer.seconds(3),
max_batch_size: 10,
max_concurrency: 4,
task_supervisor: Indexer.Temporary.BlocksTransactionsMismatch.TaskSupervisor,
metadata: [fetcher: :blocks_transactions_mismatch]
]
@doc false
def child_spec([init_options, gen_server_options]) when is_list(init_options) do
{state, mergeable_init_options} = Keyword.pop(init_options, :json_rpc_named_arguments)
unless state do
raise ArgumentError,
":json_rpc_named_arguments must be provided to `#{__MODULE__}.child_spec " <>
"to allow for json_rpc calls when running."
end
merged_init_options =
@defaults
|> Keyword.merge(mergeable_init_options)
|> Keyword.put(:state, state)
Supervisor.child_spec({BufferedTask, [{__MODULE__, merged_init_options}, gen_server_options]}, id: __MODULE__)
end
@impl BufferedTask
def init(initial, reducer, _) do
query =
from(block in Block,
join: transactions in assoc(block, :transactions),
where: block.consensus and block.refetch_needed,
group_by: block.hash,
select: {block, count(transactions.hash)}
)
{:ok, final} = Repo.stream_reduce(query, initial, &reducer.(&1, &2))
final
end
@impl BufferedTask
def run(blocks_data, json_rpc_named_arguments) do
hashes = Enum.map(blocks_data, fn {block, _trans_num} -> block.hash end)
Logger.debug("fetching")
case EthereumJSONRPC.fetch_blocks_by_hash(hashes, json_rpc_named_arguments) do
{:ok, blocks} ->
run_blocks(blocks, blocks_data)
{:error, reason} ->
Logger.error(fn -> ["failed to fetch: ", inspect(reason)] end)
{:retry, blocks_data}
end
end
defp run_blocks(%Blocks{blocks_params: []}, blocks_data), do: {:retry, blocks_data}
defp run_blocks(
%Blocks{transactions_params: transactions_params},
blocks_data
) do
found_blocks_map =
transactions_params
|> Enum.group_by(&Map.fetch!(&1, :block_hash))
|> Map.new(fn {block_hash, trans_lst} -> {block_hash, Enum.count(trans_lst)} end)
{found_blocks_data, missing_blocks_data} =
Enum.split_with(blocks_data, fn {block, _trans_num} ->
Map.has_key?(found_blocks_map, to_string(block.hash))
end)
{:ok, _} =
found_blocks_data
|> Enum.reduce(Multi.new(), fn {block, trans_num}, multi ->
changes = %{
refetch_needed: false,
consensus: found_blocks_map[to_string(block.hash)] == trans_num
}
Multi.update(multi, block.hash, Block.changeset(block, changes))
end)
|> Repo.transaction()
if Enum.empty?(missing_blocks_data) do
:ok
else
{:retry, missing_blocks_data}
end
end
end
Loading…
Cancel
Save