Unified Explorer.Chain.import

* Merge `Explorer.Chain.import_blocks` and
`Explorer.Chain.import_internal_transactions` into
`Explorer.Chain.import`.
* Move body of `Explorer.Chain.import` and its private helper functions
to `Explorer.Chain.Import` as an independent context that
`Explorer.Chain` can use.  `Explorer.Chain.Import` must be under
`Explorer.Chain` because `Explorer.Chain.Import` still needs direct
access to the `Explorer.Chain` schemas.
pull/467/head
Luke Imhoff 6 years ago
parent c4133c1fb0
commit 2f3af51741
  1. 14
      apps/explorer/lib/explorer/chain.ex
  2. 91
      apps/explorer/lib/explorer/chain/import.ex
  3. 66
      apps/explorer/test/explorer/chain_test.exs
  4. 114
      apps/explorer/test/explorer/import_test.exs
  5. 2
      apps/explorer_web/test/explorer_web/features/viewing_addresses_test.exs
  6. 2
      apps/indexer/lib/indexer/balance_fetcher.ex
  7. 6
      apps/indexer/lib/indexer/block_fetcher.ex
  8. 7
      apps/indexer/lib/indexer/internal_transaction_fetcher.ex
  9. 2
      apps/indexer/lib/indexer/pending_transaction_fetcher.ex

@ -621,21 +621,11 @@ defmodule Explorer.Chain do
See `Explorer.Chain.Import.all/1` for options and returns. See `Explorer.Chain.Import.all/1` for options and returns.
""" """
@spec import_blocks(Import.all_options()) :: Import.all_result() @spec import(Import.all_options()) :: Import.all_result()
def import_blocks(options) do def import(options) do
Import.all(options) Import.all(options)
end end
@doc """
Bulk insert internal transactions and update `t:Explorer.Chain.Transaction.t/0` `internal_transactions_indexed_at`.
See `Explorer.Chain.Import.internal_transactions/1` for options and returns.
"""
@spec import_internal_transactions(Import.internal_transactions_options()) :: Import.internal_transactions_result()
def import_internal_transactions(options) do
Import.internal_transactions(options)
end
@doc """ @doc """
The number of `t:Explorer.Chain.Address.t/0`. The number of `t:Explorer.Chain.Address.t/0`.

