From 1918040eeb06f9af4270dd4c4ecbdbc5070e65a7 Mon Sep 17 00:00:00 2001 From: Luke Imhoff Date: Fri, 5 Oct 2018 13:23:24 -0500 Subject: [PATCH] Explorer.Chain.Import.Runner behaviour --- apps/explorer/lib/explorer/chain/import.ex | 413 +++++++++--------- .../chain/import/address/coin_balances.ex | 123 +++--- .../chain/import/address/token_balances.ex | 128 +++--- .../lib/explorer/chain/import/addresses.ex | 137 +++--- .../import/block/second_degree_relations.ex | 71 +-- .../lib/explorer/chain/import/blocks.ex | 130 +++--- .../chain/import/internal_transactions.ex | 71 ++- .../lib/explorer/chain/import/logs.ex | 55 +-- .../lib/explorer/chain/import/runner.ex | 37 ++ .../explorer/chain/import/token_transfers.ex | 55 +-- .../lib/explorer/chain/import/tokens.ex | 85 ++-- .../chain/import/transaction/forks.ex | 74 ++-- .../lib/explorer/chain/import/transactions.ex | 54 ++- .../lib/explorer/chain/transaction.ex | 2 +- apps/indexer/lib/indexer/block/fetcher.ex | 19 +- .../lib/indexer/block/uncle/fetcher.ex | 3 + 16 files changed, 770 insertions(+), 687 deletions(-) create mode 100644 apps/explorer/lib/explorer/chain/import/runner.ex diff --git a/apps/explorer/lib/explorer/chain/import.ex b/apps/explorer/lib/explorer/chain/import.ex index a8c00cb2cb..6c37a6e841 100644 --- a/apps/explorer/lib/explorer/chain/import.ex +++ b/apps/explorer/lib/explorer/chain/import.ex @@ -4,55 +4,62 @@ defmodule Explorer.Chain.Import do """ alias Ecto.{Changeset, Multi} + alias Explorer.Chain.Import + alias Explorer.Repo - alias Explorer.Chain.{ - Address, - Address.CoinBalance, - Address.TokenBalance, - Block, - Import, - InternalTransaction, - Log, - Token, - TokenTransfer, - Transaction - } + # in order so that foreign keys are inserted before being referenced + @runners [ + Import.Addresses, + Import.Address.CoinBalances, + Import.Blocks, + Import.Block.SecondDegreeRelations, + Import.Transactions, + Import.Transaction.Forks, + Import.InternalTransactions, + Import.Logs, + Import.Tokens, + Import.TokenTransfers, + Import.Address.TokenBalances + ] + + quoted_runner_option_value = + quote do + Import.Runner.options() + end - alias Explorer.Repo + quoted_runner_options = + for runner <- @runners do + quoted_key = + quote do + optional(unquote(runner.option_key())) + end + + {quoted_key, quoted_runner_option_value} + end - @type changeset_function_name :: atom - @type on_conflict :: :nothing | :replace_all - @type params :: [map()] @type all_options :: %{ - optional(:addresses) => Import.Addresses.options(), - optional(:address_coin_balances) => Import.Address.CoinBalances.options(), - optional(:address_token_balances) => Import.Address.TokenBalances.options(), - optional(:blocks) => Import.Blocks.options(), - optional(:block_second_degree_relations) => Import.Block.SecondDegreeRelations.options(), optional(:broadcast) => boolean, - optional(:internal_transactions) => Import.InternalTransactions.options(), - optional(:logs) => Import.Logs.options(), optional(:timeout) => timeout, - optional(:token_transfers) => Import.TokenTransfers.options(), - optional(:tokens) => Import.Tokens.options(), - optional(:transactions) => Import.Transactions.options(), - optional(:transaction_forks) => Import.Transaction.Forks.options() + unquote_splicing(quoted_runner_options) } + + quoted_runner_imported = + for runner <- @runners do + quoted_key = + quote do + optional(unquote(runner.option_key())) + end + + quoted_value = + quote do + unquote(runner).imported() + end + + {quoted_key, quoted_value} + end + @type all_result :: - {:ok, - %{ - optional(:addresses) => Import.Addresses.imported(), - optional(:address_coin_balances) => Import.Address.CoinBalances.imported(), - optional(:address_token_balances) => Import.Address.TokenBalances.imported(), - optional(:blocks) => Import.Blocks.imported(), - optional(:block_second_degree_relations) => Import.Block.SecondDegreeRelations.imported(), - optional(:internal_transactions) => Import.InternalTransactions.imported(), - optional(:logs) => Import.Logs.imported(), - optional(:token_transfers) => Import.TokenTransfers.imported(), - optional(:tokens) => Import.Tokens.imported(), - optional(:transactions) => Import.Transactions.imported(), - optional(:transaction_forks) => Import.Transaction.Forks.imported() - }} + {:ok, %{unquote_splicing(quoted_runner_imported)}} | {:error, [Changeset.t()]} | {:error, step :: Ecto.Multi.name(), failed_value :: any(), changes_so_far :: %{optional(Ecto.Multi.name()) => any()}} @@ -62,23 +69,37 @@ defmodule Explorer.Chain.Import do # milliseconds @transaction_timeout 120_000 + @imported_table_rows @runners + |> Stream.map(&Map.put(&1.imported_table_row(), :key, &1.option_key())) + |> Enum.map_join("\n", fn %{ + key: key, + value_type: value_type, + value_description: value_description + } -> + "| `#{inspect(key)}` | `#{value_type}` | #{value_description} |" + end) + @runner_options_doc Enum.map_join(@runners, fn runner -> + ecto_schema_module = runner.ecto_schema_module() + + """ + * `#{runner.option_key() |> inspect()}` + * `:on_conflict` - what to do if a conflict occurs with a pre-existing row: `:nothing`, `:replace_all`, or an + `t:Ecto.Query.t/0` to update specific columns. + * `:params` - `list` of params for changeset function in `#{ecto_schema_module}`. + * `:with` - changeset function to use in `#{ecto_schema_module}`. Default to `:changeset`. + * `:timeout` - the timeout for inserting each batch of changes from `:params`. + Defaults to `#{runner.timeout()}` milliseconds. + """ + end) + @doc """ Bulk insert all data stored in the `Explorer`. The import returns the unique key(s) for each type of record inserted. - | Key | Value Type | Value Description | - |----------------------------------|-------------------------------------------------------------------------------------------------|------------------------------------------------------------------------------------------------------| - | `:addresses` | `[Explorer.Chain.Address.t()]` | List of `t:Explorer.Chain.Address.t/0`s | - | `:address_coin_balances` | `[%{address_hash: Explorer.Chain.Hash.t(), block_number: Explorer.Chain.Block.block_number()}]` | List of maps of the `t:Explorer.Chain.Address.CoinBalance.t/0` `address_hash` and `block_number` | - | `:blocks` | `[Explorer.Chain.Block.t()]` | List of `t:Explorer.Chain.Block.t/0`s | - | `:internal_transactions` | `[%{index: non_neg_integer(), transaction_hash: Explorer.Chain.Hash.t()}]` | List of maps of the `t:Explorer.Chain.InternalTransaction.t/0` `index` and `transaction_hash` | - | `:logs` | `[Explorer.Chain.Log.t()]` | List of `t:Explorer.Chain.Log.t/0`s | - | `:token_transfers` | `[Explorer.Chain.TokenTransfer.t()]` | List of `t:Explorer.Chain.TokenTransfer.t/0`s | - | `:tokens` | `[Explorer.Chain.Token.t()]` | List of `t:Explorer.Chain.token.t/0`s | - | `:transactions` | `[Explorer.Chain.Hash.t()]` | List of `t:Explorer.Chain.Transaction.t/0` `hash` | - | `:transaction_forks` | `[%{uncle_hash: Explorer.Chain.Hash.t(), hash: Explorer.Chain.Hash.t()}]` | List of maps of the `t:Explorer.Chain.Transaction.Fork.t/0` `uncle_hash` and `hash` | - | `:block_second_degree_relations` | `[%{uncle_hash: Explorer.Chain.Hash.t(), nephew_hash: Explorer.Chain.Hash.t()]` | List of maps of the `t:Explorer.Chain.Block.SecondDegreeRelation.t/0` `uncle_hash` and `nephew_hash` | + | Key | Value Type | Value Description | + |-----|------------|-------------------| + #{@imported_table_rows} The params for each key are validated using the corresponding `Ecto.Schema` module's `changeset/2` function. If there are errors, they are returned in `Ecto.Changeset.t`s, so that the original, invalid value can be reconstructed for any @@ -97,68 +118,17 @@ defmodule Explorer.Chain.Import do ## Options - * `:addresses` - * `:params` - `list` of params for `Explorer.Chain.Address.changeset/2`. - * `:timeout` - the timeout for inserting all addresses. Defaults to `#{Import.Addresses.timeout()}` milliseconds. - * `:with` - the changeset function on `Explorer.Chain.Address` to use validate `:params`. - * `:address_coin_balances` - * `:params` - `list` of params for `Explorer.Chain.Address.CoinBalance.changeset/2`. - * `:timeout` - the timeout for inserting all balances. Defaults to `#{Import.Address.CoinBalances.timeout()}` - milliseconds. - * `:address_token_balances` - * `:params` - `list` of params for `Explorer.Chain.TokenBalance.changeset/2` - * `:blocks` - * `:params` - `list` of params for `Explorer.Chain.Block.changeset/2`. - * `:timeout` - the timeout for inserting all blocks. Defaults to `#{Import.Blocks.timeout()}` milliseconds. - * `:block_second_degree_relations` - * `:params` - `list` of params `for `Explorer.Chain.Block.SecondDegreeRelation.changeset/2`. - * `:timeout` - the timeout for inserting all uncles found in the params list. Defaults to - `#{Import.Block.SecondDegreeRelations.timeout()}` milliseconds. * `:broadcast` - Boolean flag indicating whether or not to broadcast the event. - * `:internal_transactions` - * `:params` - `list` of params for `Explorer.Chain.InternalTransaction.changeset/2`. - * `:timeout` - the timeout for inserting all internal transactions. Defaults to - `#{Import.InternalTransactions.timeout()}` milliseconds. - * `:logs` - * `:params` - `list` of params for `Explorer.Chain.Log.changeset/2`. - * `:timeout` - the timeout for inserting all logs. Defaults to `#{Import.Logs.timeout()}` milliseconds. * `:timeout` - the timeout for the whole `c:Ecto.Repo.transaction/0` call. Defaults to `#{@transaction_timeout}` milliseconds. - * `:token_transfers` - * `:params` - `list` of params for `Explorer.Chain.TokenTransfer.changeset/2` - * `:timeout` - the timeout for inserting all token transfers. Defaults to `#{Import.TokenTransfers.timeout()}` - milliseconds. - * `:tokens` - * `:on_conflict` - Whether to do `:nothing` or `:replace_all` columns when there is a pre-existing token - with the same contract address hash. - * `:params` - `list` of params for `Explorer.Chain.Token.changeset/2` - * `:timeout` - the timeout for inserting all tokens. Defaults to `#{Import.Tokens.timeout()}` milliseconds. - * `:transactions` - * `:on_conflict` - Whether to do `:nothing` or `:replace_all` columns when there is a pre-existing transaction - with the same hash. - - *NOTE*: Because the repository transaction for a pending `Explorer.Chain.Transaction`s could `COMMIT` after the - repository transaction for that same transaction being collated into a block, writers, it is recommended to use - `:nothing` for pending transactions and `:replace_all` for collated transactions, so that collated transactions - win. - * `:params` - `list` of params for `Explorer.Chain.Transaction.changeset/2`. - * `:timeout` - the timeout for inserting all transactions found in the params lists across all - types. Defaults to `#{Import.Transactions.timeout()}` milliseconds. - * `:with` - the changeset function on `Explorer.Chain.Transaction` to use validate `:params`. - * `:transaction_forks` - * `:params` - `list` of params for `Explorer.Chain.Transaction.Fork.changeset/2`. - * `:timeout` - the timeout for inserting all transaction forks. Defaults to - `#{Import.Transaction.Forks.timeout()}` milliseconds. - * `:timeout` - the timeout for `Repo.transaction`. Defaults to `#{@transaction_timeout}` milliseconds. - + #{@runner_options_doc} """ @spec all(all_options()) :: all_result() def all(options) when is_map(options) do - changes_list_arguments_list = import_options_to_changes_list_arguments_list(options) - - with {:ok, ecto_schema_module_to_changes_list_map} <- - changes_list_arguments_list_to_ecto_schema_module_to_changes_list_map(changes_list_arguments_list), - {:ok, data} <- insert_ecto_schema_module_to_changes_list_map(ecto_schema_module_to_changes_list_map, options) do + with {:ok, runner_options_pairs} <- validate_options(options), + {:ok, valid_runner_option_pairs} <- validate_runner_options_pairs(runner_options_pairs), + {:ok, runner_changes_list_pairs} <- runner_changes_list_pairs(valid_runner_option_pairs), + {:ok, data} <- insert_runner_changes_list_pairs(runner_changes_list_pairs, options) do if Map.get(options, :broadcast, false), do: broadcast_events(data) {:ok, data} end @@ -179,84 +149,147 @@ defmodule Explorer.Chain.Import do end) end - defp changes_list_arguments_list_to_ecto_schema_module_to_changes_list_map(changes_list_arguments_list) do - changes_list_arguments_list - |> Stream.map(fn [params_list, options] -> - ecto_schema_module = Keyword.fetch!(options, :for) - {ecto_schema_module, changes_list(params_list, options)} - end) - |> Enum.reduce({:ok, %{}}, fn - {ecto_schema_module, {:ok, changes_list}}, {:ok, ecto_schema_module_to_changes_list_map} -> - {:ok, Map.put(ecto_schema_module_to_changes_list_map, ecto_schema_module, changes_list)} + defp runner_changes_list_pairs(runner_options_pairs) when is_list(runner_options_pairs) do + {status, reversed} = + runner_options_pairs + |> Stream.map(fn {runner, options} -> runner_changes_list(runner, options) end) + |> Enum.reduce({:ok, []}, fn + {:ok, runner_changes_pair}, {:ok, acc_runner_changes_pairs} -> + {:ok, [runner_changes_pair | acc_runner_changes_pairs]} - {_, {:ok, _}}, {:error, _} = error -> - error + {:ok, _}, {:error, _} = error -> + error - {_, {:error, _} = error}, {:ok, _} -> - error + {:error, _} = error, {:ok, _} -> + error - {_, {:error, changesets}}, {:error, acc_changesets} -> - {:error, acc_changesets ++ changesets} - end) + {:error, runner_changesets}, {:error, acc_changesets} -> + {:error, acc_changesets ++ runner_changesets} + end) + + {status, Enum.reverse(reversed)} end - @spec changes_list(params :: [map], [{:for, module} | {:with, atom}]) :: {:ok, [map]} | {:error, [Changeset.t()]} - defp changes_list(params, options) when is_list(options) do - ecto_schema_module = Keyword.fetch!(options, :for) - changeset_function_name = Keyword.get(options, :with, :changeset) + defp runner_changes_list(runner, %{params: params} = options) do + ecto_schema_module = runner.ecto_schema_module() + changeset_function_name = Map.get(options, :with, :changeset) struct = ecto_schema_module.__struct__() - {status, acc} = - params - |> Stream.map(&apply(ecto_schema_module, changeset_function_name, [struct, &1])) - |> Enum.reduce({:ok, []}, fn - changeset = %Changeset{valid?: false}, {:ok, _} -> - {:error, [changeset]} + params + |> Stream.map(&apply(ecto_schema_module, changeset_function_name, [struct, &1])) + |> Enum.reduce({:ok, []}, fn + changeset = %Changeset{valid?: false}, {:ok, _} -> + {:error, [changeset]} + + changeset = %Changeset{valid?: false}, {:error, acc_changesets} -> + {:error, [changeset | acc_changesets]} + + %Changeset{changes: changes, valid?: true}, {:ok, acc_changes} -> + {:ok, [changes | acc_changes]} + + %Changeset{valid?: true}, {:error, _} = error -> + error + end) + |> case do + {:ok, changes} -> {:ok, {runner, changes}} + {:error, _} = error -> error + end + end + + @global_options ~w(broadcast timeout)a + + defp validate_options(options) when is_map(options) do + local_options = Map.drop(options, @global_options) + + {reverse_runner_options_pairs, unknown_options} = + Enum.reduce(@runners, {[], local_options}, fn runner, {acc_runner_options_pairs, unknown_options} = acc -> + option_key = runner.option_key() - changeset = %Changeset{valid?: false}, {:error, acc_changesets} -> - {:error, [changeset | acc_changesets]} + case local_options do + %{^option_key => option_value} -> + {[{runner, option_value} | acc_runner_options_pairs], Map.delete(unknown_options, option_key)} - %Changeset{changes: changes, valid?: true}, {:ok, acc_changes} -> - {:ok, [changes | acc_changes]} + _ -> + acc + end + end) - %Changeset{valid?: true}, {:error, _} = error -> + case Enum.empty?(unknown_options) do + true -> {:ok, Enum.reverse(reverse_runner_options_pairs)} + false -> {:error, {:unknown_options, unknown_options}} + end + end + + defp validate_runner_options_pairs(runner_options_pairs) when is_list(runner_options_pairs) do + {status, reversed} = + runner_options_pairs + |> Stream.map(fn {runner, options} -> validate_runner_options(runner, options) end) + |> Enum.reduce({:ok, []}, fn + :ignore, acc -> + acc + + {:ok, valid_runner_option_pair}, {:ok, valid_runner_options_pairs} -> + {:ok, [valid_runner_option_pair | valid_runner_options_pairs]} + + {:ok, _}, {:error, _} = error -> error + + {:error, reason}, {:ok, _} -> + {:error, [reason]} + + {:error, reason}, {:error, reasons} -> + {:error, [reason | reasons]} end) - {status, Enum.reverse(acc)} + {status, Enum.reverse(reversed)} + end + + defp validate_runner_options(runner, options) when is_map(options) do + option_key = runner.option_key() + + case {validate_runner_option_params_required(option_key, options), + validate_runner_options_known(option_key, options)} do + {:ignore, :ok} -> :ignore + {:ignore, {:error, _} = error} -> error + {:ok, :ok} -> {:ok, {runner, options}} + {:ok, {:error, _} = error} -> error + {{:error, reason}, :ok} -> {:error, [reason]} + {{:error, reason}, {:error, reasons}} -> {:error, [reason | reasons]} + end + end + + defp validate_runner_option_params_required(_, %{params: params}) do + case Enum.empty?(params) do + false -> :ok + true -> :ignore + end + end + + defp validate_runner_option_params_required(runner_option_key, _), + do: {:error, {:required, [runner_option_key, :params]}} + + @local_options ~w(on_conflict params with timeout)a + + defp validate_runner_options_known(runner_option_key, options) do + unknown_option_keys = Map.keys(options) -- @local_options + + if Enum.empty?(unknown_option_keys) do + :ok + else + reasons = Enum.map(unknown_option_keys, &{:unknown, [runner_option_key, &1]}) + + {:error, reasons} + end end - @import_option_key_to_ecto_schema_module %{ - addresses: Address, - address_coin_balances: CoinBalance, - address_token_balances: TokenBalance, - blocks: Block, - block_second_degree_relations: Block.SecondDegreeRelation, - internal_transactions: InternalTransaction, - logs: Log, - token_transfers: TokenTransfer, - tokens: Token, - transactions: Transaction, - transaction_forks: Transaction.Fork - } - - defp ecto_schema_module_to_changes_list_map_to_multi(ecto_schema_module_to_changes_list_map, options) - when is_map(options) do + defp runner_changes_list_pairs_to_multi(runner_changes_list_pairs, options) + when is_list(runner_changes_list_pairs) and is_map(options) do timestamps = timestamps() full_options = Map.put(options, :timestamps, timestamps) - Multi.new() - |> Import.Addresses.run(ecto_schema_module_to_changes_list_map, full_options) - |> Import.Address.CoinBalances.run(ecto_schema_module_to_changes_list_map, full_options) - |> Import.Blocks.run(ecto_schema_module_to_changes_list_map, full_options) - |> Import.Block.SecondDegreeRelations.run(ecto_schema_module_to_changes_list_map, full_options) - |> Import.Transactions.run(ecto_schema_module_to_changes_list_map, full_options) - |> Import.Transaction.Forks.run(ecto_schema_module_to_changes_list_map, full_options) - |> Import.InternalTransactions.run(ecto_schema_module_to_changes_list_map, full_options) - |> Import.Logs.run(ecto_schema_module_to_changes_list_map, full_options) - |> Import.Tokens.run(ecto_schema_module_to_changes_list_map, full_options) - |> Import.TokenTransfers.run(ecto_schema_module_to_changes_list_map, full_options) - |> Import.Address.TokenBalances.run(ecto_schema_module_to_changes_list_map, full_options) + Enum.reduce(runner_changes_list_pairs, Multi.new(), fn {runner, changes_list}, acc -> + runner.run(acc, changes_list, full_options) + end) end def insert_changes_list(changes_list, options) when is_list(changes_list) do @@ -282,49 +315,13 @@ defmodule Explorer.Chain.Import do Map.merge(changes, timestamps) end - defp import_options_to_changes_list_arguments_list(options) do - Enum.flat_map( - @import_option_key_to_ecto_schema_module, - &import_options_to_changes_list_arguments_list_flat_mapper(options, &1) - ) - end - - defp import_options_to_changes_list_arguments_list_flat_mapper(options, {option_key, ecto_schema_module}) do - case Map.fetch(options, option_key) do - {:ok, option_value} -> - import_option_to_changes_list_arguments_list_flat_mapper(option_value, ecto_schema_module) - - :error -> - [] - end - end - - defp import_option_to_changes_list_arguments_list_flat_mapper(%{params: params} = option_value, ecto_schema_module) do - # Use `Enum.empty?` instead of `[_ | _]` as params are allowed to be any collection of maps - case Enum.empty?(params) do - false -> - [ - [ - params, - [for: ecto_schema_module, with: Map.get(option_value, :with, :changeset)] - ] - ] - - # filter out empty params as early as possible, so that later stages don't need to deal with empty params - # leading to selecting all rows because they produce no where conditions as happened in - # https://github.com/poanetwork/blockscout/issues/850 - true -> - [] - end - end - defp import_transaction(multi, options) when is_map(options) do Repo.transaction(multi, timeout: Map.get(options, :timeout, @transaction_timeout)) end - defp insert_ecto_schema_module_to_changes_list_map(ecto_schema_module_to_changes_list_map, options) do - ecto_schema_module_to_changes_list_map - |> ecto_schema_module_to_changes_list_map_to_multi(options) + defp insert_runner_changes_list_pairs(runner_changes_list_pairs, options) do + runner_changes_list_pairs + |> runner_changes_list_pairs_to_multi(options) |> import_transaction(options) end diff --git a/apps/explorer/lib/explorer/chain/import/address/coin_balances.ex b/apps/explorer/lib/explorer/chain/import/address/coin_balances.ex index 51afc5a4b3..8a2b98f679 100644 --- a/apps/explorer/lib/explorer/chain/import/address/coin_balances.ex +++ b/apps/explorer/lib/explorer/chain/import/address/coin_balances.ex @@ -11,38 +11,40 @@ defmodule Explorer.Chain.Import.Address.CoinBalances do alias Explorer.Chain.Address.CoinBalance alias Explorer.Chain.{Block, Hash, Import, Wei} + @behaviour Import.Runner + # milliseconds @timeout 60_000 - @type options :: %{ - required(:params) => Import.params(), - optional(:timeout) => timeout - } @type imported :: [ %{required(:address_hash) => Hash.Address.t(), required(:block_number) => Block.block_number()} ] - def run(multi, ecto_schema_module_to_changes_list_map, options) - when is_map(ecto_schema_module_to_changes_list_map) and is_map(options) do - case ecto_schema_module_to_changes_list_map do - %{CoinBalance => balances_changes} -> - timestamps = Map.fetch!(options, :timestamps) + @impl Import.Runner + def ecto_schema_module, do: CoinBalance - Multi.run(multi, :address_coin_balances, fn _ -> - insert( - balances_changes, - %{ - timeout: options[:address_coin_balances][:timeout] || @timeout, - timestamps: timestamps - } - ) - end) + @impl Import.Runner + def option_key, do: :address_coin_balances + + @impl Import.Runner + def imported_table_row do + %{ + value_type: "[%{address_hash: Explorer.Chain.Hash.t(), block_number: Explorer.Chain.Block.block_number()}]", + value_description: "List of maps of the `t:#{ecto_schema_module()}.t/0` `address_hash` and `block_number`" + } + end - _ -> - multi - end + @impl Import.Runner + def run(multi, changes_list, options) when is_map(options) do + timestamps = Map.fetch!(options, :timestamps) + timeout = options[option_key()][:timeout] || @timeout + + Multi.run(multi, :address_coin_balances, fn _ -> + insert(changes_list, %{timeout: timeout, timestamps: timestamps}) + end) end + @impl Import.Runner def timeout, do: @timeout @spec insert( @@ -60,7 +62,9 @@ defmodule Explorer.Chain.Import.Address.CoinBalances do ) :: {:ok, [%{required(:address_hash) => Hash.Address.t(), required(:block_number) => Block.block_number()}]} | {:error, [Changeset.t()]} - defp insert(changes_list, %{timeout: timeout, timestamps: timestamps}) when is_list(changes_list) do + defp insert(changes_list, %{timeout: timeout, timestamps: timestamps} = options) when is_list(changes_list) do + on_conflict = Map.get_lazy(options, :on_conflict, &default_on_conflict/0) + # order so that row ShareLocks are grabbed in a consistent order ordered_changes_list = Enum.sort_by(changes_list, &{&1.address_hash, &1.block_number}) @@ -68,42 +72,7 @@ defmodule Explorer.Chain.Import.Address.CoinBalances do Import.insert_changes_list( ordered_changes_list, conflict_target: [:address_hash, :block_number], - on_conflict: - from( - balance in CoinBalance, - update: [ - set: [ - inserted_at: fragment("LEAST(EXCLUDED.inserted_at, ?)", balance.inserted_at), - updated_at: fragment("GREATEST(EXCLUDED.updated_at, ?)", balance.updated_at), - value: - fragment( - """ - CASE WHEN EXCLUDED.value IS NOT NULL AND (? IS NULL OR EXCLUDED.value_fetched_at > ?) THEN - EXCLUDED.value - ELSE - ? - END - """, - balance.value_fetched_at, - balance.value_fetched_at, - balance.value - ), - value_fetched_at: - fragment( - """ - CASE WHEN EXCLUDED.value IS NOT NULL AND (? IS NULL OR EXCLUDED.value_fetched_at > ?) THEN - EXCLUDED.value_fetched_at - ELSE - ? - END - """, - balance.value_fetched_at, - balance.value_fetched_at, - balance.value_fetched_at - ) - ] - ] - ), + on_conflict: on_conflict, for: CoinBalance, timeout: timeout, timestamps: timestamps @@ -111,4 +80,42 @@ defmodule Explorer.Chain.Import.Address.CoinBalances do {:ok, Enum.map(ordered_changes_list, &Map.take(&1, ~w(address_hash block_number)a))} end + + def default_on_conflict do + from( + balance in CoinBalance, + update: [ + set: [ + inserted_at: fragment("LEAST(EXCLUDED.inserted_at, ?)", balance.inserted_at), + updated_at: fragment("GREATEST(EXCLUDED.updated_at, ?)", balance.updated_at), + value: + fragment( + """ + CASE WHEN EXCLUDED.value IS NOT NULL AND (? IS NULL OR EXCLUDED.value_fetched_at > ?) THEN + EXCLUDED.value + ELSE + ? + END + """, + balance.value_fetched_at, + balance.value_fetched_at, + balance.value + ), + value_fetched_at: + fragment( + """ + CASE WHEN EXCLUDED.value IS NOT NULL AND (? IS NULL OR EXCLUDED.value_fetched_at > ?) THEN + EXCLUDED.value_fetched_at + ELSE + ? + END + """, + balance.value_fetched_at, + balance.value_fetched_at, + balance.value_fetched_at + ) + ] + ] + ) + end end diff --git a/apps/explorer/lib/explorer/chain/import/address/token_balances.ex b/apps/explorer/lib/explorer/chain/import/address/token_balances.ex index b1ecc280a9..e53d1061cb 100644 --- a/apps/explorer/lib/explorer/chain/import/address/token_balances.ex +++ b/apps/explorer/lib/explorer/chain/import/address/token_balances.ex @@ -11,36 +11,38 @@ defmodule Explorer.Chain.Import.Address.TokenBalances do alias Explorer.Chain.Address.TokenBalance alias Explorer.Chain.Import + @behaviour Import.Runner + # milliseconds @timeout 60_000 - @type options :: %{ - required(:params) => Import.params(), - optional(:timeout) => timeout - } @type imported :: [TokenBalance.t()] - def run(multi, ecto_schema_module_to_changes_list, options) - when is_map(ecto_schema_module_to_changes_list) and is_map(options) do - case ecto_schema_module_to_changes_list do - %{TokenBalance => token_balances_changes} -> - timestamps = Map.fetch!(options, :timestamps) - - Multi.run(multi, :address_token_balances, fn _ -> - insert( - token_balances_changes, - %{ - timeout: options[:address_token_balances][:timeout] || @timeout, - timestamps: timestamps - } - ) - end) - - _ -> - multi - end + @impl Import.Runner + def ecto_schema_module, do: TokenBalance + + @impl Import.Runner + def option_key, do: :address_token_balances + + @impl Import.Runner + def imported_table_row do + %{ + value_type: "[#{ecto_schema_module()}.t()]", + value_description: "List of `t:#{ecto_schema_module()}.t/0`s" + } end + @impl Import.Runner + def run(multi, changes_list, options) when is_map(options) do + timestamps = Map.fetch!(options, :timestamps) + timeout = options[option_key()][:timeout] || @timeout + + Multi.run(multi, :address_token_balances, fn _ -> + insert(changes_list, %{timeout: timeout, timestamps: timestamps}) + end) + end + + @impl Import.Runner def timeout, do: @timeout @spec insert([map()], %{ @@ -49,8 +51,9 @@ defmodule Explorer.Chain.Import.Address.TokenBalances do }) :: {:ok, [TokenBalance.t()]} | {:error, [Changeset.t()]} - def insert(changes_list, %{timeout: timeout, timestamps: timestamps}) - when is_list(changes_list) do + def insert(changes_list, %{timeout: timeout, timestamps: timestamps} = options) when is_list(changes_list) do + on_conflict = Map.get_lazy(options, :on_conflict, &default_on_conflict/0) + # order so that row ShareLocks are grabbed in a consistent order ordered_changes_list = Enum.sort_by(changes_list, &{&1.address_hash, &1.block_number}) @@ -58,46 +61,49 @@ defmodule Explorer.Chain.Import.Address.TokenBalances do Import.insert_changes_list( ordered_changes_list, conflict_target: ~w(address_hash token_contract_address_hash block_number)a, - on_conflict: - from( - token_balance in TokenBalance, - update: [ - set: [ - inserted_at: fragment("LEAST(EXCLUDED.inserted_at, ?)", token_balance.inserted_at), - updated_at: fragment("GREATEST(EXCLUDED.updated_at, ?)", token_balance.updated_at), - value: - fragment( - """ - CASE WHEN EXCLUDED.value IS NOT NULL AND (? IS NULL OR EXCLUDED.value_fetched_at > ?) THEN - EXCLUDED.value - ELSE - ? - END - """, - token_balance.value_fetched_at, - token_balance.value_fetched_at, - token_balance.value - ), - value_fetched_at: - fragment( - """ - CASE WHEN EXCLUDED.value IS NOT NULL AND (? IS NULL OR EXCLUDED.value_fetched_at > ?) THEN - EXCLUDED.value_fetched_at - ELSE - ? - END - """, - token_balance.value_fetched_at, - token_balance.value_fetched_at, - token_balance.value_fetched_at - ) - ] - ] - ), + on_conflict: on_conflict, for: TokenBalance, returning: true, timeout: timeout, timestamps: timestamps ) end + + defp default_on_conflict do + from( + token_balance in TokenBalance, + update: [ + set: [ + inserted_at: fragment("LEAST(EXCLUDED.inserted_at, ?)", token_balance.inserted_at), + updated_at: fragment("GREATEST(EXCLUDED.updated_at, ?)", token_balance.updated_at), + value: + fragment( + """ + CASE WHEN EXCLUDED.value IS NOT NULL AND (? IS NULL OR EXCLUDED.value_fetched_at > ?) THEN + EXCLUDED.value + ELSE + ? + END + """, + token_balance.value_fetched_at, + token_balance.value_fetched_at, + token_balance.value + ), + value_fetched_at: + fragment( + """ + CASE WHEN EXCLUDED.value IS NOT NULL AND (? IS NULL OR EXCLUDED.value_fetched_at > ?) THEN + EXCLUDED.value_fetched_at + ELSE + ? + END + """, + token_balance.value_fetched_at, + token_balance.value_fetched_at, + token_balance.value_fetched_at + ) + ] + ] + ) + end end diff --git a/apps/explorer/lib/explorer/chain/import/addresses.ex b/apps/explorer/lib/explorer/chain/import/addresses.ex index 4373a8f708..3616a47ed3 100644 --- a/apps/explorer/lib/explorer/chain/import/addresses.ex +++ b/apps/explorer/lib/explorer/chain/import/addresses.ex @@ -10,37 +10,38 @@ defmodule Explorer.Chain.Import.Addresses do import Ecto.Query, only: [from: 2] + @behaviour Import.Runner + # milliseconds @timeout 60_000 @type imported :: [Address.t()] - @type options :: %{ - required(:params) => Import.params(), - optional(:timeout) => timeout, - optional(:with) => Import.changeset_function_name() - } - - def run(multi, ecto_schema_module_to_changes_list_map, options) - when is_map(ecto_schema_module_to_changes_list_map) and is_map(options) do - case ecto_schema_module_to_changes_list_map do - %{Address => addresses_changes} -> - timestamps = Map.fetch!(options, :timestamps) - - Multi.run(multi, :addresses, fn _ -> - insert( - addresses_changes, - %{ - timeout: options[:addresses][:timeout] || @timeout, - timestamps: timestamps - } - ) - end) - - _ -> - multi - end + + @impl Import.Runner + def ecto_schema_module, do: Address + + @impl Import.Runner + def option_key, do: :addresses + + @impl Import.Runner + def imported_table_row do + %{ + value_type: "[#{ecto_schema_module()}.t()]", + value_description: "List of `t:#{ecto_schema_module()}.t/0`s" + } end + @impl Import.Runner + def run(multi, changes_list, options) when is_map(options) do + timestamps = Map.fetch!(options, :timestamps) + timeout = options[:addresses][:timeout] || @timeout + + Multi.run(multi, :addresses, fn _ -> + insert(changes_list, %{timeout: timeout, timestamps: timestamps}) + end) + end + + @impl Import.Runner def timeout, do: @timeout ## Private Functions @@ -48,53 +49,17 @@ defmodule Explorer.Chain.Import.Addresses do @spec insert([%{hash: Hash.Address.t()}], %{ required(:timeout) => timeout, required(:timestamps) => Import.timestamps() - }) :: {:ok, [Hash.Address.t()]} - defp insert(changes_list, %{timeout: timeout, timestamps: timestamps}) when is_list(changes_list) do + }) :: {:ok, [Address.t()]} + defp insert(changes_list, %{timeout: timeout, timestamps: timestamps} = options) when is_list(changes_list) do + on_conflict = Map.get_lazy(options, :on_conflict, &default_on_conflict/0) + # order so that row ShareLocks are grabbed in a consistent order ordered_changes_list = sort_changes_list(changes_list) Import.insert_changes_list( ordered_changes_list, conflict_target: :hash, - on_conflict: - from( - address in Address, - update: [ - set: [ - contract_code: fragment("COALESCE(?, EXCLUDED.contract_code)", address.contract_code), - # ARGMAX on two columns - fetched_coin_balance: - fragment( - """ - CASE WHEN EXCLUDED.fetched_coin_balance_block_number IS NOT NULL AND - (? IS NULL OR - EXCLUDED.fetched_coin_balance_block_number >= ?) THEN - EXCLUDED.fetched_coin_balance - ELSE ? - END - """, - address.fetched_coin_balance_block_number, - address.fetched_coin_balance_block_number, - address.fetched_coin_balance - ), - # MAX on two columns - fetched_coin_balance_block_number: - fragment( - """ - CASE WHEN EXCLUDED.fetched_coin_balance_block_number IS NOT NULL AND - (? IS NULL OR - EXCLUDED.fetched_coin_balance_block_number >= ?) THEN - EXCLUDED.fetched_coin_balance_block_number - ELSE ? - END - """, - address.fetched_coin_balance_block_number, - address.fetched_coin_balance_block_number, - address.fetched_coin_balance_block_number - ) - ] - ] - ), + on_conflict: on_conflict, for: Address, returning: true, timeout: timeout, @@ -102,6 +67,46 @@ defmodule Explorer.Chain.Import.Addresses do ) end + defp default_on_conflict do + from(address in Address, + update: [ + set: [ + contract_code: fragment("COALESCE(?, EXCLUDED.contract_code)", address.contract_code), + # ARGMAX on two columns + fetched_coin_balance: + fragment( + """ + CASE WHEN EXCLUDED.fetched_coin_balance_block_number IS NOT NULL AND + (? IS NULL OR + EXCLUDED.fetched_coin_balance_block_number >= ?) THEN + EXCLUDED.fetched_coin_balance + ELSE ? + END + """, + address.fetched_coin_balance_block_number, + address.fetched_coin_balance_block_number, + address.fetched_coin_balance + ), + # MAX on two columns + fetched_coin_balance_block_number: + fragment( + """ + CASE WHEN EXCLUDED.fetched_coin_balance_block_number IS NOT NULL AND + (? IS NULL OR + EXCLUDED.fetched_coin_balance_block_number >= ?) THEN + EXCLUDED.fetched_coin_balance_block_number + ELSE ? + END + """, + address.fetched_coin_balance_block_number, + address.fetched_coin_balance_block_number, + address.fetched_coin_balance_block_number + ) + ] + ] + ) + end + defp sort_changes_list(changes_list) do Enum.sort_by(changes_list, & &1.hash) end diff --git a/apps/explorer/lib/explorer/chain/import/block/second_degree_relations.ex b/apps/explorer/lib/explorer/chain/import/block/second_degree_relations.ex index 3490cfa7e4..8572772535 100644 --- a/apps/explorer/lib/explorer/chain/import/block/second_degree_relations.ex +++ b/apps/explorer/lib/explorer/chain/import/block/second_degree_relations.ex @@ -10,54 +10,51 @@ defmodule Explorer.Chain.Import.Block.SecondDegreeRelations do alias Ecto.{Changeset, Multi} alias Explorer.Chain.{Block, Hash, Import} + @behaviour Import.Runner + @timeout 60_000 - @type options :: %{ - required(:params) => Import.params(), - optional(:timeout) => timeout - } @type imported :: [ %{required(:nephew_hash) => Hash.Full.t(), required(:uncle_hash) => Hash.Full.t()} ] - def run(multi, ecto_schema_module_to_changes_list, options) - when is_map(ecto_schema_module_to_changes_list) and is_map(options) do - case ecto_schema_module_to_changes_list do - %{Block.SecondDegreeRelation => block_second_degree_relations_changes} -> - Multi.run(multi, :block_second_degree_relations, fn _ -> - insert( - block_second_degree_relations_changes, - %{ - timeout: options[:block_second_degree_relations][:timeout] || @timeout - } - ) - end) - - _ -> - multi - end + @impl Import.Runner + def ecto_schema_module, do: Block.SecondDegreeRelation + + @impl Import.Runner + def option_key, do: :block_second_degree_relations + + @impl Import.Runner + def imported_table_row do + %{ + value_type: "[%{uncle_hash: Explorer.Chain.Hash.t(), nephew_hash: Explorer.Chain.Hash.t()]", + value_description: "List of maps of the `t:#{ecto_schema_module()}.t/0` `uncle_hash` and `nephew_hash`" + } + end + + @impl Import.Runner + def run(multi, changes_list, options) when is_map(options) do + timeout = options[:block_second_degree_relations][:timeout] || @timeout + + Multi.run(multi, :block_second_degree_relations, fn _ -> + insert(changes_list, %{timeout: timeout}) + end) end + @impl Import.Runner def timeout, do: @timeout @spec insert([map()], %{required(:timeout) => timeout}) :: {:ok, %{nephew_hash: Hash.Full.t(), uncle_hash: Hash.Full.t()}} | {:error, [Changeset.t()]} - defp insert(changes_list, %{timeout: timeout}) when is_list(changes_list) do + defp insert(changes_list, %{timeout: timeout} = options) when is_list(changes_list) do + on_conflict = Map.get_lazy(options, :on_conflict, &default_on_conflict/0) + # order so that row ShareLocks are grabbed in a consistent order ordered_changes_list = Enum.sort_by(changes_list, &{&1.nephew_hash, &1.uncle_hash}) Import.insert_changes_list(ordered_changes_list, conflict_target: [:nephew_hash, :uncle_hash], - on_conflict: - from( - block_second_degree_relation in Block.SecondDegreeRelation, - update: [ - set: [ - uncle_fetched_at: - fragment("LEAST(?, EXCLUDED.uncle_fetched_at)", block_second_degree_relation.uncle_fetched_at) - ] - ] - ), + on_conflict: on_conflict, for: Block.SecondDegreeRelation, returning: [:nephew_hash, :uncle_hash], timeout: timeout, @@ -65,4 +62,16 @@ defmodule Explorer.Chain.Import.Block.SecondDegreeRelations do timestamps: %{} ) end + + defp default_on_conflict do + from( + block_second_degree_relation in Block.SecondDegreeRelation, + update: [ + set: [ + uncle_fetched_at: + fragment("LEAST(?, EXCLUDED.uncle_fetched_at)", block_second_degree_relation.uncle_fetched_at) + ] + ] + ) + end end diff --git a/apps/explorer/lib/explorer/chain/import/blocks.ex b/apps/explorer/lib/explorer/chain/import/blocks.ex index e8f91f762e..e87190f708 100644 --- a/apps/explorer/lib/explorer/chain/import/blocks.ex +++ b/apps/explorer/lib/explorer/chain/import/blocks.ex @@ -12,61 +12,69 @@ defmodule Explorer.Chain.Import.Blocks do alias Explorer.Chain.{Block, Import, Transaction} alias Explorer.Repo + @behaviour Import.Runner + # milliseconds @timeout 60_000 - @type options :: %{ - required(:params) => Import.params(), - optional(:timeout) => timeout - } @type imported :: [Block.t()] - def run(multi, ecto_schema_module_to_changes_list_map, options) - when is_map(ecto_schema_module_to_changes_list_map) and is_map(options) do - case ecto_schema_module_to_changes_list_map do - %{Block => blocks_changes} -> - timestamps = Map.fetch!(options, :timestamps) - blocks_timeout = options[:blocks][:timeout] || @timeout - where_forked = where_forked(blocks_changes) - - multi - |> Multi.run(:derive_transaction_forks, fn _ -> - derive_transaction_forks(%{ - timeout: options[:transaction_forks][:timeout] || Import.Transaction.Forks.timeout(), - timestamps: timestamps, - where_forked: where_forked - }) - end) - # MUST be after `:derive_transaction_forks`, which depends on values in `transactions` table - |> Multi.run(:fork_transactions, fn _ -> - fork_transactions(%{ - timeout: options[:transactions][:timeout] || Import.Transactions.timeout(), - timestamps: timestamps, - where_forked: where_forked - }) - end) - |> Multi.run(:lose_consenus, fn _ -> - lose_consensus(blocks_changes, %{timeout: blocks_timeout, timestamps: timestamps}) - end) - |> Multi.run(:blocks, fn _ -> - insert(blocks_changes, %{timeout: blocks_timeout, timestamps: timestamps}) - end) - |> Multi.run(:uncle_fetched_block_second_degree_relations, fn %{blocks: blocks} when is_list(blocks) -> - update_block_second_degree_relations( - blocks, - %{ - timeout: - options[:block_second_degree_relations][:timeout] || Import.Block.SecondDegreeRelations.timeout(), - timestamps: timestamps - } - ) - end) - - _ -> - multi - end + @impl Import.Runner + def ecto_schema_module, do: Block + + @impl Import.Runner + def option_key, do: :blocks + + @impl Import.Runner + def imported_table_row do + %{ + value_type: "[#{ecto_schema_module()}.t()]", + value_description: "List of `t:#{ecto_schema_module()}.t/0`s" + } end + @impl Import.Runner + def run(multi, changes_list, options) when is_map(options) do + timestamps = Map.fetch!(options, :timestamps) + blocks_timeout = options[option_key()][:timeout] || @timeout + where_forked = where_forked(changes_list) + + multi + |> Multi.run(:derive_transaction_forks, fn _ -> + derive_transaction_forks(%{ + timeout: options[Import.Transaction.Forks.option_key()][:timeout] || Import.Transaction.Forks.timeout(), + timestamps: timestamps, + where_forked: where_forked + }) + end) + # MUST be after `:derive_transaction_forks`, which depends on values in `transactions` table + |> Multi.run(:fork_transactions, fn _ -> + fork_transactions(%{ + timeout: options[Import.Transactions.option_key()][:timeout] || Import.Transactions.timeout(), + timestamps: timestamps, + where_forked: where_forked + }) + end) + |> Multi.run(:lose_consenus, fn _ -> + lose_consensus(changes_list, %{timeout: blocks_timeout, timestamps: timestamps}) + end) + |> Multi.run(:blocks, fn _ -> + insert(changes_list, %{timeout: blocks_timeout, timestamps: timestamps}) + end) + |> Multi.run(:uncle_fetched_block_second_degree_relations, fn %{blocks: blocks} when is_list(blocks) -> + update_block_second_degree_relations( + blocks, + %{ + timeout: + options[Import.Block.SecondDegreeRelations.option_key()][:timeout] || + Import.Block.SecondDegreeRelations.timeout(), + timestamps: timestamps + } + ) + end) + end + + @impl Import.Runner def timeout, do: @timeout # sobelow_skip ["SQL.Query"] @@ -135,23 +143,21 @@ defmodule Explorer.Chain.Import.Blocks do @spec insert([map()], %{required(:timeout) => timeout, required(:timestamps) => Import.timestamps()}) :: {:ok, [Block.t()]} | {:error, [Changeset.t()]} - defp insert(changes_list, %{timeout: timeout, timestamps: timestamps}) - when is_list(changes_list) do + defp insert(changes_list, %{timeout: timeout, timestamps: timestamps} = options) when is_list(changes_list) do + on_conflict = Map.get(options, :on_conflict, :replace_all) + # order so that row ShareLocks are grabbed in a consistent order ordered_changes_list = Enum.sort_by(changes_list, &{&1.number, &1.hash}) - {:ok, blocks} = - Import.insert_changes_list( - ordered_changes_list, - conflict_target: :hash, - on_conflict: :replace_all, - for: Block, - returning: true, - timeout: timeout, - timestamps: timestamps - ) - - {:ok, blocks} + Import.insert_changes_list( + ordered_changes_list, + conflict_target: :hash, + on_conflict: on_conflict, + for: Block, + returning: true, + timeout: timeout, + timestamps: timestamps + ) end defp lose_consensus(blocks_changes, %{timeout: timeout, timestamps: %{updated_at: updated_at}}) diff --git a/apps/explorer/lib/explorer/chain/import/internal_transactions.ex b/apps/explorer/lib/explorer/chain/import/internal_transactions.ex index 74a2c4182f..02c3dce0cb 100644 --- a/apps/explorer/lib/explorer/chain/import/internal_transactions.ex +++ b/apps/explorer/lib/explorer/chain/import/internal_transactions.ex @@ -11,56 +11,55 @@ defmodule Explorer.Chain.Import.InternalTransactions do import Ecto.Query, only: [from: 2] + @behaviour Import.Runner + # milliseconds @timeout 60_000 - @type options :: %{ - required(:params) => Import.params(), - optional(:timeout) => timeout - } @type imported :: [ %{required(:index) => non_neg_integer(), required(:transaction_hash) => Hash.Full.t()} ] - def run(multi, ecto_schema_module_to_changes_list_map, options) - when is_map(ecto_schema_module_to_changes_list_map) and is_map(options) do - case ecto_schema_module_to_changes_list_map do - %{InternalTransaction => internal_transactions_changes} -> - timestamps = Map.fetch!(options, :timestamps) - - multi - |> Multi.run(:internal_transactions, fn _ -> - insert( - internal_transactions_changes, - %{ - timeout: options[:internal_transactions][:timeout] || @timeout, - timestamps: timestamps - } - ) - end) - |> Multi.run(:internal_transactions_indexed_at_transactions, fn %{internal_transactions: internal_transactions} - when is_list(internal_transactions) -> - update_transactions( - internal_transactions, - %{ - timeout: options[:transactions][:timeout] || Import.Transactions.timeout(), - timestamps: timestamps - } - ) - end) - - _ -> - multi - end + @impl Import.Runner + def ecto_schema_module, do: InternalTransaction + + @impl Import.Runner + def option_key, do: :internal_transactions + + @impl Import.Runner + def imported_table_row do + %{ + value_type: "[%{index: non_neg_integer(), transaction_hash: Explorer.Chain.Hash.t()}]", + value_description: "List of maps of the `t:Explorer.Chain.InternalTransaction.t/0` `index` and `transaction_hash`" + } end + @impl Import.Runner + def run(multi, changes_list, options) when is_map(options) do + timestamps = Map.fetch!(options, :timestamps) + internal_transactions_timeout = options[option_key()][:timeout] || @timeout + transactions_timeout = options[Import.Transactions.option_key()][:timeout] || Import.Transactions.timeout() + + multi + |> Multi.run(:internal_transactions, fn _ -> + insert(changes_list, %{timeout: internal_transactions_timeout, timestamps: timestamps}) + end) + |> Multi.run(:internal_transactions_indexed_at_transactions, fn %{internal_transactions: internal_transactions} + when is_list(internal_transactions) -> + update_transactions(internal_transactions, %{timeout: transactions_timeout, timestamps: timestamps}) + end) + end + + @impl Import.Runner def timeout, do: @timeout @spec insert([map], %{required(:timeout) => timeout, required(:timestamps) => Import.timestamps()}) :: {:ok, [%{index: non_neg_integer, transaction_hash: Hash.t()}]} | {:error, [Changeset.t()]} - defp insert(changes_list, %{timeout: timeout, timestamps: timestamps}) + defp insert(changes_list, %{timeout: timeout, timestamps: timestamps} = options) when is_list(changes_list) do + on_conflict = Map.get(options, :on_conflict, :replace_all) + # order so that row ShareLocks are grabbed in a consistent order ordered_changes_list = Enum.sort_by(changes_list, &{&1.transaction_hash, &1.index}) @@ -69,7 +68,7 @@ defmodule Explorer.Chain.Import.InternalTransactions do ordered_changes_list, conflict_target: [:transaction_hash, :index], for: InternalTransaction, - on_conflict: :replace_all, + on_conflict: on_conflict, returning: [:id, :index, :transaction_hash], timeout: timeout, timestamps: timestamps diff --git a/apps/explorer/lib/explorer/chain/import/logs.ex b/apps/explorer/lib/explorer/chain/import/logs.ex index cd21176476..d5eb6223ef 100644 --- a/apps/explorer/lib/explorer/chain/import/logs.ex +++ b/apps/explorer/lib/explorer/chain/import/logs.ex @@ -8,43 +8,46 @@ defmodule Explorer.Chain.Import.Logs do alias Ecto.{Changeset, Multi} alias Explorer.Chain.{Import, Log} + @behaviour Import.Runner + # milliseconds @timeout 60_000 - @type options :: %{ - required(:params) => Import.params(), - optional(:timeout) => timeout - } @type imported :: [Log.t()] - def run(multi, ecto_schema_module_to_changes_list_map, options) - when is_map(ecto_schema_module_to_changes_list_map) and is_map(options) do - case ecto_schema_module_to_changes_list_map do - %{Log => logs_changes} -> - timestamps = Map.fetch!(options, :timestamps) - - Multi.run(multi, :logs, fn _ -> - insert( - logs_changes, - %{ - timeout: options[:logs][:timeout] || @timeout, - timestamps: timestamps - } - ) - end) - - _ -> - multi - end + @impl Import.Runner + def ecto_schema_module, do: Log + + @impl Import.Runner + def option_key, do: :logs + + @impl Import.Runner + def imported_table_row do + %{ + value_type: "[#{ecto_schema_module()}.t()]", + value_description: "List of `t:#{ecto_schema_module()}.t/0`s" + } end + @impl Import.Runner + def run(multi, changes_list, options) when is_map(options) do + timestamps = Map.fetch!(options, :timestamps) + timeout = options[option_key()][:timeout] || @timeout + + Multi.run(multi, :logs, fn _ -> + insert(changes_list, %{timeout: timeout, timestamps: timestamps}) + end) + end + + @impl Import.Runner def timeout, do: @timeout @spec insert([map()], %{required(:timeout) => timeout, required(:timestamps) => Import.timestamps()}) :: {:ok, [Log.t()]} | {:error, [Changeset.t()]} - defp insert(changes_list, %{timeout: timeout, timestamps: timestamps}) - when is_list(changes_list) do + defp insert(changes_list, %{timeout: timeout, timestamps: timestamps} = options) when is_list(changes_list) do + on_conflict = Map.get(options, :on_conflict, :replace_all) + # order so that row ShareLocks are grabbed in a consistent order ordered_changes_list = Enum.sort_by(changes_list, &{&1.transaction_hash, &1.index}) @@ -52,7 +55,7 @@ defmodule Explorer.Chain.Import.Logs do Import.insert_changes_list( ordered_changes_list, conflict_target: [:transaction_hash, :index], - on_conflict: :replace_all, + on_conflict: on_conflict, for: Log, returning: true, timeout: timeout, diff --git a/apps/explorer/lib/explorer/chain/import/runner.ex b/apps/explorer/lib/explorer/chain/import/runner.ex new file mode 100644 index 0000000000..16f3eaa2cf --- /dev/null +++ b/apps/explorer/lib/explorer/chain/import/runner.ex @@ -0,0 +1,37 @@ +defmodule Explorer.Chain.Import.Runner do + @moduledoc """ + Behaviour used by `Explorer.Chain.Import.all/1` to import data into separate tables. + """ + + alias Ecto.Multi + + @type changeset_function_name :: atom + @type on_conflict :: :nothing | :replace_all | Ecto.Query.t() + + @typedoc """ + Runner-specific options under `c:option_key/0` in all options passed to `c:run/3`. + """ + @type options :: %{ + required(:params) => [map()], + optional(:on_conflict) => on_conflict(), + optional(:timeout) => timeout, + optional(:with) => changeset_function_name() + } + + @doc """ + Key in `t:all_options` used by this `Explorer.Chain.Import` behaviour implementation. + """ + @callback option_key() :: atom() + + @doc """ + Row of markdown table explaining format of `imported` from the module for use in `all/1` docs. + """ + @callback imported_table_row() :: %{value_type: String.t(), value_description: String.t()} + + @doc """ + The `Ecto.Schema` module that contains the `:changeset` function for validating `options[options_key][:params]`. + """ + @callback ecto_schema_module() :: module() + @callback run(Multi.t(), changes_list :: [%{optional(atom()) => term()}], %{optional(atom()) => term()}) :: Multi.t() + @callback timeout() :: timeout() +end diff --git a/apps/explorer/lib/explorer/chain/import/token_transfers.ex b/apps/explorer/lib/explorer/chain/import/token_transfers.ex index d9da70898d..860fd10c78 100644 --- a/apps/explorer/lib/explorer/chain/import/token_transfers.ex +++ b/apps/explorer/lib/explorer/chain/import/token_transfers.ex @@ -8,43 +8,46 @@ defmodule Explorer.Chain.Import.TokenTransfers do alias Ecto.{Changeset, Multi} alias Explorer.Chain.{Import, TokenTransfer} + @behaviour Import.Runner + # milliseconds @timeout 60_000 - @type options :: %{ - required(:params) => Import.params(), - optional(:timeout) => timeout - } @type imported :: [TokenTransfer.t()] - def run(multi, ecto_schema_module_to_changes_list, options) - when is_map(ecto_schema_module_to_changes_list) and is_map(options) do - case ecto_schema_module_to_changes_list do - %{TokenTransfer => token_transfers_changes} -> - timestamps = Map.fetch!(options, :timestamps) - - Multi.run(multi, :token_transfers, fn _ -> - insert( - token_transfers_changes, - %{ - timeout: options[:token_transfers][:timeout] || @timeout, - timestamps: timestamps - } - ) - end) - - _ -> - multi - end + @impl Import.Runner + def ecto_schema_module, do: TokenTransfer + + @impl Import.Runner + def option_key, do: :token_transfers + + @impl Import.Runner + def imported_table_row do + %{ + value_type: "[#{ecto_schema_module()}.t()]", + value_description: "List of `t:#{ecto_schema_module()}.t/0`s" + } end + @impl Import.Runner + def run(multi, changes_list, options) when is_map(options) do + timestamps = Map.fetch!(options, :timestamps) + timeout = options[option_key()][:timeout] || @timeout + + Multi.run(multi, :token_transfers, fn _ -> + insert(changes_list, %{timeout: timeout, timestamps: timestamps}) + end) + end + + @impl Import.Runner def timeout, do: @timeout @spec insert([map()], %{required(:timeout) => timeout(), required(:timestamps) => Import.timestamps()}) :: {:ok, [TokenTransfer.t()]} | {:error, [Changeset.t()]} - def insert(changes_list, %{timeout: timeout, timestamps: timestamps}) - when is_list(changes_list) do + def insert(changes_list, %{timeout: timeout, timestamps: timestamps} = options) when is_list(changes_list) do + on_conflict = Map.get(options, :on_conflict, :replace_all) + # order so that row ShareLocks are grabbed in a consistent order ordered_changes_list = Enum.sort_by(changes_list, &{&1.transaction_hash, &1.log_index}) @@ -52,7 +55,7 @@ defmodule Explorer.Chain.Import.TokenTransfers do Import.insert_changes_list( ordered_changes_list, conflict_target: [:transaction_hash, :log_index], - on_conflict: :replace_all, + on_conflict: on_conflict, for: TokenTransfer, returning: true, timeout: timeout, diff --git a/apps/explorer/lib/explorer/chain/import/tokens.ex b/apps/explorer/lib/explorer/chain/import/tokens.ex index 15b664efeb..f24a0a5af3 100644 --- a/apps/explorer/lib/explorer/chain/import/tokens.ex +++ b/apps/explorer/lib/explorer/chain/import/tokens.ex @@ -5,64 +5,69 @@ defmodule Explorer.Chain.Import.Tokens do require Ecto.Query - alias Ecto.{Changeset, Multi} + alias Ecto.Multi alias Explorer.Chain.{Import, Token} + @behaviour Import.Runner + # milliseconds @timeout 60_000 - @type options :: %{ - required(:params) => Import.params(), - optional(:on_conflict) => :nothing | :replace_all, - optional(:timeout) => timeout - } @type imported :: [Token.t()] - def run(multi, ecto_schema_module_to_changes_list, options) - when is_map(ecto_schema_module_to_changes_list) and is_map(options) do - case ecto_schema_module_to_changes_list do - %{Token => tokens_changes} -> - %{timestamps: timestamps, tokens: %{on_conflict: on_conflict}} = options + @impl Import.Runner + def ecto_schema_module, do: Token - Multi.run(multi, :tokens, fn _ -> - insert( - tokens_changes, - %{ - on_conflict: on_conflict, - timeout: options[:tokens][:timeout] || @timeout, - timestamps: timestamps - } - ) - end) + @impl Import.Runner + def option_key, do: :tokens - _ -> - multi - end + @impl Import.Runner + def imported_table_row do + %{ + value_type: "[#{ecto_schema_module()}.t()]", + value_description: "List of `t:#{ecto_schema_module()}.t/0`s" + } end + @impl Import.Runner + def run(multi, changes_list, options) when is_map(options) do + %{timestamps: timestamps, tokens: %{on_conflict: on_conflict}} = options + timeout = options[option_key()][:timeout] || @timeout + + Multi.run(multi, :tokens, fn _ -> + insert(changes_list, %{on_conflict: on_conflict, timeout: timeout, timestamps: timestamps}) + end) + end + + @impl Import.Runner def timeout, do: @timeout @spec insert([map()], %{ - required(:on_conflict) => Import.on_conflict(), + required(:on_conflict) => Import.Runner.on_conflict(), required(:timeout) => timeout(), required(:timestamps) => Import.timestamps() }) :: {:ok, [Token.t()]} - | {:error, [Changeset.t()]} - def insert(changes_list, %{on_conflict: on_conflict, timeout: timeout, timestamps: timestamps}) - when is_list(changes_list) do - # order so that row ShareLocks are grabbed in a consistent order - ordered_changes_list = Enum.sort_by(changes_list, & &1.contract_address_hash) + | {:error, {:required, :on_conflict}} + def insert(changes_list, %{timeout: timeout, timestamps: timestamps} = options) when is_list(changes_list) do + case options do + %{on_conflict: on_conflict} -> + # order so that row ShareLocks are grabbed in a consistent order + ordered_changes_list = Enum.sort_by(changes_list, & &1.contract_address_hash) + + {:ok, _} = + Import.insert_changes_list( + ordered_changes_list, + conflict_target: :contract_address_hash, + on_conflict: on_conflict, + for: Token, + returning: true, + timeout: timeout, + timestamps: timestamps + ) - {:ok, _} = - Import.insert_changes_list( - ordered_changes_list, - conflict_target: :contract_address_hash, - on_conflict: on_conflict, - for: Token, - returning: true, - timeout: timeout, - timestamps: timestamps - ) + _ -> + {:error, {:required, :on_conflict}} + end end end diff --git a/apps/explorer/lib/explorer/chain/import/transaction/forks.ex b/apps/explorer/lib/explorer/chain/import/transaction/forks.ex index 915f1248c8..2d78b38e6f 100644 --- a/apps/explorer/lib/explorer/chain/import/transaction/forks.ex +++ b/apps/explorer/lib/explorer/chain/import/transaction/forks.ex @@ -10,65 +10,71 @@ defmodule Explorer.Chain.Import.Transaction.Forks do alias Ecto.Multi alias Explorer.Chain.{Hash, Import, Transaction} + @behaviour Import.Runner + # milliseconds @timeout 60_000 - @type options :: %{ - required(:params) => Import.params(), - optional(:timeout) => timeout - } @type imported :: [ %{required(:uncle_hash) => Hash.Full.t(), required(:hash) => Hash.Full.t()} ] - def run(multi, ecto_schema_module_to_changes_list_map, options) - when is_map(ecto_schema_module_to_changes_list_map) and is_map(options) do - case ecto_schema_module_to_changes_list_map do - %{Transaction.Fork => transaction_fork_changes} -> - %{timestamps: timestamps} = options - - Multi.run(multi, :transaction_forks, fn _ -> - insert( - transaction_fork_changes, - %{ - timeout: options[:transaction_forks][:timeout] || @timeout, - timestamps: timestamps - } - ) - end) - - _ -> - multi - end + @impl Import.Runner + def ecto_schema_module, do: Transaction.Fork + + @impl Import.Runner + def option_key, do: :transaction_forks + + @impl Import.Runner + def imported_table_row do + %{ + value_type: "[%{uncle_hash: Explorer.Chain.Hash.t(), hash: Explorer.Chain.Hash.t()}]", + value_description: "List of maps of the `t:#{ecto_schema_module()}.t/0` `uncle_hash` and `hash` " + } + end + + @impl Import.Runner + def run(multi, changes_list, options) when is_map(options) do + %{timestamps: timestamps} = options + timeout = options[option_key()][:timeout] || @timeout + + Multi.run(multi, :transaction_forks, fn _ -> + insert(changes_list, %{timeout: timeout, timestamps: timestamps}) + end) end + @impl Import.Runner def timeout, do: @timeout @spec insert([map()], %{ required(:timeout) => timeout, required(:timestamps) => Import.timestamps() }) :: {:ok, [%{uncle_hash: Hash.t(), hash: Hash.t()}]} - defp insert(changes_list, %{timeout: timeout, timestamps: timestamps}) - when is_list(changes_list) do + defp insert(changes_list, %{timeout: timeout, timestamps: timestamps} = options) when is_list(changes_list) do + on_conflict = Map.get_lazy(options, :on_conflict, &default_on_conflict/0) + # order so that row ShareLocks are grabbed in a consistent order ordered_changes_list = Enum.sort_by(changes_list, &{&1.uncle_hash, &1.hash}) Import.insert_changes_list( ordered_changes_list, conflict_target: [:uncle_hash, :index], - on_conflict: - from( - transaction_fork in Transaction.Fork, - update: [ - set: [ - hash: fragment("EXCLUDED.hash") - ] - ] - ), + on_conflict: on_conflict, for: Transaction.Fork, returning: [:uncle_hash, :hash], timeout: timeout, timestamps: timestamps ) end + + defp default_on_conflict do + from( + transaction_fork in Transaction.Fork, + update: [ + set: [ + hash: fragment("EXCLUDED.hash") + ] + ] + ) + end end diff --git a/apps/explorer/lib/explorer/chain/import/transactions.ex b/apps/explorer/lib/explorer/chain/import/transactions.ex index 685e0b1acd..f718639128 100644 --- a/apps/explorer/lib/explorer/chain/import/transactions.ex +++ b/apps/explorer/lib/explorer/chain/import/transactions.ex @@ -5,50 +5,48 @@ defmodule Explorer.Chain.Import.Transactions do require Ecto.Query - alias Ecto.{Changeset, Multi} + alias Ecto.Multi alias Explorer.Chain.{Hash, Import, Transaction} + @behaviour Import.Runner + # milliseconds @timeout 60_000 - @type options :: %{ - required(:params) => Import.params(), - optional(:with) => Import.changeset_function_name(), - optional(:on_conflict) => :nothing | :replace_all, - optional(:timeout) => timeout - } @type imported :: [Hash.Full.t()] - def run(multi, ecto_schema_module_to_changes_list_map, options) - when is_map(ecto_schema_module_to_changes_list_map) and is_map(options) do - case ecto_schema_module_to_changes_list_map do - %{Transaction => transactions_changes} -> - # check required options as early as possible - %{timestamps: timestamps, transactions: %{on_conflict: on_conflict} = transactions_options} = options + @impl Import.Runner + def ecto_schema_module, do: Transaction + + @impl Import.Runner + def option_key, do: :transactions + + @impl Import.Runner + def imported_table_row do + %{ + value_type: "[#{ecto_schema_module()}.t()]", + value_description: "List of `t:#{ecto_schema_module()}.t/0`s" + } + end - Multi.run(multi, :transactions, fn _ -> - insert( - transactions_changes, - %{ - on_conflict: on_conflict, - timeout: transactions_options[:timeout] || @timeout, - timestamps: timestamps - } - ) - end) + @impl Import.Runner + def run(multi, changes_list, options) when is_map(options) do + %{timestamps: timestamps, transactions: %{on_conflict: on_conflict} = transactions_options} = options + timeout = transactions_options[:timeout] || @timeout - _ -> - multi - end + Multi.run(multi, :transactions, fn _ -> + insert(changes_list, %{on_conflict: on_conflict, timeout: timeout, timestamps: timestamps}) + end) end + @impl Import.Runner def timeout, do: @timeout @spec insert([map()], %{ - required(:on_conflict) => Import.on_conflict(), + required(:on_conflict) => Import.Runner.on_conflict(), required(:timeout) => timeout, required(:timestamps) => Import.timestamps() - }) :: {:ok, [Hash.t()]} | {:error, [Changeset.t()]} + }) :: {:ok, [Hash.t()]} defp insert(changes_list, %{on_conflict: on_conflict, timeout: timeout, timestamps: timestamps}) when is_list(changes_list) do # order so that row ShareLocks are grabbed in a consistent order diff --git a/apps/explorer/lib/explorer/chain/transaction.ex b/apps/explorer/lib/explorer/chain/transaction.ex index 16873920a9..1e814e43ed 100644 --- a/apps/explorer/lib/explorer/chain/transaction.ex +++ b/apps/explorer/lib/explorer/chain/transaction.ex @@ -100,7 +100,7 @@ defmodule Explorer.Chain.Transaction do * `s` - The S field of the signature. The (r, s) is the normal output of an ECDSA signature, where r is computed as the X coordinate of a point R, modulo the curve order n. * `status` - whether the transaction was successfully mined or failed. `nil` when transaction is pending or has only - been collated into one of the `uncles` in one of the `forks. + been collated into one of the `uncles` in one of the `forks`. * `to_address` - sink of `value` * `to_address_hash` - `to_address` foreign key * `uncles` - uncle blocks where `forks` were collated diff --git a/apps/indexer/lib/indexer/block/fetcher.ex b/apps/indexer/lib/indexer/block/fetcher.ex index eecc6a054c..7ad5e403ff 100644 --- a/apps/indexer/lib/indexer/block/fetcher.ex +++ b/apps/indexer/lib/indexer/block/fetcher.ex @@ -23,16 +23,16 @@ defmodule Indexer.Block.Fetcher do %{ address_hash_to_fetched_balance_block_number: address_hash_to_fetched_balance_block_number, transaction_hash_to_block_number_option: transaction_hash_to_block_number, - addresses: Import.Addresses.options(), - address_coin_balances: Import.Address.CoinBalances.options(), - address_token_balances: Import.Address.TokenBalances.options(), - blocks: Import.Blocks.options(), - block_second_degree_relations: Import.Block.SecondDegreeRelations.options(), + addresses: Import.Runner.options(), + address_coin_balances: Import.Runner.options(), + address_token_balances: Import.Runner.options(), + blocks: Import.Runner.options(), + block_second_degree_relations: Import.Runner.options(), broadcast: boolean, - logs: Import.Logs.options(), - token_transfers: Import.TokenTransfers.options(), - tokens: Import.Tokens.options(), - transactions: Import.Transactions.options() + logs: Import.Runner.options(), + token_transfers: Import.Runner.options(), + tokens: Import.Runner.options(), + transactions: Import.Runner.options() } ) :: Import.all_result() @@ -124,7 +124,6 @@ defmodule Indexer.Block.Fetcher do blocks: %{params: blocks}, block_second_degree_relations: %{params: block_second_degree_relations}, logs: %{params: logs}, - receipts: %{params: receipts}, token_transfers: %{params: token_transfers}, tokens: %{on_conflict: :nothing, params: tokens}, transactions: %{params: transactions_with_receipts, on_conflict: :replace_all} diff --git a/apps/indexer/lib/indexer/block/uncle/fetcher.ex b/apps/indexer/lib/indexer/block/uncle/fetcher.ex index 032679b12c..75ca77a885 100644 --- a/apps/indexer/lib/indexer/block/uncle/fetcher.ex +++ b/apps/indexer/lib/indexer/block/uncle/fetcher.ex @@ -98,10 +98,13 @@ defmodule Indexer.Block.Uncle.Fetcher do end end + @ignored_options ~w(address_hash_to_fetched_balance_block_number transaction_hash_to_block_number)a + @impl Block.Fetcher def import(_, options) when is_map(options) do with {:ok, %{block_second_degree_relations: block_second_degree_relations}} = ok <- options + |> Map.drop(@ignored_options) |> uncle_blocks() |> fork_transactions() |> Chain.import() do