Merge pull request #1242 from poanetwork/import-stage

Import stages
pull/1226/head
Luke Imhoff 6 years ago committed by GitHub
commit 7a3dce7c8d
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 2
      .credo.exs
  2. 13
      apps/explorer/lib/explorer/chain.ex
  3. 81
      apps/explorer/lib/explorer/chain/import.ex
  4. 18
      apps/explorer/lib/explorer/chain/import/runner.ex
  5. 2
      apps/explorer/lib/explorer/chain/import/runner/address/coin_balances.ex
  6. 2
      apps/explorer/lib/explorer/chain/import/runner/address/current_token_balances.ex
  7. 2
      apps/explorer/lib/explorer/chain/import/runner/address/token_balances.ex
  8. 2
      apps/explorer/lib/explorer/chain/import/runner/addresses.ex
  9. 2
      apps/explorer/lib/explorer/chain/import/runner/block/second_degree_relations.ex
  10. 2
      apps/explorer/lib/explorer/chain/import/runner/block_rewards.ex
  11. 25
      apps/explorer/lib/explorer/chain/import/runner/blocks.ex
  12. 19
      apps/explorer/lib/explorer/chain/import/runner/internal_transactions.ex
  13. 2
      apps/explorer/lib/explorer/chain/import/runner/logs.ex
  14. 2
      apps/explorer/lib/explorer/chain/import/runner/token_transfers.ex
  15. 2
      apps/explorer/lib/explorer/chain/import/runner/tokens.ex
  16. 2
      apps/explorer/lib/explorer/chain/import/runner/transaction/forks.ex
  17. 2
      apps/explorer/lib/explorer/chain/import/runner/transactions.ex
  18. 50
      apps/explorer/lib/explorer/chain/import/stage.ex
  19. 50
      apps/explorer/lib/explorer/chain/import/stage/address_referencing.ex
  20. 22
      apps/explorer/lib/explorer/chain/import/stage/addresses.ex
  21. 19
      apps/explorer/lib/explorer/logger.ex
  22. 16
      apps/explorer/lib/explorer/repo.ex
  23. 4
      apps/explorer/test/explorer/chain/import/runner/address/current_token_balances_test.exs

@ -75,7 +75,7 @@
# Priority values are: `low, normal, high, higher` # Priority values are: `low, normal, high, higher`
# #
{Credo.Check.Design.AliasUsage, {Credo.Check.Design.AliasUsage,
excluded_namespaces: ~w(Block Blocks Import Socket SpandexDatadog Task), excluded_namespaces: ~w(Block Blocks Import Runner Socket SpandexDatadog Task),
excluded_lastnames: excluded_lastnames:
~w(Address DateTime Exporter Fetcher Full Instrumenter Logger Monitor Name Number Repo Spec Time Unit), ~w(Address DateTime Exporter Fetcher Full Instrumenter Logger Monitor Name Number Repo Spec Time Unit),
priority: :low}, priority: :low},

