From 3592b66340d8a0d5f22ef9b181f63d3cb53e1149 Mon Sep 17 00:00:00 2001 From: Luke Imhoff Date: Tue, 18 Dec 2018 12:05:24 -0600 Subject: [PATCH] Chunk pending transactions Fixes #1253 Ensure pending transaction import without timeout by chunking in case the node has a lot of pending transactions. --- .../indexer/pending_transaction/fetcher.ex | 44 ++++++++++++++----- 1 file changed, 33 insertions(+), 11 deletions(-) diff --git a/apps/indexer/lib/indexer/pending_transaction/fetcher.ex b/apps/indexer/lib/indexer/pending_transaction/fetcher.ex index 51c379a50d..85844ae7f7 100644 --- a/apps/indexer/lib/indexer/pending_transaction/fetcher.ex +++ b/apps/indexer/lib/indexer/pending_transaction/fetcher.ex @@ -11,9 +11,12 @@ defmodule Indexer.PendingTransaction.Fetcher do import EthereumJSONRPC, only: [fetch_pending_transactions: 1] + alias Ecto.Changeset alias Explorer.Chain alias Indexer.{AddressExtraction, PendingTransaction} + @chunk_size 250 + # milliseconds @default_interval 1_000 @@ -106,17 +109,9 @@ defmodule Indexer.PendingTransaction.Fetcher do case fetch_pending_transactions(json_rpc_named_arguments) do {:ok, transactions_params} -> - addresses_params = AddressExtraction.extract_addresses(%{transactions: transactions_params}, pending: true) - - # There's no need to queue up fetching the address balance since theses are pending transactions and cannot have - # affected the address balance yet since address balance is a balance at a give block and these transactions are - # blockless. - {:ok, _} = - Chain.import(%{ - addresses: %{params: addresses_params}, - broadcast: :realtime, - transactions: %{params: transactions_params, on_conflict: :nothing} - }) + transactions_params + |> Stream.chunk_every(@chunk_size) + |> Enum.each(&import_chunk/1) :ignore -> :ok @@ -127,4 +122,31 @@ defmodule Indexer.PendingTransaction.Fetcher do :ok end end + + defp import_chunk(transactions_params) do + addresses_params = AddressExtraction.extract_addresses(%{transactions: transactions_params}, pending: true) + + # There's no need to queue up fetching the address balance since theses are pending transactions and cannot have + # affected the address balance yet since address balance is a balance at a give block and these transactions are + # blockless. + case Chain.import(%{ + addresses: %{params: addresses_params, on_conflict: :nothing}, + broadcast: :realtime, + transactions: %{params: transactions_params, on_conflict: :nothing} + }) do + {:ok, _} -> + :ok + + {:error, [%Changeset{} | _] = changesets} -> + Logger.error(fn -> ["Failed to validate: ", inspect(changesets)] end, step: :import) + :ok + + {:error, reason} -> + Logger.error(fn -> inspect(reason) end, step: :import) + + {:error, step, failed_value, _changes_so_far} -> + Logger.error(fn -> ["Failed to import: ", inspect(failed_value)] end, step: step) + :ok + end + end end