@ -21,7 +21,7 @@ defmodule Explorer.Chain.Import do
@typep timeout_option :: {:timeout, timeout} @typep timeout_option :: {:timeout, timeout}
@typep timestamps :: %{inserted_at: DateTime.t(), updated_at: DateTime.t()} @typep timestamps :: %{inserted_at: DateTime.t(), updated_at: DateTime.t()}
@typep timestamps_option :: {:timestamps, timestamps} @typep timestamps_option :: {:timestamps, timestamps}
@typep transactions_option :: {:transactions, [on_conflict_option | params_option | timeout_option]} @typep transactions_option :: {:transactions, [on_conflict_option | params_option | timeout_option | with_option]}
@typep with_option :: {:with, changeset_function_name :: atom} @typep with_option :: {:with, changeset_function_name :: atom}
@type all_options :: [ @type all_options :: [
@ -83,8 +83,6 @@ defmodule Explorer.Chain.Import do
@insert_logs_timeout 60_000 @insert_logs_timeout 60_000
@insert_transactions_timeout 60_000 @insert_transactions_timeout 60_000
@update_transactions_timeout 60_000
@doc """ @doc """
Bulk insert all data stored in the `Explorer`. Bulk insert all data stored in the `Explorer`.
@ -119,6 +117,7 @@ defmodule Explorer.Chain.Import do
* `:addresses` * `:addresses`
* `:params` - `list` of params for `Explorer.Chain.Address.changeset/2`. * `:params` - `list` of params for `Explorer.Chain.Address.changeset/2`.
* `:timeout` - the timeout for inserting all addresses. Defaults to `#{@insert_addresses_timeout}` milliseconds. * `:timeout` - the timeout for inserting all addresses. Defaults to `#{@insert_addresses_timeout}` milliseconds.
* `:with` - the changeset function on `Explorer.Chain.Address` to use validate `:params`.
* `:balances` * `:balances`
* `:params` - `list` of params for `Explorer.Chain.Balance.changeset/2`. * `:params` - `list` of params for `Explorer.Chain.Balance.changeset/2`.
* `:timeout` - the timeout for inserting all balances. Defaults to `#{@insert_balances_timeout}` milliseconds. * `:timeout` - the timeout for inserting all balances. Defaults to `#{@insert_balances_timeout}` milliseconds.
@ -146,6 +145,7 @@ defmodule Explorer.Chain.Import do
* `:params` - `list` of params for `Explorer.Chain.Transaction.changeset/2`. * `:params` - `list` of params for `Explorer.Chain.Transaction.changeset/2`.
* `:timeout` - the timeout for inserting all transactions found in the params lists across all * `:timeout` - the timeout for inserting all transactions found in the params lists across all
types. Defaults to `#{@insert_transactions_timeout}` milliseconds. types. Defaults to `#{@insert_transactions_timeout}` milliseconds.
* `:with` - the changeset function on `Explorer.Chain.Transaction` to use validate `:params`.
""" """
@spec all(all_options()) :: all_result() @spec all(all_options()) :: all_result()
def all(options) when is_list(options) do def all(options) when is_list(options) do
@ -165,56 +165,6 @@ defmodule Explorer.Chain.Import do
end end
end end
@doc """
Bulk insert internal transactions for a list of transactions.
## Options
* `:addresses`
* `:params` - `list` of params for `Explorer.Chain.Address.changeset/2`.
* `:timeout` - the timeout for inserting all addresses. Defaults to `#{@insert_addresses_timeout}` milliseconds.
* `:internal_transactions`
* `:params` - `list` of params for `Explorer.Chain.InternalTransaction.changeset/2`.
* `:timeout` - the timeout for inserting all internal transactions. Defaults to
`#{@insert_internal_transactions_timeout}` milliseconds.
* `:transactions`
* `:hashes` - `list` of `t:Explorer.Chain.Transaction.t/0` `hash`es that should have their
`internal_transactions_indexed_at` updated.
* `:timeout` - the timeout for updating transactions with `:hashes`. Defaults to
`#{@update_transactions_timeout}` milliseconds.
* `:timeout` - the timeout for the whole `c:Ecto.Repo.transaction/0` call. Defaults to `#{@transaction_timeout}`
milliseconds.
"""
@spec internal_transactions(internal_transactions_options()) :: internal_transactions_result
def internal_transactions(options) when is_list(options) do
{transactions_options, import_options} = Keyword.pop(options, :transactions)
changes_list_options_list = import_options_to_changes_list_arguments_list(import_options)
with {:ok, ecto_schema_module_to_changes_list} <-
changes_list_arguments_list_to_ecto_schema_module_to_changes_list(changes_list_options_list) do
timestamps = timestamps()
ecto_schema_module_to_changes_list
|> ecto_schema_module_to_changes_list_to_multi(Keyword.put(options, :timestamps, timestamps))
|> Multi.run(:transactions, fn _ ->
transaction_hashes = Keyword.get(transactions_options, :hashes)
transactions_count = length(transaction_hashes)
query =
from(
t in Transaction,
where: t.hash in ^transaction_hashes,
update: [set: [internal_transactions_indexed_at: ^timestamps.updated_at]]
)
{^transactions_count, result} = Repo.update_all(query, [])
{:ok, result}
end)
|> import_transaction(options)
end
end
defp broadcast_events(data) do defp broadcast_events(data) do
for {event_type, event_data} <- data, event_type in ~w(addresses balances blocks logs transactions)a do for {event_type, event_data} <- data, event_type in ~w(addresses balances blocks logs transactions)a do
broadcast_event_data(event_type, event_data) broadcast_event_data(event_type, event_data)
@ -384,13 +334,22 @@ defmodule Explorer.Chain.Import do
%{InternalTransaction => internal_transactions_changes} -> %{InternalTransaction => internal_transactions_changes} ->
timestamps = Keyword.fetch!(options, :timestamps) timestamps = Keyword.fetch!(options, :timestamps)
Multi.run(multi, :internal_transactions, fn _ -> multi
|> Multi.run(:internal_transactions, fn _ ->
insert_internal_transactions( insert_internal_transactions(
internal_transactions_changes, internal_transactions_changes,
timeout: options[:internal_transactions][:timeout] || @insert_internal_transactions_timeout, timeout: options[:internal_transactions][:timeout] || @insert_internal_transactions_timeout,
timestamps: timestamps timestamps: timestamps
) )
end) end)
|> Multi.run(:internal_transactions_indexed_at_transactions, fn %{internal_transactions: internal_transactions}
when is_list(internal_transactions) ->
update_transactions_internal_transactions_indexed_at(
internal_transactions,
timeout: options[:transactions][:timeout] || @insert_transactions_timeout,
timestamps: timestamps
)
end)
_ -> _ ->
multi multi
@ -643,6 +602,30 @@ defmodule Explorer.Chain.Import do
{:ok, inserted} {:ok, inserted}
end end
defp update_transactions_internal_transactions_indexed_at(internal_transactions, named_arguments)
when is_list(internal_transactions) and is_list(named_arguments) do
timeout = Keyword.fetch!(named_arguments, :timeout)
timestamps = Keyword.fetch!(named_arguments, :timestamps)
ordered_transaction_hashes =
internal_transactions
|> MapSet.new(& &1.transaction_hash)
|> Enum.sort()
query =
from(
t in Transaction,
where: t.hash in ^ordered_transaction_hashes,
update: [set: [internal_transactions_indexed_at: ^timestamps.updated_at]]
)
transaction_count = Enum.count(ordered_transaction_hashes)
{^transaction_count, result} = Repo.update_all(query, [], timeout: timeout)
{:ok, result}
end
defp timestamp_changes_list(changes_list, timestamps) when is_list(changes_list) do defp timestamp_changes_list(changes_list, timestamps) when is_list(changes_list) do
Enum.map(changes_list, &timestamp_params(&1, timestamps)) Enum.map(changes_list, &timestamp_params(&1, timestamps))
end end

@ -1040,60 +1040,6 @@ defmodule Explorer.ChainTest do
end end
end end
describe "import_internal_transactions/1" do
test "updates address with contract code" do
smart_contract_bytecode =
"0x608060405234801561001057600080fd5b5060df8061001f6000396000f3006080604052600436106049576000357c0100000000000000000000000000000000000000000000000000000000900463ffffffff16806360fe47b114604e5780636d4ce63c146078575b600080fd5b348015605957600080fd5b5060766004803603810190808035906020019092919050505060a0565b005b348015608357600080fd5b50608a60aa565b6040518082815260200191505060405180910390f35b8060008190555050565b600080549050905600a165627a7a7230582040d82a7379b1ee1632ad4d8a239954fd940277b25628ead95259a85c5eddb2120029"
address_hash = "0x1c494fa496f1cfd918b5ff190835af3aaf60987e"
insert(:address, hash: address_hash)
from_address_hash = "0x8cc2e4b51b4340cb3727cffe3f1878756e732cee"
from_address = insert(:address, hash: from_address_hash)
transaction_string_hash = "0x0705ea0a5b997d9aafd5c531e016d9aabe3297a28c0bd4ef005fe6ea329d301b"
insert(:transaction, from_address: from_address, hash: transaction_string_hash)
options = [
addresses: [
params: [
%{
contract_code: smart_contract_bytecode,
hash: address_hash
}
]
],
internal_transactions: [
params: [
%{
created_contract_address_hash: address_hash,
created_contract_code: smart_contract_bytecode,
from_address_hash: from_address_hash,
gas: 184_531,
gas_used: 84531,
index: 0,
init:
"0x6060604052341561000c57fe5b5b6101a68061001c6000396000f300606060405263ffffffff7c01000000000000000000000000000000000000000000000000000000006000350416631d3b9edf811461005b57806366098d4f1461007b578063a12f69e01461009b578063f4f3bdc1146100bb575bfe5b6100696004356024356100db565b60408051918252519081900360200190f35b61006960043560243561010a565b60408051918252519081900360200190f35b610069600435602435610124565b60408051918252519081900360200190f35b610069600435602435610163565b60408051918252519081900360200190f35b60008282028315806100f757508284828115156100f457fe5b04145b15156100ff57fe5b8091505b5092915050565b6000828201838110156100ff57fe5b8091505b5092915050565b60008080831161013057fe5b828481151561013b57fe5b049050828481151561014957fe5b0681840201841415156100ff57fe5b8091505b5092915050565b60008282111561016f57fe5b508082035b929150505600a165627a7a7230582020c944d8375ca14e2c92b14df53c2d044cb99dc30c3ba9f55e2bcde87bd4709b0029",
trace_address: [],
transaction_hash: transaction_string_hash,
type: "create",
value: 0
}
]
],
transactions: [
hashes: [transaction_string_hash]
]
]
assert {:ok, _} = Chain.import_internal_transactions(options)
address = Explorer.Repo.one(from(address in Explorer.Chain.Address, where: address.hash == ^address_hash))
assert address.contract_code != nil
end
end
describe "stream_unfetched_balances/2" do describe "stream_unfetched_balances/2" do
test "with existing `t:Explorer.Chain.Balance.t/0` with same `address_hash` and `block_number` " <> test "with existing `t:Explorer.Chain.Balance.t/0` with same `address_hash` and `block_number` " <>
"does not return `t:Explorer.Chain.Block.t/0` `miner_hash`" do "does not return `t:Explorer.Chain.Block.t/0` `miner_hash`" do
@ -1469,7 +1415,7 @@ defmodule Explorer.ChainTest do
assert [{^current_pid, _}] = Registry.lookup(Registry.ChainEvents, :logs) assert [{^current_pid, _}] = Registry.lookup(Registry.ChainEvents, :logs)
end end
describe "import_blocks" do describe "import" do
@import_data [ @import_data [
blocks: [ blocks: [
params: [ params: [
@ -1558,25 +1504,25 @@ defmodule Explorer.ChainTest do
test "publishes addresses with updated fetched_balance data to subscribers on insert" do test "publishes addresses with updated fetched_balance data to subscribers on insert" do
Chain.subscribe_to_events(:addresses) Chain.subscribe_to_events(:addresses)
Chain.import_blocks(@import_data) Chain.import(@import_data)
assert_received {:chain_event, :addresses, [%Address{}, %Address{}]} assert_received {:chain_event, :addresses, [%Address{}, %Address{}]}
end end
test "publishes block data to subscribers on insert" do test "publishes block data to subscribers on insert" do
Chain.subscribe_to_events(:blocks) Chain.subscribe_to_events(:blocks)
Chain.import_blocks(@import_data) Chain.import(@import_data)
assert_received {:chain_event, :blocks, [%Block{}]} assert_received {:chain_event, :blocks, [%Block{}]}
end end
test "publishes log data to subscribers on insert" do test "publishes log data to subscribers on insert" do
Chain.subscribe_to_events(:logs) Chain.subscribe_to_events(:logs)
Chain.import_blocks(@import_data) Chain.import(@import_data)
assert_received {:chain_event, :logs, [%Log{}]} assert_received {:chain_event, :logs, [%Log{}]}
end end
test "publishes transaction hashes data to subscribers on insert" do test "publishes transaction hashes data to subscribers on insert" do
Chain.subscribe_to_events(:transactions) Chain.subscribe_to_events(:transactions)
Chain.import_blocks(@import_data) Chain.import(@import_data)
assert_received {:chain_event, :transactions, [%Hash{}]} assert_received {:chain_event, :transactions, [%Hash{}]}
end end
@ -1584,7 +1530,7 @@ defmodule Explorer.ChainTest do
non_broadcast_data = Keyword.merge(@import_data, broadcast: false) non_broadcast_data = Keyword.merge(@import_data, broadcast: false)
Chain.subscribe_to_events(:logs) Chain.subscribe_to_events(:logs)
Chain.import_blocks(non_broadcast_data) Chain.import(non_broadcast_data)
refute_received {:chain_event, :logs, [%Log{}]} refute_received {:chain_event, :logs, [%Log{}]}
end end
end end

@ -1,5 +1,117 @@
defmodule Explorer.Chain.ImportTest do defmodule Explorer.Chain.ImportTest do
use Explorer.DataCase use Explorer.DataCase
doctest Explorer.Chain.Import alias Explorer.Chain.Import
doctest Import
describe "all/1" do
test "updates address with contract code" do
smart_contract_bytecode =
"0x608060405234801561001057600080fd5b5060df8061001f6000396000f3006080604052600436106049576000357c0100000000000000000000000000000000000000000000000000000000900463ffffffff16806360fe47b114604e5780636d4ce63c146078575b600080fd5b348015605957600080fd5b5060766004803603810190808035906020019092919050505060a0565b005b348015608357600080fd5b50608a60aa565b6040518082815260200191505060405180910390f35b8060008190555050565b600080549050905600a165627a7a7230582040d82a7379b1ee1632ad4d8a239954fd940277b25628ead95259a85c5eddb2120029"
address_hash = "0x1c494fa496f1cfd918b5ff190835af3aaf60987e"
insert(:address, hash: address_hash)
from_address_hash = "0x8cc2e4b51b4340cb3727cffe3f1878756e732cee"
from_address = insert(:address, hash: from_address_hash)
transaction_string_hash = "0x0705ea0a5b997d9aafd5c531e016d9aabe3297a28c0bd4ef005fe6ea329d301b"
insert(:transaction, from_address: from_address, hash: transaction_string_hash)
options = [
addresses: [
params: [
%{
contract_code: smart_contract_bytecode,
hash: address_hash
}
]
],
internal_transactions: [
params: [
%{
created_contract_address_hash: address_hash,
created_contract_code: smart_contract_bytecode,
from_address_hash: from_address_hash,
gas: 184_531,
gas_used: 84531,
index: 0,
init:
"0x6060604052341561000c57fe5b5b6101a68061001c6000396000f300606060405263ffffffff7c01000000000000000000000000000000000000000000000000000000006000350416631d3b9edf811461005b57806366098d4f1461007b578063a12f69e01461009b578063f4f3bdc1146100bb575bfe5b6100696004356024356100db565b60408051918252519081900360200190f35b61006960043560243561010a565b60408051918252519081900360200190f35b610069600435602435610124565b60408051918252519081900360200190f35b610069600435602435610163565b60408051918252519081900360200190f35b60008282028315806100f757508284828115156100f457fe5b04145b15156100ff57fe5b8091505b5092915050565b6000828201838110156100ff57fe5b8091505b5092915050565b60008080831161013057fe5b828481151561013b57fe5b049050828481151561014957fe5b0681840201841415156100ff57fe5b8091505b5092915050565b60008282111561016f57fe5b508082035b929150505600a165627a7a7230582020c944d8375ca14e2c92b14df53c2d044cb99dc30c3ba9f55e2bcde87bd4709b0029",
trace_address: [],
transaction_hash: transaction_string_hash,
type: "create",
value: 0
}
]
]
]
assert {:ok, _} = Import.all(options)
address = Explorer.Repo.one(from(address in Explorer.Chain.Address, where: address.hash == ^address_hash))
assert address.contract_code != nil
end
test "with internal_transactions updates Transaction internal_transactions_indexed_at" do
from_address_hash = "0xe8ddc5c7a2d2f0d7a9798459c0104fdf5e987aca"
to_address_hash = "0x8bf38d4764929064f2d4d3a56520a76ab3df415b"
transaction_hash = "0x3a3eb134e6792ce9403ea4188e5e79693de9e4c94e499db132be086400da79e6"
options = [
addresses: [
params: [
%{hash: from_address_hash},
%{hash: to_address_hash}
]
],
transactions: [
params: [
%{
from_address_hash: from_address_hash,
gas: 4_677_320,
gas_price: 1,
hash: transaction_hash,
input: "0x",
nonce: 0,
r: 0,
s: 0,
v: 0,
value: 0
}
],
on_conflict: :replace_all
],
internal_transactions: [
params: [
%{
block_number: 35,
call_type: "call",
from_address_hash: from_address_hash,
gas: 4_677_320,
gas_used: 27770,
index: 0,
output: "0x",
to_address_hash: to_address_hash,
trace_address: [],
transaction_hash: transaction_hash,
type: "call",
value: 0
}
]
]
]
refute Enum.any?(options[:transactions][:params], &Map.has_key?(&1, :internal_transactions_indexed_at))
assert {:ok, _} = Import.all(options)
transaction =
Explorer.Repo.one(from(transaction in Explorer.Chain.Transaction, where: transaction.hash == ^transaction_hash))
refute transaction.internal_transactions_indexed_at == nil
end
end
end end

@ -280,7 +280,7 @@ defmodule ExplorerWeb.ViewingAddressesTest do
], ],
balances: [%{address_hash: ^hash}] balances: [%{address_hash: ^hash}]
}} = }} =
Chain.import_blocks( Chain.import(
addresses: [ addresses: [
params: [ params: [
%{ %{

@ -75,7 +75,7 @@ defmodule Indexer.BalanceFetcher do
addresses_params = balances_params_to_address_params(balances_params) addresses_params = balances_params_to_address_params(balances_params)
{:ok, _} = {:ok, _} =
Chain.import_blocks( Chain.import(
addresses: [params: addresses_params, with: :balance_changeset], addresses: [params: addresses_params, with: :balance_changeset],
balances: [params: balances_params] balances: [params: balances_params]
) )

@ -280,7 +280,7 @@ defmodule Indexer.BlockFetcher do
options_with_broadcast = Keyword.merge(import_options, broadcast: indexer_mode == :realtime_index) options_with_broadcast = Keyword.merge(import_options, broadcast: indexer_mode == :realtime_index)
with {:ok, results} <- Chain.import_blocks(options_with_broadcast) do with {:ok, results} <- Chain.import(options_with_broadcast) do
async_import_remaining_block_data( async_import_remaining_block_data(
results, results,
address_hash_to_fetched_balance_block_number: address_hash_to_fetched_balance_block_number, address_hash_to_fetched_balance_block_number: address_hash_to_fetched_balance_block_number,
@ -300,8 +300,8 @@ defmodule Indexer.BlockFetcher do
end end
end end
# `fetched_balance_block_number` is needed for the `BalanceFetcher`, but should not be used for # `fetched_balance_block_number` is needed for the `BalanceFetcher`, but should not be used for `import` because the
# `import_blocks` because the balance is not known yet. # balance is not known yet.
defp pop_address_hash_to_fetched_balance_block_number(options) do defp pop_address_hash_to_fetched_balance_block_number(options) do
{address_hash_fetched_balance_block_number_pairs, import_options} = {address_hash_fetched_balance_block_number_pairs, import_options} =
get_and_update_in(options, [:addresses, :params, Access.all()], &pop_hash_fetched_balance_block_number/1) get_and_update_in(options, [:addresses, :params, Access.all()], &pop_hash_fetched_balance_block_number/1)

@ -99,13 +99,10 @@ defmodule Indexer.InternalTransactionFetcher do
{hash, block_number} {hash, block_number}
end) end)
transaction_hashes = Enum.map(unique_transactions_params, &Map.fetch!(&1, :hash_data))
with {:ok, %{addresses: address_hashes}} <- with {:ok, %{addresses: address_hashes}} <-
Chain.import_internal_transactions( Chain.import(
addresses: [params: addresses_params], addresses: [params: addresses_params],
internal_transactions: [params: internal_transactions_params], internal_transactions: [params: internal_transactions_params]
transactions: [hashes: transaction_hashes]
) do ) do
address_hashes address_hashes
|> Enum.map(fn address_hash -> |> Enum.map(fn address_hash ->

@ -97,7 +97,7 @@ defmodule Indexer.PendingTransactionFetcher do
# affected the address balance yet since address balance is a balance at a give block and these transactions are # affected the address balance yet since address balance is a balance at a give block and these transactions are
# blockless. # blockless.
{:ok, _} = {:ok, _} =
Chain.import_blocks( Chain.import(
addresses: [params: addresses_params], addresses: [params: addresses_params],
transactions: [on_conflict: :nothing, params: transactions_params] transactions: [on_conflict: :nothing, params: transactions_params]
) )

Loading…
Cancel
Save