@ -20,8 +20,6 @@ defmodule Explorer.Chain do
alias Ecto.Adapters.SQL alias Ecto.Adapters.SQL
alias Ecto.Multi alias Ecto.Multi
alias Explorer.Chain
alias Explorer.Chain.{ alias Explorer.Chain.{
Address, Address,
Address.CoinBalance, Address.CoinBalance,
@ -41,6 +39,7 @@ defmodule Explorer.Chain do
} }
alias Explorer.Chain.Block.EmissionReward alias Explorer.Chain.Block.EmissionReward
alias Explorer.Chain.Import.Runner
alias Explorer.{PagingOptions, Repo} alias Explorer.{PagingOptions, Repo}
alias Explorer.Counters.{ alias Explorer.Counters.{
@ -632,13 +631,13 @@ defmodule Explorer.Chain do
""" """
@spec find_or_insert_address_from_hash(Hash.Address.t()) :: {:ok, Address.t()} @spec find_or_insert_address_from_hash(Hash.Address.t()) :: {:ok, Address.t()}
def find_or_insert_address_from_hash(%Hash{byte_count: unquote(Hash.Address.byte_count())} = hash) do def find_or_insert_address_from_hash(%Hash{byte_count: unquote(Hash.Address.byte_count())} = hash) do
case Chain.hash_to_address(hash) do case hash_to_address(hash) do
{:ok, address} -> {:ok, address} ->
{:ok, address} {:ok, address}
{:error, :not_found} -> {:error, :not_found} ->
Chain.create_address(%{hash: to_string(hash)}) create_address(%{hash: to_string(hash)})
Chain.hash_to_address(hash) hash_to_address(hash)
end end
end end
@ -1968,7 +1967,7 @@ defmodule Explorer.Chain do
) :: {:ok, accumulator} ) :: {:ok, accumulator}
when accumulator: term() when accumulator: term()
def stream_cataloged_token_contract_address_hashes(initial, reducer) when is_function(reducer, 2) do def stream_cataloged_token_contract_address_hashes(initial, reducer) when is_function(reducer, 2) do
Chain.Token.cataloged_tokens() Token.cataloged_tokens()
|> order_by(asc: :updated_at) |> order_by(asc: :updated_at)
|> Repo.stream_reduce(initial, reducer) |> Repo.stream_reduce(initial, reducer)
end end
@ -2048,7 +2047,7 @@ defmodule Explorer.Chain do
token_changeset = Token.changeset(token, params) token_changeset = Token.changeset(token, params)
address_name_changeset = Address.Name.changeset(%Address.Name{}, Map.put(params, :address_hash, address_hash)) address_name_changeset = Address.Name.changeset(%Address.Name{}, Map.put(params, :address_hash, address_hash))
token_opts = [on_conflict: Import.Tokens.default_on_conflict(), conflict_target: :contract_address_hash] token_opts = [on_conflict: Runner.Tokens.default_on_conflict(), conflict_target: :contract_address_hash]
address_name_opts = [on_conflict: :nothing, conflict_target: [:address_hash, :name]] address_name_opts = [on_conflict: :nothing, conflict_target: [:address_hash, :name]]
insert_result = insert_result =

