From d36dac0dc21f70e7e5ac8f31279c7352a281af86 Mon Sep 17 00:00:00 2001 From: Luke Imhoff Date: Mon, 17 Dec 2018 13:29:30 -0600 Subject: [PATCH] Import stages Stages will be between Import and the Import.Runner. Unlike Runners, which use 1 common Ecto.Multi, a Stage can produce 1 or more independent Ecto.Multi that will run in separate transactions. This allows addresses to be chunked into separate transactions, which releases the locks between transactions unlike the INSERT-level chunking that was all in 1 transaction. --- .credo.exs | 2 +- apps/explorer/lib/explorer/chain.ex | 13 ++- apps/explorer/lib/explorer/chain/import.ex | 101 ++++++++++-------- .../lib/explorer/chain/import/runner.ex | 18 +++- .../{ => runner}/address/coin_balances.ex | 2 +- .../address/current_token_balances.ex | 2 +- .../{ => runner}/address/token_balances.ex | 2 +- .../chain/import/{ => runner}/addresses.ex | 2 +- .../block/second_degree_relations.ex | 2 +- .../import/{ => runner}/block_rewards.ex | 2 +- .../chain/import/{ => runner}/blocks.ex | 25 ++--- .../{ => runner}/internal_transactions.ex | 19 ++-- .../chain/import/{ => runner}/logs.ex | 2 +- .../import/{ => runner}/token_transfers.ex | 2 +- .../chain/import/{ => runner}/tokens.ex | 2 +- .../import/{ => runner}/transaction/forks.ex | 2 +- .../chain/import/{ => runner}/transactions.ex | 2 +- .../lib/explorer/chain/import/stage.ex | 50 +++++++++ .../chain/import/stage/address_referencing.ex | 50 +++++++++ .../explorer/chain/import/stage/addresses.ex | 22 ++++ apps/explorer/lib/explorer/logger.ex | 19 ++++ apps/explorer/lib/explorer/repo.ex | 16 +++ .../address/current_token_balances_test.exs | 4 +- 23 files changed, 273 insertions(+), 88 deletions(-) rename apps/explorer/lib/explorer/chain/import/{ => runner}/address/coin_balances.ex (98%) rename apps/explorer/lib/explorer/chain/import/{ => runner}/address/current_token_balances.ex (98%) rename apps/explorer/lib/explorer/chain/import/{ => runner}/address/token_balances.ex (98%) rename apps/explorer/lib/explorer/chain/import/{ => runner}/addresses.ex (98%) rename apps/explorer/lib/explorer/chain/import/{ => runner}/block/second_degree_relations.ex (97%) rename apps/explorer/lib/explorer/chain/import/{ => runner}/block_rewards.ex (97%) rename apps/explorer/lib/explorer/chain/import/{ => runner}/blocks.ex (93%) rename apps/explorer/lib/explorer/chain/import/{ => runner}/internal_transactions.ex (94%) rename apps/explorer/lib/explorer/chain/import/{ => runner}/logs.ex (98%) rename apps/explorer/lib/explorer/chain/import/{ => runner}/token_transfers.ex (98%) rename apps/explorer/lib/explorer/chain/import/{ => runner}/tokens.ex (98%) rename apps/explorer/lib/explorer/chain/import/{ => runner}/transaction/forks.ex (97%) rename apps/explorer/lib/explorer/chain/import/{ => runner}/transactions.ex (98%) create mode 100644 apps/explorer/lib/explorer/chain/import/stage.ex create mode 100644 apps/explorer/lib/explorer/chain/import/stage/address_referencing.ex create mode 100644 apps/explorer/lib/explorer/chain/import/stage/addresses.ex create mode 100644 apps/explorer/lib/explorer/logger.ex rename apps/explorer/test/explorer/chain/import/{address => runner}/address/current_token_balances_test.exs (95%) diff --git a/.credo.exs b/.credo.exs index f7c4edb32a..4dda48adf4 100644 --- a/.credo.exs +++ b/.credo.exs @@ -75,7 +75,7 @@ # Priority values are: `low, normal, high, higher` # {Credo.Check.Design.AliasUsage, - excluded_namespaces: ~w(Block Blocks Import Socket SpandexDatadog Task), + excluded_namespaces: ~w(Block Blocks Import Runner Socket SpandexDatadog Task), excluded_lastnames: ~w(Address DateTime Exporter Fetcher Full Instrumenter Logger Monitor Name Number Repo Spec Time Unit), priority: :low}, diff --git a/apps/explorer/lib/explorer/chain.ex b/apps/explorer/lib/explorer/chain.ex index 80f7e1a1b4..ec107ff540 100644 --- a/apps/explorer/lib/explorer/chain.ex +++ b/apps/explorer/lib/explorer/chain.ex @@ -20,8 +20,6 @@ defmodule Explorer.Chain do alias Ecto.Adapters.SQL alias Ecto.Multi - alias Explorer.Chain - alias Explorer.Chain.{ Address, Address.CoinBalance, @@ -41,6 +39,7 @@ defmodule Explorer.Chain do } alias Explorer.Chain.Block.EmissionReward + alias Explorer.Chain.Import.Runner alias Explorer.{PagingOptions, Repo} alias Explorer.Counters.{ @@ -632,13 +631,13 @@ defmodule Explorer.Chain do """ @spec find_or_insert_address_from_hash(Hash.Address.t()) :: {:ok, Address.t()} def find_or_insert_address_from_hash(%Hash{byte_count: unquote(Hash.Address.byte_count())} = hash) do - case Chain.hash_to_address(hash) do + case hash_to_address(hash) do {:ok, address} -> {:ok, address} {:error, :not_found} -> - Chain.create_address(%{hash: to_string(hash)}) - Chain.hash_to_address(hash) + create_address(%{hash: to_string(hash)}) + hash_to_address(hash) end end @@ -1968,7 +1967,7 @@ defmodule Explorer.Chain do ) :: {:ok, accumulator} when accumulator: term() def stream_cataloged_token_contract_address_hashes(initial, reducer) when is_function(reducer, 2) do - Chain.Token.cataloged_tokens() + Token.cataloged_tokens() |> order_by(asc: :updated_at) |> Repo.stream_reduce(initial, reducer) end @@ -2048,7 +2047,7 @@ defmodule Explorer.Chain do token_changeset = Token.changeset(token, params) address_name_changeset = Address.Name.changeset(%Address.Name{}, Map.put(params, :address_hash, address_hash)) - token_opts = [on_conflict: Import.Tokens.default_on_conflict(), conflict_target: :contract_address_hash] + token_opts = [on_conflict: Runner.Tokens.default_on_conflict(), conflict_target: :contract_address_hash] address_name_opts = [on_conflict: :nothing, conflict_target: [:address_hash, :name]] insert_result = diff --git a/apps/explorer/lib/explorer/chain/import.ex b/apps/explorer/lib/explorer/chain/import.ex index f6b44f46a5..43b5a66f89 100644 --- a/apps/explorer/lib/explorer/chain/import.ex +++ b/apps/explorer/lib/explorer/chain/import.ex @@ -3,27 +3,18 @@ defmodule Explorer.Chain.Import do Bulk importing of data into `Explorer.Repo` """ - alias Ecto.{Changeset, Multi} + alias Ecto.Changeset alias Explorer.Chain.Import alias Explorer.Repo - # in order so that foreign keys are inserted before being referenced - @runners [ - Import.Addresses, - Import.Address.CoinBalances, - Import.Blocks, - Import.Block.Rewards, - Import.Block.SecondDegreeRelations, - Import.Transactions, - Import.Transaction.Forks, - Import.InternalTransactions, - Import.Logs, - Import.Tokens, - Import.TokenTransfers, - Import.Address.CurrentTokenBalances, - Import.Address.TokenBalances + @stages [ + Import.Stage.Addresses, + Import.Stage.AddressReferencing ] + # in order so that foreign keys are inserted before being referenced + @runners Enum.flat_map(@stages, fn stage -> stage.runners() end) + quoted_runner_option_value = quote do Import.Runner.options() @@ -129,8 +120,8 @@ defmodule Explorer.Chain.Import do def all(options) when is_map(options) do with {:ok, runner_options_pairs} <- validate_options(options), {:ok, valid_runner_option_pairs} <- validate_runner_options_pairs(runner_options_pairs), - {:ok, runner_changes_list_pairs} <- runner_changes_list_pairs(valid_runner_option_pairs), - {:ok, data} <- insert_runner_changes_list_pairs(runner_changes_list_pairs, options) do + {:ok, runner_to_changes_list} <- runner_to_changes_list(valid_runner_option_pairs), + {:ok, data} <- insert_runner_to_changes_list(runner_to_changes_list, options) do broadcast_events(data, Map.get(options, :broadcast, false)) {:ok, data} end @@ -153,25 +144,22 @@ defmodule Explorer.Chain.Import do end) end - defp runner_changes_list_pairs(runner_options_pairs) when is_list(runner_options_pairs) do - {status, reversed} = - runner_options_pairs - |> Stream.map(fn {runner, options} -> runner_changes_list(runner, options) end) - |> Enum.reduce({:ok, []}, fn - {:ok, runner_changes_pair}, {:ok, acc_runner_changes_pairs} -> - {:ok, [runner_changes_pair | acc_runner_changes_pairs]} + defp runner_to_changes_list(runner_options_pairs) when is_list(runner_options_pairs) do + runner_options_pairs + |> Stream.map(fn {runner, options} -> runner_changes_list(runner, options) end) + |> Enum.reduce({:ok, %{}}, fn + {:ok, {runner, changes_list}}, {:ok, acc_runner_to_changes_list} -> + {:ok, Map.put(acc_runner_to_changes_list, runner, changes_list)} - {:ok, _}, {:error, _} = error -> - error - - {:error, _} = error, {:ok, _} -> - error + {:ok, _}, {:error, _} = error -> + error - {:error, runner_changesets}, {:error, acc_changesets} -> - {:error, acc_changesets ++ runner_changesets} - end) + {:error, _} = error, {:ok, _} -> + error - {status, Enum.reverse(reversed)} + {:error, runner_changesets}, {:error, acc_changesets} -> + {:error, acc_changesets ++ runner_changesets} + end) end defp runner_changes_list(runner, %{params: params} = options) do @@ -286,14 +274,22 @@ defmodule Explorer.Chain.Import do end end - defp runner_changes_list_pairs_to_multi(runner_changes_list_pairs, options) - when is_list(runner_changes_list_pairs) and is_map(options) do + defp runner_to_changes_list_to_multis(runner_to_changes_list, options) + when is_map(runner_to_changes_list) and is_map(options) do timestamps = timestamps() full_options = Map.put(options, :timestamps, timestamps) - Enum.reduce(runner_changes_list_pairs, Multi.new(), fn {runner, changes_list}, acc -> - runner.run(acc, changes_list, full_options) - end) + {multis, final_runner_to_changes_list} = + Enum.flat_map_reduce(@stages, runner_to_changes_list, fn stage, remaining_runner_to_changes_list -> + stage.multis(remaining_runner_to_changes_list, full_options) + end) + + unless Enum.empty?(final_runner_to_changes_list) do + raise ArgumentError, + "No stages consumed the following runners: #{final_runner_to_changes_list |> Map.keys() |> inspect()}" + end + + multis end def insert_changes_list(repo, changes_list, options) when is_atom(repo) and is_list(changes_list) do @@ -319,14 +315,29 @@ defmodule Explorer.Chain.Import do Map.merge(changes, timestamps) end - defp import_transaction(multi, options) when is_map(options) do - Repo.transaction(multi, timeout: Map.get(options, :timeout, @transaction_timeout)) + defp insert_runner_to_changes_list(runner_to_changes_list, options) when is_map(runner_to_changes_list) do + runner_to_changes_list + |> runner_to_changes_list_to_multis(options) + |> logged_import(options) + end + + defp logged_import(multis, options) when is_list(multis) and is_map(options) do + import_id = :erlang.unique_integer([:positive]) + + Explorer.Logger.metadata(fn -> import_transactions(multis, options) end, import_id: import_id) end - defp insert_runner_changes_list_pairs(runner_changes_list_pairs, options) do - runner_changes_list_pairs - |> runner_changes_list_pairs_to_multi(options) - |> import_transaction(options) + defp import_transactions(multis, options) when is_list(multis) and is_map(options) do + Enum.reduce_while(multis, {:ok, %{}}, fn multi, {:ok, acc_changes} -> + case import_transaction(multi, options) do + {:ok, changes} -> {:cont, {:ok, Map.merge(acc_changes, changes)}} + {:error, _, _, _} = error -> {:halt, error} + end + end) + end + + defp import_transaction(multi, options) when is_map(options) do + Repo.logged_transaction(multi, timeout: Map.get(options, :timeout, @transaction_timeout)) end @spec timestamps() :: timestamps diff --git a/apps/explorer/lib/explorer/chain/import/runner.ex b/apps/explorer/lib/explorer/chain/import/runner.ex index 16f3eaa2cf..9600646a64 100644 --- a/apps/explorer/lib/explorer/chain/import/runner.ex +++ b/apps/explorer/lib/explorer/chain/import/runner.ex @@ -5,6 +5,22 @@ defmodule Explorer.Chain.Import.Runner do alias Ecto.Multi + @typedoc """ + A callback module that implements this module's behaviour. + """ + @type t :: module + + @typedoc """ + Validated changes extracted from a valid `Ecto.Changeset` produced by the `t:changeset_function_name/0` in + `c:ecto_schemma_module/0`. + """ + @type changes :: %{optional(atom) => term()} + + @typedoc """ + A list of `t:changes/0` to be imported by `c:run/3`. + """ + @type changes_list :: [changes] + @type changeset_function_name :: atom @type on_conflict :: :nothing | :replace_all | Ecto.Query.t() @@ -32,6 +48,6 @@ defmodule Explorer.Chain.Import.Runner do The `Ecto.Schema` module that contains the `:changeset` function for validating `options[options_key][:params]`. """ @callback ecto_schema_module() :: module() - @callback run(Multi.t(), changes_list :: [%{optional(atom()) => term()}], %{optional(atom()) => term()}) :: Multi.t() + @callback run(Multi.t(), changes_list, %{optional(atom()) => term()}) :: Multi.t() @callback timeout() :: timeout() end diff --git a/apps/explorer/lib/explorer/chain/import/address/coin_balances.ex b/apps/explorer/lib/explorer/chain/import/runner/address/coin_balances.ex similarity index 98% rename from apps/explorer/lib/explorer/chain/import/address/coin_balances.ex rename to apps/explorer/lib/explorer/chain/import/runner/address/coin_balances.ex index d938018959..7df1a28c82 100644 --- a/apps/explorer/lib/explorer/chain/import/address/coin_balances.ex +++ b/apps/explorer/lib/explorer/chain/import/runner/address/coin_balances.ex @@ -1,4 +1,4 @@ -defmodule Explorer.Chain.Import.Address.CoinBalances do +defmodule Explorer.Chain.Import.Runner.Address.CoinBalances do @moduledoc """ Bulk imports `t:Explorer.Chain.Address.CoinBalance.t/0`. """ diff --git a/apps/explorer/lib/explorer/chain/import/address/current_token_balances.ex b/apps/explorer/lib/explorer/chain/import/runner/address/current_token_balances.ex similarity index 98% rename from apps/explorer/lib/explorer/chain/import/address/current_token_balances.ex rename to apps/explorer/lib/explorer/chain/import/runner/address/current_token_balances.ex index b1b101e50e..e07b39abc0 100644 --- a/apps/explorer/lib/explorer/chain/import/address/current_token_balances.ex +++ b/apps/explorer/lib/explorer/chain/import/runner/address/current_token_balances.ex @@ -1,4 +1,4 @@ -defmodule Explorer.Chain.Import.Address.CurrentTokenBalances do +defmodule Explorer.Chain.Import.Runner.Address.CurrentTokenBalances do @moduledoc """ Bulk imports `t:Explorer.Chain.Address.CurrentTokenBalance.t/0`. """ diff --git a/apps/explorer/lib/explorer/chain/import/address/token_balances.ex b/apps/explorer/lib/explorer/chain/import/runner/address/token_balances.ex similarity index 98% rename from apps/explorer/lib/explorer/chain/import/address/token_balances.ex rename to apps/explorer/lib/explorer/chain/import/runner/address/token_balances.ex index 848a7bb0ac..2a8b98e8ba 100644 --- a/apps/explorer/lib/explorer/chain/import/address/token_balances.ex +++ b/apps/explorer/lib/explorer/chain/import/runner/address/token_balances.ex @@ -1,4 +1,4 @@ -defmodule Explorer.Chain.Import.Address.TokenBalances do +defmodule Explorer.Chain.Import.Runner.Address.TokenBalances do @moduledoc """ Bulk imports `t:Explorer.Chain.Address.TokenBalance.t/0`. """ diff --git a/apps/explorer/lib/explorer/chain/import/addresses.ex b/apps/explorer/lib/explorer/chain/import/runner/addresses.ex similarity index 98% rename from apps/explorer/lib/explorer/chain/import/addresses.ex rename to apps/explorer/lib/explorer/chain/import/runner/addresses.ex index 49ba5d1eb1..75681efdaa 100644 --- a/apps/explorer/lib/explorer/chain/import/addresses.ex +++ b/apps/explorer/lib/explorer/chain/import/runner/addresses.ex @@ -1,4 +1,4 @@ -defmodule Explorer.Chain.Import.Addresses do +defmodule Explorer.Chain.Import.Runner.Addresses do @moduledoc """ Bulk imports `t:Explorer.Chain.Address.t/0`. """ diff --git a/apps/explorer/lib/explorer/chain/import/block/second_degree_relations.ex b/apps/explorer/lib/explorer/chain/import/runner/block/second_degree_relations.ex similarity index 97% rename from apps/explorer/lib/explorer/chain/import/block/second_degree_relations.ex rename to apps/explorer/lib/explorer/chain/import/runner/block/second_degree_relations.ex index a812a4e9f9..7c91b3b7b2 100644 --- a/apps/explorer/lib/explorer/chain/import/block/second_degree_relations.ex +++ b/apps/explorer/lib/explorer/chain/import/runner/block/second_degree_relations.ex @@ -1,4 +1,4 @@ -defmodule Explorer.Chain.Import.Block.SecondDegreeRelations do +defmodule Explorer.Chain.Import.Runner.Block.SecondDegreeRelations do @moduledoc """ Bulk imports `t:Explorer.Chain.Block.SecondDegreeRelation.t/0`. """ diff --git a/apps/explorer/lib/explorer/chain/import/block_rewards.ex b/apps/explorer/lib/explorer/chain/import/runner/block_rewards.ex similarity index 97% rename from apps/explorer/lib/explorer/chain/import/block_rewards.ex rename to apps/explorer/lib/explorer/chain/import/runner/block_rewards.ex index 40f42c7358..e9ee2abcd6 100644 --- a/apps/explorer/lib/explorer/chain/import/block_rewards.ex +++ b/apps/explorer/lib/explorer/chain/import/runner/block_rewards.ex @@ -1,4 +1,4 @@ -defmodule Explorer.Chain.Import.Block.Rewards do +defmodule Explorer.Chain.Import.Runner.Block.Rewards do @moduledoc """ Bulk imports `t:Explorer.Chain.Block.Reward.t/0`. """ diff --git a/apps/explorer/lib/explorer/chain/import/blocks.ex b/apps/explorer/lib/explorer/chain/import/runner/blocks.ex similarity index 93% rename from apps/explorer/lib/explorer/chain/import/blocks.ex rename to apps/explorer/lib/explorer/chain/import/runner/blocks.ex index 2552eea1f5..cb174499d6 100644 --- a/apps/explorer/lib/explorer/chain/import/blocks.ex +++ b/apps/explorer/lib/explorer/chain/import/runner/blocks.ex @@ -1,4 +1,4 @@ -defmodule Explorer.Chain.Import.Blocks do +defmodule Explorer.Chain.Import.Runner.Blocks do @moduledoc """ Bulk imports `t:Explorer.Chain.Block.t/0`. """ @@ -10,21 +10,22 @@ defmodule Explorer.Chain.Import.Blocks do alias Ecto.Adapters.SQL alias Ecto.{Changeset, Multi, Repo} alias Explorer.Chain.{Block, Import, InternalTransaction, Transaction} + alias Explorer.Chain.Import.Runner - @behaviour Import.Runner + @behaviour Runner # milliseconds @timeout 60_000 @type imported :: [Block.t()] - @impl Import.Runner + @impl Runner def ecto_schema_module, do: Block - @impl Import.Runner + @impl Runner def option_key, do: :blocks - @impl Import.Runner + @impl Runner def imported_table_row do %{ value_type: "[#{ecto_schema_module()}.t()]", @@ -32,7 +33,7 @@ defmodule Explorer.Chain.Import.Blocks do } end - @impl Import.Runner + @impl Runner def run(multi, changes_list, %{timestamps: timestamps} = options) do insert_options = options @@ -47,7 +48,7 @@ defmodule Explorer.Chain.Import.Blocks do |> Multi.run(:derive_transaction_forks, fn repo, _ -> derive_transaction_forks(%{ repo: repo, - timeout: options[Import.Transaction.Forks.option_key()][:timeout] || Import.Transaction.Forks.timeout(), + timeout: options[Runner.Transaction.Forks.option_key()][:timeout] || Runner.Transaction.Forks.timeout(), timestamps: timestamps, where_forked: where_forked }) @@ -56,7 +57,7 @@ defmodule Explorer.Chain.Import.Blocks do |> Multi.run(:fork_transactions, fn repo, _ -> fork_transactions(%{ repo: repo, - timeout: options[Import.Transactions.option_key()][:timeout] || Import.Transactions.timeout(), + timeout: options[Runner.Transactions.option_key()][:timeout] || Runner.Transactions.timeout(), timestamps: timestamps, where_forked: where_forked }) @@ -73,8 +74,8 @@ defmodule Explorer.Chain.Import.Blocks do blocks, %{ timeout: - options[Import.Block.SecondDegreeRelations.option_key()][:timeout] || - Import.Block.SecondDegreeRelations.timeout(), + options[Runner.Block.SecondDegreeRelations.option_key()][:timeout] || + Runner.Block.SecondDegreeRelations.timeout(), timestamps: timestamps } ) @@ -106,7 +107,7 @@ defmodule Explorer.Chain.Import.Blocks do ) end - @impl Import.Runner + @impl Runner def timeout, do: @timeout # sobelow_skip ["SQL.Query"] @@ -182,7 +183,7 @@ defmodule Explorer.Chain.Import.Blocks do end @spec insert(Repo.t(), [map()], %{ - optional(:on_conflict) => Import.Runner.on_conflict(), + optional(:on_conflict) => Runner.on_conflict(), required(:timeout) => timeout, required(:timestamps) => Import.timestamps() }) :: {:ok, [Block.t()]} | {:error, [Changeset.t()]} diff --git a/apps/explorer/lib/explorer/chain/import/internal_transactions.ex b/apps/explorer/lib/explorer/chain/import/runner/internal_transactions.ex similarity index 94% rename from apps/explorer/lib/explorer/chain/import/internal_transactions.ex rename to apps/explorer/lib/explorer/chain/import/runner/internal_transactions.ex index 7e8ea25a92..024800951f 100644 --- a/apps/explorer/lib/explorer/chain/import/internal_transactions.ex +++ b/apps/explorer/lib/explorer/chain/import/runner/internal_transactions.ex @@ -1,4 +1,4 @@ -defmodule Explorer.Chain.Import.InternalTransactions do +defmodule Explorer.Chain.Import.Runner.InternalTransactions do @moduledoc """ Bulk imports `t:Explorer.Chain.InternalTransactions.t/0`. """ @@ -7,10 +7,11 @@ defmodule Explorer.Chain.Import.InternalTransactions do alias Ecto.{Changeset, Multi, Repo} alias Explorer.Chain.{Hash, Import, InternalTransaction, Transaction} + alias Explorer.Chain.Import.Runner import Ecto.Query, only: [from: 2] - @behaviour Import.Runner + @behaviour Runner # milliseconds @timeout 60_000 @@ -19,13 +20,13 @@ defmodule Explorer.Chain.Import.InternalTransactions do %{required(:index) => non_neg_integer(), required(:transaction_hash) => Hash.Full.t()} ] - @impl Import.Runner + @impl Runner def ecto_schema_module, do: InternalTransaction - @impl Import.Runner + @impl Runner def option_key, do: :internal_transactions - @impl Import.Runner + @impl Runner def imported_table_row do %{ value_type: "[%{index: non_neg_integer(), transaction_hash: Explorer.Chain.Hash.t()}]", @@ -33,7 +34,7 @@ defmodule Explorer.Chain.Import.InternalTransactions do } end - @impl Import.Runner + @impl Runner def run(multi, changes_list, %{timestamps: timestamps} = options) when is_map(options) do insert_options = options @@ -42,7 +43,7 @@ defmodule Explorer.Chain.Import.InternalTransactions do |> Map.put_new(:timeout, @timeout) |> Map.put(:timestamps, timestamps) - transactions_timeout = options[Import.Transactions.option_key()][:timeout] || Import.Transactions.timeout() + transactions_timeout = options[Runner.Transactions.option_key()][:timeout] || Runner.Transactions.timeout() update_transactions_options = %{timeout: transactions_timeout, timestamps: timestamps} @@ -57,11 +58,11 @@ defmodule Explorer.Chain.Import.InternalTransactions do end) end - @impl Import.Runner + @impl Runner def timeout, do: @timeout @spec insert(Repo.t(), [map], %{ - optional(:on_conflict) => Import.Runner.on_conflict(), + optional(:on_conflict) => Runner.on_conflict(), required(:timeout) => timeout, required(:timestamps) => Import.timestamps() }) :: diff --git a/apps/explorer/lib/explorer/chain/import/logs.ex b/apps/explorer/lib/explorer/chain/import/runner/logs.ex similarity index 98% rename from apps/explorer/lib/explorer/chain/import/logs.ex rename to apps/explorer/lib/explorer/chain/import/runner/logs.ex index 438e00fc8e..377182e22b 100644 --- a/apps/explorer/lib/explorer/chain/import/logs.ex +++ b/apps/explorer/lib/explorer/chain/import/runner/logs.ex @@ -1,4 +1,4 @@ -defmodule Explorer.Chain.Import.Logs do +defmodule Explorer.Chain.Import.Runner.Logs do @moduledoc """ Bulk imports `t:Explorer.Chain.Log.t/0`. """ diff --git a/apps/explorer/lib/explorer/chain/import/token_transfers.ex b/apps/explorer/lib/explorer/chain/import/runner/token_transfers.ex similarity index 98% rename from apps/explorer/lib/explorer/chain/import/token_transfers.ex rename to apps/explorer/lib/explorer/chain/import/runner/token_transfers.ex index afecbb7b97..4521dc41a7 100644 --- a/apps/explorer/lib/explorer/chain/import/token_transfers.ex +++ b/apps/explorer/lib/explorer/chain/import/runner/token_transfers.ex @@ -1,4 +1,4 @@ -defmodule Explorer.Chain.Import.TokenTransfers do +defmodule Explorer.Chain.Import.Runner.TokenTransfers do @moduledoc """ Bulk imports `t:Explorer.Chain.TokenTransfer.t/0`. """ diff --git a/apps/explorer/lib/explorer/chain/import/tokens.ex b/apps/explorer/lib/explorer/chain/import/runner/tokens.ex similarity index 98% rename from apps/explorer/lib/explorer/chain/import/tokens.ex rename to apps/explorer/lib/explorer/chain/import/runner/tokens.ex index d377f50784..1c7feece97 100644 --- a/apps/explorer/lib/explorer/chain/import/tokens.ex +++ b/apps/explorer/lib/explorer/chain/import/runner/tokens.ex @@ -1,4 +1,4 @@ -defmodule Explorer.Chain.Import.Tokens do +defmodule Explorer.Chain.Import.Runner.Tokens do @moduledoc """ Bulk imports `t:Explorer.Chain.Token.t/0`. """ diff --git a/apps/explorer/lib/explorer/chain/import/transaction/forks.ex b/apps/explorer/lib/explorer/chain/import/runner/transaction/forks.ex similarity index 97% rename from apps/explorer/lib/explorer/chain/import/transaction/forks.ex rename to apps/explorer/lib/explorer/chain/import/runner/transaction/forks.ex index b9571a39ab..d16172098d 100644 --- a/apps/explorer/lib/explorer/chain/import/transaction/forks.ex +++ b/apps/explorer/lib/explorer/chain/import/runner/transaction/forks.ex @@ -1,4 +1,4 @@ -defmodule Explorer.Chain.Import.Transaction.Forks do +defmodule Explorer.Chain.Import.Runner.Transaction.Forks do @moduledoc """ Bulk imports `t:Explorer.Chain.Transaction.Fork.t/0`. """ diff --git a/apps/explorer/lib/explorer/chain/import/transactions.ex b/apps/explorer/lib/explorer/chain/import/runner/transactions.ex similarity index 98% rename from apps/explorer/lib/explorer/chain/import/transactions.ex rename to apps/explorer/lib/explorer/chain/import/runner/transactions.ex index 3c8fb83e77..eb7fafc8b8 100644 --- a/apps/explorer/lib/explorer/chain/import/transactions.ex +++ b/apps/explorer/lib/explorer/chain/import/runner/transactions.ex @@ -1,4 +1,4 @@ -defmodule Explorer.Chain.Import.Transactions do +defmodule Explorer.Chain.Import.Runner.Transactions do @moduledoc """ Bulk imports `t:Explorer.Chain.Transaction.t/0`. """ diff --git a/apps/explorer/lib/explorer/chain/import/stage.ex b/apps/explorer/lib/explorer/chain/import/stage.ex new file mode 100644 index 0000000000..6ad73d2568 --- /dev/null +++ b/apps/explorer/lib/explorer/chain/import/stage.ex @@ -0,0 +1,50 @@ +defmodule Explorer.Chain.Import.Stage do + @moduledoc """ + Behaviour used to chunk `changes_list` into multiple `t:Ecto.Multi.t/0`` that can run in separate transactions to + limit the time that transactions take and how long blocking locks are held in Postgres. + """ + + alias Ecto.Multi + alias Explorer.Chain.Import.Runner + + @typedoc """ + Maps `t:Explorer.Chain.Import.Runner.t/0` callback module to the `t:Explorer.Chain.Import.Runner.changes_list/0` it + can import. + """ + @type runner_to_changes_list :: %{Runner.t() => Runner.changes_list()} + + @doc """ + The runners consumed by this stage in `c:multis/0`. The list should be in the order that the runners are executed. + """ + @callback runners() :: [Runner.t(), ...] + + @doc """ + Chunks `changes_list` into 1 or more `t:Ecto.Multi.t/0` that can be run in separate transactions. + + The runners used by the stage should be removed from the returned `runner_to_changes_list` map. + """ + @callback multis(runner_to_changes_list, %{optional(atom()) => term()}) :: {[Multi.t()], runner_to_changes_list} + + @doc """ + Uses a single `t:Explorer.Chain.Runner.t/0` and chunks the `changes_list` across multiple `t:Ecto.Multi.t/0` + """ + @spec chunk_every(runner_to_changes_list, Runner.t(), chunk_size :: pos_integer(), %{optional(atom()) => term()}) :: + {[Multi.t()], runner_to_changes_list} + def chunk_every(runner_to_changes_list, runner, chunk_size, options) + when is_map(runner_to_changes_list) and is_atom(runner) and is_integer(chunk_size) and is_map(options) do + {changes_list, unstaged_runner_to_changes_list} = Map.pop(runner_to_changes_list, runner) + multis = changes_list_chunk_every(changes_list, chunk_size, runner, options) + + {multis, unstaged_runner_to_changes_list} + end + + defp changes_list_chunk_every(nil, _, _, _), do: [] + + defp changes_list_chunk_every(changes_list, chunk_size, runner, options) do + changes_list + |> Stream.chunk_every(chunk_size) + |> Enum.map(fn changes_chunk -> + runner.run(Multi.new(), changes_chunk, options) + end) + end +end diff --git a/apps/explorer/lib/explorer/chain/import/stage/address_referencing.ex b/apps/explorer/lib/explorer/chain/import/stage/address_referencing.ex new file mode 100644 index 0000000000..ce79baf764 --- /dev/null +++ b/apps/explorer/lib/explorer/chain/import/stage/address_referencing.ex @@ -0,0 +1,50 @@ +defmodule Explorer.Chain.Import.Stage.AddressReferencing do + @moduledoc """ + Imports any tables that reference `t:Explorer.Chain.Address.t/0` and that were imported by + `Explorer.Chain.Import.Stage.Addresses`. + """ + + alias Ecto.Multi + alias Explorer.Chain.Import.{Runner, Stage} + + @behaviour Stage + + @impl Stage + def runners, + do: [ + Runner.Address.CoinBalances, + Runner.Blocks, + Runner.Block.Rewards, + Runner.Block.SecondDegreeRelations, + Runner.Transactions, + Runner.Transaction.Forks, + Runner.InternalTransactions, + Runner.Logs, + Runner.Tokens, + Runner.TokenTransfers, + Runner.Address.CurrentTokenBalances, + Runner.Address.TokenBalances + ] + + @impl Stage + def multis(runner_to_changes_list, options) do + {final_multi, final_remaining_runner_to_changes_list} = + runners() + |> Enum.reduce({Multi.new(), runner_to_changes_list}, fn runner, {multi, remaining_runner_to_changes_list} -> + {changes_list, new_remaining_runner_to_changes_list} = Map.pop(remaining_runner_to_changes_list, runner) + + new_multi = + case changes_list do + nil -> + multi + + _ -> + runner.run(multi, changes_list, options) + end + + {new_multi, new_remaining_runner_to_changes_list} + end) + + {[final_multi], final_remaining_runner_to_changes_list} + end +end diff --git a/apps/explorer/lib/explorer/chain/import/stage/addresses.ex b/apps/explorer/lib/explorer/chain/import/stage/addresses.ex new file mode 100644 index 0000000000..03c8a57724 --- /dev/null +++ b/apps/explorer/lib/explorer/chain/import/stage/addresses.ex @@ -0,0 +1,22 @@ +defmodule Explorer.Chain.Import.Stage.Addresses do + @moduledoc """ + Imports addresses before anything else that references them because an unused address is still valid and recoverable + if the other stage(s) don't commit. + """ + + alias Explorer.Chain.Import.{Runner, Stage} + + @behaviour Stage + + @runner Runner.Addresses + + @impl Stage + def runners, do: [@runner] + + @chunk_size 50 + + @impl Stage + def multis(runner_to_changes_list, options) do + Stage.chunk_every(runner_to_changes_list, @runner, @chunk_size, options) + end +end diff --git a/apps/explorer/lib/explorer/logger.ex b/apps/explorer/lib/explorer/logger.ex new file mode 100644 index 0000000000..56eac1c3f7 --- /dev/null +++ b/apps/explorer/lib/explorer/logger.ex @@ -0,0 +1,19 @@ +defmodule Explorer.Logger do + @moduledoc """ + Helpers for `Logger`. + """ + + @doc """ + Sets `keyword` in `Logger.metadata/1` around `fun`. + """ + def metadata(fun, keyword) when is_function(fun, 0) and is_list(keyword) do + metadata_before = Logger.metadata() + + try do + Logger.metadata(keyword) + fun.() + after + Logger.reset_metadata(metadata_before) + end + end +end diff --git a/apps/explorer/lib/explorer/repo.ex b/apps/explorer/lib/explorer/repo.ex index 17f3ce9119..9584987a4a 100644 --- a/apps/explorer/lib/explorer/repo.ex +++ b/apps/explorer/lib/explorer/repo.ex @@ -13,6 +13,22 @@ defmodule Explorer.Repo do {:ok, Keyword.put(opts, :url, System.get_env("DATABASE_URL"))} end + def logged_transaction(fun_or_multi, opts \\ []) do + transaction_id = :erlang.unique_integer([:positive]) + + Explorer.Logger.metadata( + fn -> + {microseconds, value} = :timer.tc(__MODULE__, :transaction, [fun_or_multi, opts]) + + milliseconds = div(microseconds, 100) / 10.0 + Logger.debug(["transaction_time=", :io_lib_format.fwrite_g(milliseconds), ?m, ?s]) + + value + end, + transaction_id: transaction_id + ) + end + @doc """ Chunks elements into multiple `insert_all`'s to avoid DB driver param limits. diff --git a/apps/explorer/test/explorer/chain/import/address/address/current_token_balances_test.exs b/apps/explorer/test/explorer/chain/import/runner/address/current_token_balances_test.exs similarity index 95% rename from apps/explorer/test/explorer/chain/import/address/address/current_token_balances_test.exs rename to apps/explorer/test/explorer/chain/import/runner/address/current_token_balances_test.exs index b9e89c92ec..ad49a64261 100644 --- a/apps/explorer/test/explorer/chain/import/address/address/current_token_balances_test.exs +++ b/apps/explorer/test/explorer/chain/import/runner/address/current_token_balances_test.exs @@ -1,8 +1,8 @@ -defmodule Explorer.Chain.Import.Address.CurrentTokenBalancesTest do +defmodule Explorer.Chain.Import.Runner.Address.CurrentTokenBalancesTest do use Explorer.DataCase alias Explorer.Chain.Address.CurrentTokenBalance - alias Explorer.Chain.Import.Address.CurrentTokenBalances + alias Explorer.Chain.Import.Runner.Address.CurrentTokenBalances alias Explorer.Repo describe "insert/2" do