Chunk pending transactions

Fixes #1253

Ensure pending transaction import without timeout by chunking in case
the node has a lot of pending transactions.
pull/1254/head
Luke Imhoff 6 years ago
parent 575d3dda9a
commit 3592b66340
  1. 36
      apps/indexer/lib/indexer/pending_transaction/fetcher.ex

@ -11,9 +11,12 @@ defmodule Indexer.PendingTransaction.Fetcher do
import EthereumJSONRPC, only: [fetch_pending_transactions: 1] import EthereumJSONRPC, only: [fetch_pending_transactions: 1]
alias Ecto.Changeset
alias Explorer.Chain alias Explorer.Chain
alias Indexer.{AddressExtraction, PendingTransaction} alias Indexer.{AddressExtraction, PendingTransaction}
@chunk_size 250
# milliseconds # milliseconds
@default_interval 1_000 @default_interval 1_000
@ -106,24 +109,43 @@ defmodule Indexer.PendingTransaction.Fetcher do
case fetch_pending_transactions(json_rpc_named_arguments) do case fetch_pending_transactions(json_rpc_named_arguments) do
{:ok, transactions_params} -> {:ok, transactions_params} ->
transactions_params
|> Stream.chunk_every(@chunk_size)
|> Enum.each(&import_chunk/1)
:ignore ->
:ok
{:error, :timeout} ->
Logger.error("timeout")
:ok
end
end
defp import_chunk(transactions_params) do
addresses_params = AddressExtraction.extract_addresses(%{transactions: transactions_params}, pending: true) addresses_params = AddressExtraction.extract_addresses(%{transactions: transactions_params}, pending: true)
# There's no need to queue up fetching the address balance since theses are pending transactions and cannot have # There's no need to queue up fetching the address balance since theses are pending transactions and cannot have
# affected the address balance yet since address balance is a balance at a give block and these transactions are # affected the address balance yet since address balance is a balance at a give block and these transactions are
# blockless. # blockless.
{:ok, _} = case Chain.import(%{
Chain.import(%{ addresses: %{params: addresses_params, on_conflict: :nothing},
addresses: %{params: addresses_params},
broadcast: :realtime, broadcast: :realtime,
transactions: %{params: transactions_params, on_conflict: :nothing} transactions: %{params: transactions_params, on_conflict: :nothing}
}) }) do
{:ok, _} ->
:ok
:ignore -> {:error, [%Changeset{} | _] = changesets} ->
Logger.error(fn -> ["Failed to validate: ", inspect(changesets)] end, step: :import)
:ok :ok
{:error, :timeout} -> {:error, reason} ->
Logger.error("timeout") Logger.error(fn -> inspect(reason) end, step: :import)
{:error, step, failed_value, _changes_so_far} ->
Logger.error(fn -> ["Failed to import: ", inspect(failed_value)] end, step: step)
:ok :ok
end end
end end

Loading…
Cancel
Save