Add temporary indexer worker to remove consensus on blocks with mismatching transactions

After the solution for #1991 it is necessary to remove consensus from blocks whose number of transactions mismatch compared to a node, so that these blocks are refetched.
pull/1917/head
pasqu4le 6 years ago
parent b9cc8a417f
commit 64acef7b71
No known key found for this signature in database
GPG Key ID: 8F3EE01F1DC90687
  1. 1
      CHANGELOG.md
  2. 6
      apps/explorer/lib/explorer/chain/block.ex
  3. 11
      apps/explorer/priv/repo/migrations/20190513134025_add_refetch_needed_to_block.exs
  4. 1
      apps/indexer/README.md
  5. 3
      apps/indexer/lib/indexer/supervisor.ex
  6. 115
      apps/indexer/lib/indexer/temporary/blocks_transactions_mismatch.ex

@ -43,6 +43,7 @@
- [#1898](https://github.com/poanetwork/blockscout/pull/1898) - check if the constructor has arguments before verifying constructor arguments - [#1898](https://github.com/poanetwork/blockscout/pull/1898) - check if the constructor has arguments before verifying constructor arguments
- [#1915](https://github.com/poanetwork/blockscout/pull/1915) - fallback to 2 latest evm versions - [#1915](https://github.com/poanetwork/blockscout/pull/1915) - fallback to 2 latest evm versions
- [#1937](https://github.com/poanetwork/blockscout/pull/1937) - Check the presence of overlap[i] object before retrieving properties from it - [#1937](https://github.com/poanetwork/blockscout/pull/1937) - Check the presence of overlap[i] object before retrieving properties from it
- [#1917](https://github.com/poanetwork/blockscout/pull/1917) - Force block refetch if transaction is re-collated in a different block
### Chore ### Chore

@ -10,7 +10,7 @@ defmodule Explorer.Chain.Block do
alias Explorer.Chain.{Address, Gas, Hash, Transaction} alias Explorer.Chain.{Address, Gas, Hash, Transaction}
alias Explorer.Chain.Block.{Reward, SecondDegreeRelation} alias Explorer.Chain.Block.{Reward, SecondDegreeRelation}
@optional_attrs ~w(internal_transactions_indexed_at size)a @optional_attrs ~w(internal_transactions_indexed_at size refetch_needed)a
@required_attrs ~w(consensus difficulty gas_limit gas_used hash miner_hash nonce number parent_hash timestamp total_difficulty)a @required_attrs ~w(consensus difficulty gas_limit gas_used hash miner_hash nonce number parent_hash timestamp total_difficulty)a
@ -63,7 +63,8 @@ defmodule Explorer.Chain.Block do
timestamp: DateTime.t(), timestamp: DateTime.t(),
total_difficulty: difficulty(), total_difficulty: difficulty(),
transactions: %Ecto.Association.NotLoaded{} | [Transaction.t()], transactions: %Ecto.Association.NotLoaded{} | [Transaction.t()],
internal_transactions_indexed_at: DateTime.t() internal_transactions_indexed_at: DateTime.t(),
refetch_needed: boolean()
} }
@primary_key {:hash, Hash.Full, autogenerate: false} @primary_key {:hash, Hash.Full, autogenerate: false}
@ -78,6 +79,7 @@ defmodule Explorer.Chain.Block do
field(:timestamp, :utc_datetime_usec) field(:timestamp, :utc_datetime_usec)
field(:total_difficulty, :decimal) field(:total_difficulty, :decimal)
field(:internal_transactions_indexed_at, :utc_datetime_usec) field(:internal_transactions_indexed_at, :utc_datetime_usec)
field(:refetch_needed, :boolean)
timestamps() timestamps()

@ -0,0 +1,11 @@
defmodule Explorer.Repo.Migrations.AddRefetchNeededToBlock do
use Ecto.Migration
def change do
alter table(:blocks) do
add(:refetch_needed, :boolean, default: false)
end
execute("UPDATE blocks SET refetch_needed = TRUE;", "")
end
end

@ -92,6 +92,7 @@ After all deployed instances get all needed data, these fetchers should be depre
- `uncataloged_token_transfers`: extracts token transfers from logs, which previously weren't parsed due to unknown format - `uncataloged_token_transfers`: extracts token transfers from logs, which previously weren't parsed due to unknown format
- `uncles_without_index`: adds previously unfetched `index` field for unfetched blocks in `block_second_degree_relations` - `uncles_without_index`: adds previously unfetched `index` field for unfetched blocks in `block_second_degree_relations`
- `blocks_transactions_mismatch`: refetches each block once and revokes consensus to those whose transaction number mismatches with the number currently stored. This is meant to force the correction of a race condition that caused successfully fetched transactions to be overwritten by a following non-consensus block: [#1911](https://github.com/poanetwork/blockscout/issues/1911).
## Memory Usage ## Memory Usage

@ -24,6 +24,7 @@ defmodule Indexer.Supervisor do
} }
alias Indexer.Temporary.{ alias Indexer.Temporary.{
BlocksTransactionsMismatch,
UncatalogedTokenTransfers, UncatalogedTokenTransfers,
UnclesWithoutIndex UnclesWithoutIndex
} }
@ -124,6 +125,8 @@ defmodule Indexer.Supervisor do
# Temporary workers # Temporary workers
{UncatalogedTokenTransfers.Supervisor, [[]]}, {UncatalogedTokenTransfers.Supervisor, [[]]},
{UnclesWithoutIndex.Supervisor, {UnclesWithoutIndex.Supervisor,
[[json_rpc_named_arguments: json_rpc_named_arguments, memory_monitor: memory_monitor]]},
{BlocksTransactionsMismatch.Supervisor,
[[json_rpc_named_arguments: json_rpc_named_arguments, memory_monitor: memory_monitor]]} [[json_rpc_named_arguments: json_rpc_named_arguments, memory_monitor: memory_monitor]]}
], ],
strategy: :one_for_one strategy: :one_for_one

