Extract Explorer.Chain.Import.Transactions

pull/859/head
Luke Imhoff 6 years ago
parent 22cb89058a
commit 991a2fe672
  1. 63
      apps/explorer/lib/explorer/chain/import.ex
  2. 2
      apps/explorer/lib/explorer/chain/import/blocks.ex
  3. 2
      apps/explorer/lib/explorer/chain/import/internal_transactions.ex
  4. 70
      apps/explorer/lib/explorer/chain/import/transactions.ex
  5. 2
      apps/indexer/lib/indexer/block/fetcher.ex

@ -26,12 +26,6 @@ defmodule Explorer.Chain.Import do
@type changeset_function_name :: atom
@type on_conflict :: :nothing | :replace_all
@type params :: [map()]
@type transactions_options :: %{
required(:params) => params,
optional(:with) => changeset_function_name,
optional(:on_conflict) => :nothing | :replace_all,
optional(:timeout) => timeout
}
@type transaction_forks_options :: %{
required(:params) => params,
optional(:timeout) => timeout
@ -52,7 +46,7 @@ defmodule Explorer.Chain.Import do
optional(:token_transfers) => Import.TokenTransfers.options(),
optional(:tokens) => Import.Tokens.options(),
optional(:token_balances) => token_balances_options,
optional(:transactions) => transactions_options,
optional(:transactions) => Import.Transactions.options(),
optional(:transaction_forks) => transaction_forks_options
}
@type all_result ::
@ -67,7 +61,7 @@ defmodule Explorer.Chain.Import do
optional(:token_transfers) => Import.TokenTransfers.imported(),
optional(:tokens) => Import.Tokens.imported(),
optional(:token_balances) => [TokenBalance.t()],
optional(:transactions) => [Hash.Full.t()],
optional(:transactions) => Import.Transactions.imported(),
optional(:transaction_forks) => [
%{required(:uncle_hash) => Hash.Full.t(), required(:hash) => Hash.Full.t()}
]
@ -83,10 +77,8 @@ defmodule Explorer.Chain.Import do
@transaction_timeout 120_000
@insert_token_balances_timeout 60_000
@insert_transactions_timeout 60_000
@insert_transaction_forks_timeout 60_000
def transactions_timeout, do: @insert_transactions_timeout
def transaction_forks_timeout, do: @insert_transaction_forks_timeout
@doc """
@ -168,7 +160,7 @@ defmodule Explorer.Chain.Import do
win.
* `:params` - `list` of params for `Explorer.Chain.Transaction.changeset/2`.
* `:timeout` - the timeout for inserting all transactions found in the params lists across all
types. Defaults to `#{@insert_transactions_timeout}` milliseconds.
types. Defaults to `#{Import.Transactions.timeout()}` milliseconds.
* `:with` - the changeset function on `Explorer.Chain.Transaction` to use validate `:params`.
* `:transaction_forks`
* `:params` - `list` of params for `Explorer.Chain.Transaction.Fork.changeset/2`.
@ -276,7 +268,7 @@ defmodule Explorer.Chain.Import do
|> Import.Address.CoinBalances.run(ecto_schema_module_to_changes_list_map, full_options)
|> Import.Blocks.run(ecto_schema_module_to_changes_list_map, full_options)
|> Import.Block.SecondDegreeRelations.run(ecto_schema_module_to_changes_list_map, full_options)
|> run_transactions(ecto_schema_module_to_changes_list_map, full_options)
|> Import.Transactions.run(ecto_schema_module_to_changes_list_map, full_options)
|> run_transaction_forks(ecto_schema_module_to_changes_list_map, full_options)
|> Import.InternalTransactions.run(ecto_schema_module_to_changes_list_map, full_options)
|> Import.Logs.run(ecto_schema_module_to_changes_list_map, full_options)
@ -285,29 +277,6 @@ defmodule Explorer.Chain.Import do
|> run_token_balances(ecto_schema_module_to_changes_list_map, full_options)
end
defp run_transactions(multi, ecto_schema_module_to_changes_list_map, options)
when is_map(ecto_schema_module_to_changes_list_map) and is_map(options) do
case ecto_schema_module_to_changes_list_map do
%{Transaction => transactions_changes} ->
# check required options as early as possible
%{timestamps: timestamps, transactions: %{on_conflict: on_conflict} = transactions_options} = options
Multi.run(multi, :transactions, fn _ ->
insert_transactions(
transactions_changes,
%{
on_conflict: on_conflict,
timeout: transactions_options[:timeout] || @insert_transactions_timeout,
timestamps: timestamps
}
)
end)
_ ->
multi
end
end
defp run_transaction_forks(multi, ecto_schema_module_to_changes_list_map, options)
when is_map(ecto_schema_module_to_changes_list_map) and is_map(options) do
case ecto_schema_module_to_changes_list_map do
@ -408,30 +377,6 @@ defmodule Explorer.Chain.Import do
)
end
@spec insert_transactions([map()], %{
required(:on_conflict) => on_conflict,
required(:timeout) => timeout,
required(:timestamps) => timestamps
}) :: {:ok, [Hash.t()]} | {:error, [Changeset.t()]}
defp insert_transactions(changes_list, %{on_conflict: on_conflict, timeout: timeout, timestamps: timestamps})
when is_list(changes_list) do
# order so that row ShareLocks are grabbed in a consistent order
ordered_changes_list = Enum.sort_by(changes_list, & &1.hash)
{:ok, transactions} =
insert_changes_list(
ordered_changes_list,
conflict_target: :hash,
on_conflict: on_conflict,
for: Transaction,
returning: [:hash],
timeout: timeout,
timestamps: timestamps
)
{:ok, for(transaction <- transactions, do: transaction.hash)}
end
@spec insert_transaction_forks([map()], %{
required(:timeout) => timeout,
required(:timestamps) => timestamps

@ -40,7 +40,7 @@ defmodule Explorer.Chain.Import.Blocks do
# MUST be after `:derive_transaction_forks`, which depends on values in `transactions` table
|> Multi.run(:fork_transactions, fn _ ->
fork_transactions(%{
timeout: options[:transactions][:timeout] || Import.transactions_timeout(),
timeout: options[:transactions][:timeout] || Import.Transactions.timeout(),
timestamps: timestamps,
where_forked: where_forked
})

@ -43,7 +43,7 @@ defmodule Explorer.Chain.Import.InternalTransactions do
update_transactions(
internal_transactions,
%{
timeout: options[:transactions][:timeout] || Import.transactions_timeout(),
timeout: options[:transactions][:timeout] || Import.Transactions.timeout(),
timestamps: timestamps
}
)

@ -0,0 +1,70 @@
defmodule Explorer.Chain.Import.Transactions do
@moduledoc """
Bulk imports `t:Explorer.Chain.Transaction.t/0`.
"""
require Ecto.Query
alias Ecto.{Changeset, Multi}
alias Explorer.Chain.{Hash, Import, Transaction}
# milliseconds
@timeout 60_000
@type options :: %{
required(:params) => Import.params(),
optional(:with) => Import.changeset_function_name(),
optional(:on_conflict) => :nothing | :replace_all,
optional(:timeout) => timeout
}
@type imported :: [Hash.Full.t()]
def run(multi, ecto_schema_module_to_changes_list_map, options)
when is_map(ecto_schema_module_to_changes_list_map) and is_map(options) do
case ecto_schema_module_to_changes_list_map do
%{Transaction => transactions_changes} ->
# check required options as early as possible
%{timestamps: timestamps, transactions: %{on_conflict: on_conflict} = transactions_options} = options
Multi.run(multi, :transactions, fn _ ->
insert(
transactions_changes,
%{
on_conflict: on_conflict,
timeout: transactions_options[:timeout] || @timeout,
timestamps: timestamps
}
)
end)
_ ->
multi
end
end
def timeout, do: @timeout
@spec insert([map()], %{
required(:on_conflict) => Import.on_conflict(),
required(:timeout) => timeout,
required(:timestamps) => Import.timestamps()
}) :: {:ok, [Hash.t()]} | {:error, [Changeset.t()]}
defp insert(changes_list, %{on_conflict: on_conflict, timeout: timeout, timestamps: timestamps})
when is_list(changes_list) do
# order so that row ShareLocks are grabbed in a consistent order
ordered_changes_list = Enum.sort_by(changes_list, & &1.hash)
{:ok, transactions} =
Import.insert_changes_list(
ordered_changes_list,
conflict_target: :hash,
on_conflict: on_conflict,
for: Transaction,
returning: [:hash],
timeout: timeout,
timestamps: timestamps
)
{:ok, for(transaction <- transactions, do: transaction.hash)}
end
end

@ -32,7 +32,7 @@ defmodule Indexer.Block.Fetcher do
token_balances: Import.token_balances_options(),
token_transfers: Import.TokenTransfers.options(),
tokens: Import.Tokens.options(),
transactions: Import.transactions_options()
transactions: Import.Transactions.options()
}
) :: Import.all_result()

Loading…
Cancel
Save