From b96891ced2373592a0b0f37d07ec9057064107df Mon Sep 17 00:00:00 2001 From: Luke Imhoff Date: Tue, 24 Jul 2018 16:09:42 -0500 Subject: [PATCH 1/5] Move import functions into Explorer.Chain.Import Context needs to stay under `Explorer.Chain` because `Explorer.Chain.*` schemas, which only respects the `Explorer.Chain` API boundard when `Import` stays below `Explorer.Chain`. --- apps/explorer/lib/explorer/chain.ex | 687 +------------------ apps/explorer/lib/explorer/chain/import.ex | 688 ++++++++++++++++++++ apps/explorer/test/explorer/import_test.exs | 5 + 3 files changed, 704 insertions(+), 676 deletions(-) create mode 100644 apps/explorer/lib/explorer/chain/import.ex create mode 100644 apps/explorer/test/explorer/import_test.exs diff --git a/apps/explorer/lib/explorer/chain.ex b/apps/explorer/lib/explorer/chain.ex index 05bf71c17e..5a413168c7 100644 --- a/apps/explorer/lib/explorer/chain.ex +++ b/apps/explorer/lib/explorer/chain.ex @@ -17,7 +17,6 @@ defmodule Explorer.Chain do ] alias Ecto.Adapters.SQL - alias Ecto.{Changeset, Multi} alias Explorer.Chain.{ Address, @@ -25,6 +24,7 @@ defmodule Explorer.Chain do Block, Data, Hash, + Import, InternalTransaction, Log, Transaction, @@ -62,21 +62,7 @@ defmodule Explorer.Chain do @type necessity_by_association :: %{association => necessity} @typep necessity_by_association_option :: {:necessity_by_association, necessity_by_association} - @typep on_conflict_option :: {:on_conflict, :nothing | :replace_all} @typep paging_options :: {:paging_options, PagingOptions.t()} - @typep params_option :: {:params, [map()]} - @typep timeout_option :: {:timeout, timeout} - @typep timestamps :: %{inserted_at: DateTime.t(), updated_at: DateTime.t()} - @typep timestamps_option :: {:timestamps, timestamps} - @typep addresses_option :: {:addresses, [params_option | timeout_option | with_option]} - @typep balances_option :: {:balances, [params_option | timeout_option]} - @typep blocks_option :: {:blocks, [params_option | timeout_option]} - @typep broadcast_option :: {:broadcast, Boolean} - @typep internal_transactions_option :: {:internal_transactions, [params_option | timeout_option]} - @typep logs_option :: {:logs, [params_option | timeout_option]} - @typep receipts_option :: {:receipts, [params_option | timeout_option]} - @typep transactions_option :: {:transactions, [on_conflict_option | params_option | timeout_option]} - @typep with_option :: {:with, changeset_function_name :: atom} @doc """ Estimated count of `t:Explorer.Chain.Address.t/0`. @@ -214,17 +200,6 @@ defmodule Explorer.Chain do end end - # timeouts all in milliseconds - - @transaction_timeout 120_000 - @insert_addresses_timeout 60_000 - @insert_balances_timeout 60_000 - @insert_blocks_timeout 60_000 - @insert_internal_transactions_timeout 60_000 - @insert_logs_timeout 60_000 - @insert_transactions_timeout 60_000 - @update_transactions_timeout 60_000 - @doc """ The number of `t:Explorer.Chain.Block.t/0`. @@ -642,192 +617,23 @@ defmodule Explorer.Chain do end @doc """ - Bulk insert blocks from a list of blocks. - - The import returns the unique key(s) for each type of record inserted. - - | Key | Value Type | Value Description | - |--------------------------|-------------------------------------------------------------------------------------------------|-----------------------------------------------------------------------------------------------| - | `:addresses` | `[Explorer.Chain.Address.t()]` | List of `t:Explorer.Chain.Address.t/0`s | - | `:balances` | `[%{address_hash: Explorer.Chain.Hash.t(), block_number: Explorer.Chain.Block.block_number()}]` | List of `t:Explorer.Chain.Address.t/0`s | - | `:blocks` | `[Explorer.Chain.Block.t()]` | List of `t:Explorer.Chain.Block.t/0`s | - | `:broacast` | `Boolean` | Boolean of whether to broadcast | - | `:internal_transactions` | `[%{index: non_neg_integer(), transaction_hash: Explorer.Chain.Hash.t()}]` | List of maps of the `t:Explorer.Chain.InternalTransaction.t/0` `index` and `transaction_hash` | - | `:logs` | `[Explorer.Chain.Log.t()]` | List of `t:Explorer.Chain.Log.t/0`s | - | `:transactions` | `[Explorer.Chain.Hash.t()]` | List of `t:Explorer.Chain.Transaction.t/0` `hash` | - - A completely empty tree can be imported, but options must still be supplied. It is a non-zero amount of time to - process the empty options, so if there is nothing to import, you should avoid calling - `Explorer.Chain.import_blocks/1`. If you don't supply any options with params, then nothing is run so there result is - an empty map. - - iex> Explorer.Chain.import_blocks([]) - {:ok, %{}} - - The params for each key are validated using the corresponding `Ecto.Schema` module's `changeset/2` function. If there - are errors, they are returned in `Ecto.Changeset.t`s, so that the original, invalid value can be reconstructed for any - error messages. - - Because there are multiple processes potentially writing to the same tables at the same time, - `c:Ecto.Repo.insert_all/2`'s - [`:conflict_target` and `:on_conflict` options](https://hexdocs.pm/ecto/Ecto.Repo.html#c:insert_all/3-options) are - used to perform [upserts](https://hexdocs.pm/ecto/Ecto.Repo.html#c:insert_all/3-upserts) on all tables, so that - a pre-existing unique key will not trigger a failure, but instead replace or otherwise update the row. - - ## Data Notifications + Bulk insert all data stored in the `Explorer`. - On successful inserts, processes interested in certain domains of data will be notified - that new data has been inserted. See `Explorer.Chain.subscribe_to_events/1` for more information. - - ## Tree - - * `t:Explorer.Chain.Block.t/0`s - * `t:Explorer.Chain.Transaction.t/0` - * `t.Explorer.Chain.InternalTransaction.t/0` - * `t.Explorer.Chain.Log.t/0` - - ## Options - - * `:addresses` - * `:params` - `list` of params for `Explorer.Chain.Address.changeset/2`. - * `:timeout` - the timeout for inserting all addresses. Defaults to `#{@insert_addresses_timeout}` milliseconds. - * `:balances` - * `:params` - `list` of params for `Explorer.Chain.Balance.changeset/2`. - * `:timeout` - the timeout for inserting all balances. Defaults to `#{@insert_balances_timeout}` milliseconds. - * `:blocks` - * `:params` - `list` of params for `Explorer.Chain.Block.changeset/2`. - * `:timeout` - the timeout for inserting all blocks. Defaults to `#{@insert_blocks_timeout}` milliseconds. - * `:broacast` - Boolean flag indicating whether or not to broadcast the event. - * `:internal_transactions` - * `:params` - `list` of params for `Explorer.Chain.InternalTransaction.changeset/2`. - * `:timeout` - the timeout for inserting all internal transactions. Defaults to - `#{@insert_internal_transactions_timeout}` milliseconds. - * `:logs` - * `:params` - `list` of params for `Explorer.Chain.Log.changeset/2`. - * `:timeout` - the timeout for inserting all logs. Defaults to `#{@insert_logs_timeout}` milliseconds. - * `:timeout` - the timeout for the whole `c:Ecto.Repo.transaction/0` call. Defaults to `#{@transaction_timeout}` - milliseconds. - * `:transactions` - * `:on_conflict` - Whether to do `:nothing` or `:replace_all` columns when there is a pre-existing transaction - with the same hash. - - *NOTE*: Because the repository transaction for a pending `Explorer.Chain.Transaction`s could `COMMIT` after the - repository transaction for that same transaction being collated into a block, writers, it is recomended to use - `:nothing` for pending transactions and `:replace_all` for collated transactions, so that collated transactions - win. - * `:params` - `list` of params for `Explorer.Chain.Transaction.changeset/2`. - * `:timeout` - the timeout for inserting all transactions found in the params lists across all - types. Defaults to `#{@insert_transactions_timeout}` milliseconds. + See `Explorer.Chain.Import.all/1` for options and returns. """ - @spec import_blocks([ - addresses_option - | balances_option - | blocks_option - | broadcast_option - | internal_transactions_option - | logs_option - | receipts_option - | timeout_option - | transactions_option - ]) :: - {:ok, - %{ - optional(:addresses) => [Address.t()], - optional(:balances) => [ - %{required(:address_hash) => Hash.Address.t(), required(:block_number) => Block.block_number()} - ], - optional(:blocks) => [Block.t()], - optional(:broadcast) => Boolean, - optional(:internal_transactions) => [ - %{required(:index) => non_neg_integer(), required(:transaction_hash) => Hash.Full.t()} - ], - optional(:logs) => [Log.t()], - optional(:receipts) => [Hash.Full.t()], - optional(:transactions) => [Hash.Full.t()] - }} - | {:error, [Changeset.t()]} - | {:error, step :: Ecto.Multi.name(), failed_value :: any(), - changes_so_far :: %{optional(Ecto.Multi.name()) => any()}} - def import_blocks(options) when is_list(options) do - broadcast = - case Keyword.fetch(options, :broadcast) do - {:ok, broadcast} -> broadcast - :error -> false - end - - changes_list_arguments_list = import_options_to_changes_list_arguments_list(options) - - with {:ok, ecto_schema_module_to_changes_list} <- - changes_list_arguments_list_to_ecto_schema_module_to_changes_list(changes_list_arguments_list), - {:ok, data} <- insert_ecto_schema_module_to_changes_list(ecto_schema_module_to_changes_list, options) do - if broadcast, do: broadcast_events(data) - {:ok, data} - end + @spec import_blocks(Import.all_options()) :: Import.all_result() + def import_blocks(options) do + Import.all(options) end @doc """ - Bulk insert internal transactions for a list of transactions. + Bulk insert internal transactions and update `t:Explorer.Chain.Transaction.t/0` `internal_transactions_indexed_at`. - ## Options - - * `:addresses` - * `:params` - `list` of params for `Explorer.Chain.Address.changeset/2`. - * `:timeout` - the timeout for inserting all addresses. Defaults to `#{@insert_addresses_timeout}` milliseconds. - * `:internal_transactions` - * `:params` - `list` of params for `Explorer.Chain.InternalTransaction.changeset/2`. - * `:timeout` - the timeout for inserting all internal transactions. Defaults to - `#{@insert_internal_transactions_timeout}` milliseconds. - * `:transactions` - * `:hashes` - `list` of `t:Explorer.Chain.Transaction.t/0` `hash`es that should have their - `internal_transactions_indexed_at` updated. - * `:timeout` - the timeout for updating transactions with `:hashes`. Defaults to - `#{@update_transactions_timeout}` milliseconds. - * `:timeout` - the timeout for the whole `c:Ecto.Repo.transaction/0` call. Defaults to `#{@transaction_timeout}` - milliseconds. + See `Explorer.Chain.Import.internal_transactions/1` for options and returns. """ - @spec import_internal_transactions([ - addresses_option - | internal_transactions_option - | timeout_option - | {:transactions, [{:hashes, [String.t()]} | timeout_option]} - ]) :: - {:ok, - %{ - optional(:addresses) => [Hash.Address.t()], - optional(:internal_transactions) => [ - %{required(:index) => non_neg_integer(), required(:transaction_hash) => Hash.Full.t()} - ] - }} - | {:error, [Changeset.t()]} - | {:error, step :: Ecto.Multi.name(), failed_value :: any(), - changes_so_far :: %{optional(Ecto.Multi.name()) => any()}} - def import_internal_transactions(options) when is_list(options) do - {transactions_options, import_options} = Keyword.pop(options, :transactions) - changes_list_options_list = import_options_to_changes_list_arguments_list(import_options) - - with {:ok, ecto_schema_module_to_changes_list} <- - changes_list_arguments_list_to_ecto_schema_module_to_changes_list(changes_list_options_list) do - timestamps = timestamps() - - ecto_schema_module_to_changes_list - |> ecto_schema_module_to_changes_list_to_multi(Keyword.put(options, :timestamps, timestamps)) - |> Multi.run(:transactions, fn _ -> - transaction_hashes = Keyword.get(transactions_options, :hashes) - transactions_count = length(transaction_hashes) - - query = - from( - t in Transaction, - where: t.hash in ^transaction_hashes, - update: [set: [internal_transactions_indexed_at: ^timestamps.updated_at]] - ) - - {^transactions_count, result} = Repo.update_all(query, []) - - {:ok, result} - end) - |> import_transaction(options) - end + @spec import_internal_transactions(Import.internal_transactions_options()) :: Import.internal_transactions_result() + def import_internal_transactions(options) do + Import.internal_transactions(options) end @doc """ @@ -1737,80 +1543,6 @@ defmodule Explorer.Chain do Repo.one(query) end - defp broadcast_event_data(event_type, event_data) do - Registry.dispatch(Registry.ChainEvents, event_type, fn entries -> - for {pid, _registered_val} <- entries do - send(pid, {:chain_event, event_type, event_data}) - end - end) - end - - defp broadcast_events(data) do - for {event_type, event_data} <- data, event_type in ~w(addresses balances blocks logs transactions)a do - broadcast_event_data(event_type, event_data) - end - end - - @spec changes_list(params :: [map], [{:for, module} | {:with, atom}]) :: {:ok, [map]} | {:error, [Changeset.t()]} - defp changes_list(params, options) when is_list(options) do - ecto_schema_module = Keyword.fetch!(options, :for) - changeset_function_name = Keyword.get(options, :with, :changeset) - struct = ecto_schema_module.__struct__() - - {status, acc} = - params - |> Stream.map(&apply(ecto_schema_module, changeset_function_name, [struct, &1])) - |> Enum.reduce({:ok, []}, fn - changeset = %Changeset{valid?: false}, {:ok, _} -> - {:error, [changeset]} - - changeset = %Changeset{valid?: false}, {:error, acc_changesets} -> - {:error, [changeset | acc_changesets]} - - %Changeset{changes: changes, valid?: true}, {:ok, acc_changes} -> - {:ok, [changes | acc_changes]} - - %Changeset{valid?: true}, {:error, _} = error -> - error - end) - - {status, Enum.reverse(acc)} - end - - defp ecto_schema_module_to_changes_list_to_multi(ecto_schema_module_to_changes_list, options) when is_list(options) do - timestamps = timestamps() - full_options = Keyword.put(options, :timestamps, timestamps) - - Multi.new() - |> run_addresses(ecto_schema_module_to_changes_list, full_options) - |> run_balances(ecto_schema_module_to_changes_list, full_options) - |> run_blocks(ecto_schema_module_to_changes_list, full_options) - |> run_transactions(ecto_schema_module_to_changes_list, full_options) - |> run_internal_transactions(ecto_schema_module_to_changes_list, full_options) - |> run_logs(ecto_schema_module_to_changes_list, full_options) - end - - defp changes_list_arguments_list_to_ecto_schema_module_to_changes_list(changes_list_arguments_list) do - changes_list_arguments_list - |> Stream.map(fn [params_list, options] -> - ecto_schema_module = Keyword.fetch!(options, :for) - {ecto_schema_module, changes_list(params_list, options)} - end) - |> Enum.reduce({:ok, %{}}, fn - {ecto_schema_module, {:ok, changes_list}}, {:ok, ecto_schema_module_to_changes_list} -> - {:ok, Map.put(ecto_schema_module_to_changes_list, ecto_schema_module, changes_list)} - - {_, {:ok, _}}, {:error, _} = error -> - error - - {_, {:error, _} = error}, {:ok, _} -> - error - - {_, {:error, changesets}}, {:error, acc_changesets} -> - {:error, acc_changesets ++ changesets} - end) - end - defp fetch_transactions(paging_options \\ nil) do Transaction |> select_merge([transaction], %{ @@ -1840,271 +1572,6 @@ defmodule Explorer.Chain do ) end - @spec insert_addresses([%{hash: Hash.Address.t()}], [timeout_option | timestamps_option | with_option]) :: - {:ok, [Hash.Address.t()]} - defp insert_addresses(changes_list, named_arguments) - when is_list(changes_list) and is_list(named_arguments) do - timestamps = Keyword.fetch!(named_arguments, :timestamps) - timeout = Keyword.fetch!(named_arguments, :timeout) - - # order so that row ShareLocks are grabbed in a consistent order - ordered_changes_list = sort_address_changes_list(changes_list) - - insert_changes_list( - ordered_changes_list, - conflict_target: :hash, - on_conflict: - from( - address in Address, - update: [ - set: [ - contract_code: fragment("COALESCE(?, EXCLUDED.contract_code)", address.contract_code), - # ARGMAX on two columns - fetched_balance: - fragment( - """ - CASE WHEN EXCLUDED.fetched_balance_block_number IS NOT NULL AND - (? IS NULL OR - EXCLUDED.fetched_balance_block_number >= ?) THEN - EXCLUDED.fetched_balance - ELSE ? - END - """, - address.fetched_balance_block_number, - address.fetched_balance_block_number, - address.fetched_balance - ), - # MAX on two columns - fetched_balance_block_number: - fragment( - """ - CASE WHEN EXCLUDED.fetched_balance_block_number IS NOT NULL AND - (? IS NULL OR - EXCLUDED.fetched_balance_block_number >= ?) THEN - EXCLUDED.fetched_balance_block_number - ELSE ? - END - """, - address.fetched_balance_block_number, - address.fetched_balance_block_number, - address.fetched_balance_block_number - ) - ] - ] - ), - for: Address, - returning: true, - timeout: timeout, - timestamps: timestamps - ) - end - - defp sort_address_changes_list(changes_list) do - Enum.sort_by(changes_list, & &1.hash) - end - - @import_option_key_to_ecto_schema_module %{ - addresses: Address, - balances: Balance, - blocks: Block, - internal_transactions: InternalTransaction, - logs: Log, - transactions: Transaction - } - - defp import_options_to_changes_list_arguments_list(options) do - Enum.flat_map(@import_option_key_to_ecto_schema_module, fn {option_key, ecto_schema_module} -> - case Keyword.fetch(options, option_key) do - {:ok, option_value} when is_list(option_value) -> - [ - [ - Keyword.fetch!(option_value, :params), - [for: ecto_schema_module, with: Keyword.get(option_value, :with, :changeset)] - ] - ] - - :error -> - [] - end - end) - end - - @spec insert_balances( - [ - %{ - required(:address_hash) => Hash.Address.t(), - required(:block_number) => Block.block_number(), - required(:value) => Wei.t() - } - ], - [timeout_option] - ) :: - {:ok, [%{required(:address_hash) => Hash.Address.t(), required(:block_number) => Block.block_number()}]} - | {:error, [Changeset.t()]} - defp insert_balances(changes_list, named_arguments) when is_list(changes_list) and is_list(named_arguments) do - timestamps = Keyword.fetch!(named_arguments, :timestamps) - timeout = Keyword.fetch!(named_arguments, :timeout) - - # order so that row ShareLocks are grabbed in a consistent order - ordered_changes_list = Enum.sort_by(changes_list, &{&1.address_hash, &1.block_number}) - - {:ok, _} = - insert_changes_list( - ordered_changes_list, - conflict_target: [:address_hash, :block_number], - on_conflict: - from( - balance in Balance, - update: [ - set: [ - inserted_at: fragment("LEAST(EXCLUDED.inserted_at, ?)", balance.inserted_at), - updated_at: fragment("GREATEST(EXCLUDED.updated_at, ?)", balance.updated_at), - value: - fragment( - """ - CASE WHEN EXCLUDED.updated_at > ? THEN EXCLUDED.value - ELSE ? - END - """, - balance.updated_at, - balance.value - ) - ] - ] - ), - for: Balance, - timeout: timeout, - timestamps: timestamps - ) - - {:ok, Enum.map(ordered_changes_list, &Map.take(&1, ~w(address_hash block_number)a))} - end - - @spec insert_blocks([map()], [timeout_option | timestamps_option]) :: {:ok, [Block.t()]} | {:error, [Changeset.t()]} - defp insert_blocks(changes_list, named_arguments) - when is_list(changes_list) and is_list(named_arguments) do - timestamps = Keyword.fetch!(named_arguments, :timestamps) - timeout = Keyword.fetch!(named_arguments, :timeout) - - # order so that row ShareLocks are grabbed in a consistent order - ordered_changes_list = Enum.sort_by(changes_list, &{&1.number, &1.hash}) - - {:ok, blocks} = - insert_changes_list( - ordered_changes_list, - conflict_target: :number, - on_conflict: :replace_all, - for: Block, - returning: true, - timeout: timeout, - timestamps: timestamps - ) - - {:ok, blocks} - end - - defp insert_ecto_schema_module_to_changes_list(ecto_schema_module_to_changes_list, options) do - timestamps = timestamps() - - ecto_schema_module_to_changes_list - |> ecto_schema_module_to_changes_list_to_multi(Keyword.put(options, :timestamps, timestamps)) - |> import_transaction(options) - end - - defp import_transaction(multi, options) when is_list(options) do - Repo.transaction(multi, timeout: Keyword.get(options, :timeout, @transaction_timeout)) - end - - @spec insert_internal_transactions([map], [timeout_option | timestamps_option]) :: - {:ok, [%{index: non_neg_integer, transaction_hash: Hash.t()}]} - | {:error, [Changeset.t()]} - defp insert_internal_transactions(changes_list, named_arguments) - when is_list(changes_list) and is_list(named_arguments) do - timestamps = Keyword.fetch!(named_arguments, :timestamps) - - # order so that row ShareLocks are grabbed in a consistent order - ordered_changes_list = Enum.sort_by(changes_list, &{&1.transaction_hash, &1.index}) - - {:ok, internal_transactions} = - insert_changes_list( - ordered_changes_list, - conflict_target: [:transaction_hash, :index], - for: InternalTransaction, - on_conflict: :replace_all, - returning: [:index, :transaction_hash], - timestamps: timestamps - ) - - {:ok, - for( - internal_transaction <- internal_transactions, - do: Map.take(internal_transaction, [:index, :transaction_hash]) - )} - end - - @spec insert_logs([map()], [timeout_option | timestamps_option]) :: - {:ok, [Log.t()]} - | {:error, [Changeset.t()]} - defp insert_logs(changes_list, named_arguments) - when is_list(changes_list) and is_list(named_arguments) do - timestamps = Keyword.fetch!(named_arguments, :timestamps) - timeout = Keyword.fetch!(named_arguments, :timeout) - - # order so that row ShareLocks are grabbed in a consistent order - ordered_changes_list = Enum.sort_by(changes_list, &{&1.transaction_hash, &1.index}) - - {:ok, _} = - insert_changes_list( - ordered_changes_list, - conflict_target: [:transaction_hash, :index], - on_conflict: :replace_all, - for: Log, - returning: true, - timeout: timeout, - timestamps: timestamps - ) - end - - defp insert_changes_list(changes_list, options) when is_list(changes_list) do - ecto_schema_module = Keyword.fetch!(options, :for) - - timestamped_changes_list = timestamp_changes_list(changes_list, Keyword.fetch!(options, :timestamps)) - - {_, inserted} = - Repo.safe_insert_all( - ecto_schema_module, - timestamped_changes_list, - Keyword.delete(options, :for) - ) - - {:ok, inserted} - end - - @spec insert_transactions([map()], [on_conflict_option | timeout_option | timestamps_option]) :: - {:ok, [Hash.t()]} | {:error, [Changeset.t()]} - defp insert_transactions(changes_list, named_arguments) - when is_list(changes_list) and is_list(named_arguments) do - timestamps = Keyword.fetch!(named_arguments, :timestamps) - timeout = Keyword.fetch!(named_arguments, :timeout) - on_conflict = Keyword.fetch!(named_arguments, :on_conflict) - - # order so that row ShareLocks are grabbed in a consistent order - ordered_changes_list = Enum.sort_by(changes_list, & &1.hash) - - {:ok, transactions} = - insert_changes_list( - ordered_changes_list, - conflict_target: :hash, - on_conflict: on_conflict, - for: Transaction, - returning: [:hash], - timeout: timeout, - timestamps: timestamps - ) - - {:ok, for(transaction <- transactions, do: transaction.hash)} - end - defp handle_paging_options(query, nil), do: query defp handle_paging_options(query, paging_options) do @@ -2183,138 +1650,6 @@ defmodule Explorer.Chain do where(query, [transaction], transaction.index < ^index) end - defp run_addresses(multi, ecto_schema_module_to_changes_list, options) - when is_map(ecto_schema_module_to_changes_list) and is_list(options) do - case ecto_schema_module_to_changes_list do - %{Address => addresses_changes} -> - timestamps = Keyword.fetch!(options, :timestamps) - - Multi.run(multi, :addresses, fn _ -> - insert_addresses( - addresses_changes, - timeout: options[:addresses][:timeout] || @insert_addresses_timeout, - timestamps: timestamps - ) - end) - - _ -> - multi - end - end - - defp run_balances(multi, ecto_schema_module_to_changes_list, options) - when is_map(ecto_schema_module_to_changes_list) and is_list(options) do - case ecto_schema_module_to_changes_list do - %{Balance => balances_changes} -> - timestamps = Keyword.fetch!(options, :timestamps) - - Multi.run(multi, :balances, fn _ -> - insert_balances( - balances_changes, - timeout: options[:balances][:timeout] || @insert_balances_timeout, - timestamps: timestamps - ) - end) - - _ -> - multi - end - end - - defp run_blocks(multi, ecto_schema_module_to_changes_list, options) - when is_map(ecto_schema_module_to_changes_list) and is_list(options) do - case ecto_schema_module_to_changes_list do - %{Block => blocks_changes} -> - timestamps = Keyword.fetch!(options, :timestamps) - - Multi.run(multi, :blocks, fn _ -> - insert_blocks( - blocks_changes, - timeout: options[:blocks][:timeout] || @insert_blocks_timeout, - timestamps: timestamps - ) - end) - - _ -> - multi - end - end - - defp run_transactions(multi, ecto_schema_module_to_changes_list, options) - when is_map(ecto_schema_module_to_changes_list) and is_list(options) do - case ecto_schema_module_to_changes_list do - %{Transaction => transactions_changes} -> - # check required options as early as possible - transactions_options = Keyword.fetch!(options, :transactions) - on_conflict = Keyword.fetch!(transactions_options, :on_conflict) - timestamps = Keyword.fetch!(options, :timestamps) - - Multi.run(multi, :transactions, fn _ -> - insert_transactions( - transactions_changes, - on_conflict: on_conflict, - timeout: transactions_options[:timeout] || @insert_transactions_timeout, - timestamps: timestamps - ) - end) - - _ -> - multi - end - end - - defp run_internal_transactions(multi, ecto_schema_module_to_changes_list, options) - when is_map(ecto_schema_module_to_changes_list) and is_list(options) do - case ecto_schema_module_to_changes_list do - %{InternalTransaction => internal_transactions_changes} -> - timestamps = Keyword.fetch!(options, :timestamps) - - Multi.run(multi, :internal_transactions, fn _ -> - insert_internal_transactions( - internal_transactions_changes, - timeout: options[:internal_transactions][:timeout] || @insert_internal_transactions_timeout, - timestamps: timestamps - ) - end) - - _ -> - multi - end - end - - defp run_logs(multi, ecto_schema_module_to_changes_list, options) - when is_map(ecto_schema_module_to_changes_list) and is_list(options) do - case ecto_schema_module_to_changes_list do - %{Log => logs_changes} -> - timestamps = Keyword.fetch!(options, :timestamps) - - Multi.run(multi, :logs, fn _ -> - insert_logs( - logs_changes, - timeout: options[:logs][:timeout] || @insert_logs_timeout, - timestamps: timestamps - ) - end) - - _ -> - multi - end - end - - defp timestamp_params(changes, timestamps) when is_map(changes) do - Map.merge(changes, timestamps) - end - - defp timestamp_changes_list(changes_list, timestamps) when is_list(changes_list) do - Enum.map(changes_list, ×tamp_params(&1, timestamps)) - end - - @spec timestamps() :: timestamps - defp timestamps do - now = DateTime.utc_now() - %{inserted_at: now, updated_at: now} - end - defp where_address_fields_match(query, address_hash, :to) do where(query, [t], t.to_address_hash == ^address_hash) end diff --git a/apps/explorer/lib/explorer/chain/import.ex b/apps/explorer/lib/explorer/chain/import.ex new file mode 100644 index 0000000000..8978d39209 --- /dev/null +++ b/apps/explorer/lib/explorer/chain/import.ex @@ -0,0 +1,688 @@ +defmodule Explorer.Chain.Import do + @moduledoc """ + Bulk importing of data into `Explorer.Repo` + """ + + import Ecto.Query, only: [from: 2] + + alias Ecto.{Changeset, Multi} + alias Explorer.Chain.{Address, Balance, Block, Hash, InternalTransaction, Log, Transaction, Wei} + alias Explorer.Repo + + @typep addresses_option :: {:addresses, [params_option | timeout_option | with_option]} + @typep balances_option :: {:balances, [params_option | timeout_option]} + @typep blocks_option :: {:blocks, [params_option | timeout_option]} + @typep broadcast_option :: {:broadcast, Boolean} + @typep internal_transactions_option :: {:internal_transactions, [params_option | timeout_option]} + @typep logs_option :: {:logs, [params_option | timeout_option]} + @typep on_conflict_option :: {:on_conflict, :nothing | :replace_all} + @typep params_option :: {:params, [map()]} + @typep receipts_option :: {:receipts, [params_option | timeout_option]} + @typep timeout_option :: {:timeout, timeout} + @typep timestamps :: %{inserted_at: DateTime.t(), updated_at: DateTime.t()} + @typep timestamps_option :: {:timestamps, timestamps} + @typep transactions_option :: {:transactions, [on_conflict_option | params_option | timeout_option]} + @typep with_option :: {:with, changeset_function_name :: atom} + + @type all_options :: [ + addresses_option + | balances_option + | blocks_option + | broadcast_option + | internal_transactions_option + | logs_option + | receipts_option + | timeout_option + | transactions_option + ] + @type all_result :: + {:ok, + %{ + optional(:addresses) => [Address.t()], + optional(:balances) => [ + %{required(:address_hash) => Hash.Address.t(), required(:block_number) => Block.block_number()} + ], + optional(:blocks) => [Block.t()], + optional(:internal_transactions) => [ + %{required(:index) => non_neg_integer(), required(:transaction_hash) => Hash.Full.t()} + ], + optional(:logs) => [Log.t()], + optional(:receipts) => [Hash.Full.t()], + optional(:transactions) => [Hash.Full.t()] + }} + | {:error, [Changeset.t()]} + | {:error, step :: Ecto.Multi.name(), failed_value :: any(), + changes_so_far :: %{optional(Ecto.Multi.name()) => any()}} + + @type internal_transactions_options :: [ + addresses_option + | internal_transactions_option + | timeout_option + | {:transactions, [{:hashes, [String.t()]} | timeout_option]} + ] + @type internal_transactions_result :: + {:ok, + %{ + optional(:addresses) => [Hash.Address.t()], + optional(:internal_transactions) => [ + %{required(:index) => non_neg_integer(), required(:transaction_hash) => Hash.Full.t()} + ] + }} + | {:error, [Changeset.t()]} + | {:error, step :: Ecto.Multi.name(), failed_value :: any(), + changes_so_far :: %{optional(Ecto.Multi.name()) => any()}} + + # timeouts all in milliseconds + + @transaction_timeout 120_000 + + @insert_addresses_timeout 60_000 + @insert_balances_timeout 60_000 + @insert_blocks_timeout 60_000 + @insert_internal_transactions_timeout 60_000 + @insert_logs_timeout 60_000 + @insert_transactions_timeout 60_000 + + @update_transactions_timeout 60_000 + + @doc """ + Bulk insert all data stored in the `Explorer`. + + The import returns the unique key(s) for each type of record inserted. + + | Key | Value Type | Value Description | + |--------------------------|-------------------------------------------------------------------------------------------------|-----------------------------------------------------------------------------------------------| + | `:addresses` | `[Explorer.Chain.Address.t()]` | List of `t:Explorer.Chain.Address.t/0`s | + | `:balances` | `[%{address_hash: Explorer.Chain.Hash.t(), block_number: Explorer.Chain.Block.block_number()}]` | List of `t:Explorer.Chain.Address.t/0`s | + | `:blocks` | `[Explorer.Chain.Block.t()]` | List of `t:Explorer.Chain.Block.t/0`s | + | `:internal_transactions` | `[%{index: non_neg_integer(), transaction_hash: Explorer.Chain.Hash.t()}]` | List of maps of the `t:Explorer.Chain.InternalTransaction.t/0` `index` and `transaction_hash` | + | `:logs` | `[Explorer.Chain.Log.t()]` | List of `t:Explorer.Chain.Log.t/0`s | + | `:transactions` | `[Explorer.Chain.Hash.t()]` | List of `t:Explorer.Chain.Transaction.t/0` `hash` | + + The params for each key are validated using the corresponding `Ecto.Schema` module's `changeset/2` function. If there + are errors, they are returned in `Ecto.Changeset.t`s, so that the original, invalid value can be reconstructed for any + error messages. + + Because there are multiple processes potentially writing to the same tables at the same time, + `c:Ecto.Repo.insert_all/2`'s + [`:conflict_target` and `:on_conflict` options](https://hexdocs.pm/ecto/Ecto.Repo.html#c:insert_all/3-options) are + used to perform [upserts](https://hexdocs.pm/ecto/Ecto.Repo.html#c:insert_all/3-upserts) on all tables, so that + a pre-existing unique key will not trigger a failure, but instead replace or otherwise update the row. + + ## Data Notifications + + On successful inserts, processes interested in certain domains of data will be notified + that new data has been inserted. See `Explorer.Chain.subscribe_to_events/1` for more information. + + ## Options + + * `:addresses` + * `:params` - `list` of params for `Explorer.Chain.Address.changeset/2`. + * `:timeout` - the timeout for inserting all addresses. Defaults to `#{@insert_addresses_timeout}` milliseconds. + * `:balances` + * `:params` - `list` of params for `Explorer.Chain.Balance.changeset/2`. + * `:timeout` - the timeout for inserting all balances. Defaults to `#{@insert_balances_timeout}` milliseconds. + * `:blocks` + * `:params` - `list` of params for `Explorer.Chain.Block.changeset/2`. + * `:timeout` - the timeout for inserting all blocks. Defaults to `#{@insert_blocks_timeout}` milliseconds. + * `:broacast` - Boolean flag indicating whether or not to broadcast the event. + * `:internal_transactions` + * `:params` - `list` of params for `Explorer.Chain.InternalTransaction.changeset/2`. + * `:timeout` - the timeout for inserting all internal transactions. Defaults to + `#{@insert_internal_transactions_timeout}` milliseconds. + * `:logs` + * `:params` - `list` of params for `Explorer.Chain.Log.changeset/2`. + * `:timeout` - the timeout for inserting all logs. Defaults to `#{@insert_logs_timeout}` milliseconds. + * `:timeout` - the timeout for the whole `c:Ecto.Repo.transaction/0` call. Defaults to `#{@transaction_timeout}` + milliseconds. + * `:transactions` + * `:on_conflict` - Whether to do `:nothing` or `:replace_all` columns when there is a pre-existing transaction + with the same hash. + + *NOTE*: Because the repository transaction for a pending `Explorer.Chain.Transaction`s could `COMMIT` after the + repository transaction for that same transaction being collated into a block, writers, it is recomended to use + `:nothing` for pending transactions and `:replace_all` for collated transactions, so that collated transactions + win. + * `:params` - `list` of params for `Explorer.Chain.Transaction.changeset/2`. + * `:timeout` - the timeout for inserting all transactions found in the params lists across all + types. Defaults to `#{@insert_transactions_timeout}` milliseconds. + """ + @spec all(all_options()) :: all_result() + def all(options) when is_list(options) do + broadcast = + case Keyword.fetch(options, :broadcast) do + {:ok, broadcast} -> broadcast + :error -> false + end + + changes_list_arguments_list = import_options_to_changes_list_arguments_list(options) + + with {:ok, ecto_schema_module_to_changes_list} <- + changes_list_arguments_list_to_ecto_schema_module_to_changes_list(changes_list_arguments_list), + {:ok, data} <- insert_ecto_schema_module_to_changes_list(ecto_schema_module_to_changes_list, options) do + if broadcast, do: broadcast_events(data) + {:ok, data} + end + end + + @doc """ + Bulk insert internal transactions for a list of transactions. + + ## Options + + * `:addresses` + * `:params` - `list` of params for `Explorer.Chain.Address.changeset/2`. + * `:timeout` - the timeout for inserting all addresses. Defaults to `#{@insert_addresses_timeout}` milliseconds. + * `:internal_transactions` + * `:params` - `list` of params for `Explorer.Chain.InternalTransaction.changeset/2`. + * `:timeout` - the timeout for inserting all internal transactions. Defaults to + `#{@insert_internal_transactions_timeout}` milliseconds. + * `:transactions` + * `:hashes` - `list` of `t:Explorer.Chain.Transaction.t/0` `hash`es that should have their + `internal_transactions_indexed_at` updated. + * `:timeout` - the timeout for updating transactions with `:hashes`. Defaults to + `#{@update_transactions_timeout}` milliseconds. + * `:timeout` - the timeout for the whole `c:Ecto.Repo.transaction/0` call. Defaults to `#{@transaction_timeout}` + milliseconds. + """ + @spec internal_transactions(internal_transactions_options()) :: internal_transactions_result + def internal_transactions(options) when is_list(options) do + {transactions_options, import_options} = Keyword.pop(options, :transactions) + changes_list_options_list = import_options_to_changes_list_arguments_list(import_options) + + with {:ok, ecto_schema_module_to_changes_list} <- + changes_list_arguments_list_to_ecto_schema_module_to_changes_list(changes_list_options_list) do + timestamps = timestamps() + + ecto_schema_module_to_changes_list + |> ecto_schema_module_to_changes_list_to_multi(Keyword.put(options, :timestamps, timestamps)) + |> Multi.run(:transactions, fn _ -> + transaction_hashes = Keyword.get(transactions_options, :hashes) + transactions_count = length(transaction_hashes) + + query = + from( + t in Transaction, + where: t.hash in ^transaction_hashes, + update: [set: [internal_transactions_indexed_at: ^timestamps.updated_at]] + ) + + {^transactions_count, result} = Repo.update_all(query, []) + + {:ok, result} + end) + |> import_transaction(options) + end + end + + defp broadcast_events(data) do + for {event_type, event_data} <- data, event_type in ~w(addresses balances blocks logs transactions)a do + broadcast_event_data(event_type, event_data) + end + end + + defp broadcast_event_data(event_type, event_data) do + Registry.dispatch(Registry.ChainEvents, event_type, fn entries -> + for {pid, _registered_val} <- entries do + send(pid, {:chain_event, event_type, event_data}) + end + end) + end + + defp changes_list_arguments_list_to_ecto_schema_module_to_changes_list(changes_list_arguments_list) do + changes_list_arguments_list + |> Stream.map(fn [params_list, options] -> + ecto_schema_module = Keyword.fetch!(options, :for) + {ecto_schema_module, changes_list(params_list, options)} + end) + |> Enum.reduce({:ok, %{}}, fn + {ecto_schema_module, {:ok, changes_list}}, {:ok, ecto_schema_module_to_changes_list} -> + {:ok, Map.put(ecto_schema_module_to_changes_list, ecto_schema_module, changes_list)} + + {_, {:ok, _}}, {:error, _} = error -> + error + + {_, {:error, _} = error}, {:ok, _} -> + error + + {_, {:error, changesets}}, {:error, acc_changesets} -> + {:error, acc_changesets ++ changesets} + end) + end + + @spec changes_list(params :: [map], [{:for, module} | {:with, atom}]) :: {:ok, [map]} | {:error, [Changeset.t()]} + defp changes_list(params, options) when is_list(options) do + ecto_schema_module = Keyword.fetch!(options, :for) + changeset_function_name = Keyword.get(options, :with, :changeset) + struct = ecto_schema_module.__struct__() + + {status, acc} = + params + |> Stream.map(&apply(ecto_schema_module, changeset_function_name, [struct, &1])) + |> Enum.reduce({:ok, []}, fn + changeset = %Changeset{valid?: false}, {:ok, _} -> + {:error, [changeset]} + + changeset = %Changeset{valid?: false}, {:error, acc_changesets} -> + {:error, [changeset | acc_changesets]} + + %Changeset{changes: changes, valid?: true}, {:ok, acc_changes} -> + {:ok, [changes | acc_changes]} + + %Changeset{valid?: true}, {:error, _} = error -> + error + end) + + {status, Enum.reverse(acc)} + end + + @import_option_key_to_ecto_schema_module %{ + addresses: Address, + balances: Balance, + blocks: Block, + internal_transactions: InternalTransaction, + logs: Log, + transactions: Transaction + } + + defp ecto_schema_module_to_changes_list_to_multi(ecto_schema_module_to_changes_list, options) when is_list(options) do + timestamps = timestamps() + full_options = Keyword.put(options, :timestamps, timestamps) + + Multi.new() + |> run_addresses(ecto_schema_module_to_changes_list, full_options) + |> run_balances(ecto_schema_module_to_changes_list, full_options) + |> run_blocks(ecto_schema_module_to_changes_list, full_options) + |> run_transactions(ecto_schema_module_to_changes_list, full_options) + |> run_internal_transactions(ecto_schema_module_to_changes_list, full_options) + |> run_logs(ecto_schema_module_to_changes_list, full_options) + end + + defp run_addresses(multi, ecto_schema_module_to_changes_list, options) + when is_map(ecto_schema_module_to_changes_list) and is_list(options) do + case ecto_schema_module_to_changes_list do + %{Address => addresses_changes} -> + timestamps = Keyword.fetch!(options, :timestamps) + + Multi.run(multi, :addresses, fn _ -> + insert_addresses( + addresses_changes, + timeout: options[:addresses][:timeout] || @insert_addresses_timeout, + timestamps: timestamps + ) + end) + + _ -> + multi + end + end + + defp run_balances(multi, ecto_schema_module_to_changes_list, options) + when is_map(ecto_schema_module_to_changes_list) and is_list(options) do + case ecto_schema_module_to_changes_list do + %{Balance => balances_changes} -> + timestamps = Keyword.fetch!(options, :timestamps) + + Multi.run(multi, :balances, fn _ -> + insert_balances( + balances_changes, + timeout: options[:balances][:timeout] || @insert_balances_timeout, + timestamps: timestamps + ) + end) + + _ -> + multi + end + end + + defp run_blocks(multi, ecto_schema_module_to_changes_list, options) + when is_map(ecto_schema_module_to_changes_list) and is_list(options) do + case ecto_schema_module_to_changes_list do + %{Block => blocks_changes} -> + timestamps = Keyword.fetch!(options, :timestamps) + + Multi.run(multi, :blocks, fn _ -> + insert_blocks( + blocks_changes, + timeout: options[:blocks][:timeout] || @insert_blocks_timeout, + timestamps: timestamps + ) + end) + + _ -> + multi + end + end + + defp run_transactions(multi, ecto_schema_module_to_changes_list, options) + when is_map(ecto_schema_module_to_changes_list) and is_list(options) do + case ecto_schema_module_to_changes_list do + %{Transaction => transactions_changes} -> + # check required options as early as possible + transactions_options = Keyword.fetch!(options, :transactions) + on_conflict = Keyword.fetch!(transactions_options, :on_conflict) + timestamps = Keyword.fetch!(options, :timestamps) + + Multi.run(multi, :transactions, fn _ -> + insert_transactions( + transactions_changes, + on_conflict: on_conflict, + timeout: transactions_options[:timeout] || @insert_transactions_timeout, + timestamps: timestamps + ) + end) + + _ -> + multi + end + end + + defp run_internal_transactions(multi, ecto_schema_module_to_changes_list, options) + when is_map(ecto_schema_module_to_changes_list) and is_list(options) do + case ecto_schema_module_to_changes_list do + %{InternalTransaction => internal_transactions_changes} -> + timestamps = Keyword.fetch!(options, :timestamps) + + Multi.run(multi, :internal_transactions, fn _ -> + insert_internal_transactions( + internal_transactions_changes, + timeout: options[:internal_transactions][:timeout] || @insert_internal_transactions_timeout, + timestamps: timestamps + ) + end) + + _ -> + multi + end + end + + defp run_logs(multi, ecto_schema_module_to_changes_list, options) + when is_map(ecto_schema_module_to_changes_list) and is_list(options) do + case ecto_schema_module_to_changes_list do + %{Log => logs_changes} -> + timestamps = Keyword.fetch!(options, :timestamps) + + Multi.run(multi, :logs, fn _ -> + insert_logs( + logs_changes, + timeout: options[:logs][:timeout] || @insert_logs_timeout, + timestamps: timestamps + ) + end) + + _ -> + multi + end + end + + @spec insert_addresses([%{hash: Hash.Address.t()}], [timeout_option | timestamps_option | with_option]) :: + {:ok, [Hash.Address.t()]} + defp insert_addresses(changes_list, named_arguments) + when is_list(changes_list) and is_list(named_arguments) do + timestamps = Keyword.fetch!(named_arguments, :timestamps) + timeout = Keyword.fetch!(named_arguments, :timeout) + + # order so that row ShareLocks are grabbed in a consistent order + ordered_changes_list = sort_address_changes_list(changes_list) + + insert_changes_list( + ordered_changes_list, + conflict_target: :hash, + on_conflict: + from( + address in Address, + update: [ + set: [ + contract_code: fragment("COALESCE(?, EXCLUDED.contract_code)", address.contract_code), + # ARGMAX on two columns + fetched_balance: + fragment( + """ + CASE WHEN EXCLUDED.fetched_balance_block_number IS NOT NULL AND + (? IS NULL OR + EXCLUDED.fetched_balance_block_number >= ?) THEN + EXCLUDED.fetched_balance + ELSE ? + END + """, + address.fetched_balance_block_number, + address.fetched_balance_block_number, + address.fetched_balance + ), + # MAX on two columns + fetched_balance_block_number: + fragment( + """ + CASE WHEN EXCLUDED.fetched_balance_block_number IS NOT NULL AND + (? IS NULL OR + EXCLUDED.fetched_balance_block_number >= ?) THEN + EXCLUDED.fetched_balance_block_number + ELSE ? + END + """, + address.fetched_balance_block_number, + address.fetched_balance_block_number, + address.fetched_balance_block_number + ) + ] + ] + ), + for: Address, + returning: true, + timeout: timeout, + timestamps: timestamps + ) + end + + defp sort_address_changes_list(changes_list) do + Enum.sort_by(changes_list, & &1.hash) + end + + @spec insert_balances( + [ + %{ + required(:address_hash) => Hash.Address.t(), + required(:block_number) => Block.block_number(), + required(:value) => Wei.t() + } + ], + [timeout_option] + ) :: + {:ok, [%{required(:address_hash) => Hash.Address.t(), required(:block_number) => Block.block_number()}]} + | {:error, [Changeset.t()]} + defp insert_balances(changes_list, named_arguments) when is_list(changes_list) and is_list(named_arguments) do + timestamps = Keyword.fetch!(named_arguments, :timestamps) + timeout = Keyword.fetch!(named_arguments, :timeout) + + # order so that row ShareLocks are grabbed in a consistent order + ordered_changes_list = Enum.sort_by(changes_list, &{&1.address_hash, &1.block_number}) + + {:ok, _} = + insert_changes_list( + ordered_changes_list, + conflict_target: [:address_hash, :block_number], + on_conflict: + from( + balance in Balance, + update: [ + set: [ + inserted_at: fragment("LEAST(EXCLUDED.inserted_at, ?)", balance.inserted_at), + updated_at: fragment("GREATEST(EXCLUDED.updated_at, ?)", balance.updated_at), + value: + fragment( + """ + CASE WHEN EXCLUDED.updated_at > ? THEN EXCLUDED.value + ELSE ? + END + """, + balance.updated_at, + balance.value + ) + ] + ] + ), + for: Balance, + timeout: timeout, + timestamps: timestamps + ) + + {:ok, Enum.map(ordered_changes_list, &Map.take(&1, ~w(address_hash block_number)a))} + end + + @spec insert_blocks([map()], [timeout_option | timestamps_option]) :: {:ok, [Block.t()]} | {:error, [Changeset.t()]} + defp insert_blocks(changes_list, named_arguments) + when is_list(changes_list) and is_list(named_arguments) do + timestamps = Keyword.fetch!(named_arguments, :timestamps) + timeout = Keyword.fetch!(named_arguments, :timeout) + + # order so that row ShareLocks are grabbed in a consistent order + ordered_changes_list = Enum.sort_by(changes_list, &{&1.number, &1.hash}) + + {:ok, blocks} = + insert_changes_list( + ordered_changes_list, + conflict_target: :number, + on_conflict: :replace_all, + for: Block, + returning: true, + timeout: timeout, + timestamps: timestamps + ) + + {:ok, blocks} + end + + @spec insert_internal_transactions([map], [timeout_option | timestamps_option]) :: + {:ok, [%{index: non_neg_integer, transaction_hash: Hash.t()}]} + | {:error, [Changeset.t()]} + defp insert_internal_transactions(changes_list, named_arguments) + when is_list(changes_list) and is_list(named_arguments) do + timestamps = Keyword.fetch!(named_arguments, :timestamps) + + # order so that row ShareLocks are grabbed in a consistent order + ordered_changes_list = Enum.sort_by(changes_list, &{&1.transaction_hash, &1.index}) + + {:ok, internal_transactions} = + insert_changes_list( + ordered_changes_list, + conflict_target: [:transaction_hash, :index], + for: InternalTransaction, + on_conflict: :replace_all, + returning: [:index, :transaction_hash], + timestamps: timestamps + ) + + {:ok, + for( + internal_transaction <- internal_transactions, + do: Map.take(internal_transaction, [:index, :transaction_hash]) + )} + end + + @spec insert_logs([map()], [timeout_option | timestamps_option]) :: + {:ok, [Log.t()]} + | {:error, [Changeset.t()]} + defp insert_logs(changes_list, named_arguments) + when is_list(changes_list) and is_list(named_arguments) do + timestamps = Keyword.fetch!(named_arguments, :timestamps) + timeout = Keyword.fetch!(named_arguments, :timeout) + + # order so that row ShareLocks are grabbed in a consistent order + ordered_changes_list = Enum.sort_by(changes_list, &{&1.transaction_hash, &1.index}) + + {:ok, _} = + insert_changes_list( + ordered_changes_list, + conflict_target: [:transaction_hash, :index], + on_conflict: :replace_all, + for: Log, + returning: true, + timeout: timeout, + timestamps: timestamps + ) + end + + @spec insert_transactions([map()], [on_conflict_option | timeout_option | timestamps_option]) :: + {:ok, [Hash.t()]} | {:error, [Changeset.t()]} + defp insert_transactions(changes_list, named_arguments) + when is_list(changes_list) and is_list(named_arguments) do + timestamps = Keyword.fetch!(named_arguments, :timestamps) + timeout = Keyword.fetch!(named_arguments, :timeout) + on_conflict = Keyword.fetch!(named_arguments, :on_conflict) + + # order so that row ShareLocks are grabbed in a consistent order + ordered_changes_list = Enum.sort_by(changes_list, & &1.hash) + + {:ok, transactions} = + insert_changes_list( + ordered_changes_list, + conflict_target: :hash, + on_conflict: on_conflict, + for: Transaction, + returning: [:hash], + timeout: timeout, + timestamps: timestamps + ) + + {:ok, for(transaction <- transactions, do: transaction.hash)} + end + + defp insert_changes_list(changes_list, options) when is_list(changes_list) do + ecto_schema_module = Keyword.fetch!(options, :for) + + timestamped_changes_list = timestamp_changes_list(changes_list, Keyword.fetch!(options, :timestamps)) + + {_, inserted} = + Repo.safe_insert_all( + ecto_schema_module, + timestamped_changes_list, + Keyword.delete(options, :for) + ) + + {:ok, inserted} + end + + defp timestamp_changes_list(changes_list, timestamps) when is_list(changes_list) do + Enum.map(changes_list, ×tamp_params(&1, timestamps)) + end + + defp timestamp_params(changes, timestamps) when is_map(changes) do + Map.merge(changes, timestamps) + end + + defp import_options_to_changes_list_arguments_list(options) do + Enum.flat_map(@import_option_key_to_ecto_schema_module, fn {option_key, ecto_schema_module} -> + case Keyword.fetch(options, option_key) do + {:ok, option_value} when is_list(option_value) -> + [ + [ + Keyword.fetch!(option_value, :params), + [for: ecto_schema_module, with: Keyword.get(option_value, :with, :changeset)] + ] + ] + + :error -> + [] + end + end) + end + + defp import_transaction(multi, options) when is_list(options) do + Repo.transaction(multi, timeout: Keyword.get(options, :timeout, @transaction_timeout)) + end + + defp insert_ecto_schema_module_to_changes_list(ecto_schema_module_to_changes_list, options) do + timestamps = timestamps() + + ecto_schema_module_to_changes_list + |> ecto_schema_module_to_changes_list_to_multi(Keyword.put(options, :timestamps, timestamps)) + |> import_transaction(options) + end + + @spec timestamps() :: timestamps + defp timestamps do + now = DateTime.utc_now() + %{inserted_at: now, updated_at: now} + end +end diff --git a/apps/explorer/test/explorer/import_test.exs b/apps/explorer/test/explorer/import_test.exs new file mode 100644 index 0000000000..606d630fd8 --- /dev/null +++ b/apps/explorer/test/explorer/import_test.exs @@ -0,0 +1,5 @@ +defmodule Explorer.Chain.ImportTest do + use Explorer.DataCase + + doctest Explorer.Chain.Import +end From c4133c1fb032ab826c5140c20c0cd0190a9a768a Mon Sep 17 00:00:00 2001 From: Luke Imhoff Date: Wed, 25 Jul 2018 12:03:03 -0500 Subject: [PATCH 2/5] Validate Transacton from_address_hash as required It was marked null: false, but an optional parameter in Transaction.changeset, so could trigger a database not-null constraint violation. --- apps/explorer/lib/explorer/chain/transaction.ex | 9 ++++++--- apps/explorer/test/explorer/chain/transaction_test.exs | 3 ++- 2 files changed, 8 insertions(+), 4 deletions(-) diff --git a/apps/explorer/lib/explorer/chain/transaction.ex b/apps/explorer/lib/explorer/chain/transaction.ex index 52d6241086..826c11d3bf 100644 --- a/apps/explorer/lib/explorer/chain/transaction.ex +++ b/apps/explorer/lib/explorer/chain/transaction.ex @@ -7,9 +7,9 @@ defmodule Explorer.Chain.Transaction do alias Explorer.Chain.{Address, Block, Data, Gas, Hash, InternalTransaction, Log, Wei} alias Explorer.Chain.Transaction.Status - @optional_attrs ~w(block_hash block_number cumulative_gas_used from_address_hash gas_used index - internal_transactions_indexed_at status to_address_hash)a - @required_attrs ~w(gas gas_price hash input nonce r s v value)a + @optional_attrs ~w(block_hash block_number cumulative_gas_used gas_used index internal_transactions_indexed_at status + to_address_hash)a + @required_attrs ~w(from_address_hash gas gas_price hash input nonce r s v value)a @typedoc """ X coordinate module n in @@ -153,6 +153,7 @@ defmodule Explorer.Chain.Transaction do iex> changeset = Explorer.Chain.Transaction.changeset( ...> %Transaction{}, ...> %{ + ...> from_address_hash: "0xe8ddc5c7a2d2f0d7a9798459c0104fdf5e987aca", ...> gas: 4700000, ...> gas_price: 100000000000, ...> hash: "0x3a3eb134e6792ce9403ea4188e5e79693de9e4c94e499db132be086400da79e6", @@ -173,6 +174,7 @@ defmodule Explorer.Chain.Transaction do iex> changeset = Explorer.Chain.Transaction.changeset( ...> %Transaction{}, ...> %{ + ...> from_address_hash: "0xe8ddc5c7a2d2f0d7a9798459c0104fdf5e987aca", ...> block_number: 34, ...> cumulative_gas_used: 0, ...> gas: 4700000, @@ -210,6 +212,7 @@ defmodule Explorer.Chain.Transaction do ...> block_hash: "0xe52d77084cab13a4e724162bcd8c6028e5ecfaa04d091ee476e96b9958ed6b47", ...> block_number: 34, ...> cumulative_gas_used: 0, + ...> from_address_hash: "0xe8ddc5c7a2d2f0d7a9798459c0104fdf5e987aca", ...> gas: 4700000, ...> gas_price: 100000000000, ...> gas_used: 4600000, diff --git a/apps/explorer/test/explorer/chain/transaction_test.exs b/apps/explorer/test/explorer/chain/transaction_test.exs index f178d8ffe9..6755277637 100644 --- a/apps/explorer/test/explorer/chain/transaction_test.exs +++ b/apps/explorer/test/explorer/chain/transaction_test.exs @@ -10,6 +10,7 @@ defmodule Explorer.Chain.TransactionTest do test "with valid attributes" do assert %Changeset{valid?: true} = Transaction.changeset(%Transaction{}, %{ + from_address_hash: "0xe8ddc5c7a2d2f0d7a9798459c0104fdf5e987aca", hash: "0x9fc76417374aa880d4449a1f7f31ec597f00b1f6f3dd2d66f4c9c6c445836d8b", value: 1, gas: 21000, @@ -29,7 +30,7 @@ defmodule Explorer.Chain.TransactionTest do end test "it creates a new to address" do - params = params_for(:transaction) + params = params_for(:transaction, from_address_hash: "0xe8ddc5c7a2d2f0d7a9798459c0104fdf5e987aca") to_address_params = %{hash: "sk8orDi3"} changeset_params = Map.merge(params, %{to_address: to_address_params}) From 2f3af517412830b48416ffe0522009b39b7814eb Mon Sep 17 00:00:00 2001 From: Luke Imhoff Date: Wed, 25 Jul 2018 12:48:07 -0500 Subject: [PATCH 3/5] Unified Explorer.Chain.import * Merge `Explorer.Chain.import_blocks` and `Explorer.Chain.import_internal_transactions` into `Explorer.Chain.import`. * Move body of `Explorer.Chain.import` and its private helper functions to `Explorer.Chain.Import` as an independent context that `Explorer.Chain` can use. `Explorer.Chain.Import` must be under `Explorer.Chain` because `Explorer.Chain.Import` still needs direct access to the `Explorer.Chain` schemas. --- apps/explorer/lib/explorer/chain.ex | 14 +-- apps/explorer/lib/explorer/chain/import.ex | 91 ++++++-------- apps/explorer/test/explorer/chain_test.exs | 66 +--------- apps/explorer/test/explorer/import_test.exs | 114 +++++++++++++++++- .../features/viewing_addresses_test.exs | 2 +- apps/indexer/lib/indexer/balance_fetcher.ex | 2 +- apps/indexer/lib/indexer/block_fetcher.ex | 6 +- .../indexer/internal_transaction_fetcher.ex | 7 +- .../indexer/pending_transaction_fetcher.ex | 2 +- 9 files changed, 166 insertions(+), 138 deletions(-) diff --git a/apps/explorer/lib/explorer/chain.ex b/apps/explorer/lib/explorer/chain.ex index 5a413168c7..881864927f 100644 --- a/apps/explorer/lib/explorer/chain.ex +++ b/apps/explorer/lib/explorer/chain.ex @@ -621,21 +621,11 @@ defmodule Explorer.Chain do See `Explorer.Chain.Import.all/1` for options and returns. """ - @spec import_blocks(Import.all_options()) :: Import.all_result() - def import_blocks(options) do + @spec import(Import.all_options()) :: Import.all_result() + def import(options) do Import.all(options) end - @doc """ - Bulk insert internal transactions and update `t:Explorer.Chain.Transaction.t/0` `internal_transactions_indexed_at`. - - See `Explorer.Chain.Import.internal_transactions/1` for options and returns. - """ - @spec import_internal_transactions(Import.internal_transactions_options()) :: Import.internal_transactions_result() - def import_internal_transactions(options) do - Import.internal_transactions(options) - end - @doc """ The number of `t:Explorer.Chain.Address.t/0`. diff --git a/apps/explorer/lib/explorer/chain/import.ex b/apps/explorer/lib/explorer/chain/import.ex index 8978d39209..04223f76e5 100644 --- a/apps/explorer/lib/explorer/chain/import.ex +++ b/apps/explorer/lib/explorer/chain/import.ex @@ -21,7 +21,7 @@ defmodule Explorer.Chain.Import do @typep timeout_option :: {:timeout, timeout} @typep timestamps :: %{inserted_at: DateTime.t(), updated_at: DateTime.t()} @typep timestamps_option :: {:timestamps, timestamps} - @typep transactions_option :: {:transactions, [on_conflict_option | params_option | timeout_option]} + @typep transactions_option :: {:transactions, [on_conflict_option | params_option | timeout_option | with_option]} @typep with_option :: {:with, changeset_function_name :: atom} @type all_options :: [ @@ -83,8 +83,6 @@ defmodule Explorer.Chain.Import do @insert_logs_timeout 60_000 @insert_transactions_timeout 60_000 - @update_transactions_timeout 60_000 - @doc """ Bulk insert all data stored in the `Explorer`. @@ -119,6 +117,7 @@ defmodule Explorer.Chain.Import do * `:addresses` * `:params` - `list` of params for `Explorer.Chain.Address.changeset/2`. * `:timeout` - the timeout for inserting all addresses. Defaults to `#{@insert_addresses_timeout}` milliseconds. + * `:with` - the changeset function on `Explorer.Chain.Address` to use validate `:params`. * `:balances` * `:params` - `list` of params for `Explorer.Chain.Balance.changeset/2`. * `:timeout` - the timeout for inserting all balances. Defaults to `#{@insert_balances_timeout}` milliseconds. @@ -146,6 +145,7 @@ defmodule Explorer.Chain.Import do * `:params` - `list` of params for `Explorer.Chain.Transaction.changeset/2`. * `:timeout` - the timeout for inserting all transactions found in the params lists across all types. Defaults to `#{@insert_transactions_timeout}` milliseconds. + * `:with` - the changeset function on `Explorer.Chain.Transaction` to use validate `:params`. """ @spec all(all_options()) :: all_result() def all(options) when is_list(options) do @@ -165,56 +165,6 @@ defmodule Explorer.Chain.Import do end end - @doc """ - Bulk insert internal transactions for a list of transactions. - - ## Options - - * `:addresses` - * `:params` - `list` of params for `Explorer.Chain.Address.changeset/2`. - * `:timeout` - the timeout for inserting all addresses. Defaults to `#{@insert_addresses_timeout}` milliseconds. - * `:internal_transactions` - * `:params` - `list` of params for `Explorer.Chain.InternalTransaction.changeset/2`. - * `:timeout` - the timeout for inserting all internal transactions. Defaults to - `#{@insert_internal_transactions_timeout}` milliseconds. - * `:transactions` - * `:hashes` - `list` of `t:Explorer.Chain.Transaction.t/0` `hash`es that should have their - `internal_transactions_indexed_at` updated. - * `:timeout` - the timeout for updating transactions with `:hashes`. Defaults to - `#{@update_transactions_timeout}` milliseconds. - * `:timeout` - the timeout for the whole `c:Ecto.Repo.transaction/0` call. Defaults to `#{@transaction_timeout}` - milliseconds. - """ - @spec internal_transactions(internal_transactions_options()) :: internal_transactions_result - def internal_transactions(options) when is_list(options) do - {transactions_options, import_options} = Keyword.pop(options, :transactions) - changes_list_options_list = import_options_to_changes_list_arguments_list(import_options) - - with {:ok, ecto_schema_module_to_changes_list} <- - changes_list_arguments_list_to_ecto_schema_module_to_changes_list(changes_list_options_list) do - timestamps = timestamps() - - ecto_schema_module_to_changes_list - |> ecto_schema_module_to_changes_list_to_multi(Keyword.put(options, :timestamps, timestamps)) - |> Multi.run(:transactions, fn _ -> - transaction_hashes = Keyword.get(transactions_options, :hashes) - transactions_count = length(transaction_hashes) - - query = - from( - t in Transaction, - where: t.hash in ^transaction_hashes, - update: [set: [internal_transactions_indexed_at: ^timestamps.updated_at]] - ) - - {^transactions_count, result} = Repo.update_all(query, []) - - {:ok, result} - end) - |> import_transaction(options) - end - end - defp broadcast_events(data) do for {event_type, event_data} <- data, event_type in ~w(addresses balances blocks logs transactions)a do broadcast_event_data(event_type, event_data) @@ -384,13 +334,22 @@ defmodule Explorer.Chain.Import do %{InternalTransaction => internal_transactions_changes} -> timestamps = Keyword.fetch!(options, :timestamps) - Multi.run(multi, :internal_transactions, fn _ -> + multi + |> Multi.run(:internal_transactions, fn _ -> insert_internal_transactions( internal_transactions_changes, timeout: options[:internal_transactions][:timeout] || @insert_internal_transactions_timeout, timestamps: timestamps ) end) + |> Multi.run(:internal_transactions_indexed_at_transactions, fn %{internal_transactions: internal_transactions} + when is_list(internal_transactions) -> + update_transactions_internal_transactions_indexed_at( + internal_transactions, + timeout: options[:transactions][:timeout] || @insert_transactions_timeout, + timestamps: timestamps + ) + end) _ -> multi @@ -643,6 +602,30 @@ defmodule Explorer.Chain.Import do {:ok, inserted} end + defp update_transactions_internal_transactions_indexed_at(internal_transactions, named_arguments) + when is_list(internal_transactions) and is_list(named_arguments) do + timeout = Keyword.fetch!(named_arguments, :timeout) + timestamps = Keyword.fetch!(named_arguments, :timestamps) + + ordered_transaction_hashes = + internal_transactions + |> MapSet.new(& &1.transaction_hash) + |> Enum.sort() + + query = + from( + t in Transaction, + where: t.hash in ^ordered_transaction_hashes, + update: [set: [internal_transactions_indexed_at: ^timestamps.updated_at]] + ) + + transaction_count = Enum.count(ordered_transaction_hashes) + + {^transaction_count, result} = Repo.update_all(query, [], timeout: timeout) + + {:ok, result} + end + defp timestamp_changes_list(changes_list, timestamps) when is_list(changes_list) do Enum.map(changes_list, ×tamp_params(&1, timestamps)) end diff --git a/apps/explorer/test/explorer/chain_test.exs b/apps/explorer/test/explorer/chain_test.exs index 3967ca2db4..c9924e83f0 100644 --- a/apps/explorer/test/explorer/chain_test.exs +++ b/apps/explorer/test/explorer/chain_test.exs @@ -1040,60 +1040,6 @@ defmodule Explorer.ChainTest do end end - describe "import_internal_transactions/1" do - test "updates address with contract code" do - smart_contract_bytecode = - "0x608060405234801561001057600080fd5b5060df8061001f6000396000f3006080604052600436106049576000357c0100000000000000000000000000000000000000000000000000000000900463ffffffff16806360fe47b114604e5780636d4ce63c146078575b600080fd5b348015605957600080fd5b5060766004803603810190808035906020019092919050505060a0565b005b348015608357600080fd5b50608a60aa565b6040518082815260200191505060405180910390f35b8060008190555050565b600080549050905600a165627a7a7230582040d82a7379b1ee1632ad4d8a239954fd940277b25628ead95259a85c5eddb2120029" - - address_hash = "0x1c494fa496f1cfd918b5ff190835af3aaf60987e" - insert(:address, hash: address_hash) - - from_address_hash = "0x8cc2e4b51b4340cb3727cffe3f1878756e732cee" - from_address = insert(:address, hash: from_address_hash) - - transaction_string_hash = "0x0705ea0a5b997d9aafd5c531e016d9aabe3297a28c0bd4ef005fe6ea329d301b" - insert(:transaction, from_address: from_address, hash: transaction_string_hash) - - options = [ - addresses: [ - params: [ - %{ - contract_code: smart_contract_bytecode, - hash: address_hash - } - ] - ], - internal_transactions: [ - params: [ - %{ - created_contract_address_hash: address_hash, - created_contract_code: smart_contract_bytecode, - from_address_hash: from_address_hash, - gas: 184_531, - gas_used: 84531, - index: 0, - init: - "0x6060604052341561000c57fe5b5b6101a68061001c6000396000f300606060405263ffffffff7c01000000000000000000000000000000000000000000000000000000006000350416631d3b9edf811461005b57806366098d4f1461007b578063a12f69e01461009b578063f4f3bdc1146100bb575bfe5b6100696004356024356100db565b60408051918252519081900360200190f35b61006960043560243561010a565b60408051918252519081900360200190f35b610069600435602435610124565b60408051918252519081900360200190f35b610069600435602435610163565b60408051918252519081900360200190f35b60008282028315806100f757508284828115156100f457fe5b04145b15156100ff57fe5b8091505b5092915050565b6000828201838110156100ff57fe5b8091505b5092915050565b60008080831161013057fe5b828481151561013b57fe5b049050828481151561014957fe5b0681840201841415156100ff57fe5b8091505b5092915050565b60008282111561016f57fe5b508082035b929150505600a165627a7a7230582020c944d8375ca14e2c92b14df53c2d044cb99dc30c3ba9f55e2bcde87bd4709b0029", - trace_address: [], - transaction_hash: transaction_string_hash, - type: "create", - value: 0 - } - ] - ], - transactions: [ - hashes: [transaction_string_hash] - ] - ] - - assert {:ok, _} = Chain.import_internal_transactions(options) - - address = Explorer.Repo.one(from(address in Explorer.Chain.Address, where: address.hash == ^address_hash)) - - assert address.contract_code != nil - end - end - describe "stream_unfetched_balances/2" do test "with existing `t:Explorer.Chain.Balance.t/0` with same `address_hash` and `block_number` " <> "does not return `t:Explorer.Chain.Block.t/0` `miner_hash`" do @@ -1469,7 +1415,7 @@ defmodule Explorer.ChainTest do assert [{^current_pid, _}] = Registry.lookup(Registry.ChainEvents, :logs) end - describe "import_blocks" do + describe "import" do @import_data [ blocks: [ params: [ @@ -1558,25 +1504,25 @@ defmodule Explorer.ChainTest do test "publishes addresses with updated fetched_balance data to subscribers on insert" do Chain.subscribe_to_events(:addresses) - Chain.import_blocks(@import_data) + Chain.import(@import_data) assert_received {:chain_event, :addresses, [%Address{}, %Address{}]} end test "publishes block data to subscribers on insert" do Chain.subscribe_to_events(:blocks) - Chain.import_blocks(@import_data) + Chain.import(@import_data) assert_received {:chain_event, :blocks, [%Block{}]} end test "publishes log data to subscribers on insert" do Chain.subscribe_to_events(:logs) - Chain.import_blocks(@import_data) + Chain.import(@import_data) assert_received {:chain_event, :logs, [%Log{}]} end test "publishes transaction hashes data to subscribers on insert" do Chain.subscribe_to_events(:transactions) - Chain.import_blocks(@import_data) + Chain.import(@import_data) assert_received {:chain_event, :transactions, [%Hash{}]} end @@ -1584,7 +1530,7 @@ defmodule Explorer.ChainTest do non_broadcast_data = Keyword.merge(@import_data, broadcast: false) Chain.subscribe_to_events(:logs) - Chain.import_blocks(non_broadcast_data) + Chain.import(non_broadcast_data) refute_received {:chain_event, :logs, [%Log{}]} end end diff --git a/apps/explorer/test/explorer/import_test.exs b/apps/explorer/test/explorer/import_test.exs index 606d630fd8..c78d960a57 100644 --- a/apps/explorer/test/explorer/import_test.exs +++ b/apps/explorer/test/explorer/import_test.exs @@ -1,5 +1,117 @@ defmodule Explorer.Chain.ImportTest do use Explorer.DataCase - doctest Explorer.Chain.Import + alias Explorer.Chain.Import + + doctest Import + + describe "all/1" do + test "updates address with contract code" do + smart_contract_bytecode = + "0x608060405234801561001057600080fd5b5060df8061001f6000396000f3006080604052600436106049576000357c0100000000000000000000000000000000000000000000000000000000900463ffffffff16806360fe47b114604e5780636d4ce63c146078575b600080fd5b348015605957600080fd5b5060766004803603810190808035906020019092919050505060a0565b005b348015608357600080fd5b50608a60aa565b6040518082815260200191505060405180910390f35b8060008190555050565b600080549050905600a165627a7a7230582040d82a7379b1ee1632ad4d8a239954fd940277b25628ead95259a85c5eddb2120029" + + address_hash = "0x1c494fa496f1cfd918b5ff190835af3aaf60987e" + insert(:address, hash: address_hash) + + from_address_hash = "0x8cc2e4b51b4340cb3727cffe3f1878756e732cee" + from_address = insert(:address, hash: from_address_hash) + + transaction_string_hash = "0x0705ea0a5b997d9aafd5c531e016d9aabe3297a28c0bd4ef005fe6ea329d301b" + insert(:transaction, from_address: from_address, hash: transaction_string_hash) + + options = [ + addresses: [ + params: [ + %{ + contract_code: smart_contract_bytecode, + hash: address_hash + } + ] + ], + internal_transactions: [ + params: [ + %{ + created_contract_address_hash: address_hash, + created_contract_code: smart_contract_bytecode, + from_address_hash: from_address_hash, + gas: 184_531, + gas_used: 84531, + index: 0, + init: + "0x6060604052341561000c57fe5b5b6101a68061001c6000396000f300606060405263ffffffff7c01000000000000000000000000000000000000000000000000000000006000350416631d3b9edf811461005b57806366098d4f1461007b578063a12f69e01461009b578063f4f3bdc1146100bb575bfe5b6100696004356024356100db565b60408051918252519081900360200190f35b61006960043560243561010a565b60408051918252519081900360200190f35b610069600435602435610124565b60408051918252519081900360200190f35b610069600435602435610163565b60408051918252519081900360200190f35b60008282028315806100f757508284828115156100f457fe5b04145b15156100ff57fe5b8091505b5092915050565b6000828201838110156100ff57fe5b8091505b5092915050565b60008080831161013057fe5b828481151561013b57fe5b049050828481151561014957fe5b0681840201841415156100ff57fe5b8091505b5092915050565b60008282111561016f57fe5b508082035b929150505600a165627a7a7230582020c944d8375ca14e2c92b14df53c2d044cb99dc30c3ba9f55e2bcde87bd4709b0029", + trace_address: [], + transaction_hash: transaction_string_hash, + type: "create", + value: 0 + } + ] + ] + ] + + assert {:ok, _} = Import.all(options) + + address = Explorer.Repo.one(from(address in Explorer.Chain.Address, where: address.hash == ^address_hash)) + + assert address.contract_code != nil + end + + test "with internal_transactions updates Transaction internal_transactions_indexed_at" do + from_address_hash = "0xe8ddc5c7a2d2f0d7a9798459c0104fdf5e987aca" + to_address_hash = "0x8bf38d4764929064f2d4d3a56520a76ab3df415b" + transaction_hash = "0x3a3eb134e6792ce9403ea4188e5e79693de9e4c94e499db132be086400da79e6" + + options = [ + addresses: [ + params: [ + %{hash: from_address_hash}, + %{hash: to_address_hash} + ] + ], + transactions: [ + params: [ + %{ + from_address_hash: from_address_hash, + gas: 4_677_320, + gas_price: 1, + hash: transaction_hash, + input: "0x", + nonce: 0, + r: 0, + s: 0, + v: 0, + value: 0 + } + ], + on_conflict: :replace_all + ], + internal_transactions: [ + params: [ + %{ + block_number: 35, + call_type: "call", + from_address_hash: from_address_hash, + gas: 4_677_320, + gas_used: 27770, + index: 0, + output: "0x", + to_address_hash: to_address_hash, + trace_address: [], + transaction_hash: transaction_hash, + type: "call", + value: 0 + } + ] + ] + ] + + refute Enum.any?(options[:transactions][:params], &Map.has_key?(&1, :internal_transactions_indexed_at)) + + assert {:ok, _} = Import.all(options) + + transaction = + Explorer.Repo.one(from(transaction in Explorer.Chain.Transaction, where: transaction.hash == ^transaction_hash)) + + refute transaction.internal_transactions_indexed_at == nil + end + end end diff --git a/apps/explorer_web/test/explorer_web/features/viewing_addresses_test.exs b/apps/explorer_web/test/explorer_web/features/viewing_addresses_test.exs index 2dd91e6c16..59140a8c9e 100644 --- a/apps/explorer_web/test/explorer_web/features/viewing_addresses_test.exs +++ b/apps/explorer_web/test/explorer_web/features/viewing_addresses_test.exs @@ -280,7 +280,7 @@ defmodule ExplorerWeb.ViewingAddressesTest do ], balances: [%{address_hash: ^hash}] }} = - Chain.import_blocks( + Chain.import( addresses: [ params: [ %{ diff --git a/apps/indexer/lib/indexer/balance_fetcher.ex b/apps/indexer/lib/indexer/balance_fetcher.ex index 66c30dbe31..7a62447879 100644 --- a/apps/indexer/lib/indexer/balance_fetcher.ex +++ b/apps/indexer/lib/indexer/balance_fetcher.ex @@ -75,7 +75,7 @@ defmodule Indexer.BalanceFetcher do addresses_params = balances_params_to_address_params(balances_params) {:ok, _} = - Chain.import_blocks( + Chain.import( addresses: [params: addresses_params, with: :balance_changeset], balances: [params: balances_params] ) diff --git a/apps/indexer/lib/indexer/block_fetcher.ex b/apps/indexer/lib/indexer/block_fetcher.ex index 45ba0eb544..0039323dc9 100644 --- a/apps/indexer/lib/indexer/block_fetcher.ex +++ b/apps/indexer/lib/indexer/block_fetcher.ex @@ -280,7 +280,7 @@ defmodule Indexer.BlockFetcher do options_with_broadcast = Keyword.merge(import_options, broadcast: indexer_mode == :realtime_index) - with {:ok, results} <- Chain.import_blocks(options_with_broadcast) do + with {:ok, results} <- Chain.import(options_with_broadcast) do async_import_remaining_block_data( results, address_hash_to_fetched_balance_block_number: address_hash_to_fetched_balance_block_number, @@ -300,8 +300,8 @@ defmodule Indexer.BlockFetcher do end end - # `fetched_balance_block_number` is needed for the `BalanceFetcher`, but should not be used for - # `import_blocks` because the balance is not known yet. + # `fetched_balance_block_number` is needed for the `BalanceFetcher`, but should not be used for `import` because the + # balance is not known yet. defp pop_address_hash_to_fetched_balance_block_number(options) do {address_hash_fetched_balance_block_number_pairs, import_options} = get_and_update_in(options, [:addresses, :params, Access.all()], &pop_hash_fetched_balance_block_number/1) diff --git a/apps/indexer/lib/indexer/internal_transaction_fetcher.ex b/apps/indexer/lib/indexer/internal_transaction_fetcher.ex index 09479adc82..fd4f9e3e7b 100644 --- a/apps/indexer/lib/indexer/internal_transaction_fetcher.ex +++ b/apps/indexer/lib/indexer/internal_transaction_fetcher.ex @@ -99,13 +99,10 @@ defmodule Indexer.InternalTransactionFetcher do {hash, block_number} end) - transaction_hashes = Enum.map(unique_transactions_params, &Map.fetch!(&1, :hash_data)) - with {:ok, %{addresses: address_hashes}} <- - Chain.import_internal_transactions( + Chain.import( addresses: [params: addresses_params], - internal_transactions: [params: internal_transactions_params], - transactions: [hashes: transaction_hashes] + internal_transactions: [params: internal_transactions_params] ) do address_hashes |> Enum.map(fn address_hash -> diff --git a/apps/indexer/lib/indexer/pending_transaction_fetcher.ex b/apps/indexer/lib/indexer/pending_transaction_fetcher.ex index c90a7642fc..37775d406e 100644 --- a/apps/indexer/lib/indexer/pending_transaction_fetcher.ex +++ b/apps/indexer/lib/indexer/pending_transaction_fetcher.ex @@ -97,7 +97,7 @@ defmodule Indexer.PendingTransactionFetcher do # affected the address balance yet since address balance is a balance at a give block and these transactions are # blockless. {:ok, _} = - Chain.import_blocks( + Chain.import( addresses: [params: addresses_params], transactions: [on_conflict: :nothing, params: transactions_params] ) From a28b06f788a1d4000b6f3df559a0707e705b09c2 Mon Sep 17 00:00:00 2001 From: Luke Imhoff Date: Thu, 26 Jul 2018 15:25:50 -0500 Subject: [PATCH 4/5] Use _map suffix to clarify type with _list as value --- apps/explorer/lib/explorer/chain/import.ex | 69 +++++++++++----------- 1 file changed, 35 insertions(+), 34 deletions(-) diff --git a/apps/explorer/lib/explorer/chain/import.ex b/apps/explorer/lib/explorer/chain/import.ex index 04223f76e5..f6c0b9c221 100644 --- a/apps/explorer/lib/explorer/chain/import.ex +++ b/apps/explorer/lib/explorer/chain/import.ex @@ -157,9 +157,9 @@ defmodule Explorer.Chain.Import do changes_list_arguments_list = import_options_to_changes_list_arguments_list(options) - with {:ok, ecto_schema_module_to_changes_list} <- - changes_list_arguments_list_to_ecto_schema_module_to_changes_list(changes_list_arguments_list), - {:ok, data} <- insert_ecto_schema_module_to_changes_list(ecto_schema_module_to_changes_list, options) do + with {:ok, ecto_schema_module_to_changes_list_map} <- + changes_list_arguments_list_to_ecto_schema_module_to_changes_list_map(changes_list_arguments_list), + {:ok, data} <- insert_ecto_schema_module_to_changes_list_map(ecto_schema_module_to_changes_list_map, options) do if broadcast, do: broadcast_events(data) {:ok, data} end @@ -179,15 +179,15 @@ defmodule Explorer.Chain.Import do end) end - defp changes_list_arguments_list_to_ecto_schema_module_to_changes_list(changes_list_arguments_list) do + defp changes_list_arguments_list_to_ecto_schema_module_to_changes_list_map(changes_list_arguments_list) do changes_list_arguments_list |> Stream.map(fn [params_list, options] -> ecto_schema_module = Keyword.fetch!(options, :for) {ecto_schema_module, changes_list(params_list, options)} end) |> Enum.reduce({:ok, %{}}, fn - {ecto_schema_module, {:ok, changes_list}}, {:ok, ecto_schema_module_to_changes_list} -> - {:ok, Map.put(ecto_schema_module_to_changes_list, ecto_schema_module, changes_list)} + {ecto_schema_module, {:ok, changes_list}}, {:ok, ecto_schema_module_to_changes_list_map} -> + {:ok, Map.put(ecto_schema_module_to_changes_list_map, ecto_schema_module, changes_list)} {_, {:ok, _}}, {:error, _} = error -> error @@ -235,22 +235,23 @@ defmodule Explorer.Chain.Import do transactions: Transaction } - defp ecto_schema_module_to_changes_list_to_multi(ecto_schema_module_to_changes_list, options) when is_list(options) do + defp ecto_schema_module_to_changes_list_map_to_multi(ecto_schema_module_to_changes_list_map, options) + when is_list(options) do timestamps = timestamps() full_options = Keyword.put(options, :timestamps, timestamps) Multi.new() - |> run_addresses(ecto_schema_module_to_changes_list, full_options) - |> run_balances(ecto_schema_module_to_changes_list, full_options) - |> run_blocks(ecto_schema_module_to_changes_list, full_options) - |> run_transactions(ecto_schema_module_to_changes_list, full_options) - |> run_internal_transactions(ecto_schema_module_to_changes_list, full_options) - |> run_logs(ecto_schema_module_to_changes_list, full_options) + |> run_addresses(ecto_schema_module_to_changes_list_map, full_options) + |> run_balances(ecto_schema_module_to_changes_list_map, full_options) + |> run_blocks(ecto_schema_module_to_changes_list_map, full_options) + |> run_transactions(ecto_schema_module_to_changes_list_map, full_options) + |> run_internal_transactions(ecto_schema_module_to_changes_list_map, full_options) + |> run_logs(ecto_schema_module_to_changes_list_map, full_options) end - defp run_addresses(multi, ecto_schema_module_to_changes_list, options) - when is_map(ecto_schema_module_to_changes_list) and is_list(options) do - case ecto_schema_module_to_changes_list do + defp run_addresses(multi, ecto_schema_module_to_changes_list_map, options) + when is_map(ecto_schema_module_to_changes_list_map) and is_list(options) do + case ecto_schema_module_to_changes_list_map do %{Address => addresses_changes} -> timestamps = Keyword.fetch!(options, :timestamps) @@ -267,9 +268,9 @@ defmodule Explorer.Chain.Import do end end - defp run_balances(multi, ecto_schema_module_to_changes_list, options) - when is_map(ecto_schema_module_to_changes_list) and is_list(options) do - case ecto_schema_module_to_changes_list do + defp run_balances(multi, ecto_schema_module_to_changes_list_map, options) + when is_map(ecto_schema_module_to_changes_list_map) and is_list(options) do + case ecto_schema_module_to_changes_list_map do %{Balance => balances_changes} -> timestamps = Keyword.fetch!(options, :timestamps) @@ -286,9 +287,9 @@ defmodule Explorer.Chain.Import do end end - defp run_blocks(multi, ecto_schema_module_to_changes_list, options) - when is_map(ecto_schema_module_to_changes_list) and is_list(options) do - case ecto_schema_module_to_changes_list do + defp run_blocks(multi, ecto_schema_module_to_changes_list_map, options) + when is_map(ecto_schema_module_to_changes_list_map) and is_list(options) do + case ecto_schema_module_to_changes_list_map do %{Block => blocks_changes} -> timestamps = Keyword.fetch!(options, :timestamps) @@ -305,9 +306,9 @@ defmodule Explorer.Chain.Import do end end - defp run_transactions(multi, ecto_schema_module_to_changes_list, options) - when is_map(ecto_schema_module_to_changes_list) and is_list(options) do - case ecto_schema_module_to_changes_list do + defp run_transactions(multi, ecto_schema_module_to_changes_list_map, options) + when is_map(ecto_schema_module_to_changes_list_map) and is_list(options) do + case ecto_schema_module_to_changes_list_map do %{Transaction => transactions_changes} -> # check required options as early as possible transactions_options = Keyword.fetch!(options, :transactions) @@ -328,9 +329,9 @@ defmodule Explorer.Chain.Import do end end - defp run_internal_transactions(multi, ecto_schema_module_to_changes_list, options) - when is_map(ecto_schema_module_to_changes_list) and is_list(options) do - case ecto_schema_module_to_changes_list do + defp run_internal_transactions(multi, ecto_schema_module_to_changes_list_map, options) + when is_map(ecto_schema_module_to_changes_list_map) and is_list(options) do + case ecto_schema_module_to_changes_list_map do %{InternalTransaction => internal_transactions_changes} -> timestamps = Keyword.fetch!(options, :timestamps) @@ -356,9 +357,9 @@ defmodule Explorer.Chain.Import do end end - defp run_logs(multi, ecto_schema_module_to_changes_list, options) - when is_map(ecto_schema_module_to_changes_list) and is_list(options) do - case ecto_schema_module_to_changes_list do + defp run_logs(multi, ecto_schema_module_to_changes_list_map, options) + when is_map(ecto_schema_module_to_changes_list_map) and is_list(options) do + case ecto_schema_module_to_changes_list_map do %{Log => logs_changes} -> timestamps = Keyword.fetch!(options, :timestamps) @@ -655,11 +656,11 @@ defmodule Explorer.Chain.Import do Repo.transaction(multi, timeout: Keyword.get(options, :timeout, @transaction_timeout)) end - defp insert_ecto_schema_module_to_changes_list(ecto_schema_module_to_changes_list, options) do + defp insert_ecto_schema_module_to_changes_list_map(ecto_schema_module_to_changes_list_map, options) do timestamps = timestamps() - ecto_schema_module_to_changes_list - |> ecto_schema_module_to_changes_list_to_multi(Keyword.put(options, :timestamps, timestamps)) + ecto_schema_module_to_changes_list_map + |> ecto_schema_module_to_changes_list_map_to_multi(Keyword.put(options, :timestamps, timestamps)) |> import_transaction(options) end From 062ebd94637a4f9dc1987475be0ff0e6b996cbf3 Mon Sep 17 00:00:00 2001 From: Luke Imhoff Date: Thu, 26 Jul 2018 15:52:39 -0500 Subject: [PATCH 5/5] Remove unreliable test --- apps/indexer/test/indexer/sequence_test.exs | 31 --------------------- 1 file changed, 31 deletions(-) diff --git a/apps/indexer/test/indexer/sequence_test.exs b/apps/indexer/test/indexer/sequence_test.exs index ee0038df11..2f4397123f 100644 --- a/apps/indexer/test/indexer/sequence_test.exs +++ b/apps/indexer/test/indexer/sequence_test.exs @@ -76,37 +76,6 @@ defmodule Indexer.SequenceTest do # noproc when the sequence has already died by the time monitor is called assert_receive {:DOWN, ^sequence_ref, :process, ^sequence_pid, status} when status in [:normal, :noproc] end - - test "with :ranges in direction opposite of :step returns errors for all ranges in wrong direction" do - parent = self() - - {child_pid, child_ref} = - spawn_monitor(fn -> - send( - parent, - Sequence.start_link( - ranges: [ - # ok, ok - 7..6, - # ok, error - 4..5, - # error, ok - 3..2, - # error, error - 0..1 - ], - step: -1 - ) - ) - end) - - assert_receive {:DOWN, ^child_ref, :process, ^child_pid, - [ - "Range (0..1) direction is opposite step (-1) direction", - "Range (4..5) direction is opposite step (-1) direction" - ]}, - 200 - end end describe "queue/2" do