diff --git a/apps/explorer/lib/explorer/chain/import/runner/internal_transactions.ex b/apps/explorer/lib/explorer/chain/import/runner/internal_transactions.ex index 3f38efbfe8..960af51780 100644 --- a/apps/explorer/lib/explorer/chain/import/runner/internal_transactions.ex +++ b/apps/explorer/lib/explorer/chain/import/runner/internal_transactions.ex @@ -94,8 +94,12 @@ defmodule Explorer.Chain.Import.Runner.InternalTransactions do } -> insert(repo, valid_internal_transactions_without_first_traces_of_trivial_transactions, insert_options) end) - |> Multi.run(:update_transactions, fn repo, %{valid_internal_transactions: valid_internal_transactions} -> - update_transactions(repo, valid_internal_transactions, update_transactions_options) + |> Multi.run(:update_transactions, fn repo, + %{ + valid_internal_transactions: valid_internal_transactions, + acquire_transactions: transactions + } -> + update_transactions(repo, valid_internal_transactions, transactions, update_transactions_options) end) |> Multi.run(:remove_consensus_of_invalid_blocks, fn repo, %{invalid_block_numbers: invalid_block_numbers} -> remove_consensus_of_invalid_blocks(repo, invalid_block_numbers) @@ -252,7 +256,7 @@ defmodule Explorer.Chain.Import.Runner.InternalTransactions do from( t in Transaction, where: t.block_hash in ^pending_block_hashes, - select: map(t, [:hash, :block_hash, :block_number]), + select: map(t, [:hash, :block_hash, :block_number, :cumulative_gas_used]), # Enforce Transaction ShareLocks order (see docs: sharelocks.md) order_by: t.hash, lock: "FOR UPDATE" @@ -387,7 +391,7 @@ defmodule Explorer.Chain.Import.Runner.InternalTransactions do end end - defp update_transactions(repo, valid_internal_transactions, %{ + defp update_transactions(repo, valid_internal_transactions, transactions, %{ timeout: timeout, timestamps: timestamps }) do @@ -403,6 +407,9 @@ defmodule Explorer.Chain.Import.Runner.InternalTransactions do end) |> Enum.map(fn trace -> %{ + block_hash: Map.get(trace, :block_hash), + block_number: Map.get(trace, :block_number), + gas_used: Map.get(trace, :gas_used), transaction_hash: Map.get(trace, :transaction_hash), created_contract_address_hash: Map.get(trace, :created_contract_address_hash), error: Map.get(trace, :error), @@ -416,36 +423,59 @@ defmodule Explorer.Chain.Import.Runner.InternalTransactions do |> MapSet.new(& &1.transaction_hash) |> MapSet.to_list() + json_rpc_named_arguments = Application.fetch_env!(:indexer, :json_rpc_named_arguments) + result = Enum.reduce_while(params, 0, fn first_trace, transaction_hashes_iterator -> - update_query = - from( - t in Transaction, - where: t.hash == ^first_trace.transaction_hash, - # ShareLocks order already enforced by `acquire_transactions` (see docs: sharelocks.md) - update: [ - set: [ - created_contract_address_hash: ^first_trace.created_contract_address_hash, - error: ^first_trace.error, - status: ^first_trace.status, - updated_at: ^timestamps.updated_at - ] - ] - ) - - transaction_hashes_iterator = transaction_hashes_iterator + 1 - - try do - {_transaction_count, result} = repo.update_all(update_query, [], timeout: timeout) - - if valid_internal_transactions_count == transaction_hashes_iterator do - {:halt, result} - else - {:cont, transaction_hashes_iterator} - end - rescue - postgrex_error in Postgrex.Error -> - {:halt, %{exception: postgrex_error, transaction_hashes: transaction_hashes}} + transaction_hash = Map.get(first_trace, :transaction_hash) + + transaction_from_db = + transactions + |> Enum.find(fn transaction -> + transaction.hash == transaction_hash + end) + + cond do + !transaction_from_db -> + transaction_receipt_from_node = + fetch_transaction_receipt_from_node(transaction_hash, json_rpc_named_arguments) + + update_transactions_inner( + repo, + valid_internal_transactions_count, + transaction_hashes, + transaction_hashes_iterator, + timeout, + timestamps, + first_trace, + transaction_receipt_from_node + ) + + transaction_from_db && Map.get(transaction_from_db, :cumulative_gas_used) -> + update_transactions_inner( + repo, + valid_internal_transactions_count, + transaction_hashes, + transaction_hashes_iterator, + timeout, + timestamps, + first_trace + ) + + true -> + transaction_receipt_from_node = + fetch_transaction_receipt_from_node(transaction_hash, json_rpc_named_arguments) + + update_transactions_inner( + repo, + valid_internal_transactions_count, + transaction_hashes, + transaction_hashes_iterator, + timeout, + timestamps, + first_trace, + transaction_receipt_from_node + ) end end) @@ -459,6 +489,104 @@ defmodule Explorer.Chain.Import.Runner.InternalTransactions do end end + defp fetch_transaction_receipt_from_node(transaction_hash, json_rpc_named_arguments) do + receipt_response = + EthereumJSONRPC.fetch_transaction_receipts( + [ + %{ + :hash => to_string(transaction_hash), + :gas => 0 + } + ], + json_rpc_named_arguments + ) + + case receipt_response do + {:ok, + %{ + :receipts => [ + receipt + ] + }} -> + receipt + + _ -> + %{:cumulative_gas_used => nil} + end + end + + defp update_transactions_inner( + repo, + valid_internal_transactions_count, + transaction_hashes, + transaction_hashes_iterator, + timeout, + timestamps, + first_trace, + transaction_receipt_from_node \\ nil + ) do + set = generate_transaction_set_to_update(first_trace, transaction_receipt_from_node, timestamps) + + update_query = + from( + t in Transaction, + where: t.hash == ^first_trace.transaction_hash, + # ShareLocks order already enforced by `acquire_transactions` (see docs: sharelocks.md) + update: [ + set: ^set + ] + ) + + transaction_hashes_iterator = transaction_hashes_iterator + 1 + + try do + {_transaction_count, result} = repo.update_all(update_query, [], timeout: timeout) + + if valid_internal_transactions_count == transaction_hashes_iterator do + {:halt, result} + else + {:cont, transaction_hashes_iterator} + end + rescue + postgrex_error in Postgrex.Error -> + {:halt, %{exception: postgrex_error, transaction_hashes: transaction_hashes}} + end + end + + def generate_transaction_set_to_update(first_trace, transaction_receipt_from_node, timestamps) do + default_set = [ + created_contract_address_hash: first_trace.created_contract_address_hash, + error: first_trace.error, + status: first_trace.status, + updated_at: timestamps.updated_at + ] + + set = + default_set + |> Keyword.put_new(:block_hash, first_trace.block_hash) + |> Keyword.put_new(:block_number, first_trace.block_number) + |> Keyword.put_new(:index, transaction_receipt_from_node && transaction_receipt_from_node.transaction_index) + |> Keyword.put_new( + :cumulative_gas_used, + transaction_receipt_from_node && transaction_receipt_from_node.cumulative_gas_used + ) + + set_with_gas_used = + if first_trace.gas_used do + Keyword.put_new(set, :gas_used, first_trace.gas_used) + else + if transaction_receipt_from_node && transaction_receipt_from_node.gas_used do + Keyword.put_new(set, :gas_used, transaction_receipt_from_node.gas_used) + else + set + end + end + + filtered_set = Enum.reject(set_with_gas_used, fn {_key, value} -> is_nil(value) end) + + filtered_set + end + defp remove_consensus_of_invalid_blocks(repo, invalid_block_numbers) do if Enum.count(invalid_block_numbers) > 0 do update_query =