@ -3,27 +3,18 @@ defmodule Explorer.Chain.Import do
Bulk importing of data into `Explorer.Repo` Bulk importing of data into `Explorer.Repo`
""" """
alias Ecto.{Changeset, Multi} alias Ecto.Changeset
alias Explorer.Chain.Import alias Explorer.Chain.Import
alias Explorer.Repo alias Explorer.Repo
# in order so that foreign keys are inserted before being referenced @stages [
@runners [ Import.Stage.Addresses,
Import.Addresses, Import.Stage.AddressReferencing
Import.Address.CoinBalances,
Import.Blocks,
Import.Block.Rewards,
Import.Block.SecondDegreeRelations,
Import.Transactions,
Import.Transaction.Forks,
Import.InternalTransactions,
Import.Logs,
Import.Tokens,
Import.TokenTransfers,
Import.Address.CurrentTokenBalances,
Import.Address.TokenBalances
] ]
# in order so that foreign keys are inserted before being referenced
@runners Enum.flat_map(@stages, fn stage -> stage.runners() end)
quoted_runner_option_value = quoted_runner_option_value =
quote do quote do
Import.Runner.options() Import.Runner.options()
@ -129,8 +120,8 @@ defmodule Explorer.Chain.Import do
def all(options) when is_map(options) do def all(options) when is_map(options) do
with {:ok, runner_options_pairs} <- validate_options(options), with {:ok, runner_options_pairs} <- validate_options(options),
{:ok, valid_runner_option_pairs} <- validate_runner_options_pairs(runner_options_pairs), {:ok, valid_runner_option_pairs} <- validate_runner_options_pairs(runner_options_pairs),
{:ok, runner_changes_list_pairs} <- runner_changes_list_pairs(valid_runner_option_pairs), {:ok, runner_to_changes_list} <- runner_to_changes_list(valid_runner_option_pairs),
{:ok, data} <- insert_runner_changes_list_pairs(runner_changes_list_pairs, options) do {:ok, data} <- insert_runner_to_changes_list(runner_to_changes_list, options) do
broadcast_events(data, Map.get(options, :broadcast, false)) broadcast_events(data, Map.get(options, :broadcast, false))
{:ok, data} {:ok, data}
end end
@ -153,13 +144,12 @@ defmodule Explorer.Chain.Import do
end) end)
end end
defp runner_changes_list_pairs(runner_options_pairs) when is_list(runner_options_pairs) do defp runner_to_changes_list(runner_options_pairs) when is_list(runner_options_pairs) do
{status, reversed} =
runner_options_pairs runner_options_pairs
|> Stream.map(fn {runner, options} -> runner_changes_list(runner, options) end) |> Stream.map(fn {runner, options} -> runner_changes_list(runner, options) end)
|> Enum.reduce({:ok, []}, fn |> Enum.reduce({:ok, %{}}, fn
{:ok, runner_changes_pair}, {:ok, acc_runner_changes_pairs} -> {:ok, {runner, changes_list}}, {:ok, acc_runner_to_changes_list} ->
{:ok, [runner_changes_pair | acc_runner_changes_pairs]} {:ok, Map.put(acc_runner_to_changes_list, runner, changes_list)}
{:ok, _}, {:error, _} = error -> {:ok, _}, {:error, _} = error ->
error error
@ -170,8 +160,6 @@ defmodule Explorer.Chain.Import do
{:error, runner_changesets}, {:error, acc_changesets} -> {:error, runner_changesets}, {:error, acc_changesets} ->
{:error, acc_changesets ++ runner_changesets} {:error, acc_changesets ++ runner_changesets}
end) end)
{status, Enum.reverse(reversed)}
end end
defp runner_changes_list(runner, %{params: params} = options) do defp runner_changes_list(runner, %{params: params} = options) do
@ -286,14 +274,22 @@ defmodule Explorer.Chain.Import do
end end
end end
defp runner_changes_list_pairs_to_multi(runner_changes_list_pairs, options) defp runner_to_changes_list_to_multis(runner_to_changes_list, options)
when is_list(runner_changes_list_pairs) and is_map(options) do when is_map(runner_to_changes_list) and is_map(options) do
timestamps = timestamps() timestamps = timestamps()
full_options = Map.put(options, :timestamps, timestamps) full_options = Map.put(options, :timestamps, timestamps)
Enum.reduce(runner_changes_list_pairs, Multi.new(), fn {runner, changes_list}, acc -> {multis, final_runner_to_changes_list} =
runner.run(acc, changes_list, full_options) Enum.flat_map_reduce(@stages, runner_to_changes_list, fn stage, remaining_runner_to_changes_list ->
stage.multis(remaining_runner_to_changes_list, full_options)
end) end)
unless Enum.empty?(final_runner_to_changes_list) do
raise ArgumentError,
"No stages consumed the following runners: #{final_runner_to_changes_list |> Map.keys() |> inspect()}"
end
multis
end end
def insert_changes_list(repo, changes_list, options) when is_atom(repo) and is_list(changes_list) do def insert_changes_list(repo, changes_list, options) when is_atom(repo) and is_list(changes_list) do
@ -319,14 +315,29 @@ defmodule Explorer.Chain.Import do
Map.merge(changes, timestamps) Map.merge(changes, timestamps)
end end
defp import_transaction(multi, options) when is_map(options) do defp insert_runner_to_changes_list(runner_to_changes_list, options) when is_map(runner_to_changes_list) do
Repo.transaction(multi, timeout: Map.get(options, :timeout, @transaction_timeout)) runner_to_changes_list
|> runner_to_changes_list_to_multis(options)
|> logged_import(options)
end end
defp insert_runner_changes_list_pairs(runner_changes_list_pairs, options) do defp logged_import(multis, options) when is_list(multis) and is_map(options) do
runner_changes_list_pairs import_id = :erlang.unique_integer([:positive])
|> runner_changes_list_pairs_to_multi(options)
|> import_transaction(options) Explorer.Logger.metadata(fn -> import_transactions(multis, options) end, import_id: import_id)
end
defp import_transactions(multis, options) when is_list(multis) and is_map(options) do
Enum.reduce_while(multis, {:ok, %{}}, fn multi, {:ok, acc_changes} ->
case import_transaction(multi, options) do
{:ok, changes} -> {:cont, {:ok, Map.merge(acc_changes, changes)}}
{:error, _, _, _} = error -> {:halt, error}
end
end)
end
defp import_transaction(multi, options) when is_map(options) do
Repo.logged_transaction(multi, timeout: Map.get(options, :timeout, @transaction_timeout))
end end
@spec timestamps() :: timestamps @spec timestamps() :: timestamps

