Extract Explorer.Chain.Import.TokenTransfers

pull/859/head
Luke Imhoff 6 years ago
parent 753fbd8bd5
commit 14aa12e9bd
  1. 57
      apps/explorer/lib/explorer/chain/import.ex
  2. 62
      apps/explorer/lib/explorer/chain/import/token_transfers.ex
  3. 2
      apps/indexer/lib/indexer/block/fetcher.ex

@ -26,10 +26,6 @@ defmodule Explorer.Chain.Import do
@type changeset_function_name :: atom
@type on_conflict :: :nothing | :replace_all
@type params :: [map()]
@type token_transfers_options :: %{
required(:params) => params,
optional(:timeout) => timeout
}
@type tokens_options :: %{
required(:params) => params,
optional(:on_conflict) => :nothing | :replace_all,
@ -58,7 +54,7 @@ defmodule Explorer.Chain.Import do
optional(:internal_transactions) => Import.InternalTransactions.options(),
optional(:logs) => Import.Logs.options(),
optional(:timeout) => timeout,
optional(:token_transfers) => token_transfers_options,
optional(:token_transfers) => Import.TokenTransfers.options(),
optional(:tokens) => tokens_options,
optional(:token_balances) => token_balances_options,
optional(:transactions) => transactions_options,
@ -73,7 +69,7 @@ defmodule Explorer.Chain.Import do
optional(:block_second_degree_relations) => Import.Block.SecondDegreeRelations.imported(),
optional(:internal_transactions) => Import.InternalTransactions.imported(),
optional(:logs) => Import.Logs.imported(),
optional(:token_transfers) => [TokenTransfer.t()],
optional(:token_transfers) => Import.TokenTransfers.imported(),
optional(:tokens) => [Token.t()],
optional(:token_balances) => [TokenBalance.t()],
optional(:transactions) => [Hash.Full.t()],
@ -91,9 +87,6 @@ defmodule Explorer.Chain.Import do
@transaction_timeout 120_000
@insert_internal_transactions_timeout 60_000
@insert_logs_timeout 60_000
@insert_token_transfers_timeout 60_000
@insert_token_balances_timeout 60_000
@insert_tokens_timeout 60_000
@insert_transactions_timeout 60_000
@ -164,7 +157,8 @@ defmodule Explorer.Chain.Import do
milliseconds.
* `:token_transfers`
* `:params` - `list` of params for `Explorer.Chain.TokenTransfer.changeset/2`
* `:timeout` - the timeout for inserting all token transfers. Defaults to `#{@insert_token_transfers_timeout}` milliseconds.
* `:timeout` - the timeout for inserting all token transfers. Defaults to `#{Import.TokenTransfers.timeout()}`
milliseconds.
* `:tokens`
* `:on_conflict` - Whether to do `:nothing` or `:replace_all` columns when there is a pre-existing token
with the same contract address hash.
@ -293,7 +287,7 @@ defmodule Explorer.Chain.Import do
|> Import.InternalTransactions.run(ecto_schema_module_to_changes_list_map, full_options)
|> Import.Logs.run(ecto_schema_module_to_changes_list_map, full_options)
|> run_tokens(ecto_schema_module_to_changes_list_map, full_options)
|> run_token_transfers(ecto_schema_module_to_changes_list_map, full_options)
|> Import.TokenTransfers.run(ecto_schema_module_to_changes_list_map, full_options)
|> run_token_balances(ecto_schema_module_to_changes_list_map, full_options)
end
@ -363,27 +357,6 @@ defmodule Explorer.Chain.Import do
end
end
defp run_token_transfers(multi, ecto_schema_module_to_changes_list, options)
when is_map(ecto_schema_module_to_changes_list) and is_map(options) do
case ecto_schema_module_to_changes_list do
%{TokenTransfer => token_transfers_changes} ->
timestamps = Map.fetch!(options, :timestamps)
Multi.run(multi, :token_transfers, fn _ ->
insert_token_transfers(
token_transfers_changes,
%{
timeout: options[:token_transfers][:timeout] || @insert_token_transfers_timeout,
timestamps: timestamps
}
)
end)
_ ->
multi
end
end
defp run_token_balances(multi, ecto_schema_module_to_changes_list, options)
when is_map(ecto_schema_module_to_changes_list) and is_map(options) do
case ecto_schema_module_to_changes_list do
@ -429,26 +402,6 @@ defmodule Explorer.Chain.Import do
)
end
@spec insert_token_transfers([map()], %{required(:timeout) => timeout(), required(:timestamps) => timestamps()}) ::
{:ok, [TokenTransfer.t()]}
| {:error, [Changeset.t()]}
def insert_token_transfers(changes_list, %{timeout: timeout, timestamps: timestamps})
when is_list(changes_list) do
# order so that row ShareLocks are grabbed in a consistent order
ordered_changes_list = Enum.sort_by(changes_list, &{&1.transaction_hash, &1.log_index})
{:ok, _} =
insert_changes_list(
ordered_changes_list,
conflict_target: [:transaction_hash, :log_index],
on_conflict: :replace_all,
for: TokenTransfer,
returning: true,
timeout: timeout,
timestamps: timestamps
)
end
@spec insert_token_balances([map()], %{
required(:timeout) => timeout(),
required(:timestamps) => timestamps()

@ -0,0 +1,62 @@
defmodule Explorer.Chain.Import.TokenTransfers do
@moduledoc """
Bulk imports `t:Explorer.Chain.TokenTransfer.t/0`.
"""
require Ecto.Query
alias Ecto.{Changeset, Multi}
alias Explorer.Chain.{Import, TokenTransfer}
# milliseconds
@timeout 60_000
@type options :: %{
required(:params) => Import.params(),
optional(:timeout) => timeout
}
@type imported :: [TokenTransfer.t()]
def run(multi, ecto_schema_module_to_changes_list, options)
when is_map(ecto_schema_module_to_changes_list) and is_map(options) do
case ecto_schema_module_to_changes_list do
%{TokenTransfer => token_transfers_changes} ->
timestamps = Map.fetch!(options, :timestamps)
Multi.run(multi, :token_transfers, fn _ ->
insert(
token_transfers_changes,
%{
timeout: options[:token_transfers][:timeout] || @timeout,
timestamps: timestamps
}
)
end)
_ ->
multi
end
end
def timeout, do: @timeout
@spec insert([map()], %{required(:timeout) => timeout(), required(:timestamps) => Import.timestamps()}) ::
{:ok, [TokenTransfer.t()]}
| {:error, [Changeset.t()]}
def insert(changes_list, %{timeout: timeout, timestamps: timestamps})
when is_list(changes_list) do
# order so that row ShareLocks are grabbed in a consistent order
ordered_changes_list = Enum.sort_by(changes_list, &{&1.transaction_hash, &1.log_index})
{:ok, _} =
Import.insert_changes_list(
ordered_changes_list,
conflict_target: [:transaction_hash, :log_index],
on_conflict: :replace_all,
for: TokenTransfer,
returning: true,
timeout: timeout,
timestamps: timestamps
)
end
end

@ -30,7 +30,7 @@ defmodule Indexer.Block.Fetcher do
broadcast: boolean,
logs: Import.Logs.options(),
token_balances: Import.token_balances_options(),
token_transfers: Import.token_transfers_options(),
token_transfers: Import.TokenTransfers.options(),
tokens: Import.tokens_options(),
transactions: Import.transactions_options()
}

Loading…
Cancel
Save