|
|
@ -4,9 +4,10 @@ defmodule Explorer.Chain.Import.Runner.InternalTransactions do |
|
|
|
""" |
|
|
|
""" |
|
|
|
|
|
|
|
|
|
|
|
require Ecto.Query |
|
|
|
require Ecto.Query |
|
|
|
|
|
|
|
require Logger |
|
|
|
|
|
|
|
|
|
|
|
alias Ecto.{Changeset, Multi, Repo} |
|
|
|
alias Ecto.{Changeset, Multi, Repo} |
|
|
|
alias Explorer.Chain.{Hash, Import, InternalTransaction, Transaction} |
|
|
|
alias Explorer.Chain.{Block, Hash, Import, InternalTransaction, Transaction} |
|
|
|
alias Explorer.Chain.Import.Runner |
|
|
|
alias Explorer.Chain.Import.Runner |
|
|
|
|
|
|
|
|
|
|
|
import Ecto.Query, only: [from: 2] |
|
|
|
import Ecto.Query, only: [from: 2] |
|
|
@ -52,32 +53,43 @@ defmodule Explorer.Chain.Import.Runner.InternalTransactions do |
|
|
|
|> Multi.run(:acquire_transactions, fn repo, _ -> |
|
|
|
|> Multi.run(:acquire_transactions, fn repo, _ -> |
|
|
|
acquire_transactions(repo, changes_list) |
|
|
|
acquire_transactions(repo, changes_list) |
|
|
|
end) |
|
|
|
end) |
|
|
|
|> Multi.run(:internal_transactions, fn repo, _ -> |
|
|
|
|> Multi.run(:internal_transactions, fn repo, %{acquire_transactions: transactions} -> |
|
|
|
insert(repo, changes_list, insert_options) |
|
|
|
insert(repo, changes_list, transactions, insert_options) |
|
|
|
end) |
|
|
|
end) |
|
|
|
|> Multi.run(:internal_transactions_indexed_at_transactions, fn repo, %{acquire_transactions: transaction_hashes} -> |
|
|
|
|> Multi.run(:internal_transactions_indexed_at_transactions, fn repo, %{acquire_transactions: transactions} -> |
|
|
|
update_transactions(repo, transaction_hashes, update_transactions_options) |
|
|
|
update_transactions(repo, transactions, update_transactions_options) |
|
|
|
end) |
|
|
|
end) |
|
|
|
|
|
|
|
|> Multi.run( |
|
|
|
|
|
|
|
:remove_consensus_of_missing_transactions_blocks, |
|
|
|
|
|
|
|
fn repo, %{internal_transactions: inserted} = results_map -> |
|
|
|
|
|
|
|
# NOTE: for this to work it has to follow the runner `internal_transactions_indexed_at_blocks` |
|
|
|
|
|
|
|
block_hashes = Map.get(results_map, :internal_transactions_indexed_at_blocks, []) |
|
|
|
|
|
|
|
remove_consensus_of_missing_transactions_blocks(repo, block_hashes, changes_list, inserted) |
|
|
|
|
|
|
|
end |
|
|
|
|
|
|
|
) |
|
|
|
end |
|
|
|
end |
|
|
|
|
|
|
|
|
|
|
|
@impl Runner |
|
|
|
@impl Runner |
|
|
|
def timeout, do: @timeout |
|
|
|
def timeout, do: @timeout |
|
|
|
|
|
|
|
|
|
|
|
@spec insert(Repo.t(), [map], %{ |
|
|
|
@spec insert(Repo.t(), [map], [Transaction.t()], %{ |
|
|
|
optional(:on_conflict) => Runner.on_conflict(), |
|
|
|
optional(:on_conflict) => Runner.on_conflict(), |
|
|
|
required(:timeout) => timeout, |
|
|
|
required(:timeout) => timeout, |
|
|
|
required(:timestamps) => Import.timestamps() |
|
|
|
required(:timestamps) => Import.timestamps() |
|
|
|
}) :: |
|
|
|
}) :: |
|
|
|
{:ok, [%{index: non_neg_integer, transaction_hash: Hash.t()}]} |
|
|
|
{:ok, [%{index: non_neg_integer, transaction_hash: Hash.t()}]} |
|
|
|
| {:error, [Changeset.t()]} |
|
|
|
| {:error, [Changeset.t()]} |
|
|
|
defp insert(repo, changes_list, %{timeout: timeout, timestamps: timestamps} = options) |
|
|
|
defp insert(repo, changes_list, transactions, %{timeout: timeout, timestamps: timestamps} = options) |
|
|
|
when is_list(changes_list) do |
|
|
|
when is_list(changes_list) do |
|
|
|
on_conflict = Map.get_lazy(options, :on_conflict, &default_on_conflict/0) |
|
|
|
on_conflict = Map.get_lazy(options, :on_conflict, &default_on_conflict/0) |
|
|
|
|
|
|
|
|
|
|
|
# Enforce InternalTransaction ShareLocks order (see docs: sharelocks.md) |
|
|
|
transactions_map = Map.new(transactions, &{&1.hash, &1}) |
|
|
|
ordered_changes_list = Enum.sort_by(changes_list, &{&1.transaction_hash, &1.index}) |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
final_changes_list = reject_pending_transactions(ordered_changes_list, repo) |
|
|
|
final_changes_list = |
|
|
|
|
|
|
|
changes_list |
|
|
|
|
|
|
|
# Enforce InternalTransaction ShareLocks order (see docs: sharelocks.md) |
|
|
|
|
|
|
|
|> Enum.sort_by(&{&1.transaction_hash, &1.index}) |
|
|
|
|
|
|
|
|> reject_missing_transactions(transactions_map) |
|
|
|
|
|
|
|
|
|
|
|
{:ok, internal_transactions} = |
|
|
|
{:ok, internal_transactions} = |
|
|
|
Import.insert_changes_list( |
|
|
|
Import.insert_changes_list( |
|
|
@ -86,16 +98,12 @@ defmodule Explorer.Chain.Import.Runner.InternalTransactions do |
|
|
|
conflict_target: [:transaction_hash, :index], |
|
|
|
conflict_target: [:transaction_hash, :index], |
|
|
|
for: InternalTransaction, |
|
|
|
for: InternalTransaction, |
|
|
|
on_conflict: on_conflict, |
|
|
|
on_conflict: on_conflict, |
|
|
|
returning: [:transaction_hash, :index], |
|
|
|
returning: true, |
|
|
|
timeout: timeout, |
|
|
|
timeout: timeout, |
|
|
|
timestamps: timestamps |
|
|
|
timestamps: timestamps |
|
|
|
) |
|
|
|
) |
|
|
|
|
|
|
|
|
|
|
|
{:ok, |
|
|
|
{:ok, internal_transactions} |
|
|
|
for( |
|
|
|
|
|
|
|
internal_transaction <- internal_transactions, |
|
|
|
|
|
|
|
do: Map.take(internal_transaction, [:id, :index, :transaction_hash]) |
|
|
|
|
|
|
|
)} |
|
|
|
|
|
|
|
end |
|
|
|
end |
|
|
|
|
|
|
|
|
|
|
|
defp default_on_conflict do |
|
|
|
defp default_on_conflict do |
|
|
@ -158,26 +166,28 @@ defmodule Explorer.Chain.Import.Runner.InternalTransactions do |
|
|
|
from( |
|
|
|
from( |
|
|
|
t in Transaction, |
|
|
|
t in Transaction, |
|
|
|
where: t.hash in ^transaction_hashes, |
|
|
|
where: t.hash in ^transaction_hashes, |
|
|
|
|
|
|
|
# do not consider pending transactions |
|
|
|
where: not is_nil(t.block_hash), |
|
|
|
where: not is_nil(t.block_hash), |
|
|
|
select: t.hash, |
|
|
|
select: map(t, [:hash, :block_hash, :block_number]), |
|
|
|
# Enforce Transaction ShareLocks order (see docs: sharelocks.md) |
|
|
|
# Enforce Transaction ShareLocks order (see docs: sharelocks.md) |
|
|
|
order_by: t.hash, |
|
|
|
order_by: t.hash, |
|
|
|
lock: "FOR UPDATE" |
|
|
|
lock: "FOR UPDATE" |
|
|
|
) |
|
|
|
) |
|
|
|
|
|
|
|
|
|
|
|
hashes = repo.all(query) |
|
|
|
{:ok, repo.all(query)} |
|
|
|
|
|
|
|
|
|
|
|
{:ok, hashes} |
|
|
|
|
|
|
|
end |
|
|
|
end |
|
|
|
|
|
|
|
|
|
|
|
defp update_transactions(repo, transaction_hashes, %{ |
|
|
|
defp update_transactions(repo, transactions, %{ |
|
|
|
timeout: timeout, |
|
|
|
timeout: timeout, |
|
|
|
timestamps: timestamps |
|
|
|
timestamps: timestamps |
|
|
|
}) |
|
|
|
}) |
|
|
|
when is_list(transaction_hashes) do |
|
|
|
when is_list(transactions) do |
|
|
|
|
|
|
|
transaction_hashes = Enum.map(transactions, & &1.hash) |
|
|
|
|
|
|
|
|
|
|
|
update_query = |
|
|
|
update_query = |
|
|
|
from( |
|
|
|
from( |
|
|
|
t in Transaction, |
|
|
|
t in Transaction, |
|
|
|
|
|
|
|
# pending transactions are already excluded by `acquire_transactions` |
|
|
|
where: t.hash in ^transaction_hashes, |
|
|
|
where: t.hash in ^transaction_hashes, |
|
|
|
# ShareLocks order already enforced by `acquire_transactions` (see docs: sharelocks.md) |
|
|
|
# ShareLocks order already enforced by `acquire_transactions` (see docs: sharelocks.md) |
|
|
|
update: [ |
|
|
|
update: [ |
|
|
@ -214,22 +224,51 @@ defmodule Explorer.Chain.Import.Runner.InternalTransactions do |
|
|
|
end |
|
|
|
end |
|
|
|
end |
|
|
|
end |
|
|
|
|
|
|
|
|
|
|
|
defp reject_pending_transactions(ordered_changes_list, repo) do |
|
|
|
# If not using Parity this is not relevant |
|
|
|
transaction_hashes = |
|
|
|
defp remove_consensus_of_missing_transactions_blocks(_, [], _, _), do: {:ok, []} |
|
|
|
ordered_changes_list |
|
|
|
|
|
|
|
|> Enum.map(& &1.transaction_hash) |
|
|
|
|
|
|
|
|> Enum.dedup() |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
query = |
|
|
|
defp remove_consensus_of_missing_transactions_blocks(repo, block_hashes, changes_list, inserted) do |
|
|
|
from(t in Transaction, |
|
|
|
inserted_block_numbers = MapSet.new(inserted, & &1.block_number) |
|
|
|
where: t.hash in ^transaction_hashes, |
|
|
|
|
|
|
|
where: is_nil(t.block_hash), |
|
|
|
missing_transactions_block_numbers = |
|
|
|
select: t.hash |
|
|
|
changes_list |
|
|
|
|
|
|
|
|> MapSet.new(& &1.block_number) |
|
|
|
|
|
|
|
|> MapSet.difference(inserted_block_numbers) |
|
|
|
|
|
|
|
|> MapSet.to_list() |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
update_query = |
|
|
|
|
|
|
|
from( |
|
|
|
|
|
|
|
b in Block, |
|
|
|
|
|
|
|
where: b.number in ^missing_transactions_block_numbers, |
|
|
|
|
|
|
|
where: b.hash in ^block_hashes, |
|
|
|
|
|
|
|
# ShareLocks order already enforced by `internal_transactions_indexed_at_blocks` (see docs: sharelocks.md) |
|
|
|
|
|
|
|
update: [set: [consensus: false, internal_transactions_indexed_at: nil]] |
|
|
|
) |
|
|
|
) |
|
|
|
|
|
|
|
|
|
|
|
pending_transactions = repo.all(query) |
|
|
|
try do |
|
|
|
|
|
|
|
{_num, result} = repo.update_all(update_query, []) |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
Logger.debug(fn -> |
|
|
|
|
|
|
|
[ |
|
|
|
|
|
|
|
"consensus removed from blocks with numbers: ", |
|
|
|
|
|
|
|
inspect(missing_transactions_block_numbers), |
|
|
|
|
|
|
|
" because of missing transactions" |
|
|
|
|
|
|
|
] |
|
|
|
|
|
|
|
end) |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
{:ok, result} |
|
|
|
|
|
|
|
rescue |
|
|
|
|
|
|
|
postgrex_error in Postgrex.Error -> |
|
|
|
|
|
|
|
{:error, %{exception: postgrex_error, missing_transactions_block_numbers: missing_transactions_block_numbers}} |
|
|
|
|
|
|
|
end |
|
|
|
|
|
|
|
end |
|
|
|
|
|
|
|
|
|
|
|
ordered_changes_list |
|
|
|
defp reject_missing_transactions(ordered_changes_list, transactions_map) do |
|
|
|
|> Enum.reject(fn %{transaction_hash: hash} -> Enum.member?(pending_transactions, hash) end) |
|
|
|
Enum.reject(ordered_changes_list, fn %{transaction_hash: hash} -> |
|
|
|
|
|
|
|
transactions_map |
|
|
|
|
|
|
|
|> Map.get(hash, %{}) |
|
|
|
|
|
|
|
|> Map.get(:block_hash) |
|
|
|
|
|
|
|
|> is_nil() |
|
|
|
|
|
|
|
end) |
|
|
|
end |
|
|
|
end |
|
|
|
end |
|
|
|
end |
|
|
|