@ -5,6 +5,22 @@ defmodule Explorer.Chain.Import.Runner do
alias Ecto.Multi alias Ecto.Multi
@typedoc """
A callback module that implements this module's behaviour.
"""
@type t :: module
@typedoc """
Validated changes extracted from a valid `Ecto.Changeset` produced by the `t:changeset_function_name/0` in
`c:ecto_schemma_module/0`.
"""
@type changes :: %{optional(atom) => term()}
@typedoc """
A list of `t:changes/0` to be imported by `c:run/3`.
"""
@type changes_list :: [changes]
@type changeset_function_name :: atom @type changeset_function_name :: atom
@type on_conflict :: :nothing | :replace_all | Ecto.Query.t() @type on_conflict :: :nothing | :replace_all | Ecto.Query.t()
@ -32,6 +48,6 @@ defmodule Explorer.Chain.Import.Runner do
The `Ecto.Schema` module that contains the `:changeset` function for validating `options[options_key][:params]`. The `Ecto.Schema` module that contains the `:changeset` function for validating `options[options_key][:params]`.
""" """
@callback ecto_schema_module() :: module() @callback ecto_schema_module() :: module()
@callback run(Multi.t(), changes_list :: [%{optional(atom()) => term()}], %{optional(atom()) => term()}) :: Multi.t() @callback run(Multi.t(), changes_list, %{optional(atom()) => term()}) :: Multi.t()
@callback timeout() :: timeout() @callback timeout() :: timeout()
end end

@ -1,4 +1,4 @@
defmodule Explorer.Chain.Import.Address.CoinBalances do defmodule Explorer.Chain.Import.Runner.Address.CoinBalances do
@moduledoc """ @moduledoc """
Bulk imports `t:Explorer.Chain.Address.CoinBalance.t/0`. Bulk imports `t:Explorer.Chain.Address.CoinBalance.t/0`.
""" """

@ -1,4 +1,4 @@
defmodule Explorer.Chain.Import.Address.CurrentTokenBalances do defmodule Explorer.Chain.Import.Runner.Address.CurrentTokenBalances do
@moduledoc """ @moduledoc """
Bulk imports `t:Explorer.Chain.Address.CurrentTokenBalance.t/0`. Bulk imports `t:Explorer.Chain.Address.CurrentTokenBalance.t/0`.
""" """

@ -1,4 +1,4 @@
defmodule Explorer.Chain.Import.Address.TokenBalances do defmodule Explorer.Chain.Import.Runner.Address.TokenBalances do
@moduledoc """ @moduledoc """
Bulk imports `t:Explorer.Chain.Address.TokenBalance.t/0`. Bulk imports `t:Explorer.Chain.Address.TokenBalance.t/0`.
""" """

@ -1,4 +1,4 @@
defmodule Explorer.Chain.Import.Addresses do defmodule Explorer.Chain.Import.Runner.Addresses do
@moduledoc """ @moduledoc """
Bulk imports `t:Explorer.Chain.Address.t/0`. Bulk imports `t:Explorer.Chain.Address.t/0`.
""" """

@ -1,4 +1,4 @@
defmodule Explorer.Chain.Import.Block.SecondDegreeRelations do defmodule Explorer.Chain.Import.Runner.Block.SecondDegreeRelations do
@moduledoc """ @moduledoc """
Bulk imports `t:Explorer.Chain.Block.SecondDegreeRelation.t/0`. Bulk imports `t:Explorer.Chain.Block.SecondDegreeRelation.t/0`.
""" """

