Pending transactions sanitizer

pull/3715/head
Viktor Baranov 4 years ago
parent ae03f9436b
commit b4c049abb1
  1. 1
      CHANGELOG.md
  2. 14
      apps/explorer/lib/explorer/chain.ex
  3. 8
      apps/explorer/lib/explorer/chain/import/runner/blocks.ex
  4. 5
      apps/indexer/config/dev.exs
  5. 5
      apps/indexer/config/prod.exs
  6. 5
      apps/indexer/lib/indexer/block/realtime/fetcher.ex
  7. 124
      apps/indexer/lib/indexer/pending_transactions_sanitizer.ex
  8. 2
      apps/indexer/lib/indexer/supervisor.ex
  9. 3
      config/config.exs

@ -13,6 +13,7 @@
- [#3564](https://github.com/poanetwork/blockscout/pull/3564) - Staking welcome message
### Fixes
- [#3715](https://github.com/poanetwork/blockscout/pull/3715) - Pending transactions sanitizer process
- [#3710](https://github.com/poanetwork/blockscout/pull/3710) - Missing @destination in bridged-tokens template
- [#3707](https://github.com/poanetwork/blockscout/pull/3707) - Fetch bridged token price by address of foreign token, not by symbol
- [#3686](https://github.com/poanetwork/blockscout/pull/3686) - BSC bridged tokens detection fix

@ -1618,6 +1618,10 @@ defmodule Explorer.Chain do
Repo.one!(query, timeout: :infinity) || 0
end
def fetch_block_by_hash(block_hash) do
Repo.get(Block, block_hash)
end
@spec fetch_sum_coin_total_supply_minus_burnt() :: non_neg_integer
def fetch_sum_coin_total_supply_minus_burnt do
{:ok, burn_address_hash} = string_to_address_hash("0x0000000000000000000000000000000000000000")
@ -2748,6 +2752,16 @@ defmodule Explorer.Chain do
)
end
def pending_transactions_list do
query =
from(transaction in Transaction,
where: is_nil(transaction.block_hash) and (is_nil(transaction.error) or transaction.error != "dropped/replaced")
)
query
|> Repo.all()
end
@doc """
The `string` must start with `0x`, then is converted to an integer and then to `t:Explorer.Chain.Hash.Address.t/0`.

@ -273,10 +273,10 @@ defmodule Explorer.Chain.Import.Runner.Blocks do
|> Enum.map(& &1.number)
end
defp lose_consensus(repo, hashes, consensus_block_numbers, changes_list, %{
timeout: timeout,
timestamps: %{updated_at: updated_at}
}) do
def lose_consensus(repo, hashes, consensus_block_numbers, changes_list, %{
timeout: timeout,
timestamps: %{updated_at: updated_at}
}) do
acquire_query =
from(
block in where_invalid_neighbour(changes_list),

@ -21,6 +21,11 @@ config :logger, :addresses_without_code,
path: Path.absname("logs/dev/indexer/addresses_without_code.log"),
metadata_filter: [fetcher: :addresses_without_code]
config :logger, :pending_transactions_to_refetch,
level: :debug,
path: Path.absname("logs/dev/indexer/pending_transactions_to_refetch.log"),
metadata_filter: [fetcher: :pending_transactions_to_refetch]
variant =
if is_nil(System.get_env("ETHEREUM_JSONRPC_VARIANT")) do
"ganache"

@ -23,6 +23,11 @@ config :logger, :addresses_without_code,
path: Path.absname("logs/prod/indexer/addresses_without_code.log"),
metadata_filter: [fetcher: :addresses_without_code]
config :logger, :pending_transactions_to_refetch,
level: :debug,
path: Path.absname("logs/prod/indexer/pending_transactions_to_refetch.log"),
metadata_filter: [fetcher: :pending_transactions_to_refetch]
variant =
if is_nil(System.get_env("ETHEREUM_JSONRPC_VARIANT")) do
"parity"

@ -140,6 +140,11 @@ defmodule Indexer.Block.Realtime.Fetcher do
}}
end
# don't handle other messages (e.g. :ssl_closed)
def handle_info(_, state) do
{:noreply, state}
end
defp subscribe_to_new_heads(%__MODULE__{subscription: nil} = state, subscribe_named_arguments)
when is_list(subscribe_named_arguments) do
case EthereumJSONRPC.subscribe("newHeads", subscribe_named_arguments) do

@ -0,0 +1,124 @@
defmodule Indexer.PendingTransactionsSanitizer do
@moduledoc """
Periodically checks pending transactions status in order to detect that transaction already included to the block
And we need to re-fetch that block.
"""
use GenServer
require Logger
import EthereumJSONRPC, only: [json_rpc: 2, request: 1]
alias Explorer.{Chain, Repo}
alias Explorer.Chain.Import.Runner.Blocks
@interval :timer.hours(3)
defstruct interval: @interval,
json_rpc_named_arguments: []
def child_spec([init_arguments]) do
child_spec([init_arguments, []])
end
def child_spec([_init_arguments, _gen_server_options] = start_link_arguments) do
default = %{
id: __MODULE__,
start: {__MODULE__, :start_link, start_link_arguments}
}
Supervisor.child_spec(default, [])
end
def start_link(init_opts, gen_server_opts \\ []) do
GenServer.start_link(__MODULE__, init_opts, gen_server_opts)
end
def init(opts) when is_list(opts) do
state = %__MODULE__{
json_rpc_named_arguments: Keyword.fetch!(opts, :json_rpc_named_arguments),
interval: opts[:interval] || @interval
}
Process.send_after(self(), :sanitize_pending_transactions, state.interval)
{:ok, state}
end
def handle_info(
:sanitize_pending_transactions,
%{interval: interval, json_rpc_named_arguments: json_rpc_named_arguments} = state
) do
Logger.debug("Start sanitizing of pending transactions",
fetcher: :pending_transactions_to_refetch
)
sanitize_pending_transactions(json_rpc_named_arguments)
Process.send_after(self(), :sanitize_pending_transactions, interval)
{:noreply, state}
end
defp sanitize_pending_transactions(json_rpc_named_arguments) do
pending_transactions_list_from_db = Chain.pending_transactions_list()
pending_transactions_list_from_db
|> Enum.with_index()
|> Enum.each(fn {pending_tx, ind} ->
pending_tx_hash_str = "0x" <> Base.encode16(pending_tx.hash.bytes, case: :lower)
with {:ok, result} <-
%{id: ind, method: "eth_getTransactionReceipt", params: [pending_tx_hash_str]}
|> request()
|> json_rpc(json_rpc_named_arguments) do
if result do
block_hash = Map.get(result, "blockHash")
Logger.debug(
"Transaction #{pending_tx_hash_str} already included into the block #{block_hash}. We should invalidate consensus for it in order to re-fetch transactions",
fetcher: :pending_transactions_to_refetch
)
fetch_block_and_invalidate(block_hash)
end
end
end)
Logger.debug("Pending transactions are sanitized",
fetcher: :pending_transactions_to_refetch
)
end
defp fetch_block_and_invalidate(block_hash) do
case Chain.fetch_block_by_hash(block_hash) do
%{number: number, consensus: consensus} ->
Logger.debug(
"Corresponding number of the block with hash #{block_hash} to invalidate is #{number} and consensus #{
consensus
}",
fetcher: :pending_transactions_to_refetch
)
invalidate_block(number, consensus)
_ ->
Logger.debug(
"Block with hash #{block_hash} is not yet in the DB",
fetcher: :pending_transactions_to_refetch
)
end
end
defp invalidate_block(number, consensus) do
if consensus do
opts = %{
timeout: 60_000,
timestamps: %{updated_at: DateTime.utc_now()}
}
Blocks.lose_consensus(Repo, [], [number], [], opts)
end
end
end

@ -11,6 +11,7 @@ defmodule Indexer.Supervisor do
Block,
CalcLpTokensTotalLiqudity,
PendingOpsCleaner,
PendingTransactionsSanitizer,
SetAmbBridgedMetadataForTokens,
SetOmniBridgedMetadataForTokens
}
@ -128,6 +129,7 @@ defmodule Indexer.Supervisor do
{CoinBalanceOnDemand.Supervisor, [json_rpc_named_arguments]},
{TokenTotalSupplyOnDemand.Supervisor, [json_rpc_named_arguments]},
{TokenBalanceOnDemand.Supervisor, [json_rpc_named_arguments]},
{PendingTransactionsSanitizer, [[json_rpc_named_arguments: json_rpc_named_arguments]]},
# Temporary workers
{UncatalogedTokenTransfers.Supervisor, [[]]},

@ -29,7 +29,8 @@ config :logger,
{LoggerFileBackend, :indexer},
{LoggerFileBackend, :indexer_token_balances},
{LoggerFileBackend, :token_instances},
{LoggerFileBackend, :reading_token_functions}
{LoggerFileBackend, :reading_token_functions},
{LoggerFileBackend, :pending_transactions_to_refetch}
]
config :logger, :console,

Loading…
Cancel
Save