|
|
@ -50,8 +50,8 @@ defmodule Explorer.Chain.Import.Runner.Transactions do |
|
|
|
|> Multi.run(:transactions, fn repo, _ -> |
|
|
|
|> Multi.run(:transactions, fn repo, _ -> |
|
|
|
insert(repo, changes_list, insert_options) |
|
|
|
insert(repo, changes_list, insert_options) |
|
|
|
end) |
|
|
|
end) |
|
|
|
|> Multi.run(:replaced_transactions, fn repo, %{transactions: transactions} -> |
|
|
|
|> Multi.run(:replaced_transactions, fn repo, _ -> |
|
|
|
update_replaced_transactions(repo, transactions, update_transactions_options) |
|
|
|
update_replaced_transactions(repo, changes_list, update_transactions_options) |
|
|
|
end) |
|
|
|
end) |
|
|
|
end |
|
|
|
end |
|
|
|
|
|
|
|
|
|
|
@ -188,38 +188,37 @@ defmodule Explorer.Chain.Import.Runner.Transactions do |
|
|
|
defp put_internal_transactions_indexed_at?(_, _), do: false |
|
|
|
defp put_internal_transactions_indexed_at?(_, _), do: false |
|
|
|
|
|
|
|
|
|
|
|
defp update_replaced_transactions(repo, transactions, %{timeout: timeout}) do |
|
|
|
defp update_replaced_transactions(repo, transactions, %{timeout: timeout}) do |
|
|
|
result = |
|
|
|
filters = |
|
|
|
transactions |
|
|
|
transactions |
|
|
|
|> Enum.filter(& &1.block_hash) |
|
|
|
|> Enum.filter(fn transaction -> |
|
|
|
|> Enum.map(fn transaction -> {transaction.nonce, transaction.from_address_hash} end) |
|
|
|
transaction.block_hash && transaction.block_number && transaction.nonce && transaction.from_address_hash |
|
|
|
|
|
|
|
end) |
|
|
|
|
|
|
|
|> Enum.map(fn transaction -> |
|
|
|
|
|
|
|
{transaction.nonce, transaction.from_address_hash} |
|
|
|
|
|
|
|
end) |
|
|
|
|> Enum.uniq() |
|
|
|
|> Enum.uniq() |
|
|
|
|> Enum.map(fn {nonce, from_address_hash} -> |
|
|
|
|
|
|
|
from(t in Transaction, |
|
|
|
if Enum.empty?(filters) do |
|
|
|
where: t.nonce == ^nonce and t.from_address_hash == ^from_address_hash and is_nil(t.block_hash), |
|
|
|
{:ok, []} |
|
|
|
update: [ |
|
|
|
else |
|
|
|
set: [status: ^:error, error: "dropped/replaced"] |
|
|
|
query = |
|
|
|
] |
|
|
|
filters |
|
|
|
|
|
|
|
|> Enum.reduce(from(t in Transaction, where: is_nil(t.block_hash)), fn {nonce, from_address}, query -> |
|
|
|
|
|
|
|
from(t in query, |
|
|
|
|
|
|
|
or_where: t.nonce == ^nonce and t.from_address_hash == ^from_address and is_nil(t.block_hash) |
|
|
|
) |
|
|
|
) |
|
|
|
end) |
|
|
|
end) |
|
|
|
|> Enum.map(fn query -> |
|
|
|
|
|
|
|
|
|
|
|
update_query = from(t in query, update: [set: [status: ^:error, error: "dropped/replaced"]]) |
|
|
|
|
|
|
|
|
|
|
|
try do |
|
|
|
try do |
|
|
|
{_, result} = repo.update_all(query, [], timeout: timeout) |
|
|
|
{_, result} = repo.update_all(update_query, [], timeout: timeout) |
|
|
|
|
|
|
|
|
|
|
|
{:ok, result} |
|
|
|
{:ok, result} |
|
|
|
rescue |
|
|
|
rescue |
|
|
|
postgrex_error in Postgrex.Error -> |
|
|
|
postgrex_error in Postgrex.Error -> |
|
|
|
{:error, %{exception: postgrex_error, query: query}} |
|
|
|
{:error, %{exception: postgrex_error, query: query}} |
|
|
|
end |
|
|
|
end |
|
|
|
end) |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
first_error = |
|
|
|
|
|
|
|
Enum.find(result, fn result -> |
|
|
|
|
|
|
|
case result do |
|
|
|
|
|
|
|
{:ok, _} -> false |
|
|
|
|
|
|
|
{:error, _} -> true |
|
|
|
|
|
|
|
end |
|
|
|
end |
|
|
|
end) |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
if first_error, do: first_error, else: {:ok, []} |
|
|
|
|
|
|
|
end |
|
|
|
end |
|
|
|
end |
|
|
|
end |
|
|
|