@ -1,4 +1,4 @@
defmodule Explorer.Chain.Import.Block.Rewards do defmodule Explorer.Chain.Import.Runner.Block.Rewards do
@moduledoc """ @moduledoc """
Bulk imports `t:Explorer.Chain.Block.Reward.t/0`. Bulk imports `t:Explorer.Chain.Block.Reward.t/0`.
""" """

@ -1,4 +1,4 @@
defmodule Explorer.Chain.Import.Blocks do defmodule Explorer.Chain.Import.Runner.Blocks do
@moduledoc """ @moduledoc """
Bulk imports `t:Explorer.Chain.Block.t/0`. Bulk imports `t:Explorer.Chain.Block.t/0`.
""" """
@ -10,21 +10,22 @@ defmodule Explorer.Chain.Import.Blocks do
alias Ecto.Adapters.SQL alias Ecto.Adapters.SQL
alias Ecto.{Changeset, Multi, Repo} alias Ecto.{Changeset, Multi, Repo}
alias Explorer.Chain.{Block, Import, InternalTransaction, Transaction} alias Explorer.Chain.{Block, Import, InternalTransaction, Transaction}
alias Explorer.Chain.Import.Runner
@behaviour Import.Runner @behaviour Runner
# milliseconds # milliseconds
@timeout 60_000 @timeout 60_000
@type imported :: [Block.t()] @type imported :: [Block.t()]
@impl Import.Runner @impl Runner
def ecto_schema_module, do: Block def ecto_schema_module, do: Block
@impl Import.Runner @impl Runner
def option_key, do: :blocks def option_key, do: :blocks
@impl Import.Runner @impl Runner
def imported_table_row do def imported_table_row do
%{ %{
value_type: "[#{ecto_schema_module()}.t()]", value_type: "[#{ecto_schema_module()}.t()]",
@ -32,7 +33,7 @@ defmodule Explorer.Chain.Import.Blocks do
} }
end end
@impl Import.Runner @impl Runner
def run(multi, changes_list, %{timestamps: timestamps} = options) do def run(multi, changes_list, %{timestamps: timestamps} = options) do
insert_options = insert_options =
options options
@ -47,7 +48,7 @@ defmodule Explorer.Chain.Import.Blocks do
|> Multi.run(:derive_transaction_forks, fn repo, _ -> |> Multi.run(:derive_transaction_forks, fn repo, _ ->
derive_transaction_forks(%{ derive_transaction_forks(%{
repo: repo, repo: repo,
timeout: options[Import.Transaction.Forks.option_key()][:timeout] || Import.Transaction.Forks.timeout(), timeout: options[Runner.Transaction.Forks.option_key()][:timeout] || Runner.Transaction.Forks.timeout(),
timestamps: timestamps, timestamps: timestamps,
where_forked: where_forked where_forked: where_forked
}) })
@ -56,7 +57,7 @@ defmodule Explorer.Chain.Import.Blocks do
|> Multi.run(:fork_transactions, fn repo, _ -> |> Multi.run(:fork_transactions, fn repo, _ ->
fork_transactions(%{ fork_transactions(%{
repo: repo, repo: repo,
timeout: options[Import.Transactions.option_key()][:timeout] || Import.Transactions.timeout(), timeout: options[Runner.Transactions.option_key()][:timeout] || Runner.Transactions.timeout(),
timestamps: timestamps, timestamps: timestamps,
where_forked: where_forked where_forked: where_forked
}) })
@ -73,8 +74,8 @@ defmodule Explorer.Chain.Import.Blocks do
blocks, blocks,
%{ %{
timeout: timeout:
options[Import.Block.SecondDegreeRelations.option_key()][:timeout] || options[Runner.Block.SecondDegreeRelations.option_key()][:timeout] ||
Import.Block.SecondDegreeRelations.timeout(), Runner.Block.SecondDegreeRelations.timeout(),
timestamps: timestamps timestamps: timestamps
} }
) )
@ -106,7 +107,7 @@ defmodule Explorer.Chain.Import.Blocks do
) )
end end
@impl Import.Runner @impl Runner
def timeout, do: @timeout def timeout, do: @timeout
# sobelow_skip ["SQL.Query"] # sobelow_skip ["SQL.Query"]
@ -182,7 +183,7 @@ defmodule Explorer.Chain.Import.Blocks do
end end
@spec insert(Repo.t(), [map()], %{ @spec insert(Repo.t(), [map()], %{
optional(:on_conflict) => Import.Runner.on_conflict(), optional(:on_conflict) => Runner.on_conflict(),
required(:timeout) => timeout, required(:timeout) => timeout,
required(:timestamps) => Import.timestamps() required(:timestamps) => Import.timestamps()
}) :: {:ok, [Block.t()]} | {:error, [Changeset.t()]} }) :: {:ok, [Block.t()]} | {:error, [Changeset.t()]}

