diff --git a/CHANGELOG.md b/CHANGELOG.md index 411c0cdbfb..cda3e2e191 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -13,6 +13,7 @@ - [#3564](https://github.com/poanetwork/blockscout/pull/3564) - Staking welcome message ### Fixes +- [#3715](https://github.com/poanetwork/blockscout/pull/3715) - Pending transactions sanitizer process - [#3710](https://github.com/poanetwork/blockscout/pull/3710) - Missing @destination in bridged-tokens template - [#3707](https://github.com/poanetwork/blockscout/pull/3707) - Fetch bridged token price by address of foreign token, not by symbol - [#3686](https://github.com/poanetwork/blockscout/pull/3686) - BSC bridged tokens detection fix diff --git a/apps/explorer/lib/explorer/chain.ex b/apps/explorer/lib/explorer/chain.ex index 99ab68c4cc..b1c074a297 100644 --- a/apps/explorer/lib/explorer/chain.ex +++ b/apps/explorer/lib/explorer/chain.ex @@ -1618,6 +1618,10 @@ defmodule Explorer.Chain do Repo.one!(query, timeout: :infinity) || 0 end + def fetch_block_by_hash(block_hash) do + Repo.get(Block, block_hash) + end + @spec fetch_sum_coin_total_supply_minus_burnt() :: non_neg_integer def fetch_sum_coin_total_supply_minus_burnt do {:ok, burn_address_hash} = string_to_address_hash("0x0000000000000000000000000000000000000000") @@ -2748,6 +2752,16 @@ defmodule Explorer.Chain do ) end + def pending_transactions_list do + query = + from(transaction in Transaction, + where: is_nil(transaction.block_hash) and (is_nil(transaction.error) or transaction.error != "dropped/replaced") + ) + + query + |> Repo.all() + end + @doc """ The `string` must start with `0x`, then is converted to an integer and then to `t:Explorer.Chain.Hash.Address.t/0`. diff --git a/apps/explorer/lib/explorer/chain/import/runner/blocks.ex b/apps/explorer/lib/explorer/chain/import/runner/blocks.ex index 6a85041620..b8d67208ce 100644 --- a/apps/explorer/lib/explorer/chain/import/runner/blocks.ex +++ b/apps/explorer/lib/explorer/chain/import/runner/blocks.ex @@ -273,10 +273,10 @@ defmodule Explorer.Chain.Import.Runner.Blocks do |> Enum.map(& &1.number) end - defp lose_consensus(repo, hashes, consensus_block_numbers, changes_list, %{ - timeout: timeout, - timestamps: %{updated_at: updated_at} - }) do + def lose_consensus(repo, hashes, consensus_block_numbers, changes_list, %{ + timeout: timeout, + timestamps: %{updated_at: updated_at} + }) do acquire_query = from( block in where_invalid_neighbour(changes_list), diff --git a/apps/indexer/config/dev.exs b/apps/indexer/config/dev.exs index 8d53da7f69..85c3bb8abe 100644 --- a/apps/indexer/config/dev.exs +++ b/apps/indexer/config/dev.exs @@ -21,6 +21,11 @@ config :logger, :addresses_without_code, path: Path.absname("logs/dev/indexer/addresses_without_code.log"), metadata_filter: [fetcher: :addresses_without_code] +config :logger, :pending_transactions_to_refetch, + level: :debug, + path: Path.absname("logs/dev/indexer/pending_transactions_to_refetch.log"), + metadata_filter: [fetcher: :pending_transactions_to_refetch] + variant = if is_nil(System.get_env("ETHEREUM_JSONRPC_VARIANT")) do "ganache" diff --git a/apps/indexer/config/prod.exs b/apps/indexer/config/prod.exs index 896b38a979..5929e71942 100644 --- a/apps/indexer/config/prod.exs +++ b/apps/indexer/config/prod.exs @@ -23,6 +23,11 @@ config :logger, :addresses_without_code, path: Path.absname("logs/prod/indexer/addresses_without_code.log"), metadata_filter: [fetcher: :addresses_without_code] +config :logger, :pending_transactions_to_refetch, + level: :debug, + path: Path.absname("logs/prod/indexer/pending_transactions_to_refetch.log"), + metadata_filter: [fetcher: :pending_transactions_to_refetch] + variant = if is_nil(System.get_env("ETHEREUM_JSONRPC_VARIANT")) do "parity" diff --git a/apps/indexer/lib/indexer/block/realtime/fetcher.ex b/apps/indexer/lib/indexer/block/realtime/fetcher.ex index ac064f4878..2b96f6ffe3 100644 --- a/apps/indexer/lib/indexer/block/realtime/fetcher.ex +++ b/apps/indexer/lib/indexer/block/realtime/fetcher.ex @@ -140,6 +140,11 @@ defmodule Indexer.Block.Realtime.Fetcher do }} end + # don't handle other messages (e.g. :ssl_closed) + def handle_info(_, state) do + {:noreply, state} + end + defp subscribe_to_new_heads(%__MODULE__{subscription: nil} = state, subscribe_named_arguments) when is_list(subscribe_named_arguments) do case EthereumJSONRPC.subscribe("newHeads", subscribe_named_arguments) do diff --git a/apps/indexer/lib/indexer/pending_transactions_sanitizer.ex b/apps/indexer/lib/indexer/pending_transactions_sanitizer.ex new file mode 100644 index 0000000000..74a47232a9 --- /dev/null +++ b/apps/indexer/lib/indexer/pending_transactions_sanitizer.ex @@ -0,0 +1,124 @@ +defmodule Indexer.PendingTransactionsSanitizer do + @moduledoc """ + Periodically checks pending transactions status in order to detect that transaction already included to the block + And we need to re-fetch that block. + """ + + use GenServer + + require Logger + + import EthereumJSONRPC, only: [json_rpc: 2, request: 1] + + alias Explorer.{Chain, Repo} + alias Explorer.Chain.Import.Runner.Blocks + + @interval :timer.hours(3) + + defstruct interval: @interval, + json_rpc_named_arguments: [] + + def child_spec([init_arguments]) do + child_spec([init_arguments, []]) + end + + def child_spec([_init_arguments, _gen_server_options] = start_link_arguments) do + default = %{ + id: __MODULE__, + start: {__MODULE__, :start_link, start_link_arguments} + } + + Supervisor.child_spec(default, []) + end + + def start_link(init_opts, gen_server_opts \\ []) do + GenServer.start_link(__MODULE__, init_opts, gen_server_opts) + end + + def init(opts) when is_list(opts) do + state = %__MODULE__{ + json_rpc_named_arguments: Keyword.fetch!(opts, :json_rpc_named_arguments), + interval: opts[:interval] || @interval + } + + Process.send_after(self(), :sanitize_pending_transactions, state.interval) + + {:ok, state} + end + + def handle_info( + :sanitize_pending_transactions, + %{interval: interval, json_rpc_named_arguments: json_rpc_named_arguments} = state + ) do + Logger.debug("Start sanitizing of pending transactions", + fetcher: :pending_transactions_to_refetch + ) + + sanitize_pending_transactions(json_rpc_named_arguments) + + Process.send_after(self(), :sanitize_pending_transactions, interval) + + {:noreply, state} + end + + defp sanitize_pending_transactions(json_rpc_named_arguments) do + pending_transactions_list_from_db = Chain.pending_transactions_list() + + pending_transactions_list_from_db + |> Enum.with_index() + |> Enum.each(fn {pending_tx, ind} -> + pending_tx_hash_str = "0x" <> Base.encode16(pending_tx.hash.bytes, case: :lower) + + with {:ok, result} <- + %{id: ind, method: "eth_getTransactionReceipt", params: [pending_tx_hash_str]} + |> request() + |> json_rpc(json_rpc_named_arguments) do + if result do + block_hash = Map.get(result, "blockHash") + + Logger.debug( + "Transaction #{pending_tx_hash_str} already included into the block #{block_hash}. We should invalidate consensus for it in order to re-fetch transactions", + fetcher: :pending_transactions_to_refetch + ) + + fetch_block_and_invalidate(block_hash) + end + end + end) + + Logger.debug("Pending transactions are sanitized", + fetcher: :pending_transactions_to_refetch + ) + end + + defp fetch_block_and_invalidate(block_hash) do + case Chain.fetch_block_by_hash(block_hash) do + %{number: number, consensus: consensus} -> + Logger.debug( + "Corresponding number of the block with hash #{block_hash} to invalidate is #{number} and consensus #{ + consensus + }", + fetcher: :pending_transactions_to_refetch + ) + + invalidate_block(number, consensus) + + _ -> + Logger.debug( + "Block with hash #{block_hash} is not yet in the DB", + fetcher: :pending_transactions_to_refetch + ) + end + end + + defp invalidate_block(number, consensus) do + if consensus do + opts = %{ + timeout: 60_000, + timestamps: %{updated_at: DateTime.utc_now()} + } + + Blocks.lose_consensus(Repo, [], [number], [], opts) + end + end +end diff --git a/apps/indexer/lib/indexer/supervisor.ex b/apps/indexer/lib/indexer/supervisor.ex index d7f8f09ec1..14c2a97a2a 100644 --- a/apps/indexer/lib/indexer/supervisor.ex +++ b/apps/indexer/lib/indexer/supervisor.ex @@ -11,6 +11,7 @@ defmodule Indexer.Supervisor do Block, CalcLpTokensTotalLiqudity, PendingOpsCleaner, + PendingTransactionsSanitizer, SetAmbBridgedMetadataForTokens, SetOmniBridgedMetadataForTokens } @@ -128,6 +129,7 @@ defmodule Indexer.Supervisor do {CoinBalanceOnDemand.Supervisor, [json_rpc_named_arguments]}, {TokenTotalSupplyOnDemand.Supervisor, [json_rpc_named_arguments]}, {TokenBalanceOnDemand.Supervisor, [json_rpc_named_arguments]}, + {PendingTransactionsSanitizer, [[json_rpc_named_arguments: json_rpc_named_arguments]]}, # Temporary workers {UncatalogedTokenTransfers.Supervisor, [[]]}, diff --git a/config/config.exs b/config/config.exs index 93e90bc7fe..6631863bce 100644 --- a/config/config.exs +++ b/config/config.exs @@ -29,7 +29,8 @@ config :logger, {LoggerFileBackend, :indexer}, {LoggerFileBackend, :indexer_token_balances}, {LoggerFileBackend, :token_instances}, - {LoggerFileBackend, :reading_token_functions} + {LoggerFileBackend, :reading_token_functions}, + {LoggerFileBackend, :pending_transactions_to_refetch} ] config :logger, :console,