Merge pull request #467 from poanetwork/458

Unified Explorer.Chain.import
pull/475/head
Luke Imhoff 6 years ago committed by GitHub
commit 9cf9583165
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 687
      apps/explorer/lib/explorer/chain.ex
  2. 672
      apps/explorer/lib/explorer/chain/import.ex
  3. 9
      apps/explorer/lib/explorer/chain/transaction.ex
  4. 3
      apps/explorer/test/explorer/chain/transaction_test.exs
  5. 66
      apps/explorer/test/explorer/chain_test.exs
  6. 117
      apps/explorer/test/explorer/import_test.exs
  7. 2
      apps/explorer_web/test/explorer_web/features/viewing_addresses_test.exs
  8. 2
      apps/indexer/lib/indexer/balance_fetcher.ex
  9. 6
      apps/indexer/lib/indexer/block_fetcher.ex
  10. 7
      apps/indexer/lib/indexer/internal_transaction_fetcher.ex
  11. 2
      apps/indexer/lib/indexer/pending_transaction_fetcher.ex
  12. 31
      apps/indexer/test/indexer/sequence_test.exs

@ -17,7 +17,6 @@ defmodule Explorer.Chain do
] ]
alias Ecto.Adapters.SQL alias Ecto.Adapters.SQL
alias Ecto.{Changeset, Multi}
alias Explorer.Chain.{ alias Explorer.Chain.{
Address, Address,
@ -25,6 +24,7 @@ defmodule Explorer.Chain do
Block, Block,
Data, Data,
Hash, Hash,
Import,
InternalTransaction, InternalTransaction,
Log, Log,
Transaction, Transaction,
@ -62,21 +62,7 @@ defmodule Explorer.Chain do
@type necessity_by_association :: %{association => necessity} @type necessity_by_association :: %{association => necessity}
@typep necessity_by_association_option :: {:necessity_by_association, necessity_by_association} @typep necessity_by_association_option :: {:necessity_by_association, necessity_by_association}
@typep on_conflict_option :: {:on_conflict, :nothing | :replace_all}
@typep paging_options :: {:paging_options, PagingOptions.t()} @typep paging_options :: {:paging_options, PagingOptions.t()}
@typep params_option :: {:params, [map()]}
@typep timeout_option :: {:timeout, timeout}
@typep timestamps :: %{inserted_at: DateTime.t(), updated_at: DateTime.t()}
@typep timestamps_option :: {:timestamps, timestamps}
@typep addresses_option :: {:addresses, [params_option | timeout_option | with_option]}
@typep balances_option :: {:balances, [params_option | timeout_option]}
@typep blocks_option :: {:blocks, [params_option | timeout_option]}
@typep broadcast_option :: {:broadcast, Boolean}
@typep internal_transactions_option :: {:internal_transactions, [params_option | timeout_option]}
@typep logs_option :: {:logs, [params_option | timeout_option]}
@typep receipts_option :: {:receipts, [params_option | timeout_option]}
@typep transactions_option :: {:transactions, [on_conflict_option | params_option | timeout_option]}
@typep with_option :: {:with, changeset_function_name :: atom}
@doc """ @doc """
Estimated count of `t:Explorer.Chain.Address.t/0`. Estimated count of `t:Explorer.Chain.Address.t/0`.
@ -214,17 +200,6 @@ defmodule Explorer.Chain do
end end
end end
# timeouts all in milliseconds
@transaction_timeout 120_000
@insert_addresses_timeout 60_000
@insert_balances_timeout 60_000
@insert_blocks_timeout 60_000
@insert_internal_transactions_timeout 60_000
@insert_logs_timeout 60_000
@insert_transactions_timeout 60_000
@update_transactions_timeout 60_000
@doc """ @doc """
The number of `t:Explorer.Chain.Block.t/0`. The number of `t:Explorer.Chain.Block.t/0`.
@ -642,192 +617,13 @@ defmodule Explorer.Chain do
end end
@doc """ @doc """
Bulk insert blocks from a list of blocks. Bulk insert all data stored in the `Explorer`.
The import returns the unique key(s) for each type of record inserted.
| Key | Value Type | Value Description |
|--------------------------|-------------------------------------------------------------------------------------------------|-----------------------------------------------------------------------------------------------|
| `:addresses` | `[Explorer.Chain.Address.t()]` | List of `t:Explorer.Chain.Address.t/0`s |
| `:balances` | `[%{address_hash: Explorer.Chain.Hash.t(), block_number: Explorer.Chain.Block.block_number()}]` | List of `t:Explorer.Chain.Address.t/0`s |
| `:blocks` | `[Explorer.Chain.Block.t()]` | List of `t:Explorer.Chain.Block.t/0`s |
| `:broacast` | `Boolean` | Boolean of whether to broadcast |
| `:internal_transactions` | `[%{index: non_neg_integer(), transaction_hash: Explorer.Chain.Hash.t()}]` | List of maps of the `t:Explorer.Chain.InternalTransaction.t/0` `index` and `transaction_hash` |
| `:logs` | `[Explorer.Chain.Log.t()]` | List of `t:Explorer.Chain.Log.t/0`s |
| `:transactions` | `[Explorer.Chain.Hash.t()]` | List of `t:Explorer.Chain.Transaction.t/0` `hash` |
A completely empty tree can be imported, but options must still be supplied. It is a non-zero amount of time to
process the empty options, so if there is nothing to import, you should avoid calling
`Explorer.Chain.import_blocks/1`. If you don't supply any options with params, then nothing is run so there result is
an empty map.
iex> Explorer.Chain.import_blocks([])
{:ok, %{}}
The params for each key are validated using the corresponding `Ecto.Schema` module's `changeset/2` function. If there
are errors, they are returned in `Ecto.Changeset.t`s, so that the original, invalid value can be reconstructed for any
error messages.
Because there are multiple processes potentially writing to the same tables at the same time,
`c:Ecto.Repo.insert_all/2`'s
[`:conflict_target` and `:on_conflict` options](https://hexdocs.pm/ecto/Ecto.Repo.html#c:insert_all/3-options) are
used to perform [upserts](https://hexdocs.pm/ecto/Ecto.Repo.html#c:insert_all/3-upserts) on all tables, so that
a pre-existing unique key will not trigger a failure, but instead replace or otherwise update the row.
## Data Notifications
On successful inserts, processes interested in certain domains of data will be notified
that new data has been inserted. See `Explorer.Chain.subscribe_to_events/1` for more information.
## Tree
* `t:Explorer.Chain.Block.t/0`s
* `t:Explorer.Chain.Transaction.t/0`
* `t.Explorer.Chain.InternalTransaction.t/0`
* `t.Explorer.Chain.Log.t/0`
## Options
* `:addresses`
* `:params` - `list` of params for `Explorer.Chain.Address.changeset/2`.
* `:timeout` - the timeout for inserting all addresses. Defaults to `#{@insert_addresses_timeout}` milliseconds.
* `:balances`
* `:params` - `list` of params for `Explorer.Chain.Balance.changeset/2`.
* `:timeout` - the timeout for inserting all balances. Defaults to `#{@insert_balances_timeout}` milliseconds.
* `:blocks`
* `:params` - `list` of params for `Explorer.Chain.Block.changeset/2`.
* `:timeout` - the timeout for inserting all blocks. Defaults to `#{@insert_blocks_timeout}` milliseconds.
* `:broacast` - Boolean flag indicating whether or not to broadcast the event.
* `:internal_transactions`
* `:params` - `list` of params for `Explorer.Chain.InternalTransaction.changeset/2`.
* `:timeout` - the timeout for inserting all internal transactions. Defaults to
`#{@insert_internal_transactions_timeout}` milliseconds.
* `:logs`
* `:params` - `list` of params for `Explorer.Chain.Log.changeset/2`.
* `:timeout` - the timeout for inserting all logs. Defaults to `#{@insert_logs_timeout}` milliseconds.
* `:timeout` - the timeout for the whole `c:Ecto.Repo.transaction/0` call. Defaults to `#{@transaction_timeout}`
milliseconds.
* `:transactions`
* `:on_conflict` - Whether to do `:nothing` or `:replace_all` columns when there is a pre-existing transaction
with the same hash.
*NOTE*: Because the repository transaction for a pending `Explorer.Chain.Transaction`s could `COMMIT` after the
repository transaction for that same transaction being collated into a block, writers, it is recomended to use
`:nothing` for pending transactions and `:replace_all` for collated transactions, so that collated transactions
win.
* `:params` - `list` of params for `Explorer.Chain.Transaction.changeset/2`.
* `:timeout` - the timeout for inserting all transactions found in the params lists across all
types. Defaults to `#{@insert_transactions_timeout}` milliseconds.
"""
@spec import_blocks([
addresses_option
| balances_option
| blocks_option
| broadcast_option
| internal_transactions_option
| logs_option
| receipts_option
| timeout_option
| transactions_option
]) ::
{:ok,
%{
optional(:addresses) => [Address.t()],
optional(:balances) => [
%{required(:address_hash) => Hash.Address.t(), required(:block_number) => Block.block_number()}
],
optional(:blocks) => [Block.t()],
optional(:broadcast) => Boolean,
optional(:internal_transactions) => [
%{required(:index) => non_neg_integer(), required(:transaction_hash) => Hash.Full.t()}
],
optional(:logs) => [Log.t()],
optional(:receipts) => [Hash.Full.t()],
optional(:transactions) => [Hash.Full.t()]
}}
| {:error, [Changeset.t()]}
| {:error, step :: Ecto.Multi.name(), failed_value :: any(),
changes_so_far :: %{optional(Ecto.Multi.name()) => any()}}
def import_blocks(options) when is_list(options) do
broadcast =
case Keyword.fetch(options, :broadcast) do
{:ok, broadcast} -> broadcast
:error -> false
end
changes_list_arguments_list = import_options_to_changes_list_arguments_list(options)
with {:ok, ecto_schema_module_to_changes_list} <-
changes_list_arguments_list_to_ecto_schema_module_to_changes_list(changes_list_arguments_list),
{:ok, data} <- insert_ecto_schema_module_to_changes_list(ecto_schema_module_to_changes_list, options) do
if broadcast, do: broadcast_events(data)
{:ok, data}
end
end
@doc """
Bulk insert internal transactions for a list of transactions.
## Options See `Explorer.Chain.Import.all/1` for options and returns.
* `:addresses`
* `:params` - `list` of params for `Explorer.Chain.Address.changeset/2`.
* `:timeout` - the timeout for inserting all addresses. Defaults to `#{@insert_addresses_timeout}` milliseconds.
* `:internal_transactions`
* `:params` - `list` of params for `Explorer.Chain.InternalTransaction.changeset/2`.
* `:timeout` - the timeout for inserting all internal transactions. Defaults to
`#{@insert_internal_transactions_timeout}` milliseconds.
* `:transactions`
* `:hashes` - `list` of `t:Explorer.Chain.Transaction.t/0` `hash`es that should have their
`internal_transactions_indexed_at` updated.
* `:timeout` - the timeout for updating transactions with `:hashes`. Defaults to
`#{@update_transactions_timeout}` milliseconds.
* `:timeout` - the timeout for the whole `c:Ecto.Repo.transaction/0` call. Defaults to `#{@transaction_timeout}`
milliseconds.
""" """
@spec import_internal_transactions([ @spec import(Import.all_options()) :: Import.all_result()
addresses_option def import(options) do
| internal_transactions_option Import.all(options)
| timeout_option
| {:transactions, [{:hashes, [String.t()]} | timeout_option]}
]) ::
{:ok,
%{
optional(:addresses) => [Hash.Address.t()],
optional(:internal_transactions) => [
%{required(:index) => non_neg_integer(), required(:transaction_hash) => Hash.Full.t()}
]
}}
| {:error, [Changeset.t()]}
| {:error, step :: Ecto.Multi.name(), failed_value :: any(),
changes_so_far :: %{optional(Ecto.Multi.name()) => any()}}
def import_internal_transactions(options) when is_list(options) do
{transactions_options, import_options} = Keyword.pop(options, :transactions)
changes_list_options_list = import_options_to_changes_list_arguments_list(import_options)
with {:ok, ecto_schema_module_to_changes_list} <-
changes_list_arguments_list_to_ecto_schema_module_to_changes_list(changes_list_options_list) do
timestamps = timestamps()
ecto_schema_module_to_changes_list
|> ecto_schema_module_to_changes_list_to_multi(Keyword.put(options, :timestamps, timestamps))
|> Multi.run(:transactions, fn _ ->
transaction_hashes = Keyword.get(transactions_options, :hashes)
transactions_count = length(transaction_hashes)
query =
from(
t in Transaction,
where: t.hash in ^transaction_hashes,
update: [set: [internal_transactions_indexed_at: ^timestamps.updated_at]]
)
{^transactions_count, result} = Repo.update_all(query, [])
{:ok, result}
end)
|> import_transaction(options)
end
end end
@doc """ @doc """
@ -1737,80 +1533,6 @@ defmodule Explorer.Chain do
Repo.one(query) Repo.one(query)
end end
defp broadcast_event_data(event_type, event_data) do
Registry.dispatch(Registry.ChainEvents, event_type, fn entries ->
for {pid, _registered_val} <- entries do
send(pid, {:chain_event, event_type, event_data})
end
end)
end
defp broadcast_events(data) do
for {event_type, event_data} <- data, event_type in ~w(addresses balances blocks logs transactions)a do
broadcast_event_data(event_type, event_data)
end
end
@spec changes_list(params :: [map], [{:for, module} | {:with, atom}]) :: {:ok, [map]} | {:error, [Changeset.t()]}
defp changes_list(params, options) when is_list(options) do
ecto_schema_module = Keyword.fetch!(options, :for)
changeset_function_name = Keyword.get(options, :with, :changeset)
struct = ecto_schema_module.__struct__()
{status, acc} =
params
|> Stream.map(&apply(ecto_schema_module, changeset_function_name, [struct, &1]))
|> Enum.reduce({:ok, []}, fn
changeset = %Changeset{valid?: false}, {:ok, _} ->
{:error, [changeset]}
changeset = %Changeset{valid?: false}, {:error, acc_changesets} ->
{:error, [changeset | acc_changesets]}
%Changeset{changes: changes, valid?: true}, {:ok, acc_changes} ->
{:ok, [changes | acc_changes]}
%Changeset{valid?: true}, {:error, _} = error ->
error
end)
{status, Enum.reverse(acc)}
end
defp ecto_schema_module_to_changes_list_to_multi(ecto_schema_module_to_changes_list, options) when is_list(options) do
timestamps = timestamps()
full_options = Keyword.put(options, :timestamps, timestamps)
Multi.new()
|> run_addresses(ecto_schema_module_to_changes_list, full_options)
|> run_balances(ecto_schema_module_to_changes_list, full_options)
|> run_blocks(ecto_schema_module_to_changes_list, full_options)
|> run_transactions(ecto_schema_module_to_changes_list, full_options)
|> run_internal_transactions(ecto_schema_module_to_changes_list, full_options)
|> run_logs(ecto_schema_module_to_changes_list, full_options)
end
defp changes_list_arguments_list_to_ecto_schema_module_to_changes_list(changes_list_arguments_list) do
changes_list_arguments_list
|> Stream.map(fn [params_list, options] ->
ecto_schema_module = Keyword.fetch!(options, :for)
{ecto_schema_module, changes_list(params_list, options)}
end)
|> Enum.reduce({:ok, %{}}, fn
{ecto_schema_module, {:ok, changes_list}}, {:ok, ecto_schema_module_to_changes_list} ->
{:ok, Map.put(ecto_schema_module_to_changes_list, ecto_schema_module, changes_list)}
{_, {:ok, _}}, {:error, _} = error ->
error
{_, {:error, _} = error}, {:ok, _} ->
error
{_, {:error, changesets}}, {:error, acc_changesets} ->
{:error, acc_changesets ++ changesets}
end)
end
defp fetch_transactions(paging_options \\ nil) do defp fetch_transactions(paging_options \\ nil) do
Transaction Transaction
|> select_merge([transaction], %{ |> select_merge([transaction], %{
@ -1840,271 +1562,6 @@ defmodule Explorer.Chain do
) )
end end
@spec insert_addresses([%{hash: Hash.Address.t()}], [timeout_option | timestamps_option | with_option]) ::
{:ok, [Hash.Address.t()]}
defp insert_addresses(changes_list, named_arguments)
when is_list(changes_list) and is_list(named_arguments) do
timestamps = Keyword.fetch!(named_arguments, :timestamps)
timeout = Keyword.fetch!(named_arguments, :timeout)
# order so that row ShareLocks are grabbed in a consistent order
ordered_changes_list = sort_address_changes_list(changes_list)
insert_changes_list(
ordered_changes_list,
conflict_target: :hash,
on_conflict:
from(
address in Address,
update: [
set: [
contract_code: fragment("COALESCE(?, EXCLUDED.contract_code)", address.contract_code),
# ARGMAX on two columns
fetched_balance:
fragment(
"""
CASE WHEN EXCLUDED.fetched_balance_block_number IS NOT NULL AND
(? IS NULL OR
EXCLUDED.fetched_balance_block_number >= ?) THEN
EXCLUDED.fetched_balance
ELSE ?
END
""",
address.fetched_balance_block_number,
address.fetched_balance_block_number,
address.fetched_balance
),
# MAX on two columns
fetched_balance_block_number:
fragment(
"""
CASE WHEN EXCLUDED.fetched_balance_block_number IS NOT NULL AND
(? IS NULL OR
EXCLUDED.fetched_balance_block_number >= ?) THEN
EXCLUDED.fetched_balance_block_number
ELSE ?
END
""",
address.fetched_balance_block_number,
address.fetched_balance_block_number,
address.fetched_balance_block_number
)
]
]
),
for: Address,
returning: true,
timeout: timeout,
timestamps: timestamps
)
end
defp sort_address_changes_list(changes_list) do
Enum.sort_by(changes_list, & &1.hash)
end
@import_option_key_to_ecto_schema_module %{
addresses: Address,
balances: Balance,
blocks: Block,
internal_transactions: InternalTransaction,
logs: Log,
transactions: Transaction
}
defp import_options_to_changes_list_arguments_list(options) do
Enum.flat_map(@import_option_key_to_ecto_schema_module, fn {option_key, ecto_schema_module} ->
case Keyword.fetch(options, option_key) do
{:ok, option_value} when is_list(option_value) ->
[
[
Keyword.fetch!(option_value, :params),
[for: ecto_schema_module, with: Keyword.get(option_value, :with, :changeset)]
]
]
:error ->
[]
end
end)
end
@spec insert_balances(
[
%{
required(:address_hash) => Hash.Address.t(),
required(:block_number) => Block.block_number(),
required(:value) => Wei.t()
}
],
[timeout_option]
) ::
{:ok, [%{required(:address_hash) => Hash.Address.t(), required(:block_number) => Block.block_number()}]}
| {:error, [Changeset.t()]}
defp insert_balances(changes_list, named_arguments) when is_list(changes_list) and is_list(named_arguments) do
timestamps = Keyword.fetch!(named_arguments, :timestamps)
timeout = Keyword.fetch!(named_arguments, :timeout)
# order so that row ShareLocks are grabbed in a consistent order
ordered_changes_list = Enum.sort_by(changes_list, &{&1.address_hash, &1.block_number})
{:ok, _} =
insert_changes_list(
ordered_changes_list,
conflict_target: [:address_hash, :block_number],
on_conflict:
from(
balance in Balance,
update: [
set: [
inserted_at: fragment("LEAST(EXCLUDED.inserted_at, ?)", balance.inserted_at),
updated_at: fragment("GREATEST(EXCLUDED.updated_at, ?)", balance.updated_at),
value:
fragment(
"""
CASE WHEN EXCLUDED.updated_at > ? THEN EXCLUDED.value
ELSE ?
END
""",
balance.updated_at,
balance.value
)
]
]
),
for: Balance,
timeout: timeout,
timestamps: timestamps
)
{:ok, Enum.map(ordered_changes_list, &Map.take(&1, ~w(address_hash block_number)a))}
end
@spec insert_blocks([map()], [timeout_option | timestamps_option]) :: {:ok, [Block.t()]} | {:error, [Changeset.t()]}
defp insert_blocks(changes_list, named_arguments)
when is_list(changes_list) and is_list(named_arguments) do
timestamps = Keyword.fetch!(named_arguments, :timestamps)
timeout = Keyword.fetch!(named_arguments, :timeout)
# order so that row ShareLocks are grabbed in a consistent order
ordered_changes_list = Enum.sort_by(changes_list, &{&1.number, &1.hash})
{:ok, blocks} =
insert_changes_list(
ordered_changes_list,
conflict_target: :number,
on_conflict: :replace_all,
for: Block,
returning: true,
timeout: timeout,
timestamps: timestamps
)
{:ok, blocks}
end
defp insert_ecto_schema_module_to_changes_list(ecto_schema_module_to_changes_list, options) do
timestamps = timestamps()
ecto_schema_module_to_changes_list
|> ecto_schema_module_to_changes_list_to_multi(Keyword.put(options, :timestamps, timestamps))
|> import_transaction(options)
end
defp import_transaction(multi, options) when is_list(options) do
Repo.transaction(multi, timeout: Keyword.get(options, :timeout, @transaction_timeout))
end
@spec insert_internal_transactions([map], [timeout_option | timestamps_option]) ::
{:ok, [%{index: non_neg_integer, transaction_hash: Hash.t()}]}
| {:error, [Changeset.t()]}
defp insert_internal_transactions(changes_list, named_arguments)
when is_list(changes_list) and is_list(named_arguments) do
timestamps = Keyword.fetch!(named_arguments, :timestamps)
# order so that row ShareLocks are grabbed in a consistent order
ordered_changes_list = Enum.sort_by(changes_list, &{&1.transaction_hash, &1.index})
{:ok, internal_transactions} =
insert_changes_list(
ordered_changes_list,
conflict_target: [:transaction_hash, :index],
for: InternalTransaction,
on_conflict: :replace_all,
returning: [:index, :transaction_hash],
timestamps: timestamps
)
{:ok,
for(
internal_transaction <- internal_transactions,
do: Map.take(internal_transaction, [:index, :transaction_hash])
)}
end
@spec insert_logs([map()], [timeout_option | timestamps_option]) ::
{:ok, [Log.t()]}
| {:error, [Changeset.t()]}
defp insert_logs(changes_list, named_arguments)
when is_list(changes_list) and is_list(named_arguments) do
timestamps = Keyword.fetch!(named_arguments, :timestamps)
timeout = Keyword.fetch!(named_arguments, :timeout)
# order so that row ShareLocks are grabbed in a consistent order
ordered_changes_list = Enum.sort_by(changes_list, &{&1.transaction_hash, &1.index})
{:ok, _} =
insert_changes_list(
ordered_changes_list,
conflict_target: [:transaction_hash, :index],
on_conflict: :replace_all,
for: Log,
returning: true,
timeout: timeout,
timestamps: timestamps
)
end
defp insert_changes_list(changes_list, options) when is_list(changes_list) do
ecto_schema_module = Keyword.fetch!(options, :for)
timestamped_changes_list = timestamp_changes_list(changes_list, Keyword.fetch!(options, :timestamps))
{_, inserted} =
Repo.safe_insert_all(
ecto_schema_module,
timestamped_changes_list,
Keyword.delete(options, :for)
)
{:ok, inserted}
end
@spec insert_transactions([map()], [on_conflict_option | timeout_option | timestamps_option]) ::
{:ok, [Hash.t()]} | {:error, [Changeset.t()]}
defp insert_transactions(changes_list, named_arguments)
when is_list(changes_list) and is_list(named_arguments) do
timestamps = Keyword.fetch!(named_arguments, :timestamps)
timeout = Keyword.fetch!(named_arguments, :timeout)
on_conflict = Keyword.fetch!(named_arguments, :on_conflict)
# order so that row ShareLocks are grabbed in a consistent order
ordered_changes_list = Enum.sort_by(changes_list, & &1.hash)
{:ok, transactions} =
insert_changes_list(
ordered_changes_list,
conflict_target: :hash,
on_conflict: on_conflict,
for: Transaction,
returning: [:hash],
timeout: timeout,
timestamps: timestamps
)
{:ok, for(transaction <- transactions, do: transaction.hash)}
end
defp handle_paging_options(query, nil), do: query defp handle_paging_options(query, nil), do: query
defp handle_paging_options(query, paging_options) do defp handle_paging_options(query, paging_options) do
@ -2183,138 +1640,6 @@ defmodule Explorer.Chain do
where(query, [transaction], transaction.index < ^index) where(query, [transaction], transaction.index < ^index)
end end
defp run_addresses(multi, ecto_schema_module_to_changes_list, options)
when is_map(ecto_schema_module_to_changes_list) and is_list(options) do
case ecto_schema_module_to_changes_list do
%{Address => addresses_changes} ->
timestamps = Keyword.fetch!(options, :timestamps)
Multi.run(multi, :addresses, fn _ ->
insert_addresses(
addresses_changes,
timeout: options[:addresses][:timeout] || @insert_addresses_timeout,
timestamps: timestamps
)
end)
_ ->
multi
end
end
defp run_balances(multi, ecto_schema_module_to_changes_list, options)
when is_map(ecto_schema_module_to_changes_list) and is_list(options) do
case ecto_schema_module_to_changes_list do
%{Balance => balances_changes} ->
timestamps = Keyword.fetch!(options, :timestamps)
Multi.run(multi, :balances, fn _ ->
insert_balances(
balances_changes,
timeout: options[:balances][:timeout] || @insert_balances_timeout,
timestamps: timestamps
)
end)
_ ->
multi
end
end
defp run_blocks(multi, ecto_schema_module_to_changes_list, options)
when is_map(ecto_schema_module_to_changes_list) and is_list(options) do
case ecto_schema_module_to_changes_list do
%{Block => blocks_changes} ->
timestamps = Keyword.fetch!(options, :timestamps)
Multi.run(multi, :blocks, fn _ ->
insert_blocks(
blocks_changes,
timeout: options[:blocks][:timeout] || @insert_blocks_timeout,
timestamps: timestamps
)
end)
_ ->
multi
end
end
defp run_transactions(multi, ecto_schema_module_to_changes_list, options)
when is_map(ecto_schema_module_to_changes_list) and is_list(options) do
case ecto_schema_module_to_changes_list do
%{Transaction => transactions_changes} ->
# check required options as early as possible
transactions_options = Keyword.fetch!(options, :transactions)
on_conflict = Keyword.fetch!(transactions_options, :on_conflict)
timestamps = Keyword.fetch!(options, :timestamps)
Multi.run(multi, :transactions, fn _ ->
insert_transactions(
transactions_changes,
on_conflict: on_conflict,
timeout: transactions_options[:timeout] || @insert_transactions_timeout,
timestamps: timestamps
)
end)
_ ->
multi
end
end
defp run_internal_transactions(multi, ecto_schema_module_to_changes_list, options)
when is_map(ecto_schema_module_to_changes_list) and is_list(options) do
case ecto_schema_module_to_changes_list do
%{InternalTransaction => internal_transactions_changes} ->
timestamps = Keyword.fetch!(options, :timestamps)
Multi.run(multi, :internal_transactions, fn _ ->
insert_internal_transactions(
internal_transactions_changes,
timeout: options[:internal_transactions][:timeout] || @insert_internal_transactions_timeout,
timestamps: timestamps
)
end)
_ ->
multi
end
end
defp run_logs(multi, ecto_schema_module_to_changes_list, options)
when is_map(ecto_schema_module_to_changes_list) and is_list(options) do
case ecto_schema_module_to_changes_list do
%{Log => logs_changes} ->
timestamps = Keyword.fetch!(options, :timestamps)
Multi.run(multi, :logs, fn _ ->
insert_logs(
logs_changes,
timeout: options[:logs][:timeout] || @insert_logs_timeout,
timestamps: timestamps
)
end)
_ ->
multi
end
end
defp timestamp_params(changes, timestamps) when is_map(changes) do
Map.merge(changes, timestamps)
end
defp timestamp_changes_list(changes_list, timestamps) when is_list(changes_list) do
Enum.map(changes_list, &timestamp_params(&1, timestamps))
end
@spec timestamps() :: timestamps
defp timestamps do
now = DateTime.utc_now()
%{inserted_at: now, updated_at: now}
end
defp where_address_fields_match(query, address_hash, :to) do defp where_address_fields_match(query, address_hash, :to) do
where(query, [t], t.to_address_hash == ^address_hash) where(query, [t], t.to_address_hash == ^address_hash)
end end

@ -0,0 +1,672 @@
defmodule Explorer.Chain.Import do
@moduledoc """
Bulk importing of data into `Explorer.Repo`
"""
import Ecto.Query, only: [from: 2]
alias Ecto.{Changeset, Multi}
alias Explorer.Chain.{Address, Balance, Block, Hash, InternalTransaction, Log, Transaction, Wei}
alias Explorer.Repo
@typep addresses_option :: {:addresses, [params_option | timeout_option | with_option]}
@typep balances_option :: {:balances, [params_option | timeout_option]}
@typep blocks_option :: {:blocks, [params_option | timeout_option]}
@typep broadcast_option :: {:broadcast, Boolean}
@typep internal_transactions_option :: {:internal_transactions, [params_option | timeout_option]}
@typep logs_option :: {:logs, [params_option | timeout_option]}
@typep on_conflict_option :: {:on_conflict, :nothing | :replace_all}
@typep params_option :: {:params, [map()]}
@typep receipts_option :: {:receipts, [params_option | timeout_option]}
@typep timeout_option :: {:timeout, timeout}
@typep timestamps :: %{inserted_at: DateTime.t(), updated_at: DateTime.t()}
@typep timestamps_option :: {:timestamps, timestamps}
@typep transactions_option :: {:transactions, [on_conflict_option | params_option | timeout_option | with_option]}
@typep with_option :: {:with, changeset_function_name :: atom}
@type all_options :: [
addresses_option
| balances_option
| blocks_option
| broadcast_option
| internal_transactions_option
| logs_option
| receipts_option
| timeout_option
| transactions_option
]
@type all_result ::
{:ok,
%{
optional(:addresses) => [Address.t()],
optional(:balances) => [
%{required(:address_hash) => Hash.Address.t(), required(:block_number) => Block.block_number()}
],
optional(:blocks) => [Block.t()],
optional(:internal_transactions) => [
%{required(:index) => non_neg_integer(), required(:transaction_hash) => Hash.Full.t()}
],
optional(:logs) => [Log.t()],
optional(:receipts) => [Hash.Full.t()],
optional(:transactions) => [Hash.Full.t()]
}}
| {:error, [Changeset.t()]}
| {:error, step :: Ecto.Multi.name(), failed_value :: any(),
changes_so_far :: %{optional(Ecto.Multi.name()) => any()}}
@type internal_transactions_options :: [
addresses_option
| internal_transactions_option
| timeout_option
| {:transactions, [{:hashes, [String.t()]} | timeout_option]}
]
@type internal_transactions_result ::
{:ok,
%{
optional(:addresses) => [Hash.Address.t()],
optional(:internal_transactions) => [
%{required(:index) => non_neg_integer(), required(:transaction_hash) => Hash.Full.t()}
]
}}
| {:error, [Changeset.t()]}
| {:error, step :: Ecto.Multi.name(), failed_value :: any(),
changes_so_far :: %{optional(Ecto.Multi.name()) => any()}}
# timeouts all in milliseconds
@transaction_timeout 120_000
@insert_addresses_timeout 60_000
@insert_balances_timeout 60_000
@insert_blocks_timeout 60_000
@insert_internal_transactions_timeout 60_000
@insert_logs_timeout 60_000
@insert_transactions_timeout 60_000
@doc """
Bulk insert all data stored in the `Explorer`.
The import returns the unique key(s) for each type of record inserted.
| Key | Value Type | Value Description |
|--------------------------|-------------------------------------------------------------------------------------------------|-----------------------------------------------------------------------------------------------|
| `:addresses` | `[Explorer.Chain.Address.t()]` | List of `t:Explorer.Chain.Address.t/0`s |
| `:balances` | `[%{address_hash: Explorer.Chain.Hash.t(), block_number: Explorer.Chain.Block.block_number()}]` | List of `t:Explorer.Chain.Address.t/0`s |
| `:blocks` | `[Explorer.Chain.Block.t()]` | List of `t:Explorer.Chain.Block.t/0`s |
| `:internal_transactions` | `[%{index: non_neg_integer(), transaction_hash: Explorer.Chain.Hash.t()}]` | List of maps of the `t:Explorer.Chain.InternalTransaction.t/0` `index` and `transaction_hash` |
| `:logs` | `[Explorer.Chain.Log.t()]` | List of `t:Explorer.Chain.Log.t/0`s |
| `:transactions` | `[Explorer.Chain.Hash.t()]` | List of `t:Explorer.Chain.Transaction.t/0` `hash` |
The params for each key are validated using the corresponding `Ecto.Schema` module's `changeset/2` function. If there
are errors, they are returned in `Ecto.Changeset.t`s, so that the original, invalid value can be reconstructed for any
error messages.
Because there are multiple processes potentially writing to the same tables at the same time,
`c:Ecto.Repo.insert_all/2`'s
[`:conflict_target` and `:on_conflict` options](https://hexdocs.pm/ecto/Ecto.Repo.html#c:insert_all/3-options) are
used to perform [upserts](https://hexdocs.pm/ecto/Ecto.Repo.html#c:insert_all/3-upserts) on all tables, so that
a pre-existing unique key will not trigger a failure, but instead replace or otherwise update the row.
## Data Notifications
On successful inserts, processes interested in certain domains of data will be notified
that new data has been inserted. See `Explorer.Chain.subscribe_to_events/1` for more information.
## Options
* `:addresses`
* `:params` - `list` of params for `Explorer.Chain.Address.changeset/2`.
* `:timeout` - the timeout for inserting all addresses. Defaults to `#{@insert_addresses_timeout}` milliseconds.
* `:with` - the changeset function on `Explorer.Chain.Address` to use validate `:params`.
* `:balances`
* `:params` - `list` of params for `Explorer.Chain.Balance.changeset/2`.
* `:timeout` - the timeout for inserting all balances. Defaults to `#{@insert_balances_timeout}` milliseconds.
* `:blocks`
* `:params` - `list` of params for `Explorer.Chain.Block.changeset/2`.
* `:timeout` - the timeout for inserting all blocks. Defaults to `#{@insert_blocks_timeout}` milliseconds.
* `:broacast` - Boolean flag indicating whether or not to broadcast the event.
* `:internal_transactions`
* `:params` - `list` of params for `Explorer.Chain.InternalTransaction.changeset/2`.
* `:timeout` - the timeout for inserting all internal transactions. Defaults to
`#{@insert_internal_transactions_timeout}` milliseconds.
* `:logs`
* `:params` - `list` of params for `Explorer.Chain.Log.changeset/2`.
* `:timeout` - the timeout for inserting all logs. Defaults to `#{@insert_logs_timeout}` milliseconds.
* `:timeout` - the timeout for the whole `c:Ecto.Repo.transaction/0` call. Defaults to `#{@transaction_timeout}`
milliseconds.
* `:transactions`
* `:on_conflict` - Whether to do `:nothing` or `:replace_all` columns when there is a pre-existing transaction
with the same hash.
*NOTE*: Because the repository transaction for a pending `Explorer.Chain.Transaction`s could `COMMIT` after the
repository transaction for that same transaction being collated into a block, writers, it is recomended to use
`:nothing` for pending transactions and `:replace_all` for collated transactions, so that collated transactions
win.
* `:params` - `list` of params for `Explorer.Chain.Transaction.changeset/2`.
* `:timeout` - the timeout for inserting all transactions found in the params lists across all
types. Defaults to `#{@insert_transactions_timeout}` milliseconds.
* `:with` - the changeset function on `Explorer.Chain.Transaction` to use validate `:params`.
"""
@spec all(all_options()) :: all_result()
def all(options) when is_list(options) do
broadcast =
case Keyword.fetch(options, :broadcast) do
{:ok, broadcast} -> broadcast
:error -> false
end
changes_list_arguments_list = import_options_to_changes_list_arguments_list(options)
with {:ok, ecto_schema_module_to_changes_list_map} <-
changes_list_arguments_list_to_ecto_schema_module_to_changes_list_map(changes_list_arguments_list),
{:ok, data} <- insert_ecto_schema_module_to_changes_list_map(ecto_schema_module_to_changes_list_map, options) do
if broadcast, do: broadcast_events(data)
{:ok, data}
end
end
defp broadcast_events(data) do
for {event_type, event_data} <- data, event_type in ~w(addresses balances blocks logs transactions)a do
broadcast_event_data(event_type, event_data)
end
end
defp broadcast_event_data(event_type, event_data) do
Registry.dispatch(Registry.ChainEvents, event_type, fn entries ->
for {pid, _registered_val} <- entries do
send(pid, {:chain_event, event_type, event_data})
end
end)
end
defp changes_list_arguments_list_to_ecto_schema_module_to_changes_list_map(changes_list_arguments_list) do
changes_list_arguments_list
|> Stream.map(fn [params_list, options] ->
ecto_schema_module = Keyword.fetch!(options, :for)
{ecto_schema_module, changes_list(params_list, options)}
end)
|> Enum.reduce({:ok, %{}}, fn
{ecto_schema_module, {:ok, changes_list}}, {:ok, ecto_schema_module_to_changes_list_map} ->
{:ok, Map.put(ecto_schema_module_to_changes_list_map, ecto_schema_module, changes_list)}
{_, {:ok, _}}, {:error, _} = error ->
error
{_, {:error, _} = error}, {:ok, _} ->
error
{_, {:error, changesets}}, {:error, acc_changesets} ->
{:error, acc_changesets ++ changesets}
end)
end
@spec changes_list(params :: [map], [{:for, module} | {:with, atom}]) :: {:ok, [map]} | {:error, [Changeset.t()]}
defp changes_list(params, options) when is_list(options) do
ecto_schema_module = Keyword.fetch!(options, :for)
changeset_function_name = Keyword.get(options, :with, :changeset)
struct = ecto_schema_module.__struct__()
{status, acc} =
params
|> Stream.map(&apply(ecto_schema_module, changeset_function_name, [struct, &1]))
|> Enum.reduce({:ok, []}, fn
changeset = %Changeset{valid?: false}, {:ok, _} ->
{:error, [changeset]}
changeset = %Changeset{valid?: false}, {:error, acc_changesets} ->
{:error, [changeset | acc_changesets]}
%Changeset{changes: changes, valid?: true}, {:ok, acc_changes} ->
{:ok, [changes | acc_changes]}
%Changeset{valid?: true}, {:error, _} = error ->
error
end)
{status, Enum.reverse(acc)}
end
@import_option_key_to_ecto_schema_module %{
addresses: Address,
balances: Balance,
blocks: Block,
internal_transactions: InternalTransaction,
logs: Log,
transactions: Transaction
}
defp ecto_schema_module_to_changes_list_map_to_multi(ecto_schema_module_to_changes_list_map, options)
when is_list(options) do
timestamps = timestamps()
full_options = Keyword.put(options, :timestamps, timestamps)
Multi.new()
|> run_addresses(ecto_schema_module_to_changes_list_map, full_options)
|> run_balances(ecto_schema_module_to_changes_list_map, full_options)
|> run_blocks(ecto_schema_module_to_changes_list_map, full_options)
|> run_transactions(ecto_schema_module_to_changes_list_map, full_options)
|> run_internal_transactions(ecto_schema_module_to_changes_list_map, full_options)
|> run_logs(ecto_schema_module_to_changes_list_map, full_options)
end
defp run_addresses(multi, ecto_schema_module_to_changes_list_map, options)
when is_map(ecto_schema_module_to_changes_list_map) and is_list(options) do
case ecto_schema_module_to_changes_list_map do
%{Address => addresses_changes} ->
timestamps = Keyword.fetch!(options, :timestamps)
Multi.run(multi, :addresses, fn _ ->
insert_addresses(
addresses_changes,
timeout: options[:addresses][:timeout] || @insert_addresses_timeout,
timestamps: timestamps
)
end)
_ ->
multi
end
end
defp run_balances(multi, ecto_schema_module_to_changes_list_map, options)
when is_map(ecto_schema_module_to_changes_list_map) and is_list(options) do
case ecto_schema_module_to_changes_list_map do
%{Balance => balances_changes} ->
timestamps = Keyword.fetch!(options, :timestamps)
Multi.run(multi, :balances, fn _ ->
insert_balances(
balances_changes,
timeout: options[:balances][:timeout] || @insert_balances_timeout,
timestamps: timestamps
)
end)
_ ->
multi
end
end
defp run_blocks(multi, ecto_schema_module_to_changes_list_map, options)
when is_map(ecto_schema_module_to_changes_list_map) and is_list(options) do
case ecto_schema_module_to_changes_list_map do
%{Block => blocks_changes} ->
timestamps = Keyword.fetch!(options, :timestamps)
Multi.run(multi, :blocks, fn _ ->
insert_blocks(
blocks_changes,
timeout: options[:blocks][:timeout] || @insert_blocks_timeout,
timestamps: timestamps
)
end)
_ ->
multi
end
end
defp run_transactions(multi, ecto_schema_module_to_changes_list_map, options)
when is_map(ecto_schema_module_to_changes_list_map) and is_list(options) do
case ecto_schema_module_to_changes_list_map do
%{Transaction => transactions_changes} ->
# check required options as early as possible
transactions_options = Keyword.fetch!(options, :transactions)
on_conflict = Keyword.fetch!(transactions_options, :on_conflict)
timestamps = Keyword.fetch!(options, :timestamps)
Multi.run(multi, :transactions, fn _ ->
insert_transactions(
transactions_changes,
on_conflict: on_conflict,
timeout: transactions_options[:timeout] || @insert_transactions_timeout,
timestamps: timestamps
)
end)
_ ->
multi
end
end
defp run_internal_transactions(multi, ecto_schema_module_to_changes_list_map, options)
when is_map(ecto_schema_module_to_changes_list_map) and is_list(options) do
case ecto_schema_module_to_changes_list_map do
%{InternalTransaction => internal_transactions_changes} ->
timestamps = Keyword.fetch!(options, :timestamps)
multi
|> Multi.run(:internal_transactions, fn _ ->
insert_internal_transactions(
internal_transactions_changes,
timeout: options[:internal_transactions][:timeout] || @insert_internal_transactions_timeout,
timestamps: timestamps
)
end)
|> Multi.run(:internal_transactions_indexed_at_transactions, fn %{internal_transactions: internal_transactions}
when is_list(internal_transactions) ->
update_transactions_internal_transactions_indexed_at(
internal_transactions,
timeout: options[:transactions][:timeout] || @insert_transactions_timeout,
timestamps: timestamps
)
end)
_ ->
multi
end
end
defp run_logs(multi, ecto_schema_module_to_changes_list_map, options)
when is_map(ecto_schema_module_to_changes_list_map) and is_list(options) do
case ecto_schema_module_to_changes_list_map do
%{Log => logs_changes} ->
timestamps = Keyword.fetch!(options, :timestamps)
Multi.run(multi, :logs, fn _ ->
insert_logs(
logs_changes,
timeout: options[:logs][:timeout] || @insert_logs_timeout,
timestamps: timestamps
)
end)
_ ->
multi
end
end
@spec insert_addresses([%{hash: Hash.Address.t()}], [timeout_option | timestamps_option | with_option]) ::
{:ok, [Hash.Address.t()]}
defp insert_addresses(changes_list, named_arguments)
when is_list(changes_list) and is_list(named_arguments) do
timestamps = Keyword.fetch!(named_arguments, :timestamps)
timeout = Keyword.fetch!(named_arguments, :timeout)
# order so that row ShareLocks are grabbed in a consistent order
ordered_changes_list = sort_address_changes_list(changes_list)
insert_changes_list(
ordered_changes_list,
conflict_target: :hash,
on_conflict:
from(
address in Address,
update: [
set: [
contract_code: fragment("COALESCE(?, EXCLUDED.contract_code)", address.contract_code),
# ARGMAX on two columns
fetched_balance:
fragment(
"""
CASE WHEN EXCLUDED.fetched_balance_block_number IS NOT NULL AND
(? IS NULL OR
EXCLUDED.fetched_balance_block_number >= ?) THEN
EXCLUDED.fetched_balance
ELSE ?
END
""",
address.fetched_balance_block_number,
address.fetched_balance_block_number,
address.fetched_balance
),
# MAX on two columns
fetched_balance_block_number:
fragment(
"""
CASE WHEN EXCLUDED.fetched_balance_block_number IS NOT NULL AND
(? IS NULL OR
EXCLUDED.fetched_balance_block_number >= ?) THEN
EXCLUDED.fetched_balance_block_number
ELSE ?
END
""",
address.fetched_balance_block_number,
address.fetched_balance_block_number,
address.fetched_balance_block_number
)
]
]
),
for: Address,
returning: true,
timeout: timeout,
timestamps: timestamps
)
end
defp sort_address_changes_list(changes_list) do
Enum.sort_by(changes_list, & &1.hash)
end
@spec insert_balances(
[
%{
required(:address_hash) => Hash.Address.t(),
required(:block_number) => Block.block_number(),
required(:value) => Wei.t()
}
],
[timeout_option]
) ::
{:ok, [%{required(:address_hash) => Hash.Address.t(), required(:block_number) => Block.block_number()}]}
| {:error, [Changeset.t()]}
defp insert_balances(changes_list, named_arguments) when is_list(changes_list) and is_list(named_arguments) do
timestamps = Keyword.fetch!(named_arguments, :timestamps)
timeout = Keyword.fetch!(named_arguments, :timeout)
# order so that row ShareLocks are grabbed in a consistent order
ordered_changes_list = Enum.sort_by(changes_list, &{&1.address_hash, &1.block_number})
{:ok, _} =
insert_changes_list(
ordered_changes_list,
conflict_target: [:address_hash, :block_number],
on_conflict:
from(
balance in Balance,
update: [
set: [
inserted_at: fragment("LEAST(EXCLUDED.inserted_at, ?)", balance.inserted_at),
updated_at: fragment("GREATEST(EXCLUDED.updated_at, ?)", balance.updated_at),
value:
fragment(
"""
CASE WHEN EXCLUDED.updated_at > ? THEN EXCLUDED.value
ELSE ?
END
""",
balance.updated_at,
balance.value
)
]
]
),
for: Balance,
timeout: timeout,
timestamps: timestamps
)
{:ok, Enum.map(ordered_changes_list, &Map.take(&1, ~w(address_hash block_number)a))}
end
@spec insert_blocks([map()], [timeout_option | timestamps_option]) :: {:ok, [Block.t()]} | {:error, [Changeset.t()]}
defp insert_blocks(changes_list, named_arguments)
when is_list(changes_list) and is_list(named_arguments) do
timestamps = Keyword.fetch!(named_arguments, :timestamps)
timeout = Keyword.fetch!(named_arguments, :timeout)
# order so that row ShareLocks are grabbed in a consistent order
ordered_changes_list = Enum.sort_by(changes_list, &{&1.number, &1.hash})
{:ok, blocks} =
insert_changes_list(
ordered_changes_list,
conflict_target: :number,
on_conflict: :replace_all,
for: Block,
returning: true,
timeout: timeout,
timestamps: timestamps
)
{:ok, blocks}
end
@spec insert_internal_transactions([map], [timeout_option | timestamps_option]) ::
{:ok, [%{index: non_neg_integer, transaction_hash: Hash.t()}]}
| {:error, [Changeset.t()]}
defp insert_internal_transactions(changes_list, named_arguments)
when is_list(changes_list) and is_list(named_arguments) do
timestamps = Keyword.fetch!(named_arguments, :timestamps)
# order so that row ShareLocks are grabbed in a consistent order
ordered_changes_list = Enum.sort_by(changes_list, &{&1.transaction_hash, &1.index})
{:ok, internal_transactions} =
insert_changes_list(
ordered_changes_list,
conflict_target: [:transaction_hash, :index],
for: InternalTransaction,
on_conflict: :replace_all,
returning: [:index, :transaction_hash],
timestamps: timestamps
)
{:ok,
for(
internal_transaction <- internal_transactions,
do: Map.take(internal_transaction, [:index, :transaction_hash])
)}
end
@spec insert_logs([map()], [timeout_option | timestamps_option]) ::
{:ok, [Log.t()]}
| {:error, [Changeset.t()]}
defp insert_logs(changes_list, named_arguments)
when is_list(changes_list) and is_list(named_arguments) do
timestamps = Keyword.fetch!(named_arguments, :timestamps)
timeout = Keyword.fetch!(named_arguments, :timeout)
# order so that row ShareLocks are grabbed in a consistent order
ordered_changes_list = Enum.sort_by(changes_list, &{&1.transaction_hash, &1.index})
{:ok, _} =
insert_changes_list(
ordered_changes_list,
conflict_target: [:transaction_hash, :index],
on_conflict: :replace_all,
for: Log,
returning: true,
timeout: timeout,
timestamps: timestamps
)
end
@spec insert_transactions([map()], [on_conflict_option | timeout_option | timestamps_option]) ::
{:ok, [Hash.t()]} | {:error, [Changeset.t()]}
defp insert_transactions(changes_list, named_arguments)
when is_list(changes_list) and is_list(named_arguments) do
timestamps = Keyword.fetch!(named_arguments, :timestamps)
timeout = Keyword.fetch!(named_arguments, :timeout)
on_conflict = Keyword.fetch!(named_arguments, :on_conflict)
# order so that row ShareLocks are grabbed in a consistent order
ordered_changes_list = Enum.sort_by(changes_list, & &1.hash)
{:ok, transactions} =
insert_changes_list(
ordered_changes_list,
conflict_target: :hash,
on_conflict: on_conflict,
for: Transaction,
returning: [:hash],
timeout: timeout,
timestamps: timestamps
)
{:ok, for(transaction <- transactions, do: transaction.hash)}
end
defp insert_changes_list(changes_list, options) when is_list(changes_list) do
ecto_schema_module = Keyword.fetch!(options, :for)
timestamped_changes_list = timestamp_changes_list(changes_list, Keyword.fetch!(options, :timestamps))
{_, inserted} =
Repo.safe_insert_all(
ecto_schema_module,
timestamped_changes_list,
Keyword.delete(options, :for)
)
{:ok, inserted}
end
defp update_transactions_internal_transactions_indexed_at(internal_transactions, named_arguments)
when is_list(internal_transactions) and is_list(named_arguments) do
timeout = Keyword.fetch!(named_arguments, :timeout)
timestamps = Keyword.fetch!(named_arguments, :timestamps)
ordered_transaction_hashes =
internal_transactions
|> MapSet.new(& &1.transaction_hash)
|> Enum.sort()
query =
from(
t in Transaction,
where: t.hash in ^ordered_transaction_hashes,
update: [set: [internal_transactions_indexed_at: ^timestamps.updated_at]]
)
transaction_count = Enum.count(ordered_transaction_hashes)
{^transaction_count, result} = Repo.update_all(query, [], timeout: timeout)
{:ok, result}
end
defp timestamp_changes_list(changes_list, timestamps) when is_list(changes_list) do
Enum.map(changes_list, &timestamp_params(&1, timestamps))
end
defp timestamp_params(changes, timestamps) when is_map(changes) do
Map.merge(changes, timestamps)
end
defp import_options_to_changes_list_arguments_list(options) do
Enum.flat_map(@import_option_key_to_ecto_schema_module, fn {option_key, ecto_schema_module} ->
case Keyword.fetch(options, option_key) do
{:ok, option_value} when is_list(option_value) ->
[
[
Keyword.fetch!(option_value, :params),
[for: ecto_schema_module, with: Keyword.get(option_value, :with, :changeset)]
]
]
:error ->
[]
end
end)
end
defp import_transaction(multi, options) when is_list(options) do
Repo.transaction(multi, timeout: Keyword.get(options, :timeout, @transaction_timeout))
end
defp insert_ecto_schema_module_to_changes_list_map(ecto_schema_module_to_changes_list_map, options) do
timestamps = timestamps()
ecto_schema_module_to_changes_list_map
|> ecto_schema_module_to_changes_list_map_to_multi(Keyword.put(options, :timestamps, timestamps))
|> import_transaction(options)
end
@spec timestamps() :: timestamps
defp timestamps do
now = DateTime.utc_now()
%{inserted_at: now, updated_at: now}
end
end

@ -7,9 +7,9 @@ defmodule Explorer.Chain.Transaction do
alias Explorer.Chain.{Address, Block, Data, Gas, Hash, InternalTransaction, Log, Wei} alias Explorer.Chain.{Address, Block, Data, Gas, Hash, InternalTransaction, Log, Wei}
alias Explorer.Chain.Transaction.Status alias Explorer.Chain.Transaction.Status
@optional_attrs ~w(block_hash block_number cumulative_gas_used from_address_hash gas_used index @optional_attrs ~w(block_hash block_number cumulative_gas_used gas_used index internal_transactions_indexed_at status
internal_transactions_indexed_at status to_address_hash)a to_address_hash)a
@required_attrs ~w(gas gas_price hash input nonce r s v value)a @required_attrs ~w(from_address_hash gas gas_price hash input nonce r s v value)a
@typedoc """ @typedoc """
X coordinate module n in X coordinate module n in
@ -153,6 +153,7 @@ defmodule Explorer.Chain.Transaction do
iex> changeset = Explorer.Chain.Transaction.changeset( iex> changeset = Explorer.Chain.Transaction.changeset(
...> %Transaction{}, ...> %Transaction{},
...> %{ ...> %{
...> from_address_hash: "0xe8ddc5c7a2d2f0d7a9798459c0104fdf5e987aca",
...> gas: 4700000, ...> gas: 4700000,
...> gas_price: 100000000000, ...> gas_price: 100000000000,
...> hash: "0x3a3eb134e6792ce9403ea4188e5e79693de9e4c94e499db132be086400da79e6", ...> hash: "0x3a3eb134e6792ce9403ea4188e5e79693de9e4c94e499db132be086400da79e6",
@ -173,6 +174,7 @@ defmodule Explorer.Chain.Transaction do
iex> changeset = Explorer.Chain.Transaction.changeset( iex> changeset = Explorer.Chain.Transaction.changeset(
...> %Transaction{}, ...> %Transaction{},
...> %{ ...> %{
...> from_address_hash: "0xe8ddc5c7a2d2f0d7a9798459c0104fdf5e987aca",
...> block_number: 34, ...> block_number: 34,
...> cumulative_gas_used: 0, ...> cumulative_gas_used: 0,
...> gas: 4700000, ...> gas: 4700000,
@ -210,6 +212,7 @@ defmodule Explorer.Chain.Transaction do
...> block_hash: "0xe52d77084cab13a4e724162bcd8c6028e5ecfaa04d091ee476e96b9958ed6b47", ...> block_hash: "0xe52d77084cab13a4e724162bcd8c6028e5ecfaa04d091ee476e96b9958ed6b47",
...> block_number: 34, ...> block_number: 34,
...> cumulative_gas_used: 0, ...> cumulative_gas_used: 0,
...> from_address_hash: "0xe8ddc5c7a2d2f0d7a9798459c0104fdf5e987aca",
...> gas: 4700000, ...> gas: 4700000,
...> gas_price: 100000000000, ...> gas_price: 100000000000,
...> gas_used: 4600000, ...> gas_used: 4600000,

@ -10,6 +10,7 @@ defmodule Explorer.Chain.TransactionTest do
test "with valid attributes" do test "with valid attributes" do
assert %Changeset{valid?: true} = assert %Changeset{valid?: true} =
Transaction.changeset(%Transaction{}, %{ Transaction.changeset(%Transaction{}, %{
from_address_hash: "0xe8ddc5c7a2d2f0d7a9798459c0104fdf5e987aca",
hash: "0x9fc76417374aa880d4449a1f7f31ec597f00b1f6f3dd2d66f4c9c6c445836d8b", hash: "0x9fc76417374aa880d4449a1f7f31ec597f00b1f6f3dd2d66f4c9c6c445836d8b",
value: 1, value: 1,
gas: 21000, gas: 21000,
@ -29,7 +30,7 @@ defmodule Explorer.Chain.TransactionTest do
end end
test "it creates a new to address" do test "it creates a new to address" do
params = params_for(:transaction) params = params_for(:transaction, from_address_hash: "0xe8ddc5c7a2d2f0d7a9798459c0104fdf5e987aca")
to_address_params = %{hash: "sk8orDi3"} to_address_params = %{hash: "sk8orDi3"}
changeset_params = Map.merge(params, %{to_address: to_address_params}) changeset_params = Map.merge(params, %{to_address: to_address_params})

@ -1040,60 +1040,6 @@ defmodule Explorer.ChainTest do
end end
end end
describe "import_internal_transactions/1" do
test "updates address with contract code" do
smart_contract_bytecode =
"0x608060405234801561001057600080fd5b5060df8061001f6000396000f3006080604052600436106049576000357c0100000000000000000000000000000000000000000000000000000000900463ffffffff16806360fe47b114604e5780636d4ce63c146078575b600080fd5b348015605957600080fd5b5060766004803603810190808035906020019092919050505060a0565b005b348015608357600080fd5b50608a60aa565b6040518082815260200191505060405180910390f35b8060008190555050565b600080549050905600a165627a7a7230582040d82a7379b1ee1632ad4d8a239954fd940277b25628ead95259a85c5eddb2120029"
address_hash = "0x1c494fa496f1cfd918b5ff190835af3aaf60987e"
insert(:address, hash: address_hash)
from_address_hash = "0x8cc2e4b51b4340cb3727cffe3f1878756e732cee"
from_address = insert(:address, hash: from_address_hash)
transaction_string_hash = "0x0705ea0a5b997d9aafd5c531e016d9aabe3297a28c0bd4ef005fe6ea329d301b"
insert(:transaction, from_address: from_address, hash: transaction_string_hash)
options = [
addresses: [
params: [
%{
contract_code: smart_contract_bytecode,
hash: address_hash
}
]
],
internal_transactions: [
params: [
%{
created_contract_address_hash: address_hash,
created_contract_code: smart_contract_bytecode,
from_address_hash: from_address_hash,
gas: 184_531,
gas_used: 84531,
index: 0,
init:
"0x6060604052341561000c57fe5b5b6101a68061001c6000396000f300606060405263ffffffff7c01000000000000000000000000000000000000000000000000000000006000350416631d3b9edf811461005b57806366098d4f1461007b578063a12f69e01461009b578063f4f3bdc1146100bb575bfe5b6100696004356024356100db565b60408051918252519081900360200190f35b61006960043560243561010a565b60408051918252519081900360200190f35b610069600435602435610124565b60408051918252519081900360200190f35b610069600435602435610163565b60408051918252519081900360200190f35b60008282028315806100f757508284828115156100f457fe5b04145b15156100ff57fe5b8091505b5092915050565b6000828201838110156100ff57fe5b8091505b5092915050565b60008080831161013057fe5b828481151561013b57fe5b049050828481151561014957fe5b0681840201841415156100ff57fe5b8091505b5092915050565b60008282111561016f57fe5b508082035b929150505600a165627a7a7230582020c944d8375ca14e2c92b14df53c2d044cb99dc30c3ba9f55e2bcde87bd4709b0029",
trace_address: [],
transaction_hash: transaction_string_hash,
type: "create",
value: 0
}
]
],
transactions: [
hashes: [transaction_string_hash]
]
]
assert {:ok, _} = Chain.import_internal_transactions(options)
address = Explorer.Repo.one(from(address in Explorer.Chain.Address, where: address.hash == ^address_hash))
assert address.contract_code != nil
end
end
describe "stream_unfetched_balances/2" do describe "stream_unfetched_balances/2" do
test "with existing `t:Explorer.Chain.Balance.t/0` with same `address_hash` and `block_number` " <> test "with existing `t:Explorer.Chain.Balance.t/0` with same `address_hash` and `block_number` " <>
"does not return `t:Explorer.Chain.Block.t/0` `miner_hash`" do "does not return `t:Explorer.Chain.Block.t/0` `miner_hash`" do
@ -1469,7 +1415,7 @@ defmodule Explorer.ChainTest do
assert [{^current_pid, _}] = Registry.lookup(Registry.ChainEvents, :logs) assert [{^current_pid, _}] = Registry.lookup(Registry.ChainEvents, :logs)
end end
describe "import_blocks" do describe "import" do
@import_data [ @import_data [
blocks: [ blocks: [
params: [ params: [
@ -1558,25 +1504,25 @@ defmodule Explorer.ChainTest do
test "publishes addresses with updated fetched_balance data to subscribers on insert" do test "publishes addresses with updated fetched_balance data to subscribers on insert" do
Chain.subscribe_to_events(:addresses) Chain.subscribe_to_events(:addresses)
Chain.import_blocks(@import_data) Chain.import(@import_data)
assert_received {:chain_event, :addresses, [%Address{}, %Address{}]} assert_received {:chain_event, :addresses, [%Address{}, %Address{}]}
end end
test "publishes block data to subscribers on insert" do test "publishes block data to subscribers on insert" do
Chain.subscribe_to_events(:blocks) Chain.subscribe_to_events(:blocks)
Chain.import_blocks(@import_data) Chain.import(@import_data)
assert_received {:chain_event, :blocks, [%Block{}]} assert_received {:chain_event, :blocks, [%Block{}]}
end end
test "publishes log data to subscribers on insert" do test "publishes log data to subscribers on insert" do
Chain.subscribe_to_events(:logs) Chain.subscribe_to_events(:logs)
Chain.import_blocks(@import_data) Chain.import(@import_data)
assert_received {:chain_event, :logs, [%Log{}]} assert_received {:chain_event, :logs, [%Log{}]}
end end
test "publishes transaction hashes data to subscribers on insert" do test "publishes transaction hashes data to subscribers on insert" do
Chain.subscribe_to_events(:transactions) Chain.subscribe_to_events(:transactions)
Chain.import_blocks(@import_data) Chain.import(@import_data)
assert_received {:chain_event, :transactions, [%Hash{}]} assert_received {:chain_event, :transactions, [%Hash{}]}
end end
@ -1584,7 +1530,7 @@ defmodule Explorer.ChainTest do
non_broadcast_data = Keyword.merge(@import_data, broadcast: false) non_broadcast_data = Keyword.merge(@import_data, broadcast: false)
Chain.subscribe_to_events(:logs) Chain.subscribe_to_events(:logs)
Chain.import_blocks(non_broadcast_data) Chain.import(non_broadcast_data)
refute_received {:chain_event, :logs, [%Log{}]} refute_received {:chain_event, :logs, [%Log{}]}
end end
end end

@ -0,0 +1,117 @@
defmodule Explorer.Chain.ImportTest do
use Explorer.DataCase
alias Explorer.Chain.Import
doctest Import
describe "all/1" do
test "updates address with contract code" do
smart_contract_bytecode =
"0x608060405234801561001057600080fd5b5060df8061001f6000396000f3006080604052600436106049576000357c0100000000000000000000000000000000000000000000000000000000900463ffffffff16806360fe47b114604e5780636d4ce63c146078575b600080fd5b348015605957600080fd5b5060766004803603810190808035906020019092919050505060a0565b005b348015608357600080fd5b50608a60aa565b6040518082815260200191505060405180910390f35b8060008190555050565b600080549050905600a165627a7a7230582040d82a7379b1ee1632ad4d8a239954fd940277b25628ead95259a85c5eddb2120029"
address_hash = "0x1c494fa496f1cfd918b5ff190835af3aaf60987e"
insert(:address, hash: address_hash)
from_address_hash = "0x8cc2e4b51b4340cb3727cffe3f1878756e732cee"
from_address = insert(:address, hash: from_address_hash)
transaction_string_hash = "0x0705ea0a5b997d9aafd5c531e016d9aabe3297a28c0bd4ef005fe6ea329d301b"
insert(:transaction, from_address: from_address, hash: transaction_string_hash)
options = [
addresses: [
params: [
%{
contract_code: smart_contract_bytecode,
hash: address_hash
}
]
],
internal_transactions: [
params: [
%{
created_contract_address_hash: address_hash,
created_contract_code: smart_contract_bytecode,
from_address_hash: from_address_hash,
gas: 184_531,
gas_used: 84531,
index: 0,
init:
"0x6060604052341561000c57fe5b5b6101a68061001c6000396000f300606060405263ffffffff7c01000000000000000000000000000000000000000000000000000000006000350416631d3b9edf811461005b57806366098d4f1461007b578063a12f69e01461009b578063f4f3bdc1146100bb575bfe5b6100696004356024356100db565b60408051918252519081900360200190f35b61006960043560243561010a565b60408051918252519081900360200190f35b610069600435602435610124565b60408051918252519081900360200190f35b610069600435602435610163565b60408051918252519081900360200190f35b60008282028315806100f757508284828115156100f457fe5b04145b15156100ff57fe5b8091505b5092915050565b6000828201838110156100ff57fe5b8091505b5092915050565b60008080831161013057fe5b828481151561013b57fe5b049050828481151561014957fe5b0681840201841415156100ff57fe5b8091505b5092915050565b60008282111561016f57fe5b508082035b929150505600a165627a7a7230582020c944d8375ca14e2c92b14df53c2d044cb99dc30c3ba9f55e2bcde87bd4709b0029",
trace_address: [],
transaction_hash: transaction_string_hash,
type: "create",
value: 0
}
]
]
]
assert {:ok, _} = Import.all(options)
address = Explorer.Repo.one(from(address in Explorer.Chain.Address, where: address.hash == ^address_hash))
assert address.contract_code != nil
end
test "with internal_transactions updates Transaction internal_transactions_indexed_at" do
from_address_hash = "0xe8ddc5c7a2d2f0d7a9798459c0104fdf5e987aca"
to_address_hash = "0x8bf38d4764929064f2d4d3a56520a76ab3df415b"
transaction_hash = "0x3a3eb134e6792ce9403ea4188e5e79693de9e4c94e499db132be086400da79e6"
options = [
addresses: [
params: [
%{hash: from_address_hash},
%{hash: to_address_hash}
]
],
transactions: [
params: [
%{
from_address_hash: from_address_hash,
gas: 4_677_320,
gas_price: 1,
hash: transaction_hash,
input: "0x",
nonce: 0,
r: 0,
s: 0,
v: 0,
value: 0
}
],
on_conflict: :replace_all
],
internal_transactions: [
params: [
%{
block_number: 35,
call_type: "call",
from_address_hash: from_address_hash,
gas: 4_677_320,
gas_used: 27770,
index: 0,
output: "0x",
to_address_hash: to_address_hash,
trace_address: [],
transaction_hash: transaction_hash,
type: "call",
value: 0
}
]
]
]
refute Enum.any?(options[:transactions][:params], &Map.has_key?(&1, :internal_transactions_indexed_at))
assert {:ok, _} = Import.all(options)
transaction =
Explorer.Repo.one(from(transaction in Explorer.Chain.Transaction, where: transaction.hash == ^transaction_hash))
refute transaction.internal_transactions_indexed_at == nil
end
end
end

@ -280,7 +280,7 @@ defmodule ExplorerWeb.ViewingAddressesTest do
], ],
balances: [%{address_hash: ^hash}] balances: [%{address_hash: ^hash}]
}} = }} =
Chain.import_blocks( Chain.import(
addresses: [ addresses: [
params: [ params: [
%{ %{

@ -75,7 +75,7 @@ defmodule Indexer.BalanceFetcher do
addresses_params = balances_params_to_address_params(balances_params) addresses_params = balances_params_to_address_params(balances_params)
{:ok, _} = {:ok, _} =
Chain.import_blocks( Chain.import(
addresses: [params: addresses_params, with: :balance_changeset], addresses: [params: addresses_params, with: :balance_changeset],
balances: [params: balances_params] balances: [params: balances_params]
) )

@ -280,7 +280,7 @@ defmodule Indexer.BlockFetcher do
options_with_broadcast = Keyword.merge(import_options, broadcast: indexer_mode == :realtime_index) options_with_broadcast = Keyword.merge(import_options, broadcast: indexer_mode == :realtime_index)
with {:ok, results} <- Chain.import_blocks(options_with_broadcast) do with {:ok, results} <- Chain.import(options_with_broadcast) do
async_import_remaining_block_data( async_import_remaining_block_data(
results, results,
address_hash_to_fetched_balance_block_number: address_hash_to_fetched_balance_block_number, address_hash_to_fetched_balance_block_number: address_hash_to_fetched_balance_block_number,
@ -300,8 +300,8 @@ defmodule Indexer.BlockFetcher do
end end
end end
# `fetched_balance_block_number` is needed for the `BalanceFetcher`, but should not be used for # `fetched_balance_block_number` is needed for the `BalanceFetcher`, but should not be used for `import` because the
# `import_blocks` because the balance is not known yet. # balance is not known yet.
defp pop_address_hash_to_fetched_balance_block_number(options) do defp pop_address_hash_to_fetched_balance_block_number(options) do
{address_hash_fetched_balance_block_number_pairs, import_options} = {address_hash_fetched_balance_block_number_pairs, import_options} =
get_and_update_in(options, [:addresses, :params, Access.all()], &pop_hash_fetched_balance_block_number/1) get_and_update_in(options, [:addresses, :params, Access.all()], &pop_hash_fetched_balance_block_number/1)

@ -99,13 +99,10 @@ defmodule Indexer.InternalTransactionFetcher do
{hash, block_number} {hash, block_number}
end) end)
transaction_hashes = Enum.map(unique_transactions_params, &Map.fetch!(&1, :hash_data))
with {:ok, %{addresses: address_hashes}} <- with {:ok, %{addresses: address_hashes}} <-
Chain.import_internal_transactions( Chain.import(
addresses: [params: addresses_params], addresses: [params: addresses_params],
internal_transactions: [params: internal_transactions_params], internal_transactions: [params: internal_transactions_params]
transactions: [hashes: transaction_hashes]
) do ) do
address_hashes address_hashes
|> Enum.map(fn address_hash -> |> Enum.map(fn address_hash ->

@ -97,7 +97,7 @@ defmodule Indexer.PendingTransactionFetcher do
# affected the address balance yet since address balance is a balance at a give block and these transactions are # affected the address balance yet since address balance is a balance at a give block and these transactions are
# blockless. # blockless.
{:ok, _} = {:ok, _} =
Chain.import_blocks( Chain.import(
addresses: [params: addresses_params], addresses: [params: addresses_params],
transactions: [on_conflict: :nothing, params: transactions_params] transactions: [on_conflict: :nothing, params: transactions_params]
) )

@ -76,37 +76,6 @@ defmodule Indexer.SequenceTest do
# noproc when the sequence has already died by the time monitor is called # noproc when the sequence has already died by the time monitor is called
assert_receive {:DOWN, ^sequence_ref, :process, ^sequence_pid, status} when status in [:normal, :noproc] assert_receive {:DOWN, ^sequence_ref, :process, ^sequence_pid, status} when status in [:normal, :noproc]
end end
test "with :ranges in direction opposite of :step returns errors for all ranges in wrong direction" do
parent = self()
{child_pid, child_ref} =
spawn_monitor(fn ->
send(
parent,
Sequence.start_link(
ranges: [
# ok, ok
7..6,
# ok, error
4..5,
# error, ok
3..2,
# error, error
0..1
],
step: -1
)
)
end)
assert_receive {:DOWN, ^child_ref, :process, ^child_pid,
[
"Range (0..1) direction is opposite step (-1) direction",
"Range (4..5) direction is opposite step (-1) direction"
]},
200
end
end end
describe "queue/2" do describe "queue/2" do

Loading…
Cancel
Save