From 14aa12e9bde56d9237add1144055d7e06f6c9a2e Mon Sep 17 00:00:00 2001 From: Luke Imhoff Date: Wed, 3 Oct 2018 14:57:37 -0500 Subject: [PATCH] Extract Explorer.Chain.Import.TokenTransfers --- apps/explorer/lib/explorer/chain/import.ex | 57 ++--------------- .../explorer/chain/import/token_transfers.ex | 62 +++++++++++++++++++ apps/indexer/lib/indexer/block/fetcher.ex | 2 +- 3 files changed, 68 insertions(+), 53 deletions(-) create mode 100644 apps/explorer/lib/explorer/chain/import/token_transfers.ex diff --git a/apps/explorer/lib/explorer/chain/import.ex b/apps/explorer/lib/explorer/chain/import.ex index 70a7fa6091..9658dbd0ae 100644 --- a/apps/explorer/lib/explorer/chain/import.ex +++ b/apps/explorer/lib/explorer/chain/import.ex @@ -26,10 +26,6 @@ defmodule Explorer.Chain.Import do @type changeset_function_name :: atom @type on_conflict :: :nothing | :replace_all @type params :: [map()] - @type token_transfers_options :: %{ - required(:params) => params, - optional(:timeout) => timeout - } @type tokens_options :: %{ required(:params) => params, optional(:on_conflict) => :nothing | :replace_all, @@ -58,7 +54,7 @@ defmodule Explorer.Chain.Import do optional(:internal_transactions) => Import.InternalTransactions.options(), optional(:logs) => Import.Logs.options(), optional(:timeout) => timeout, - optional(:token_transfers) => token_transfers_options, + optional(:token_transfers) => Import.TokenTransfers.options(), optional(:tokens) => tokens_options, optional(:token_balances) => token_balances_options, optional(:transactions) => transactions_options, @@ -73,7 +69,7 @@ defmodule Explorer.Chain.Import do optional(:block_second_degree_relations) => Import.Block.SecondDegreeRelations.imported(), optional(:internal_transactions) => Import.InternalTransactions.imported(), optional(:logs) => Import.Logs.imported(), - optional(:token_transfers) => [TokenTransfer.t()], + optional(:token_transfers) => Import.TokenTransfers.imported(), optional(:tokens) => [Token.t()], optional(:token_balances) => [TokenBalance.t()], optional(:transactions) => [Hash.Full.t()], @@ -91,9 +87,6 @@ defmodule Explorer.Chain.Import do @transaction_timeout 120_000 - @insert_internal_transactions_timeout 60_000 - @insert_logs_timeout 60_000 - @insert_token_transfers_timeout 60_000 @insert_token_balances_timeout 60_000 @insert_tokens_timeout 60_000 @insert_transactions_timeout 60_000 @@ -164,7 +157,8 @@ defmodule Explorer.Chain.Import do milliseconds. * `:token_transfers` * `:params` - `list` of params for `Explorer.Chain.TokenTransfer.changeset/2` - * `:timeout` - the timeout for inserting all token transfers. Defaults to `#{@insert_token_transfers_timeout}` milliseconds. + * `:timeout` - the timeout for inserting all token transfers. Defaults to `#{Import.TokenTransfers.timeout()}` + milliseconds. * `:tokens` * `:on_conflict` - Whether to do `:nothing` or `:replace_all` columns when there is a pre-existing token with the same contract address hash. @@ -293,7 +287,7 @@ defmodule Explorer.Chain.Import do |> Import.InternalTransactions.run(ecto_schema_module_to_changes_list_map, full_options) |> Import.Logs.run(ecto_schema_module_to_changes_list_map, full_options) |> run_tokens(ecto_schema_module_to_changes_list_map, full_options) - |> run_token_transfers(ecto_schema_module_to_changes_list_map, full_options) + |> Import.TokenTransfers.run(ecto_schema_module_to_changes_list_map, full_options) |> run_token_balances(ecto_schema_module_to_changes_list_map, full_options) end @@ -363,27 +357,6 @@ defmodule Explorer.Chain.Import do end end - defp run_token_transfers(multi, ecto_schema_module_to_changes_list, options) - when is_map(ecto_schema_module_to_changes_list) and is_map(options) do - case ecto_schema_module_to_changes_list do - %{TokenTransfer => token_transfers_changes} -> - timestamps = Map.fetch!(options, :timestamps) - - Multi.run(multi, :token_transfers, fn _ -> - insert_token_transfers( - token_transfers_changes, - %{ - timeout: options[:token_transfers][:timeout] || @insert_token_transfers_timeout, - timestamps: timestamps - } - ) - end) - - _ -> - multi - end - end - defp run_token_balances(multi, ecto_schema_module_to_changes_list, options) when is_map(ecto_schema_module_to_changes_list) and is_map(options) do case ecto_schema_module_to_changes_list do @@ -429,26 +402,6 @@ defmodule Explorer.Chain.Import do ) end - @spec insert_token_transfers([map()], %{required(:timeout) => timeout(), required(:timestamps) => timestamps()}) :: - {:ok, [TokenTransfer.t()]} - | {:error, [Changeset.t()]} - def insert_token_transfers(changes_list, %{timeout: timeout, timestamps: timestamps}) - when is_list(changes_list) do - # order so that row ShareLocks are grabbed in a consistent order - ordered_changes_list = Enum.sort_by(changes_list, &{&1.transaction_hash, &1.log_index}) - - {:ok, _} = - insert_changes_list( - ordered_changes_list, - conflict_target: [:transaction_hash, :log_index], - on_conflict: :replace_all, - for: TokenTransfer, - returning: true, - timeout: timeout, - timestamps: timestamps - ) - end - @spec insert_token_balances([map()], %{ required(:timeout) => timeout(), required(:timestamps) => timestamps() diff --git a/apps/explorer/lib/explorer/chain/import/token_transfers.ex b/apps/explorer/lib/explorer/chain/import/token_transfers.ex new file mode 100644 index 0000000000..d9da70898d --- /dev/null +++ b/apps/explorer/lib/explorer/chain/import/token_transfers.ex @@ -0,0 +1,62 @@ +defmodule Explorer.Chain.Import.TokenTransfers do + @moduledoc """ + Bulk imports `t:Explorer.Chain.TokenTransfer.t/0`. + """ + + require Ecto.Query + + alias Ecto.{Changeset, Multi} + alias Explorer.Chain.{Import, TokenTransfer} + + # milliseconds + @timeout 60_000 + + @type options :: %{ + required(:params) => Import.params(), + optional(:timeout) => timeout + } + @type imported :: [TokenTransfer.t()] + + def run(multi, ecto_schema_module_to_changes_list, options) + when is_map(ecto_schema_module_to_changes_list) and is_map(options) do + case ecto_schema_module_to_changes_list do + %{TokenTransfer => token_transfers_changes} -> + timestamps = Map.fetch!(options, :timestamps) + + Multi.run(multi, :token_transfers, fn _ -> + insert( + token_transfers_changes, + %{ + timeout: options[:token_transfers][:timeout] || @timeout, + timestamps: timestamps + } + ) + end) + + _ -> + multi + end + end + + def timeout, do: @timeout + + @spec insert([map()], %{required(:timeout) => timeout(), required(:timestamps) => Import.timestamps()}) :: + {:ok, [TokenTransfer.t()]} + | {:error, [Changeset.t()]} + def insert(changes_list, %{timeout: timeout, timestamps: timestamps}) + when is_list(changes_list) do + # order so that row ShareLocks are grabbed in a consistent order + ordered_changes_list = Enum.sort_by(changes_list, &{&1.transaction_hash, &1.log_index}) + + {:ok, _} = + Import.insert_changes_list( + ordered_changes_list, + conflict_target: [:transaction_hash, :log_index], + on_conflict: :replace_all, + for: TokenTransfer, + returning: true, + timeout: timeout, + timestamps: timestamps + ) + end +end diff --git a/apps/indexer/lib/indexer/block/fetcher.ex b/apps/indexer/lib/indexer/block/fetcher.ex index 2206181200..05cb1cd411 100644 --- a/apps/indexer/lib/indexer/block/fetcher.ex +++ b/apps/indexer/lib/indexer/block/fetcher.ex @@ -30,7 +30,7 @@ defmodule Indexer.Block.Fetcher do broadcast: boolean, logs: Import.Logs.options(), token_balances: Import.token_balances_options(), - token_transfers: Import.token_transfers_options(), + token_transfers: Import.TokenTransfers.options(), tokens: Import.tokens_options(), transactions: Import.transactions_options() }