|
|
|
@ -18,10 +18,7 @@ defmodule Explorer.Chain.Import.Runner.InternalTransactions do |
|
|
|
|
# milliseconds |
|
|
|
|
@timeout 60_000 |
|
|
|
|
|
|
|
|
|
@type imported :: [ |
|
|
|
|
%{required(:index) => non_neg_integer(), required(:transaction_hash) => Hash.Full.t()} |
|
|
|
|
| %{required(:empty_block_number) => non_neg_integer()} |
|
|
|
|
] |
|
|
|
|
@type imported :: [InternalTransaction.t()] |
|
|
|
|
|
|
|
|
|
@impl Runner |
|
|
|
|
def ecto_schema_module, do: InternalTransaction |
|
|
|
@ -50,7 +47,8 @@ defmodule Explorer.Chain.Import.Runner.InternalTransactions do |
|
|
|
|
|
|
|
|
|
update_transactions_options = %{timeout: transactions_timeout, timestamps: timestamps} |
|
|
|
|
|
|
|
|
|
internal_transactions_changes = Enum.filter(changes_list, &Map.has_key?(&1, :type)) |
|
|
|
|
# filter out params with just `block_number` (indicating blocks without internal transactions) |
|
|
|
|
internal_transactions_params = Enum.filter(changes_list, &Map.has_key?(&1, :type)) |
|
|
|
|
|
|
|
|
|
# Enforce ShareLocks tables order (see docs: sharelocks.md) |
|
|
|
|
multi |
|
|
|
@ -64,14 +62,14 @@ defmodule Explorer.Chain.Import.Runner.InternalTransactions do |
|
|
|
|
acquire_transactions(repo, pending_block_hashes) |
|
|
|
|
end) |
|
|
|
|
|> Multi.run(:invalid_block_numbers, fn _, %{acquire_transactions: transactions} -> |
|
|
|
|
invalid_block_numbers(transactions, internal_transactions_changes) |
|
|
|
|
invalid_block_numbers(transactions, internal_transactions_params) |
|
|
|
|
end) |
|
|
|
|
|> Multi.run(:valid_internal_transactions, fn _, |
|
|
|
|
%{ |
|
|
|
|
acquire_transactions: transactions, |
|
|
|
|
invalid_block_numbers: invalid_block_numbers |
|
|
|
|
} -> |
|
|
|
|
valid_internal_transactions(transactions, internal_transactions_changes, invalid_block_numbers) |
|
|
|
|
valid_internal_transactions(transactions, internal_transactions_params, invalid_block_numbers) |
|
|
|
|
end) |
|
|
|
|
|> Multi.run(:internal_transactions, fn repo, %{valid_internal_transactions: valid_internal_transactions} -> |
|
|
|
|
insert(repo, valid_internal_transactions, insert_options) |
|
|
|
@ -201,6 +199,7 @@ defmodule Explorer.Chain.Import.Runner.InternalTransactions do |
|
|
|
|
from( |
|
|
|
|
pending_ops in PendingBlockOperation, |
|
|
|
|
where: pending_ops.block_hash in ^block_hashes, |
|
|
|
|
where: pending_ops.fetch_internal_transactions, |
|
|
|
|
select: pending_ops.block_hash, |
|
|
|
|
# Enforce PendingBlockOperation ShareLocks order (see docs: sharelocks.md) |
|
|
|
|
order_by: [asc: pending_ops.block_hash], |
|
|
|
@ -224,10 +223,17 @@ defmodule Explorer.Chain.Import.Runner.InternalTransactions do |
|
|
|
|
{:ok, repo.all(query)} |
|
|
|
|
end |
|
|
|
|
|
|
|
|
|
defp invalid_block_numbers(transactions, internal_transactions_changes) do |
|
|
|
|
defp invalid_block_numbers(transactions, internal_transactions_params) do |
|
|
|
|
# Finds all mistmatches between transactions and internal transactions |
|
|
|
|
# for a block number: |
|
|
|
|
# - there are no internal txs for some transactions |
|
|
|
|
# - there are no transactions for some internal transactions |
|
|
|
|
# - there are internal txs with a different block number than their transactions |
|
|
|
|
# Returns block numbers where any of these issues is found |
|
|
|
|
|
|
|
|
|
required_tuples = MapSet.new(transactions, &{&1.hash, &1.block_number}) |
|
|
|
|
|
|
|
|
|
candidate_tuples = MapSet.new(internal_transactions_changes, &{&1.transaction_hash, &1.block_number}) |
|
|
|
|
candidate_tuples = MapSet.new(internal_transactions_params, &{&1.transaction_hash, &1.block_number}) |
|
|
|
|
|
|
|
|
|
all_tuples = MapSet.union(required_tuples, candidate_tuples) |
|
|
|
|
|
|
|
|
@ -242,11 +248,11 @@ defmodule Explorer.Chain.Import.Runner.InternalTransactions do |
|
|
|
|
{:ok, invalid_numbers} |
|
|
|
|
end |
|
|
|
|
|
|
|
|
|
defp valid_internal_transactions(transactions, internal_transactions_changes, invalid_block_numbers) do |
|
|
|
|
defp valid_internal_transactions(transactions, internal_transactions_params, invalid_block_numbers) do |
|
|
|
|
blocks_map = Map.new(transactions, &{&1.block_number, &1.block_hash}) |
|
|
|
|
|
|
|
|
|
valid_internal_txs = |
|
|
|
|
internal_transactions_changes |
|
|
|
|
internal_transactions_params |
|
|
|
|
|> Enum.group_by(& &1.block_number) |
|
|
|
|
|> Map.drop(invalid_block_numbers) |
|
|
|
|
|> Enum.flat_map(fn {block_number, entries} -> |
|
|
|
@ -266,13 +272,17 @@ defmodule Explorer.Chain.Import.Runner.InternalTransactions do |
|
|
|
|
end |
|
|
|
|
|
|
|
|
|
def defer_internal_transactions_primary_key(repo) do |
|
|
|
|
# allows internal_transactions primary key to skips being checked during the |
|
|
|
|
# DB transactions and instead be checked at the end of it. |
|
|
|
|
# This allows us to use a more efficient upserting logic |
|
|
|
|
# Allows internal_transactions primary key to not be checked during the |
|
|
|
|
# DB transactions and instead be checked only at the end of it. |
|
|
|
|
# This allows us to use a more efficient upserting logic, while keeping the |
|
|
|
|
# uniqueness valid. |
|
|
|
|
SQL.query(repo, "SET CONSTRAINTS internal_transactions_pkey DEFERRED") |
|
|
|
|
end |
|
|
|
|
|
|
|
|
|
def remove_left_over_internal_transactions(repo, valid_internal_transactions) do |
|
|
|
|
# Removes internal transactions that were part of a block before a refetch |
|
|
|
|
# and have not been upserted with new ones (if any exist). |
|
|
|
|
|
|
|
|
|
case valid_internal_transactions do |
|
|
|
|
[] -> |
|
|
|
|
{:ok, []} |
|
|
|
|