diff --git a/apps/explorer/lib/explorer/chain.ex b/apps/explorer/lib/explorer/chain.ex index 5a413168c7..881864927f 100644 --- a/apps/explorer/lib/explorer/chain.ex +++ b/apps/explorer/lib/explorer/chain.ex @@ -621,21 +621,11 @@ defmodule Explorer.Chain do See `Explorer.Chain.Import.all/1` for options and returns. """ - @spec import_blocks(Import.all_options()) :: Import.all_result() - def import_blocks(options) do + @spec import(Import.all_options()) :: Import.all_result() + def import(options) do Import.all(options) end - @doc """ - Bulk insert internal transactions and update `t:Explorer.Chain.Transaction.t/0` `internal_transactions_indexed_at`. - - See `Explorer.Chain.Import.internal_transactions/1` for options and returns. - """ - @spec import_internal_transactions(Import.internal_transactions_options()) :: Import.internal_transactions_result() - def import_internal_transactions(options) do - Import.internal_transactions(options) - end - @doc """ The number of `t:Explorer.Chain.Address.t/0`. diff --git a/apps/explorer/lib/explorer/chain/import.ex b/apps/explorer/lib/explorer/chain/import.ex index 8978d39209..04223f76e5 100644 --- a/apps/explorer/lib/explorer/chain/import.ex +++ b/apps/explorer/lib/explorer/chain/import.ex @@ -21,7 +21,7 @@ defmodule Explorer.Chain.Import do @typep timeout_option :: {:timeout, timeout} @typep timestamps :: %{inserted_at: DateTime.t(), updated_at: DateTime.t()} @typep timestamps_option :: {:timestamps, timestamps} - @typep transactions_option :: {:transactions, [on_conflict_option | params_option | timeout_option]} + @typep transactions_option :: {:transactions, [on_conflict_option | params_option | timeout_option | with_option]} @typep with_option :: {:with, changeset_function_name :: atom} @type all_options :: [ @@ -83,8 +83,6 @@ defmodule Explorer.Chain.Import do @insert_logs_timeout 60_000 @insert_transactions_timeout 60_000 - @update_transactions_timeout 60_000 - @doc """ Bulk insert all data stored in the `Explorer`. @@ -119,6 +117,7 @@ defmodule Explorer.Chain.Import do * `:addresses` * `:params` - `list` of params for `Explorer.Chain.Address.changeset/2`. * `:timeout` - the timeout for inserting all addresses. Defaults to `#{@insert_addresses_timeout}` milliseconds. + * `:with` - the changeset function on `Explorer.Chain.Address` to use validate `:params`. * `:balances` * `:params` - `list` of params for `Explorer.Chain.Balance.changeset/2`. * `:timeout` - the timeout for inserting all balances. Defaults to `#{@insert_balances_timeout}` milliseconds. @@ -146,6 +145,7 @@ defmodule Explorer.Chain.Import do * `:params` - `list` of params for `Explorer.Chain.Transaction.changeset/2`. * `:timeout` - the timeout for inserting all transactions found in the params lists across all types. Defaults to `#{@insert_transactions_timeout}` milliseconds. + * `:with` - the changeset function on `Explorer.Chain.Transaction` to use validate `:params`. """ @spec all(all_options()) :: all_result() def all(options) when is_list(options) do @@ -165,56 +165,6 @@ defmodule Explorer.Chain.Import do end end - @doc """ - Bulk insert internal transactions for a list of transactions. - - ## Options - - * `:addresses` - * `:params` - `list` of params for `Explorer.Chain.Address.changeset/2`. - * `:timeout` - the timeout for inserting all addresses. Defaults to `#{@insert_addresses_timeout}` milliseconds. - * `:internal_transactions` - * `:params` - `list` of params for `Explorer.Chain.InternalTransaction.changeset/2`. - * `:timeout` - the timeout for inserting all internal transactions. Defaults to - `#{@insert_internal_transactions_timeout}` milliseconds. - * `:transactions` - * `:hashes` - `list` of `t:Explorer.Chain.Transaction.t/0` `hash`es that should have their - `internal_transactions_indexed_at` updated. - * `:timeout` - the timeout for updating transactions with `:hashes`. Defaults to - `#{@update_transactions_timeout}` milliseconds. - * `:timeout` - the timeout for the whole `c:Ecto.Repo.transaction/0` call. Defaults to `#{@transaction_timeout}` - milliseconds. - """ - @spec internal_transactions(internal_transactions_options()) :: internal_transactions_result - def internal_transactions(options) when is_list(options) do - {transactions_options, import_options} = Keyword.pop(options, :transactions) - changes_list_options_list = import_options_to_changes_list_arguments_list(import_options) - - with {:ok, ecto_schema_module_to_changes_list} <- - changes_list_arguments_list_to_ecto_schema_module_to_changes_list(changes_list_options_list) do - timestamps = timestamps() - - ecto_schema_module_to_changes_list - |> ecto_schema_module_to_changes_list_to_multi(Keyword.put(options, :timestamps, timestamps)) - |> Multi.run(:transactions, fn _ -> - transaction_hashes = Keyword.get(transactions_options, :hashes) - transactions_count = length(transaction_hashes) - - query = - from( - t in Transaction, - where: t.hash in ^transaction_hashes, - update: [set: [internal_transactions_indexed_at: ^timestamps.updated_at]] - ) - - {^transactions_count, result} = Repo.update_all(query, []) - - {:ok, result} - end) - |> import_transaction(options) - end - end - defp broadcast_events(data) do for {event_type, event_data} <- data, event_type in ~w(addresses balances blocks logs transactions)a do broadcast_event_data(event_type, event_data) @@ -384,13 +334,22 @@ defmodule Explorer.Chain.Import do %{InternalTransaction => internal_transactions_changes} -> timestamps = Keyword.fetch!(options, :timestamps) - Multi.run(multi, :internal_transactions, fn _ -> + multi + |> Multi.run(:internal_transactions, fn _ -> insert_internal_transactions( internal_transactions_changes, timeout: options[:internal_transactions][:timeout] || @insert_internal_transactions_timeout, timestamps: timestamps ) end) + |> Multi.run(:internal_transactions_indexed_at_transactions, fn %{internal_transactions: internal_transactions} + when is_list(internal_transactions) -> + update_transactions_internal_transactions_indexed_at( + internal_transactions, + timeout: options[:transactions][:timeout] || @insert_transactions_timeout, + timestamps: timestamps + ) + end) _ -> multi @@ -643,6 +602,30 @@ defmodule Explorer.Chain.Import do {:ok, inserted} end + defp update_transactions_internal_transactions_indexed_at(internal_transactions, named_arguments) + when is_list(internal_transactions) and is_list(named_arguments) do + timeout = Keyword.fetch!(named_arguments, :timeout) + timestamps = Keyword.fetch!(named_arguments, :timestamps) + + ordered_transaction_hashes = + internal_transactions + |> MapSet.new(& &1.transaction_hash) + |> Enum.sort() + + query = + from( + t in Transaction, + where: t.hash in ^ordered_transaction_hashes, + update: [set: [internal_transactions_indexed_at: ^timestamps.updated_at]] + ) + + transaction_count = Enum.count(ordered_transaction_hashes) + + {^transaction_count, result} = Repo.update_all(query, [], timeout: timeout) + + {:ok, result} + end + defp timestamp_changes_list(changes_list, timestamps) when is_list(changes_list) do Enum.map(changes_list, ×tamp_params(&1, timestamps)) end diff --git a/apps/explorer/test/explorer/chain_test.exs b/apps/explorer/test/explorer/chain_test.exs index 3967ca2db4..c9924e83f0 100644 --- a/apps/explorer/test/explorer/chain_test.exs +++ b/apps/explorer/test/explorer/chain_test.exs @@ -1040,60 +1040,6 @@ defmodule Explorer.ChainTest do end end - describe "import_internal_transactions/1" do - test "updates address with contract code" do - smart_contract_bytecode = - "0x608060405234801561001057600080fd5b5060df8061001f6000396000f3006080604052600436106049576000357c0100000000000000000000000000000000000000000000000000000000900463ffffffff16806360fe47b114604e5780636d4ce63c146078575b600080fd5b348015605957600080fd5b5060766004803603810190808035906020019092919050505060a0565b005b348015608357600080fd5b50608a60aa565b6040518082815260200191505060405180910390f35b8060008190555050565b600080549050905600a165627a7a7230582040d82a7379b1ee1632ad4d8a239954fd940277b25628ead95259a85c5eddb2120029" - - address_hash = "0x1c494fa496f1cfd918b5ff190835af3aaf60987e" - insert(:address, hash: address_hash) - - from_address_hash = "0x8cc2e4b51b4340cb3727cffe3f1878756e732cee" - from_address = insert(:address, hash: from_address_hash) - - transaction_string_hash = "0x0705ea0a5b997d9aafd5c531e016d9aabe3297a28c0bd4ef005fe6ea329d301b" - insert(:transaction, from_address: from_address, hash: transaction_string_hash) - - options = [ - addresses: [ - params: [ - %{ - contract_code: smart_contract_bytecode, - hash: address_hash - } - ] - ], - internal_transactions: [ - params: [ - %{ - created_contract_address_hash: address_hash, - created_contract_code: smart_contract_bytecode, - from_address_hash: from_address_hash, - gas: 184_531, - gas_used: 84531, - index: 0, - init: - "0x6060604052341561000c57fe5b5b6101a68061001c6000396000f300606060405263ffffffff7c01000000000000000000000000000000000000000000000000000000006000350416631d3b9edf811461005b57806366098d4f1461007b578063a12f69e01461009b578063f4f3bdc1146100bb575bfe5b6100696004356024356100db565b60408051918252519081900360200190f35b61006960043560243561010a565b60408051918252519081900360200190f35b610069600435602435610124565b60408051918252519081900360200190f35b610069600435602435610163565b60408051918252519081900360200190f35b60008282028315806100f757508284828115156100f457fe5b04145b15156100ff57fe5b8091505b5092915050565b6000828201838110156100ff57fe5b8091505b5092915050565b60008080831161013057fe5b828481151561013b57fe5b049050828481151561014957fe5b0681840201841415156100ff57fe5b8091505b5092915050565b60008282111561016f57fe5b508082035b929150505600a165627a7a7230582020c944d8375ca14e2c92b14df53c2d044cb99dc30c3ba9f55e2bcde87bd4709b0029", - trace_address: [], - transaction_hash: transaction_string_hash, - type: "create", - value: 0 - } - ] - ], - transactions: [ - hashes: [transaction_string_hash] - ] - ] - - assert {:ok, _} = Chain.import_internal_transactions(options) - - address = Explorer.Repo.one(from(address in Explorer.Chain.Address, where: address.hash == ^address_hash)) - - assert address.contract_code != nil - end - end - describe "stream_unfetched_balances/2" do test "with existing `t:Explorer.Chain.Balance.t/0` with same `address_hash` and `block_number` " <> "does not return `t:Explorer.Chain.Block.t/0` `miner_hash`" do @@ -1469,7 +1415,7 @@ defmodule Explorer.ChainTest do assert [{^current_pid, _}] = Registry.lookup(Registry.ChainEvents, :logs) end - describe "import_blocks" do + describe "import" do @import_data [ blocks: [ params: [ @@ -1558,25 +1504,25 @@ defmodule Explorer.ChainTest do test "publishes addresses with updated fetched_balance data to subscribers on insert" do Chain.subscribe_to_events(:addresses) - Chain.import_blocks(@import_data) + Chain.import(@import_data) assert_received {:chain_event, :addresses, [%Address{}, %Address{}]} end test "publishes block data to subscribers on insert" do Chain.subscribe_to_events(:blocks) - Chain.import_blocks(@import_data) + Chain.import(@import_data) assert_received {:chain_event, :blocks, [%Block{}]} end test "publishes log data to subscribers on insert" do Chain.subscribe_to_events(:logs) - Chain.import_blocks(@import_data) + Chain.import(@import_data) assert_received {:chain_event, :logs, [%Log{}]} end test "publishes transaction hashes data to subscribers on insert" do Chain.subscribe_to_events(:transactions) - Chain.import_blocks(@import_data) + Chain.import(@import_data) assert_received {:chain_event, :transactions, [%Hash{}]} end @@ -1584,7 +1530,7 @@ defmodule Explorer.ChainTest do non_broadcast_data = Keyword.merge(@import_data, broadcast: false) Chain.subscribe_to_events(:logs) - Chain.import_blocks(non_broadcast_data) + Chain.import(non_broadcast_data) refute_received {:chain_event, :logs, [%Log{}]} end end diff --git a/apps/explorer/test/explorer/import_test.exs b/apps/explorer/test/explorer/import_test.exs index 606d630fd8..c78d960a57 100644 --- a/apps/explorer/test/explorer/import_test.exs +++ b/apps/explorer/test/explorer/import_test.exs @@ -1,5 +1,117 @@ defmodule Explorer.Chain.ImportTest do use Explorer.DataCase - doctest Explorer.Chain.Import + alias Explorer.Chain.Import + + doctest Import + + describe "all/1" do + test "updates address with contract code" do + smart_contract_bytecode = + "0x608060405234801561001057600080fd5b5060df8061001f6000396000f3006080604052600436106049576000357c0100000000000000000000000000000000000000000000000000000000900463ffffffff16806360fe47b114604e5780636d4ce63c146078575b600080fd5b348015605957600080fd5b5060766004803603810190808035906020019092919050505060a0565b005b348015608357600080fd5b50608a60aa565b6040518082815260200191505060405180910390f35b8060008190555050565b600080549050905600a165627a7a7230582040d82a7379b1ee1632ad4d8a239954fd940277b25628ead95259a85c5eddb2120029" + + address_hash = "0x1c494fa496f1cfd918b5ff190835af3aaf60987e" + insert(:address, hash: address_hash) + + from_address_hash = "0x8cc2e4b51b4340cb3727cffe3f1878756e732cee" + from_address = insert(:address, hash: from_address_hash) + + transaction_string_hash = "0x0705ea0a5b997d9aafd5c531e016d9aabe3297a28c0bd4ef005fe6ea329d301b" + insert(:transaction, from_address: from_address, hash: transaction_string_hash) + + options = [ + addresses: [ + params: [ + %{ + contract_code: smart_contract_bytecode, + hash: address_hash + } + ] + ], + internal_transactions: [ + params: [ + %{ + created_contract_address_hash: address_hash, + created_contract_code: smart_contract_bytecode, + from_address_hash: from_address_hash, + gas: 184_531, + gas_used: 84531, + index: 0, + init: + "0x6060604052341561000c57fe5b5b6101a68061001c6000396000f300606060405263ffffffff7c01000000000000000000000000000000000000000000000000000000006000350416631d3b9edf811461005b57806366098d4f1461007b578063a12f69e01461009b578063f4f3bdc1146100bb575bfe5b6100696004356024356100db565b60408051918252519081900360200190f35b61006960043560243561010a565b60408051918252519081900360200190f35b610069600435602435610124565b60408051918252519081900360200190f35b610069600435602435610163565b60408051918252519081900360200190f35b60008282028315806100f757508284828115156100f457fe5b04145b15156100ff57fe5b8091505b5092915050565b6000828201838110156100ff57fe5b8091505b5092915050565b60008080831161013057fe5b828481151561013b57fe5b049050828481151561014957fe5b0681840201841415156100ff57fe5b8091505b5092915050565b60008282111561016f57fe5b508082035b929150505600a165627a7a7230582020c944d8375ca14e2c92b14df53c2d044cb99dc30c3ba9f55e2bcde87bd4709b0029", + trace_address: [], + transaction_hash: transaction_string_hash, + type: "create", + value: 0 + } + ] + ] + ] + + assert {:ok, _} = Import.all(options) + + address = Explorer.Repo.one(from(address in Explorer.Chain.Address, where: address.hash == ^address_hash)) + + assert address.contract_code != nil + end + + test "with internal_transactions updates Transaction internal_transactions_indexed_at" do + from_address_hash = "0xe8ddc5c7a2d2f0d7a9798459c0104fdf5e987aca" + to_address_hash = "0x8bf38d4764929064f2d4d3a56520a76ab3df415b" + transaction_hash = "0x3a3eb134e6792ce9403ea4188e5e79693de9e4c94e499db132be086400da79e6" + + options = [ + addresses: [ + params: [ + %{hash: from_address_hash}, + %{hash: to_address_hash} + ] + ], + transactions: [ + params: [ + %{ + from_address_hash: from_address_hash, + gas: 4_677_320, + gas_price: 1, + hash: transaction_hash, + input: "0x", + nonce: 0, + r: 0, + s: 0, + v: 0, + value: 0 + } + ], + on_conflict: :replace_all + ], + internal_transactions: [ + params: [ + %{ + block_number: 35, + call_type: "call", + from_address_hash: from_address_hash, + gas: 4_677_320, + gas_used: 27770, + index: 0, + output: "0x", + to_address_hash: to_address_hash, + trace_address: [], + transaction_hash: transaction_hash, + type: "call", + value: 0 + } + ] + ] + ] + + refute Enum.any?(options[:transactions][:params], &Map.has_key?(&1, :internal_transactions_indexed_at)) + + assert {:ok, _} = Import.all(options) + + transaction = + Explorer.Repo.one(from(transaction in Explorer.Chain.Transaction, where: transaction.hash == ^transaction_hash)) + + refute transaction.internal_transactions_indexed_at == nil + end + end end diff --git a/apps/explorer_web/test/explorer_web/features/viewing_addresses_test.exs b/apps/explorer_web/test/explorer_web/features/viewing_addresses_test.exs index 2dd91e6c16..59140a8c9e 100644 --- a/apps/explorer_web/test/explorer_web/features/viewing_addresses_test.exs +++ b/apps/explorer_web/test/explorer_web/features/viewing_addresses_test.exs @@ -280,7 +280,7 @@ defmodule ExplorerWeb.ViewingAddressesTest do ], balances: [%{address_hash: ^hash}] }} = - Chain.import_blocks( + Chain.import( addresses: [ params: [ %{ diff --git a/apps/indexer/lib/indexer/balance_fetcher.ex b/apps/indexer/lib/indexer/balance_fetcher.ex index 66c30dbe31..7a62447879 100644 --- a/apps/indexer/lib/indexer/balance_fetcher.ex +++ b/apps/indexer/lib/indexer/balance_fetcher.ex @@ -75,7 +75,7 @@ defmodule Indexer.BalanceFetcher do addresses_params = balances_params_to_address_params(balances_params) {:ok, _} = - Chain.import_blocks( + Chain.import( addresses: [params: addresses_params, with: :balance_changeset], balances: [params: balances_params] ) diff --git a/apps/indexer/lib/indexer/block_fetcher.ex b/apps/indexer/lib/indexer/block_fetcher.ex index 45ba0eb544..0039323dc9 100644 --- a/apps/indexer/lib/indexer/block_fetcher.ex +++ b/apps/indexer/lib/indexer/block_fetcher.ex @@ -280,7 +280,7 @@ defmodule Indexer.BlockFetcher do options_with_broadcast = Keyword.merge(import_options, broadcast: indexer_mode == :realtime_index) - with {:ok, results} <- Chain.import_blocks(options_with_broadcast) do + with {:ok, results} <- Chain.import(options_with_broadcast) do async_import_remaining_block_data( results, address_hash_to_fetched_balance_block_number: address_hash_to_fetched_balance_block_number, @@ -300,8 +300,8 @@ defmodule Indexer.BlockFetcher do end end - # `fetched_balance_block_number` is needed for the `BalanceFetcher`, but should not be used for - # `import_blocks` because the balance is not known yet. + # `fetched_balance_block_number` is needed for the `BalanceFetcher`, but should not be used for `import` because the + # balance is not known yet. defp pop_address_hash_to_fetched_balance_block_number(options) do {address_hash_fetched_balance_block_number_pairs, import_options} = get_and_update_in(options, [:addresses, :params, Access.all()], &pop_hash_fetched_balance_block_number/1) diff --git a/apps/indexer/lib/indexer/internal_transaction_fetcher.ex b/apps/indexer/lib/indexer/internal_transaction_fetcher.ex index 09479adc82..fd4f9e3e7b 100644 --- a/apps/indexer/lib/indexer/internal_transaction_fetcher.ex +++ b/apps/indexer/lib/indexer/internal_transaction_fetcher.ex @@ -99,13 +99,10 @@ defmodule Indexer.InternalTransactionFetcher do {hash, block_number} end) - transaction_hashes = Enum.map(unique_transactions_params, &Map.fetch!(&1, :hash_data)) - with {:ok, %{addresses: address_hashes}} <- - Chain.import_internal_transactions( + Chain.import( addresses: [params: addresses_params], - internal_transactions: [params: internal_transactions_params], - transactions: [hashes: transaction_hashes] + internal_transactions: [params: internal_transactions_params] ) do address_hashes |> Enum.map(fn address_hash -> diff --git a/apps/indexer/lib/indexer/pending_transaction_fetcher.ex b/apps/indexer/lib/indexer/pending_transaction_fetcher.ex index c90a7642fc..37775d406e 100644 --- a/apps/indexer/lib/indexer/pending_transaction_fetcher.ex +++ b/apps/indexer/lib/indexer/pending_transaction_fetcher.ex @@ -97,7 +97,7 @@ defmodule Indexer.PendingTransactionFetcher do # affected the address balance yet since address balance is a balance at a give block and these transactions are # blockless. {:ok, _} = - Chain.import_blocks( + Chain.import( addresses: [params: addresses_params], transactions: [on_conflict: :nothing, params: transactions_params] )