|
|
|
@ -43,9 +43,9 @@ defmodule Indexer.InternalTransaction.Fetcher do |
|
|
|
|
""" |
|
|
|
|
@spec async_fetch([%{required(:block_number) => Block.block_number(), required(:hash) => Hash.Full.t()}]) :: :ok |
|
|
|
|
def async_fetch(transactions_fields, timeout \\ 5000) when is_list(transactions_fields) do |
|
|
|
|
params_list = Enum.map(transactions_fields, &transaction_fields_to_params/1) |
|
|
|
|
entries = Enum.map(transactions_fields, &entry/1) |
|
|
|
|
|
|
|
|
|
BufferedTask.buffer(__MODULE__, params_list, timeout) |
|
|
|
|
BufferedTask.buffer(__MODULE__, entries, timeout) |
|
|
|
|
end |
|
|
|
|
|
|
|
|
|
@doc false |
|
|
|
@ -74,7 +74,7 @@ defmodule Indexer.InternalTransaction.Fetcher do |
|
|
|
|
initial, |
|
|
|
|
fn transaction_fields, acc -> |
|
|
|
|
transaction_fields |
|
|
|
|
|> transaction_fields_to_params() |
|
|
|
|
|> entry() |
|
|
|
|
|> reducer.(acc) |
|
|
|
|
end |
|
|
|
|
) |
|
|
|
@ -82,17 +82,25 @@ defmodule Indexer.InternalTransaction.Fetcher do |
|
|
|
|
final |
|
|
|
|
end |
|
|
|
|
|
|
|
|
|
defp transaction_fields_to_params(%{block_number: block_number, hash: hash}) when is_integer(block_number) do |
|
|
|
|
defp entry(%{block_number: block_number, hash: %Hash{bytes: bytes}}) when is_integer(block_number) do |
|
|
|
|
{block_number, bytes} |
|
|
|
|
end |
|
|
|
|
|
|
|
|
|
defp params({block_number, hash_bytes}) when is_integer(block_number) do |
|
|
|
|
{:ok, hash} = Hash.Full.cast(hash_bytes) |
|
|
|
|
%{block_number: block_number, hash_data: to_string(hash)} |
|
|
|
|
end |
|
|
|
|
|
|
|
|
|
@impl BufferedTask |
|
|
|
|
def run(transactions_params, _retries, json_rpc_named_arguments) do |
|
|
|
|
unique_transactions_params = unique_transactions_params(transactions_params) |
|
|
|
|
def run(entries, _retries, json_rpc_named_arguments) do |
|
|
|
|
unique_entries = unique_entries(entries) |
|
|
|
|
|
|
|
|
|
Logger.debug(fn -> "fetching internal transactions for #{length(unique_transactions_params)} transactions" end) |
|
|
|
|
Logger.debug(fn -> "fetching internal transactions for #{length(unique_entries)} transactions" end) |
|
|
|
|
|
|
|
|
|
case EthereumJSONRPC.fetch_internal_transactions(unique_transactions_params, json_rpc_named_arguments) do |
|
|
|
|
unique_entries |
|
|
|
|
|> Enum.map(¶ms/1) |
|
|
|
|
|> EthereumJSONRPC.fetch_internal_transactions(json_rpc_named_arguments) |
|
|
|
|
|> case do |
|
|
|
|
{:ok, internal_transactions_params} -> |
|
|
|
|
addresses_params = AddressExtraction.extract_addresses(%{internal_transactions: internal_transactions_params}) |
|
|
|
|
|
|
|
|
@ -115,7 +123,7 @@ defmodule Indexer.InternalTransaction.Fetcher do |
|
|
|
|
Logger.error(fn -> |
|
|
|
|
[ |
|
|
|
|
"failed to import internal transactions for ", |
|
|
|
|
to_string(length(transactions_params)), |
|
|
|
|
to_string(length(entries)), |
|
|
|
|
" transactions at ", |
|
|
|
|
to_string(step), |
|
|
|
|
": ", |
|
|
|
@ -123,17 +131,17 @@ defmodule Indexer.InternalTransaction.Fetcher do |
|
|
|
|
] |
|
|
|
|
end) |
|
|
|
|
|
|
|
|
|
# re-queue the de-duped transactions_params |
|
|
|
|
{:retry, unique_transactions_params} |
|
|
|
|
# re-queue the de-duped entries |
|
|
|
|
{:retry, unique_entries} |
|
|
|
|
end |
|
|
|
|
|
|
|
|
|
{:error, reason} -> |
|
|
|
|
Logger.error(fn -> |
|
|
|
|
"failed to fetch internal transactions for #{length(transactions_params)} transactions: #{inspect(reason)}" |
|
|
|
|
"failed to fetch internal transactions for #{length(entries)} transactions: #{inspect(reason)}" |
|
|
|
|
end) |
|
|
|
|
|
|
|
|
|
# re-queue the de-duped transactions_params |
|
|
|
|
{:retry, unique_transactions_params} |
|
|
|
|
# re-queue the de-duped entries |
|
|
|
|
{:retry, unique_entries} |
|
|
|
|
|
|
|
|
|
:ignore -> |
|
|
|
|
:ok |
|
|
|
@ -141,29 +149,29 @@ defmodule Indexer.InternalTransaction.Fetcher do |
|
|
|
|
end |
|
|
|
|
|
|
|
|
|
# Protection and improved reporting for https://github.com/poanetwork/blockscout/issues/289 |
|
|
|
|
defp unique_transactions_params(transactions_params) do |
|
|
|
|
transactions_params_by_hash_data = Enum.group_by(transactions_params, fn %{hash_data: hash_data} -> hash_data end) |
|
|
|
|
defp unique_entries(entries) do |
|
|
|
|
entries_by_hash_bytes = Enum.group_by(entries, &elem(&1, 1)) |
|
|
|
|
|
|
|
|
|
if map_size(transactions_params_by_hash_data) < length(transactions_params) do |
|
|
|
|
{unique_transactions_params, duplicate_transactions_params} = |
|
|
|
|
transactions_params_by_hash_data |
|
|
|
|
if map_size(entries_by_hash_bytes) < length(entries) do |
|
|
|
|
{unique_entries, duplicate_entries} = |
|
|
|
|
entries_by_hash_bytes |
|
|
|
|
|> Map.values() |
|
|
|
|
|> uniques_and_duplicates() |
|
|
|
|
|
|
|
|
|
Logger.error(fn -> |
|
|
|
|
duplicate_transactions_params |
|
|
|
|
duplicate_entries |
|
|
|
|
|> Stream.with_index() |
|
|
|
|
|> Enum.reduce( |
|
|
|
|
["Duplicate transaction params being used to fetch internal transactions:\n"], |
|
|
|
|
fn {transaction_params, index}, acc -> |
|
|
|
|
[acc, " ", to_string(index + 1), ". ", inspect(transaction_params), "\n"] |
|
|
|
|
["Duplicate entries being used to fetch internal transactions:\n"], |
|
|
|
|
fn {entry, index}, acc -> |
|
|
|
|
[acc, " ", to_string(index + 1), ". ", inspect(entry), "\n"] |
|
|
|
|
end |
|
|
|
|
) |
|
|
|
|
end) |
|
|
|
|
|
|
|
|
|
unique_transactions_params |
|
|
|
|
unique_entries |
|
|
|
|
else |
|
|
|
|
transactions_params |
|
|
|
|
entries |
|
|
|
|
end |
|
|
|
|
end |
|
|
|
|
|
|
|
|
|