Merge pull request #4678 from blockscout/vb-internal-transactions-indexer-fix

Internal transactions indexer: fix issue of some pending transactions never become confirmed
pull/4693/head
Victor Baranov 3 years ago committed by GitHub
commit bee3da8df5
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 192
      apps/explorer/lib/explorer/chain/import/runner/internal_transactions.ex

@ -94,8 +94,12 @@ defmodule Explorer.Chain.Import.Runner.InternalTransactions do
} ->
insert(repo, valid_internal_transactions_without_first_traces_of_trivial_transactions, insert_options)
end)
|> Multi.run(:update_transactions, fn repo, %{valid_internal_transactions: valid_internal_transactions} ->
update_transactions(repo, valid_internal_transactions, update_transactions_options)
|> Multi.run(:update_transactions, fn repo,
%{
valid_internal_transactions: valid_internal_transactions,
acquire_transactions: transactions
} ->
update_transactions(repo, valid_internal_transactions, transactions, update_transactions_options)
end)
|> Multi.run(:remove_consensus_of_invalid_blocks, fn repo, %{invalid_block_numbers: invalid_block_numbers} ->
remove_consensus_of_invalid_blocks(repo, invalid_block_numbers)
@ -252,7 +256,7 @@ defmodule Explorer.Chain.Import.Runner.InternalTransactions do
from(
t in Transaction,
where: t.block_hash in ^pending_block_hashes,
select: map(t, [:hash, :block_hash, :block_number]),
select: map(t, [:hash, :block_hash, :block_number, :cumulative_gas_used]),
# Enforce Transaction ShareLocks order (see docs: sharelocks.md)
order_by: t.hash,
lock: "FOR UPDATE"
@ -387,7 +391,7 @@ defmodule Explorer.Chain.Import.Runner.InternalTransactions do
end
end
defp update_transactions(repo, valid_internal_transactions, %{
defp update_transactions(repo, valid_internal_transactions, transactions, %{
timeout: timeout,
timestamps: timestamps
}) do
@ -403,6 +407,9 @@ defmodule Explorer.Chain.Import.Runner.InternalTransactions do
end)
|> Enum.map(fn trace ->
%{
block_hash: Map.get(trace, :block_hash),
block_number: Map.get(trace, :block_number),
gas_used: Map.get(trace, :gas_used),
transaction_hash: Map.get(trace, :transaction_hash),
created_contract_address_hash: Map.get(trace, :created_contract_address_hash),
error: Map.get(trace, :error),
@ -416,36 +423,59 @@ defmodule Explorer.Chain.Import.Runner.InternalTransactions do
|> MapSet.new(& &1.transaction_hash)
|> MapSet.to_list()
json_rpc_named_arguments = Application.fetch_env!(:indexer, :json_rpc_named_arguments)
result =
Enum.reduce_while(params, 0, fn first_trace, transaction_hashes_iterator ->
update_query =
from(
t in Transaction,
where: t.hash == ^first_trace.transaction_hash,
# ShareLocks order already enforced by `acquire_transactions` (see docs: sharelocks.md)
update: [
set: [
created_contract_address_hash: ^first_trace.created_contract_address_hash,
error: ^first_trace.error,
status: ^first_trace.status,
updated_at: ^timestamps.updated_at
]
]
)
transaction_hashes_iterator = transaction_hashes_iterator + 1
try do
{_transaction_count, result} = repo.update_all(update_query, [], timeout: timeout)
if valid_internal_transactions_count == transaction_hashes_iterator do
{:halt, result}
else
{:cont, transaction_hashes_iterator}
end
rescue
postgrex_error in Postgrex.Error ->
{:halt, %{exception: postgrex_error, transaction_hashes: transaction_hashes}}
transaction_hash = Map.get(first_trace, :transaction_hash)
transaction_from_db =
transactions
|> Enum.find(fn transaction ->
transaction.hash == transaction_hash
end)
cond do
!transaction_from_db ->
transaction_receipt_from_node =
fetch_transaction_receipt_from_node(transaction_hash, json_rpc_named_arguments)
update_transactions_inner(
repo,
valid_internal_transactions_count,
transaction_hashes,
transaction_hashes_iterator,
timeout,
timestamps,
first_trace,
transaction_receipt_from_node
)
transaction_from_db && Map.get(transaction_from_db, :cumulative_gas_used) ->
update_transactions_inner(
repo,
valid_internal_transactions_count,
transaction_hashes,
transaction_hashes_iterator,
timeout,
timestamps,
first_trace
)
true ->
transaction_receipt_from_node =
fetch_transaction_receipt_from_node(transaction_hash, json_rpc_named_arguments)
update_transactions_inner(
repo,
valid_internal_transactions_count,
transaction_hashes,
transaction_hashes_iterator,
timeout,
timestamps,
first_trace,
transaction_receipt_from_node
)
end
end)
@ -459,6 +489,104 @@ defmodule Explorer.Chain.Import.Runner.InternalTransactions do
end
end
defp fetch_transaction_receipt_from_node(transaction_hash, json_rpc_named_arguments) do
receipt_response =
EthereumJSONRPC.fetch_transaction_receipts(
[
%{
:hash => to_string(transaction_hash),
:gas => 0
}
],
json_rpc_named_arguments
)
case receipt_response do
{:ok,
%{
:receipts => [
receipt
]
}} ->
receipt
_ ->
%{:cumulative_gas_used => nil}
end
end
defp update_transactions_inner(
repo,
valid_internal_transactions_count,
transaction_hashes,
transaction_hashes_iterator,
timeout,
timestamps,
first_trace,
transaction_receipt_from_node \\ nil
) do
set = generate_transaction_set_to_update(first_trace, transaction_receipt_from_node, timestamps)
update_query =
from(
t in Transaction,
where: t.hash == ^first_trace.transaction_hash,
# ShareLocks order already enforced by `acquire_transactions` (see docs: sharelocks.md)
update: [
set: ^set
]
)
transaction_hashes_iterator = transaction_hashes_iterator + 1
try do
{_transaction_count, result} = repo.update_all(update_query, [], timeout: timeout)
if valid_internal_transactions_count == transaction_hashes_iterator do
{:halt, result}
else
{:cont, transaction_hashes_iterator}
end
rescue
postgrex_error in Postgrex.Error ->
{:halt, %{exception: postgrex_error, transaction_hashes: transaction_hashes}}
end
end
def generate_transaction_set_to_update(first_trace, transaction_receipt_from_node, timestamps) do
default_set = [
created_contract_address_hash: first_trace.created_contract_address_hash,
error: first_trace.error,
status: first_trace.status,
updated_at: timestamps.updated_at
]
set =
default_set
|> Keyword.put_new(:block_hash, first_trace.block_hash)
|> Keyword.put_new(:block_number, first_trace.block_number)
|> Keyword.put_new(:index, transaction_receipt_from_node && transaction_receipt_from_node.transaction_index)
|> Keyword.put_new(
:cumulative_gas_used,
transaction_receipt_from_node && transaction_receipt_from_node.cumulative_gas_used
)
set_with_gas_used =
if first_trace.gas_used do
Keyword.put_new(set, :gas_used, first_trace.gas_used)
else
if transaction_receipt_from_node && transaction_receipt_from_node.gas_used do
Keyword.put_new(set, :gas_used, transaction_receipt_from_node.gas_used)
else
set
end
end
filtered_set = Enum.reject(set_with_gas_used, fn {_key, value} -> is_nil(value) end)
filtered_set
end
defp remove_consensus_of_invalid_blocks(repo, invalid_block_numbers) do
if Enum.count(invalid_block_numbers) > 0 do
update_query =

Loading…
Cancel
Save