From 991a2fe6721b1e768ef3f4648d9f12177c48a338 Mon Sep 17 00:00:00 2001 From: Luke Imhoff Date: Wed, 3 Oct 2018 15:47:54 -0500 Subject: [PATCH] Extract Explorer.Chain.Import.Transactions --- apps/explorer/lib/explorer/chain/import.ex | 63 ++--------------- .../lib/explorer/chain/import/blocks.ex | 2 +- .../chain/import/internal_transactions.ex | 2 +- .../lib/explorer/chain/import/transactions.ex | 70 +++++++++++++++++++ apps/indexer/lib/indexer/block/fetcher.ex | 2 +- 5 files changed, 77 insertions(+), 62 deletions(-) create mode 100644 apps/explorer/lib/explorer/chain/import/transactions.ex diff --git a/apps/explorer/lib/explorer/chain/import.ex b/apps/explorer/lib/explorer/chain/import.ex index a1fe2ac805..6442a7305c 100644 --- a/apps/explorer/lib/explorer/chain/import.ex +++ b/apps/explorer/lib/explorer/chain/import.ex @@ -26,12 +26,6 @@ defmodule Explorer.Chain.Import do @type changeset_function_name :: atom @type on_conflict :: :nothing | :replace_all @type params :: [map()] - @type transactions_options :: %{ - required(:params) => params, - optional(:with) => changeset_function_name, - optional(:on_conflict) => :nothing | :replace_all, - optional(:timeout) => timeout - } @type transaction_forks_options :: %{ required(:params) => params, optional(:timeout) => timeout @@ -52,7 +46,7 @@ defmodule Explorer.Chain.Import do optional(:token_transfers) => Import.TokenTransfers.options(), optional(:tokens) => Import.Tokens.options(), optional(:token_balances) => token_balances_options, - optional(:transactions) => transactions_options, + optional(:transactions) => Import.Transactions.options(), optional(:transaction_forks) => transaction_forks_options } @type all_result :: @@ -67,7 +61,7 @@ defmodule Explorer.Chain.Import do optional(:token_transfers) => Import.TokenTransfers.imported(), optional(:tokens) => Import.Tokens.imported(), optional(:token_balances) => [TokenBalance.t()], - optional(:transactions) => [Hash.Full.t()], + optional(:transactions) => Import.Transactions.imported(), optional(:transaction_forks) => [ %{required(:uncle_hash) => Hash.Full.t(), required(:hash) => Hash.Full.t()} ] @@ -83,10 +77,8 @@ defmodule Explorer.Chain.Import do @transaction_timeout 120_000 @insert_token_balances_timeout 60_000 - @insert_transactions_timeout 60_000 @insert_transaction_forks_timeout 60_000 - def transactions_timeout, do: @insert_transactions_timeout def transaction_forks_timeout, do: @insert_transaction_forks_timeout @doc """ @@ -168,7 +160,7 @@ defmodule Explorer.Chain.Import do win. * `:params` - `list` of params for `Explorer.Chain.Transaction.changeset/2`. * `:timeout` - the timeout for inserting all transactions found in the params lists across all - types. Defaults to `#{@insert_transactions_timeout}` milliseconds. + types. Defaults to `#{Import.Transactions.timeout()}` milliseconds. * `:with` - the changeset function on `Explorer.Chain.Transaction` to use validate `:params`. * `:transaction_forks` * `:params` - `list` of params for `Explorer.Chain.Transaction.Fork.changeset/2`. @@ -276,7 +268,7 @@ defmodule Explorer.Chain.Import do |> Import.Address.CoinBalances.run(ecto_schema_module_to_changes_list_map, full_options) |> Import.Blocks.run(ecto_schema_module_to_changes_list_map, full_options) |> Import.Block.SecondDegreeRelations.run(ecto_schema_module_to_changes_list_map, full_options) - |> run_transactions(ecto_schema_module_to_changes_list_map, full_options) + |> Import.Transactions.run(ecto_schema_module_to_changes_list_map, full_options) |> run_transaction_forks(ecto_schema_module_to_changes_list_map, full_options) |> Import.InternalTransactions.run(ecto_schema_module_to_changes_list_map, full_options) |> Import.Logs.run(ecto_schema_module_to_changes_list_map, full_options) @@ -285,29 +277,6 @@ defmodule Explorer.Chain.Import do |> run_token_balances(ecto_schema_module_to_changes_list_map, full_options) end - defp run_transactions(multi, ecto_schema_module_to_changes_list_map, options) - when is_map(ecto_schema_module_to_changes_list_map) and is_map(options) do - case ecto_schema_module_to_changes_list_map do - %{Transaction => transactions_changes} -> - # check required options as early as possible - %{timestamps: timestamps, transactions: %{on_conflict: on_conflict} = transactions_options} = options - - Multi.run(multi, :transactions, fn _ -> - insert_transactions( - transactions_changes, - %{ - on_conflict: on_conflict, - timeout: transactions_options[:timeout] || @insert_transactions_timeout, - timestamps: timestamps - } - ) - end) - - _ -> - multi - end - end - defp run_transaction_forks(multi, ecto_schema_module_to_changes_list_map, options) when is_map(ecto_schema_module_to_changes_list_map) and is_map(options) do case ecto_schema_module_to_changes_list_map do @@ -408,30 +377,6 @@ defmodule Explorer.Chain.Import do ) end - @spec insert_transactions([map()], %{ - required(:on_conflict) => on_conflict, - required(:timeout) => timeout, - required(:timestamps) => timestamps - }) :: {:ok, [Hash.t()]} | {:error, [Changeset.t()]} - defp insert_transactions(changes_list, %{on_conflict: on_conflict, timeout: timeout, timestamps: timestamps}) - when is_list(changes_list) do - # order so that row ShareLocks are grabbed in a consistent order - ordered_changes_list = Enum.sort_by(changes_list, & &1.hash) - - {:ok, transactions} = - insert_changes_list( - ordered_changes_list, - conflict_target: :hash, - on_conflict: on_conflict, - for: Transaction, - returning: [:hash], - timeout: timeout, - timestamps: timestamps - ) - - {:ok, for(transaction <- transactions, do: transaction.hash)} - end - @spec insert_transaction_forks([map()], %{ required(:timeout) => timeout, required(:timestamps) => timestamps diff --git a/apps/explorer/lib/explorer/chain/import/blocks.ex b/apps/explorer/lib/explorer/chain/import/blocks.ex index dbd71c0de8..127f048acc 100644 --- a/apps/explorer/lib/explorer/chain/import/blocks.ex +++ b/apps/explorer/lib/explorer/chain/import/blocks.ex @@ -40,7 +40,7 @@ defmodule Explorer.Chain.Import.Blocks do # MUST be after `:derive_transaction_forks`, which depends on values in `transactions` table |> Multi.run(:fork_transactions, fn _ -> fork_transactions(%{ - timeout: options[:transactions][:timeout] || Import.transactions_timeout(), + timeout: options[:transactions][:timeout] || Import.Transactions.timeout(), timestamps: timestamps, where_forked: where_forked }) diff --git a/apps/explorer/lib/explorer/chain/import/internal_transactions.ex b/apps/explorer/lib/explorer/chain/import/internal_transactions.ex index 2d9feafb40..74a2c4182f 100644 --- a/apps/explorer/lib/explorer/chain/import/internal_transactions.ex +++ b/apps/explorer/lib/explorer/chain/import/internal_transactions.ex @@ -43,7 +43,7 @@ defmodule Explorer.Chain.Import.InternalTransactions do update_transactions( internal_transactions, %{ - timeout: options[:transactions][:timeout] || Import.transactions_timeout(), + timeout: options[:transactions][:timeout] || Import.Transactions.timeout(), timestamps: timestamps } ) diff --git a/apps/explorer/lib/explorer/chain/import/transactions.ex b/apps/explorer/lib/explorer/chain/import/transactions.ex new file mode 100644 index 0000000000..a001220cb9 --- /dev/null +++ b/apps/explorer/lib/explorer/chain/import/transactions.ex @@ -0,0 +1,70 @@ +defmodule Explorer.Chain.Import.Transactions do + @moduledoc """ + Bulk imports `t:Explorer.Chain.Transaction.t/0`. + """ + + require Ecto.Query + + alias Ecto.{Changeset, Multi} + alias Explorer.Chain.{Hash, Import, Transaction} + + # milliseconds + @timeout 60_000 + + @type options :: %{ + required(:params) => Import.params(), + optional(:with) => Import.changeset_function_name(), + optional(:on_conflict) => :nothing | :replace_all, + optional(:timeout) => timeout + } + @type imported :: [Hash.Full.t()] + + def run(multi, ecto_schema_module_to_changes_list_map, options) + when is_map(ecto_schema_module_to_changes_list_map) and is_map(options) do + case ecto_schema_module_to_changes_list_map do + %{Transaction => transactions_changes} -> + # check required options as early as possible + %{timestamps: timestamps, transactions: %{on_conflict: on_conflict} = transactions_options} = options + + Multi.run(multi, :transactions, fn _ -> + insert( + transactions_changes, + %{ + on_conflict: on_conflict, + timeout: transactions_options[:timeout] || @timeout, + timestamps: timestamps + } + ) + end) + + _ -> + multi + end + end + + def timeout, do: @timeout + + @spec insert([map()], %{ + required(:on_conflict) => Import.on_conflict(), + required(:timeout) => timeout, + required(:timestamps) => Import.timestamps() + }) :: {:ok, [Hash.t()]} | {:error, [Changeset.t()]} + defp insert(changes_list, %{on_conflict: on_conflict, timeout: timeout, timestamps: timestamps}) + when is_list(changes_list) do + # order so that row ShareLocks are grabbed in a consistent order + ordered_changes_list = Enum.sort_by(changes_list, & &1.hash) + + {:ok, transactions} = + Import.insert_changes_list( + ordered_changes_list, + conflict_target: :hash, + on_conflict: on_conflict, + for: Transaction, + returning: [:hash], + timeout: timeout, + timestamps: timestamps + ) + + {:ok, for(transaction <- transactions, do: transaction.hash)} + end +end diff --git a/apps/indexer/lib/indexer/block/fetcher.ex b/apps/indexer/lib/indexer/block/fetcher.ex index d4e693130c..6c0dcc1cd2 100644 --- a/apps/indexer/lib/indexer/block/fetcher.ex +++ b/apps/indexer/lib/indexer/block/fetcher.ex @@ -32,7 +32,7 @@ defmodule Indexer.Block.Fetcher do token_balances: Import.token_balances_options(), token_transfers: Import.TokenTransfers.options(), tokens: Import.Tokens.options(), - transactions: Import.transactions_options() + transactions: Import.Transactions.options() } ) :: Import.all_result()