@ -1,4 +1,4 @@
defmodule Explorer.Chain.Import.InternalTransactions do defmodule Explorer.Chain.Import.Runner.InternalTransactions do
@moduledoc """ @moduledoc """
Bulk imports `t:Explorer.Chain.InternalTransactions.t/0`. Bulk imports `t:Explorer.Chain.InternalTransactions.t/0`.
""" """
@ -7,10 +7,11 @@ defmodule Explorer.Chain.Import.InternalTransactions do
alias Ecto.{Changeset, Multi, Repo} alias Ecto.{Changeset, Multi, Repo}
alias Explorer.Chain.{Hash, Import, InternalTransaction, Transaction} alias Explorer.Chain.{Hash, Import, InternalTransaction, Transaction}
alias Explorer.Chain.Import.Runner
import Ecto.Query, only: [from: 2] import Ecto.Query, only: [from: 2]
@behaviour Import.Runner @behaviour Runner
# milliseconds # milliseconds
@timeout 60_000 @timeout 60_000
@ -19,13 +20,13 @@ defmodule Explorer.Chain.Import.InternalTransactions do
%{required(:index) => non_neg_integer(), required(:transaction_hash) => Hash.Full.t()} %{required(:index) => non_neg_integer(), required(:transaction_hash) => Hash.Full.t()}
] ]
@impl Import.Runner @impl Runner
def ecto_schema_module, do: InternalTransaction def ecto_schema_module, do: InternalTransaction
@impl Import.Runner @impl Runner
def option_key, do: :internal_transactions def option_key, do: :internal_transactions
@impl Import.Runner @impl Runner
def imported_table_row do def imported_table_row do
%{ %{
value_type: "[%{index: non_neg_integer(), transaction_hash: Explorer.Chain.Hash.t()}]", value_type: "[%{index: non_neg_integer(), transaction_hash: Explorer.Chain.Hash.t()}]",
@ -33,7 +34,7 @@ defmodule Explorer.Chain.Import.InternalTransactions do
} }
end end
@impl Import.Runner @impl Runner
def run(multi, changes_list, %{timestamps: timestamps} = options) when is_map(options) do def run(multi, changes_list, %{timestamps: timestamps} = options) when is_map(options) do
insert_options = insert_options =
options options
@ -42,7 +43,7 @@ defmodule Explorer.Chain.Import.InternalTransactions do
|> Map.put_new(:timeout, @timeout) |> Map.put_new(:timeout, @timeout)
|> Map.put(:timestamps, timestamps) |> Map.put(:timestamps, timestamps)
transactions_timeout = options[Import.Transactions.option_key()][:timeout] || Import.Transactions.timeout() transactions_timeout = options[Runner.Transactions.option_key()][:timeout] || Runner.Transactions.timeout()
update_transactions_options = %{timeout: transactions_timeout, timestamps: timestamps} update_transactions_options = %{timeout: transactions_timeout, timestamps: timestamps}
@ -57,11 +58,11 @@ defmodule Explorer.Chain.Import.InternalTransactions do
end) end)
end end
@impl Import.Runner @impl Runner
def timeout, do: @timeout def timeout, do: @timeout
@spec insert(Repo.t(), [map], %{ @spec insert(Repo.t(), [map], %{
optional(:on_conflict) => Import.Runner.on_conflict(), optional(:on_conflict) => Runner.on_conflict(),
required(:timeout) => timeout, required(:timeout) => timeout,
required(:timestamps) => Import.timestamps() required(:timestamps) => Import.timestamps()
}) :: }) ::

