Extract Explorer.Chain.Import.InternalTransactions

pull/859/head
Luke Imhoff 6 years ago
parent 4842462667
commit bca8f8a645
  1. 125
      apps/explorer/lib/explorer/chain/import.ex
  2. 136
      apps/explorer/lib/explorer/chain/import/internal_transactions.ex

@ -26,10 +26,6 @@ defmodule Explorer.Chain.Import do
@type changeset_function_name :: atom
@type on_conflict :: :nothing | :replace_all
@type params :: [map()]
@type internal_transactions_options :: %{
required(:params) => params,
optional(:timeout) => timeout
}
@type logs_options :: %{
required(:params) => params,
optional(:timeout) => timeout
@ -67,7 +63,7 @@ defmodule Explorer.Chain.Import do
optional(:blocks) => Import.Blocks.options(),
optional(:block_second_degree_relations) => Import.Block.SecondDegreeRelations.options(),
optional(:broadcast) => boolean,
optional(:internal_transactions) => internal_transactions_options,
optional(:internal_transactions) => Import.InternalTransactions.options(),
optional(:logs) => logs_options,
optional(:receipts) => receipts_options,
optional(:timeout) => timeout,
@ -84,9 +80,7 @@ defmodule Explorer.Chain.Import do
optional(:address_coin_balances) => Import.Address.CoinBalances.imported(),
optional(:blocks) => Import.Blocks.imported(),
optional(:block_second_degree_relations) => Import.Block.SecondDegreeRelations.imported(),
optional(:internal_transactions) => [
%{required(:index) => non_neg_integer(), required(:transaction_hash) => Hash.Full.t()}
],
optional(:internal_transactions) => Import.InternalTransactions.imported(),
optional(:logs) => [Log.t()],
optional(:receipts) => [Hash.Full.t()],
optional(:token_transfers) => [TokenTransfer.t()],
@ -107,7 +101,6 @@ defmodule Explorer.Chain.Import do
@transaction_timeout 120_000
@insert_internal_transactions_timeout 60_000
@insert_logs_timeout 60_000
@insert_token_transfers_timeout 60_000
@insert_token_balances_timeout 60_000
@ -172,7 +165,7 @@ defmodule Explorer.Chain.Import do
* `:internal_transactions`
* `:params` - `list` of params for `Explorer.Chain.InternalTransaction.changeset/2`.
* `:timeout` - the timeout for inserting all internal transactions. Defaults to
`#{@insert_internal_transactions_timeout}` milliseconds.
`#{Import.InternalTransactions.timeout()}` milliseconds.
* `:logs`
* `:params` - `list` of params for `Explorer.Chain.Log.changeset/2`.
* `:timeout` - the timeout for inserting all logs. Defaults to `#{@insert_logs_timeout}` milliseconds.
@ -306,7 +299,7 @@ defmodule Explorer.Chain.Import do
|> Import.Block.SecondDegreeRelations.run(ecto_schema_module_to_changes_list_map, full_options)
|> run_transactions(ecto_schema_module_to_changes_list_map, full_options)
|> run_transaction_forks(ecto_schema_module_to_changes_list_map, full_options)
|> run_internal_transactions(ecto_schema_module_to_changes_list_map, full_options)
|> Import.InternalTransactions.run(ecto_schema_module_to_changes_list_map, full_options)
|> run_logs(ecto_schema_module_to_changes_list_map, full_options)
|> run_tokens(ecto_schema_module_to_changes_list_map, full_options)
|> run_token_transfers(ecto_schema_module_to_changes_list_map, full_options)
@ -357,38 +350,6 @@ defmodule Explorer.Chain.Import do
end
end
defp run_internal_transactions(multi, ecto_schema_module_to_changes_list_map, options)
when is_map(ecto_schema_module_to_changes_list_map) and is_map(options) do
case ecto_schema_module_to_changes_list_map do
%{InternalTransaction => internal_transactions_changes} ->
timestamps = Map.fetch!(options, :timestamps)
multi
|> Multi.run(:internal_transactions, fn _ ->
insert_internal_transactions(
internal_transactions_changes,
%{
timeout: options[:internal_transactions][:timeout] || @insert_internal_transactions_timeout,
timestamps: timestamps
}
)
end)
|> Multi.run(:internal_transactions_indexed_at_transactions, fn %{internal_transactions: internal_transactions}
when is_list(internal_transactions) ->
update_transactions(
internal_transactions,
%{
timeout: options[:transactions][:timeout] || @insert_transactions_timeout,
timestamps: timestamps
}
)
end)
_ ->
multi
end
end
defp run_logs(multi, ecto_schema_module_to_changes_list_map, options)
when is_map(ecto_schema_module_to_changes_list_map) and is_map(options) do
case ecto_schema_module_to_changes_list_map do
@ -474,32 +435,6 @@ defmodule Explorer.Chain.Import do
end
end
@spec insert_internal_transactions([map], %{required(:timeout) => timeout, required(:timestamps) => timestamps}) ::
{:ok, [%{index: non_neg_integer, transaction_hash: Hash.t()}]}
| {:error, [Changeset.t()]}
defp insert_internal_transactions(changes_list, %{timeout: timeout, timestamps: timestamps})
when is_list(changes_list) do
# order so that row ShareLocks are grabbed in a consistent order
ordered_changes_list = Enum.sort_by(changes_list, &{&1.transaction_hash, &1.index})
{:ok, internal_transactions} =
insert_changes_list(
ordered_changes_list,
conflict_target: [:transaction_hash, :index],
for: InternalTransaction,
on_conflict: :replace_all,
returning: [:id, :index, :transaction_hash],
timeout: timeout,
timestamps: timestamps
)
{:ok,
for(
internal_transaction <- internal_transactions,
do: Map.take(internal_transaction, [:id, :index, :transaction_hash])
)}
end
@spec insert_logs([map()], %{required(:timeout) => timeout, required(:timestamps) => timestamps}) ::
{:ok, [Log.t()]}
| {:error, [Changeset.t()]}
@ -689,58 +624,6 @@ defmodule Explorer.Chain.Import do
{:ok, inserted}
end
defp update_transactions(internal_transactions, %{
timeout: timeout,
timestamps: timestamps
})
when is_list(internal_transactions) do
ordered_transaction_hashes =
internal_transactions
|> MapSet.new(& &1.transaction_hash)
|> Enum.sort()
query =
from(
t in Transaction,
where: t.hash in ^ordered_transaction_hashes,
update: [
set: [
internal_transactions_indexed_at: ^timestamps.updated_at,
created_contract_address_hash:
fragment(
"(SELECT it.created_contract_address_hash FROM internal_transactions AS it WHERE it.transaction_hash = ? and it.type = 'create' and ? IS NULL)",
t.hash,
t.to_address_hash
),
error:
fragment(
"(SELECT it.error FROM internal_transactions AS it WHERE it.transaction_hash = ? ORDER BY it.index ASC LIMIT 1)",
t.hash
),
status:
fragment(
"COALESCE(?, CASE WHEN (SELECT it.error FROM internal_transactions AS it WHERE it.transaction_hash = ? ORDER BY it.index ASC LIMIT 1) IS NULL THEN ? ELSE ? END)",
t.status,
t.hash,
type(^:ok, t.status),
type(^:error, t.status)
)
]
]
)
transaction_count = Enum.count(ordered_transaction_hashes)
try do
{^transaction_count, result} = Repo.update_all(query, [], timeout: timeout)
{:ok, result}
rescue
postgrex_error in Postgrex.Error ->
{:error, %{exception: postgrex_error, transaction_hashes: ordered_transaction_hashes}}
end
end
defp timestamp_changes_list(changes_list, timestamps) when is_list(changes_list) do
Enum.map(changes_list, &timestamp_params(&1, timestamps))
end

@ -0,0 +1,136 @@
defmodule Explorer.Chain.Import.InternalTransactions do
@moduledoc """
Bulk imports `t:Explorer.Chain.InternalTransactions.t/0`.
"""
require Ecto.Query
alias Ecto.{Changeset, Multi}
alias Explorer.Chain.{Hash, Import, InternalTransaction, Transaction}
alias Explorer.Repo
import Ecto.Query, only: [from: 2]
# milliseconds
@timeout 60_000
@type options :: %{
required(:params) => Import.params(),
optional(:timeout) => timeout
}
@type imported :: [
%{required(:index) => non_neg_integer(), required(:transaction_hash) => Hash.Full.t()}
]
def run(multi, ecto_schema_module_to_changes_list_map, options)
when is_map(ecto_schema_module_to_changes_list_map) and is_map(options) do
case ecto_schema_module_to_changes_list_map do
%{InternalTransaction => internal_transactions_changes} ->
timestamps = Map.fetch!(options, :timestamps)
multi
|> Multi.run(:internal_transactions, fn _ ->
insert(
internal_transactions_changes,
%{
timeout: options[:internal_transactions][:timeout] || @timeout,
timestamps: timestamps
}
)
end)
|> Multi.run(:internal_transactions_indexed_at_transactions, fn %{internal_transactions: internal_transactions}
when is_list(internal_transactions) ->
update_transactions(
internal_transactions,
%{
timeout: options[:transactions][:timeout] || Import.transactions_timeout(),
timestamps: timestamps
}
)
end)
_ ->
multi
end
end
def timeout, do: @timeout
@spec insert([map], %{required(:timeout) => timeout, required(:timestamps) => Import.timestamps()}) ::
{:ok, [%{index: non_neg_integer, transaction_hash: Hash.t()}]}
| {:error, [Changeset.t()]}
defp insert(changes_list, %{timeout: timeout, timestamps: timestamps})
when is_list(changes_list) do
# order so that row ShareLocks are grabbed in a consistent order
ordered_changes_list = Enum.sort_by(changes_list, &{&1.transaction_hash, &1.index})
{:ok, internal_transactions} =
Import.insert_changes_list(
ordered_changes_list,
conflict_target: [:transaction_hash, :index],
for: InternalTransaction,
on_conflict: :replace_all,
returning: [:id, :index, :transaction_hash],
timeout: timeout,
timestamps: timestamps
)
{:ok,
for(
internal_transaction <- internal_transactions,
do: Map.take(internal_transaction, [:id, :index, :transaction_hash])
)}
end
defp update_transactions(internal_transactions, %{
timeout: timeout,
timestamps: timestamps
})
when is_list(internal_transactions) do
ordered_transaction_hashes =
internal_transactions
|> MapSet.new(& &1.transaction_hash)
|> Enum.sort()
query =
from(
t in Transaction,
where: t.hash in ^ordered_transaction_hashes,
update: [
set: [
internal_transactions_indexed_at: ^timestamps.updated_at,
created_contract_address_hash:
fragment(
"(SELECT it.created_contract_address_hash FROM internal_transactions AS it WHERE it.transaction_hash = ? and it.type = 'create' and ? IS NULL)",
t.hash,
t.to_address_hash
),
error:
fragment(
"(SELECT it.error FROM internal_transactions AS it WHERE it.transaction_hash = ? ORDER BY it.index ASC LIMIT 1)",
t.hash
),
status:
fragment(
"COALESCE(?, CASE WHEN (SELECT it.error FROM internal_transactions AS it WHERE it.transaction_hash = ? ORDER BY it.index ASC LIMIT 1) IS NULL THEN ? ELSE ? END)",
t.status,
t.hash,
type(^:ok, t.status),
type(^:error, t.status)
)
]
]
)
transaction_count = Enum.count(ordered_transaction_hashes)
try do
{^transaction_count, result} = Repo.update_all(query, [], timeout: timeout)
{:ok, result}
rescue
postgrex_error in Postgrex.Error ->
{:error, %{exception: postgrex_error, transaction_hashes: ordered_transaction_hashes}}
end
end
end
Loading…
Cancel
Save