|
|
|
@ -6,6 +6,7 @@ defmodule Explorer.Chain.Import do |
|
|
|
|
import Ecto.Query, only: [from: 2, update: 2] |
|
|
|
|
|
|
|
|
|
alias Ecto.{Changeset, Multi} |
|
|
|
|
alias Ecto.Adapters.SQL |
|
|
|
|
|
|
|
|
|
alias Explorer.Chain.{ |
|
|
|
|
Address, |
|
|
|
@ -378,12 +379,22 @@ defmodule Explorer.Chain.Import do |
|
|
|
|
%{Block => blocks_changes} -> |
|
|
|
|
timestamps = Map.fetch!(options, :timestamps) |
|
|
|
|
blocks_timeout = options[:blocks][:timeout] || @insert_blocks_timeout |
|
|
|
|
where_forked = where_forked(blocks_changes) |
|
|
|
|
|
|
|
|
|
multi |
|
|
|
|
|> Multi.run(:derive_transaction_forks, fn _ -> |
|
|
|
|
derive_transaction_forks(%{ |
|
|
|
|
timeout: options[:transaction_forks][:timeout] || @insert_transaction_forks_timeout, |
|
|
|
|
timestamps: timestamps, |
|
|
|
|
where_forked: where_forked |
|
|
|
|
}) |
|
|
|
|
end) |
|
|
|
|
# MUST be after `:derive_transaction_forks`, which depends on values in `transactions` table |
|
|
|
|
|> Multi.run(:fork_transactions, fn _ -> |
|
|
|
|
fork_transactions(blocks_changes, %{ |
|
|
|
|
fork_transactions(%{ |
|
|
|
|
timeout: options[:transactions][:timeout] || @insert_transactions_timeout, |
|
|
|
|
timestamps: timestamps |
|
|
|
|
timestamps: timestamps, |
|
|
|
|
where_forked: where_forked |
|
|
|
|
}) |
|
|
|
|
end) |
|
|
|
|
|> Multi.run(:lose_consenus, fn _ -> |
|
|
|
@ -980,14 +991,9 @@ defmodule Explorer.Chain.Import do |
|
|
|
|
{:ok, inserted} |
|
|
|
|
end |
|
|
|
|
|
|
|
|
|
defp fork_transactions(blocks_changes, %{ |
|
|
|
|
timeout: timeout, |
|
|
|
|
timestamps: %{updated_at: updated_at} |
|
|
|
|
}) |
|
|
|
|
when is_list(blocks_changes) do |
|
|
|
|
defp fork_transactions(%{timeout: timeout, timestamps: %{updated_at: updated_at}, where_forked: where_forked}) do |
|
|
|
|
query = |
|
|
|
|
Transaction |
|
|
|
|
|> where_forked(blocks_changes) |
|
|
|
|
where_forked |
|
|
|
|
|> update( |
|
|
|
|
set: [ |
|
|
|
|
block_hash: nil, |
|
|
|
@ -1007,12 +1013,12 @@ defmodule Explorer.Chain.Import do |
|
|
|
|
{:ok, result} |
|
|
|
|
rescue |
|
|
|
|
postgrex_error in Postgrex.Error -> |
|
|
|
|
{:error, %{exception: postgrex_error, blocks_changes: blocks_changes}} |
|
|
|
|
{:error, %{exception: postgrex_error}} |
|
|
|
|
end |
|
|
|
|
end |
|
|
|
|
|
|
|
|
|
defp where_forked(query, blocks_changes) when is_list(blocks_changes) do |
|
|
|
|
Enum.reduce(blocks_changes, query, fn %{consensus: consensus, hash: hash, number: number}, acc -> |
|
|
|
|
defp where_forked(blocks_changes) when is_list(blocks_changes) do |
|
|
|
|
Enum.reduce(blocks_changes, Transaction, fn %{consensus: consensus, hash: hash, number: number}, acc -> |
|
|
|
|
case consensus do |
|
|
|
|
false -> |
|
|
|
|
from(transaction in acc, or_where: transaction.block_hash == ^hash and transaction.block_number == ^number) |
|
|
|
@ -1023,6 +1029,42 @@ defmodule Explorer.Chain.Import do |
|
|
|
|
end) |
|
|
|
|
end |
|
|
|
|
|
|
|
|
|
# sobelow_skip ["SQL.Query"] |
|
|
|
|
defp derive_transaction_forks(%{ |
|
|
|
|
timeout: timeout, |
|
|
|
|
timestamps: %{inserted_at: inserted_at, updated_at: updated_at}, |
|
|
|
|
where_forked: where_forked |
|
|
|
|
}) do |
|
|
|
|
query = |
|
|
|
|
from(transaction in where_forked, |
|
|
|
|
select: [ |
|
|
|
|
transaction.block_hash, |
|
|
|
|
transaction.index, |
|
|
|
|
transaction.hash, |
|
|
|
|
type(^inserted_at, transaction.inserted_at), |
|
|
|
|
type(^updated_at, transaction.updated_at) |
|
|
|
|
] |
|
|
|
|
) |
|
|
|
|
|
|
|
|
|
{sql, parameters} = SQL.to_sql(:all, Repo, query) |
|
|
|
|
|
|
|
|
|
{:ok, %Postgrex.Result{columns: ["uncle_hash", "hash"], command: :insert, rows: rows}} = |
|
|
|
|
SQL.query( |
|
|
|
|
Repo, |
|
|
|
|
""" |
|
|
|
|
INSERT INTO transaction_forks (uncle_hash, index, hash, inserted_at, updated_at) |
|
|
|
|
#{sql} |
|
|
|
|
RETURNING uncle_hash, hash |
|
|
|
|
""", |
|
|
|
|
parameters, |
|
|
|
|
timeout: timeout |
|
|
|
|
) |
|
|
|
|
|
|
|
|
|
derived_transaction_forks = Enum.map(rows, fn [uncle_hash, hash] -> %{uncle_hash: uncle_hash, hash: hash} end) |
|
|
|
|
|
|
|
|
|
{:ok, derived_transaction_forks} |
|
|
|
|
end |
|
|
|
|
|
|
|
|
|
defp lose_consensus(blocks_changes, %{timeout: timeout, timestamps: %{updated_at: updated_at}}) |
|
|
|
|
when is_list(blocks_changes) do |
|
|
|
|
ordered_block_number = |
|
|
|
|