@ -1,4 +1,4 @@
defmodule Explorer.Chain.Import.Logs do defmodule Explorer.Chain.Import.Runner.Logs do
@moduledoc """ @moduledoc """
Bulk imports `t:Explorer.Chain.Log.t/0`. Bulk imports `t:Explorer.Chain.Log.t/0`.
""" """

@ -1,4 +1,4 @@
defmodule Explorer.Chain.Import.TokenTransfers do defmodule Explorer.Chain.Import.Runner.TokenTransfers do
@moduledoc """ @moduledoc """
Bulk imports `t:Explorer.Chain.TokenTransfer.t/0`. Bulk imports `t:Explorer.Chain.TokenTransfer.t/0`.
""" """

@ -1,4 +1,4 @@
defmodule Explorer.Chain.Import.Tokens do defmodule Explorer.Chain.Import.Runner.Tokens do
@moduledoc """ @moduledoc """
Bulk imports `t:Explorer.Chain.Token.t/0`. Bulk imports `t:Explorer.Chain.Token.t/0`.
""" """

@ -1,4 +1,4 @@
defmodule Explorer.Chain.Import.Transaction.Forks do defmodule Explorer.Chain.Import.Runner.Transaction.Forks do
@moduledoc """ @moduledoc """
Bulk imports `t:Explorer.Chain.Transaction.Fork.t/0`. Bulk imports `t:Explorer.Chain.Transaction.Fork.t/0`.
""" """

@ -1,4 +1,4 @@
defmodule Explorer.Chain.Import.Transactions do defmodule Explorer.Chain.Import.Runner.Transactions do
@moduledoc """ @moduledoc """
Bulk imports `t:Explorer.Chain.Transaction.t/0`. Bulk imports `t:Explorer.Chain.Transaction.t/0`.
""" """

@ -0,0 +1,50 @@
defmodule Explorer.Chain.Import.Stage do
@moduledoc """
Behaviour used to chunk `changes_list` into multiple `t:Ecto.Multi.t/0`` that can run in separate transactions to
limit the time that transactions take and how long blocking locks are held in Postgres.
"""
alias Ecto.Multi
alias Explorer.Chain.Import.Runner
@typedoc """
Maps `t:Explorer.Chain.Import.Runner.t/0` callback module to the `t:Explorer.Chain.Import.Runner.changes_list/0` it
can import.
"""
@type runner_to_changes_list :: %{Runner.t() => Runner.changes_list()}
@doc """
The runners consumed by this stage in `c:multis/0`. The list should be in the order that the runners are executed.
"""
@callback runners() :: [Runner.t(), ...]
@doc """
Chunks `changes_list` into 1 or more `t:Ecto.Multi.t/0` that can be run in separate transactions.
The runners used by the stage should be removed from the returned `runner_to_changes_list` map.
"""
@callback multis(runner_to_changes_list, %{optional(atom()) => term()}) :: {[Multi.t()], runner_to_changes_list}
@doc """
Uses a single `t:Explorer.Chain.Runner.t/0` and chunks the `changes_list` across multiple `t:Ecto.Multi.t/0`
"""
@spec chunk_every(runner_to_changes_list, Runner.t(), chunk_size :: pos_integer(), %{optional(atom()) => term()}) ::
{[Multi.t()], runner_to_changes_list}
def chunk_every(runner_to_changes_list, runner, chunk_size, options)
when is_map(runner_to_changes_list) and is_atom(runner) and is_integer(chunk_size) and is_map(options) do
{changes_list, unstaged_runner_to_changes_list} = Map.pop(runner_to_changes_list, runner)
multis = changes_list_chunk_every(changes_list, chunk_size, runner, options)
{multis, unstaged_runner_to_changes_list}
end
defp changes_list_chunk_every(nil, _, _, _), do: []
defp changes_list_chunk_every(changes_list, chunk_size, runner, options) do
changes_list
|> Stream.chunk_every(chunk_size)
|> Enum.map(fn changes_chunk ->
runner.run(Multi.new(), changes_chunk, options)
end)
end
end