@ -0,0 +1,115 @@
defmodule Indexer.Temporary.BlocksTransactionsMismatch do
@moduledoc """
Fetches `consensus` `t:Explorer.Chain.Block.t/0` and compares their transaction
number against a node, to revoke `consensus` on mismatch.
This is meant to fix incorrectly strored transactions that happened as a result
of a race condition due to the asynchronicity of indexer's components.
"""
use Indexer.Fetcher
require Logger
import Ecto.Query
alias Ecto.Multi
alias EthereumJSONRPC.Blocks
alias Explorer.Chain.Block
alias Explorer.Repo
alias Indexer.BufferedTask
@behaviour BufferedTask
@defaults [
flush_interval: :timer.seconds(3),
max_batch_size: 10,
max_concurrency: 4,
task_supervisor: Indexer.Temporary.BlocksTransactionsMismatch.TaskSupervisor,
metadata: [fetcher: :blocks_transactions_mismatch]
]
@doc false
def child_spec([init_options, gen_server_options]) when is_list(init_options) do
{state, mergeable_init_options} = Keyword.pop(init_options, :json_rpc_named_arguments)
unless state do
raise ArgumentError,
":json_rpc_named_arguments must be provided to `#{__MODULE__}.child_spec " <>
"to allow for json_rpc calls when running."
end
merged_init_options =
@defaults
|> Keyword.merge(mergeable_init_options)
|> Keyword.put(:state, state)
Supervisor.child_spec({BufferedTask, [{__MODULE__, merged_init_options}, gen_server_options]}, id: __MODULE__)
end
@impl BufferedTask
def init(initial, reducer, _) do
query =
from(block in Block,
join: transactions in assoc(block, :transactions),
where: block.consensus and block.refetch_needed,
group_by: block.hash,
select: {block, count(transactions.hash)}
)
{:ok, final} = Repo.stream_reduce(query, initial, &reducer.(&1, &2))
final
end
@impl BufferedTask
def run(blocks_data, json_rpc_named_arguments) do
hashes = Enum.map(blocks_data, fn {block, _trans_num} -> block.hash end)
Logger.debug("fetching")
case EthereumJSONRPC.fetch_blocks_by_hash(hashes, json_rpc_named_arguments) do
{:ok, blocks} ->
run_blocks(blocks, blocks_data)
{:error, reason} ->
Logger.error(fn -> ["failed to fetch: ", inspect(reason)] end)
{:retry, blocks_data}
end
end
defp run_blocks(%Blocks{blocks_params: []}, blocks_data), do: {:retry, blocks_data}
defp run_blocks(
%Blocks{transactions_params: transactions_params},
blocks_data
) do
found_blocks_map =
transactions_params
|> Enum.group_by(&Map.fetch!(&1, :block_hash))
|> Map.new(fn {block_hash, trans_lst} -> {block_hash, Enum.count(trans_lst)} end)
{found_blocks_data, missing_blocks_data} =
Enum.split_with(blocks_data, fn {block, _trans_num} ->
Map.has_key?(found_blocks_map, to_string(block.hash))
end)
{:ok, _} =
found_blocks_data
|> Enum.reduce(Multi.new(), fn {block, trans_num}, multi ->
changes = %{
refetch_needed: false,
consensus: found_blocks_map[to_string(block.hash)] == trans_num
}
Multi.update(multi, block.hash, Block.changeset(block, changes))
end)
|> Repo.transaction()
if Enum.empty?(missing_blocks_data) do
:ok
else
{:retry, missing_blocks_data}
end
end
end
Loading…
Cancel
Save