Move Receipts functions out of BlockFetcher

pull/489/head
Luke Imhoff 6 years ago
parent 11735b1289
commit 68229420b4
  1. 42
      apps/indexer/lib/indexer/block_fetcher.ex
  2. 40
      apps/indexer/lib/indexer/block_fetcher/receipts.ex

@ -9,6 +9,7 @@ defmodule Indexer.BlockFetcher do
alias Explorer.Chain
alias Indexer.{AddressExtraction, BalanceFetcher, InternalTransactionFetcher, Sequence}
alias Indexer.BlockFetcher.Receipts
# dialyzer thinks that Logger.debug functions always have no_local_return
@dialyzer {:nowarn_function, import_range: 2}
@ -89,30 +90,6 @@ defmodule Indexer.BlockFetcher do
:ok
end
defp fetch_transaction_receipts(%__MODULE__{} = _state, []), do: {:ok, %{logs: [], receipts: []}}
defp fetch_transaction_receipts(
%__MODULE__{json_rpc_named_arguments: json_rpc_named_arguments} = state,
transaction_params
) do
debug(fn -> "fetching #{length(transaction_params)} transaction receipts" end)
stream_opts = [max_concurrency: state.receipts_concurrency, timeout: :infinity]
transaction_params
|> Enum.chunk_every(state.receipts_batch_size)
|> Task.async_stream(&EthereumJSONRPC.fetch_transaction_receipts(&1, json_rpc_named_arguments), stream_opts)
|> Enum.reduce_while({:ok, %{logs: [], receipts: []}}, fn
{:ok, {:ok, %{logs: logs, receipts: receipts}}}, {:ok, %{logs: acc_logs, receipts: acc_receipts}} ->
{:cont, {:ok, %{logs: acc_logs ++ logs, receipts: acc_receipts ++ receipts}}}
{:ok, {:error, reason}}, {:ok, _acc} ->
{:halt, {:error, reason}}
{:error, reason}, {:ok, _acc} ->
{:halt, {:error, reason}}
end)
end
defp insert(%__MODULE__{broadcast: broadcast, sequence: sequence}, options) when is_list(options) do
{address_hash_to_fetched_balance_block_number, import_options} =
pop_address_hash_to_fetched_balance_block_number(options)
@ -199,10 +176,9 @@ defmodule Indexer.BlockFetcher do
{:blocks, EthereumJSONRPC.fetch_blocks_by_range(range, json_rpc_named_arguments)},
%{blocks: blocks, transactions: transactions_without_receipts} = result,
cap_seq(seq, next, range),
{:receipts, {:ok, receipt_params}} <-
{:receipts, fetch_transaction_receipts(state, transactions_without_receipts)},
{:receipts, {:ok, receipt_params}} <- {:receipts, Receipts.fetch(state, transactions_without_receipts)},
%{logs: logs, receipts: receipts} = receipt_params,
transactions_with_receipts = put_receipts(transactions_without_receipts, receipts) do
transactions_with_receipts = Receipts.put(transactions_without_receipts, receipts) do
addresses =
AddressExtraction.extract_addresses(%{
blocks: blocks,
@ -231,16 +207,4 @@ defmodule Indexer.BlockFetcher do
{:error, step, reason}
end
end
defp put_receipts(transactions_params, receipts_params)
when is_list(transactions_params) and is_list(receipts_params) do
transaction_hash_to_receipt_params =
Enum.into(receipts_params, %{}, fn %{transaction_hash: transaction_hash} = receipt_params ->
{transaction_hash, receipt_params}
end)
Enum.map(transactions_params, fn %{hash: transaction_hash} = transaction_params ->
Map.merge(transaction_params, Map.fetch!(transaction_hash_to_receipt_params, transaction_hash))
end)
end
end

@ -0,0 +1,40 @@
defmodule Indexer.BlockFetcher.Receipts do
import Indexer, only: [debug: 1]
alias Indexer.BlockFetcher
def fetch(%BlockFetcher{} = _state, []), do: {:ok, %{logs: [], receipts: []}}
def fetch(
%BlockFetcher{json_rpc_named_arguments: json_rpc_named_arguments} = state,
transaction_params
) do
debug(fn -> "fetching #{length(transaction_params)} transaction receipts" end)
stream_opts = [max_concurrency: state.receipts_concurrency, timeout: :infinity]
transaction_params
|> Enum.chunk_every(state.receipts_batch_size)
|> Task.async_stream(&EthereumJSONRPC.fetch_transaction_receipts(&1, json_rpc_named_arguments), stream_opts)
|> Enum.reduce_while({:ok, %{logs: [], receipts: []}}, fn
{:ok, {:ok, %{logs: logs, receipts: receipts}}}, {:ok, %{logs: acc_logs, receipts: acc_receipts}} ->
{:cont, {:ok, %{logs: acc_logs ++ logs, receipts: acc_receipts ++ receipts}}}
{:ok, {:error, reason}}, {:ok, _acc} ->
{:halt, {:error, reason}}
{:error, reason}, {:ok, _acc} ->
{:halt, {:error, reason}}
end)
end
def put(transactions_params, receipts_params) when is_list(transactions_params) and is_list(receipts_params) do
transaction_hash_to_receipt_params =
Enum.into(receipts_params, %{}, fn %{transaction_hash: transaction_hash} = receipt_params ->
{transaction_hash, receipt_params}
end)
Enum.map(transactions_params, fn %{hash: transaction_hash} = transaction_params ->
Map.merge(transaction_params, Map.fetch!(transaction_hash_to_receipt_params, transaction_hash))
end)
end
end
Loading…
Cancel
Save