@ -0,0 +1,50 @@
defmodule Explorer.Chain.Import.Stage.AddressReferencing do
@moduledoc """
Imports any tables that reference `t:Explorer.Chain.Address.t/0` and that were imported by
`Explorer.Chain.Import.Stage.Addresses`.
"""
alias Ecto.Multi
alias Explorer.Chain.Import.{Runner, Stage}
@behaviour Stage
@impl Stage
def runners,
do: [
Runner.Address.CoinBalances,
Runner.Blocks,
Runner.Block.Rewards,
Runner.Block.SecondDegreeRelations,
Runner.Transactions,
Runner.Transaction.Forks,
Runner.InternalTransactions,
Runner.Logs,
Runner.Tokens,
Runner.TokenTransfers,
Runner.Address.CurrentTokenBalances,
Runner.Address.TokenBalances
]
@impl Stage
def multis(runner_to_changes_list, options) do
{final_multi, final_remaining_runner_to_changes_list} =
runners()
|> Enum.reduce({Multi.new(), runner_to_changes_list}, fn runner, {multi, remaining_runner_to_changes_list} ->
{changes_list, new_remaining_runner_to_changes_list} = Map.pop(remaining_runner_to_changes_list, runner)
new_multi =
case changes_list do
nil ->
multi
_ ->
runner.run(multi, changes_list, options)
end
{new_multi, new_remaining_runner_to_changes_list}
end)
{[final_multi], final_remaining_runner_to_changes_list}
end
end

@ -0,0 +1,22 @@
defmodule Explorer.Chain.Import.Stage.Addresses do
@moduledoc """
Imports addresses before anything else that references them because an unused address is still valid and recoverable
if the other stage(s) don't commit.
"""
alias Explorer.Chain.Import.{Runner, Stage}
@behaviour Stage
@runner Runner.Addresses
@impl Stage
def runners, do: [@runner]
@chunk_size 50
@impl Stage
def multis(runner_to_changes_list, options) do
Stage.chunk_every(runner_to_changes_list, @runner, @chunk_size, options)
end
end

@ -0,0 +1,19 @@
defmodule Explorer.Logger do
@moduledoc """
Helpers for `Logger`.
"""
@doc """
Sets `keyword` in `Logger.metadata/1` around `fun`.
"""
def metadata(fun, keyword) when is_function(fun, 0) and is_list(keyword) do
metadata_before = Logger.metadata()
try do
Logger.metadata(keyword)
fun.()
after
Logger.reset_metadata(metadata_before)
end
end
end

@ -13,6 +13,22 @@ defmodule Explorer.Repo do
{:ok, Keyword.put(opts, :url, System.get_env("DATABASE_URL"))} {:ok, Keyword.put(opts, :url, System.get_env("DATABASE_URL"))}
end end
def logged_transaction(fun_or_multi, opts \\ []) do
transaction_id = :erlang.unique_integer([:positive])
Explorer.Logger.metadata(
fn ->
{microseconds, value} = :timer.tc(__MODULE__, :transaction, [fun_or_multi, opts])
milliseconds = div(microseconds, 100) / 10.0
Logger.debug(["transaction_time=", :io_lib_format.fwrite_g(milliseconds), ?m, ?s])
value
end,
transaction_id: transaction_id
)
end
@doc """ @doc """
Chunks elements into multiple `insert_all`'s to avoid DB driver param limits. Chunks elements into multiple `insert_all`'s to avoid DB driver param limits.

@ -1,8 +1,8 @@
defmodule Explorer.Chain.Import.Address.CurrentTokenBalancesTest do defmodule Explorer.Chain.Import.Runner.Address.CurrentTokenBalancesTest do
use Explorer.DataCase use Explorer.DataCase
alias Explorer.Chain.Address.CurrentTokenBalance alias Explorer.Chain.Address.CurrentTokenBalance
alias Explorer.Chain.Import.Address.CurrentTokenBalances alias Explorer.Chain.Import.Runner.Address.CurrentTokenBalances
alias Explorer.Repo alias Explorer.Repo
describe "insert/2" do describe "insert/2" do
Loading…
Cancel
Save