diff --git a/apps/explorer/lib/explorer/chain/import.ex b/apps/explorer/lib/explorer/chain/import.ex index 4ea5d7b0c2..2c604755b1 100644 --- a/apps/explorer/lib/explorer/chain/import.ex +++ b/apps/explorer/lib/explorer/chain/import.ex @@ -26,10 +26,6 @@ defmodule Explorer.Chain.Import do @type changeset_function_name :: atom @type on_conflict :: :nothing | :replace_all @type params :: [map()] - @type internal_transactions_options :: %{ - required(:params) => params, - optional(:timeout) => timeout - } @type logs_options :: %{ required(:params) => params, optional(:timeout) => timeout @@ -67,7 +63,7 @@ defmodule Explorer.Chain.Import do optional(:blocks) => Import.Blocks.options(), optional(:block_second_degree_relations) => Import.Block.SecondDegreeRelations.options(), optional(:broadcast) => boolean, - optional(:internal_transactions) => internal_transactions_options, + optional(:internal_transactions) => Import.InternalTransactions.options(), optional(:logs) => logs_options, optional(:receipts) => receipts_options, optional(:timeout) => timeout, @@ -84,9 +80,7 @@ defmodule Explorer.Chain.Import do optional(:address_coin_balances) => Import.Address.CoinBalances.imported(), optional(:blocks) => Import.Blocks.imported(), optional(:block_second_degree_relations) => Import.Block.SecondDegreeRelations.imported(), - optional(:internal_transactions) => [ - %{required(:index) => non_neg_integer(), required(:transaction_hash) => Hash.Full.t()} - ], + optional(:internal_transactions) => Import.InternalTransactions.imported(), optional(:logs) => [Log.t()], optional(:receipts) => [Hash.Full.t()], optional(:token_transfers) => [TokenTransfer.t()], @@ -107,7 +101,6 @@ defmodule Explorer.Chain.Import do @transaction_timeout 120_000 - @insert_internal_transactions_timeout 60_000 @insert_logs_timeout 60_000 @insert_token_transfers_timeout 60_000 @insert_token_balances_timeout 60_000 @@ -172,7 +165,7 @@ defmodule Explorer.Chain.Import do * `:internal_transactions` * `:params` - `list` of params for `Explorer.Chain.InternalTransaction.changeset/2`. * `:timeout` - the timeout for inserting all internal transactions. Defaults to - `#{@insert_internal_transactions_timeout}` milliseconds. + `#{Import.InternalTransactions.timeout()}` milliseconds. * `:logs` * `:params` - `list` of params for `Explorer.Chain.Log.changeset/2`. * `:timeout` - the timeout for inserting all logs. Defaults to `#{@insert_logs_timeout}` milliseconds. @@ -306,7 +299,7 @@ defmodule Explorer.Chain.Import do |> Import.Block.SecondDegreeRelations.run(ecto_schema_module_to_changes_list_map, full_options) |> run_transactions(ecto_schema_module_to_changes_list_map, full_options) |> run_transaction_forks(ecto_schema_module_to_changes_list_map, full_options) - |> run_internal_transactions(ecto_schema_module_to_changes_list_map, full_options) + |> Import.InternalTransactions.run(ecto_schema_module_to_changes_list_map, full_options) |> run_logs(ecto_schema_module_to_changes_list_map, full_options) |> run_tokens(ecto_schema_module_to_changes_list_map, full_options) |> run_token_transfers(ecto_schema_module_to_changes_list_map, full_options) @@ -357,38 +350,6 @@ defmodule Explorer.Chain.Import do end end - defp run_internal_transactions(multi, ecto_schema_module_to_changes_list_map, options) - when is_map(ecto_schema_module_to_changes_list_map) and is_map(options) do - case ecto_schema_module_to_changes_list_map do - %{InternalTransaction => internal_transactions_changes} -> - timestamps = Map.fetch!(options, :timestamps) - - multi - |> Multi.run(:internal_transactions, fn _ -> - insert_internal_transactions( - internal_transactions_changes, - %{ - timeout: options[:internal_transactions][:timeout] || @insert_internal_transactions_timeout, - timestamps: timestamps - } - ) - end) - |> Multi.run(:internal_transactions_indexed_at_transactions, fn %{internal_transactions: internal_transactions} - when is_list(internal_transactions) -> - update_transactions( - internal_transactions, - %{ - timeout: options[:transactions][:timeout] || @insert_transactions_timeout, - timestamps: timestamps - } - ) - end) - - _ -> - multi - end - end - defp run_logs(multi, ecto_schema_module_to_changes_list_map, options) when is_map(ecto_schema_module_to_changes_list_map) and is_map(options) do case ecto_schema_module_to_changes_list_map do @@ -474,32 +435,6 @@ defmodule Explorer.Chain.Import do end end - @spec insert_internal_transactions([map], %{required(:timeout) => timeout, required(:timestamps) => timestamps}) :: - {:ok, [%{index: non_neg_integer, transaction_hash: Hash.t()}]} - | {:error, [Changeset.t()]} - defp insert_internal_transactions(changes_list, %{timeout: timeout, timestamps: timestamps}) - when is_list(changes_list) do - # order so that row ShareLocks are grabbed in a consistent order - ordered_changes_list = Enum.sort_by(changes_list, &{&1.transaction_hash, &1.index}) - - {:ok, internal_transactions} = - insert_changes_list( - ordered_changes_list, - conflict_target: [:transaction_hash, :index], - for: InternalTransaction, - on_conflict: :replace_all, - returning: [:id, :index, :transaction_hash], - timeout: timeout, - timestamps: timestamps - ) - - {:ok, - for( - internal_transaction <- internal_transactions, - do: Map.take(internal_transaction, [:id, :index, :transaction_hash]) - )} - end - @spec insert_logs([map()], %{required(:timeout) => timeout, required(:timestamps) => timestamps}) :: {:ok, [Log.t()]} | {:error, [Changeset.t()]} @@ -689,58 +624,6 @@ defmodule Explorer.Chain.Import do {:ok, inserted} end - defp update_transactions(internal_transactions, %{ - timeout: timeout, - timestamps: timestamps - }) - when is_list(internal_transactions) do - ordered_transaction_hashes = - internal_transactions - |> MapSet.new(& &1.transaction_hash) - |> Enum.sort() - - query = - from( - t in Transaction, - where: t.hash in ^ordered_transaction_hashes, - update: [ - set: [ - internal_transactions_indexed_at: ^timestamps.updated_at, - created_contract_address_hash: - fragment( - "(SELECT it.created_contract_address_hash FROM internal_transactions AS it WHERE it.transaction_hash = ? and it.type = 'create' and ? IS NULL)", - t.hash, - t.to_address_hash - ), - error: - fragment( - "(SELECT it.error FROM internal_transactions AS it WHERE it.transaction_hash = ? ORDER BY it.index ASC LIMIT 1)", - t.hash - ), - status: - fragment( - "COALESCE(?, CASE WHEN (SELECT it.error FROM internal_transactions AS it WHERE it.transaction_hash = ? ORDER BY it.index ASC LIMIT 1) IS NULL THEN ? ELSE ? END)", - t.status, - t.hash, - type(^:ok, t.status), - type(^:error, t.status) - ) - ] - ] - ) - - transaction_count = Enum.count(ordered_transaction_hashes) - - try do - {^transaction_count, result} = Repo.update_all(query, [], timeout: timeout) - - {:ok, result} - rescue - postgrex_error in Postgrex.Error -> - {:error, %{exception: postgrex_error, transaction_hashes: ordered_transaction_hashes}} - end - end - defp timestamp_changes_list(changes_list, timestamps) when is_list(changes_list) do Enum.map(changes_list, ×tamp_params(&1, timestamps)) end diff --git a/apps/explorer/lib/explorer/chain/import/internal_transactions.ex b/apps/explorer/lib/explorer/chain/import/internal_transactions.ex new file mode 100644 index 0000000000..2d9feafb40 --- /dev/null +++ b/apps/explorer/lib/explorer/chain/import/internal_transactions.ex @@ -0,0 +1,136 @@ +defmodule Explorer.Chain.Import.InternalTransactions do + @moduledoc """ + Bulk imports `t:Explorer.Chain.InternalTransactions.t/0`. + """ + + require Ecto.Query + + alias Ecto.{Changeset, Multi} + alias Explorer.Chain.{Hash, Import, InternalTransaction, Transaction} + alias Explorer.Repo + + import Ecto.Query, only: [from: 2] + + # milliseconds + @timeout 60_000 + + @type options :: %{ + required(:params) => Import.params(), + optional(:timeout) => timeout + } + @type imported :: [ + %{required(:index) => non_neg_integer(), required(:transaction_hash) => Hash.Full.t()} + ] + + def run(multi, ecto_schema_module_to_changes_list_map, options) + when is_map(ecto_schema_module_to_changes_list_map) and is_map(options) do + case ecto_schema_module_to_changes_list_map do + %{InternalTransaction => internal_transactions_changes} -> + timestamps = Map.fetch!(options, :timestamps) + + multi + |> Multi.run(:internal_transactions, fn _ -> + insert( + internal_transactions_changes, + %{ + timeout: options[:internal_transactions][:timeout] || @timeout, + timestamps: timestamps + } + ) + end) + |> Multi.run(:internal_transactions_indexed_at_transactions, fn %{internal_transactions: internal_transactions} + when is_list(internal_transactions) -> + update_transactions( + internal_transactions, + %{ + timeout: options[:transactions][:timeout] || Import.transactions_timeout(), + timestamps: timestamps + } + ) + end) + + _ -> + multi + end + end + + def timeout, do: @timeout + + @spec insert([map], %{required(:timeout) => timeout, required(:timestamps) => Import.timestamps()}) :: + {:ok, [%{index: non_neg_integer, transaction_hash: Hash.t()}]} + | {:error, [Changeset.t()]} + defp insert(changes_list, %{timeout: timeout, timestamps: timestamps}) + when is_list(changes_list) do + # order so that row ShareLocks are grabbed in a consistent order + ordered_changes_list = Enum.sort_by(changes_list, &{&1.transaction_hash, &1.index}) + + {:ok, internal_transactions} = + Import.insert_changes_list( + ordered_changes_list, + conflict_target: [:transaction_hash, :index], + for: InternalTransaction, + on_conflict: :replace_all, + returning: [:id, :index, :transaction_hash], + timeout: timeout, + timestamps: timestamps + ) + + {:ok, + for( + internal_transaction <- internal_transactions, + do: Map.take(internal_transaction, [:id, :index, :transaction_hash]) + )} + end + + defp update_transactions(internal_transactions, %{ + timeout: timeout, + timestamps: timestamps + }) + when is_list(internal_transactions) do + ordered_transaction_hashes = + internal_transactions + |> MapSet.new(& &1.transaction_hash) + |> Enum.sort() + + query = + from( + t in Transaction, + where: t.hash in ^ordered_transaction_hashes, + update: [ + set: [ + internal_transactions_indexed_at: ^timestamps.updated_at, + created_contract_address_hash: + fragment( + "(SELECT it.created_contract_address_hash FROM internal_transactions AS it WHERE it.transaction_hash = ? and it.type = 'create' and ? IS NULL)", + t.hash, + t.to_address_hash + ), + error: + fragment( + "(SELECT it.error FROM internal_transactions AS it WHERE it.transaction_hash = ? ORDER BY it.index ASC LIMIT 1)", + t.hash + ), + status: + fragment( + "COALESCE(?, CASE WHEN (SELECT it.error FROM internal_transactions AS it WHERE it.transaction_hash = ? ORDER BY it.index ASC LIMIT 1) IS NULL THEN ? ELSE ? END)", + t.status, + t.hash, + type(^:ok, t.status), + type(^:error, t.status) + ) + ] + ] + ) + + transaction_count = Enum.count(ordered_transaction_hashes) + + try do + {^transaction_count, result} = Repo.update_all(query, [], timeout: timeout) + + {:ok, result} + rescue + postgrex_error in Postgrex.Error -> + {:error, %{exception: postgrex_error, transaction_hashes: ordered_transaction_hashes}} + end + end +end