From f02e2d8841a48d6cfe6599f6c4b955093fb692c3 Mon Sep 17 00:00:00 2001 From: Paul Tsupikoff Date: Thu, 11 Apr 2019 15:06:18 +0300 Subject: [PATCH] Remove internal tx and token balance fetching from realtime fetcher (#1724) Internal transactions and token balances are the slowest to fetch and import. We'd rather have almost instant import of almost all block-related info than frequent timeouts in realtime fetcher and handing blocks over to catchup fetcher which imports this data asynchronously anyway. --- CHANGELOG.md | 1 + .../lib/indexer/block/catchup/fetcher.ex | 64 +----- apps/indexer/lib/indexer/block/fetcher.ex | 69 +++++- .../lib/indexer/block/realtime/fetcher.ex | 205 +++--------------- .../indexer/block/realtime/fetcher_test.exs | 61 +++--- 5 files changed, 125 insertions(+), 275 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index c411fb0283..d7226bf642 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -4,6 +4,7 @@ ### Fixes + - [#1724](https://github.com/poanetwork/blockscout/pull/1724) - Remove internal tx and token balance fetching from realtime fetcher - [#1727](https://github.com/poanetwork/blockscout/pull/1727) - add logs pagination in rpc api ### Chore diff --git a/apps/indexer/lib/indexer/block/catchup/fetcher.ex b/apps/indexer/lib/indexer/block/catchup/fetcher.ex index 212903526d..7a1eeea620 100644 --- a/apps/indexer/lib/indexer/block/catchup/fetcher.ex +++ b/apps/indexer/lib/indexer/block/catchup/fetcher.ex @@ -11,18 +11,19 @@ defmodule Indexer.Block.Catchup.Fetcher do only: [ async_import_block_rewards: 1, async_import_coin_balances: 2, + async_import_created_contract_codes: 1, + async_import_internal_transactions: 2, + async_import_replaced_transactions: 1, async_import_tokens: 1, + async_import_token_balances: 1, async_import_uncles: 1, - fetch_and_import_range: 2, - async_import_replaced_transactions: 1 + fetch_and_import_range: 2 ] alias Ecto.Changeset alias Explorer.Chain - alias Explorer.Chain.{Hash, Transaction} alias Indexer.{Block, Tracer} alias Indexer.Block.Catchup.Sequence - alias Indexer.Fetcher.{ContractCode, InternalTransaction, TokenBalance} alias Indexer.Memory.Shrinkable @behaviour Block.Fetcher @@ -33,7 +34,6 @@ defmodule Indexer.Block.Catchup.Fetcher do @blocks_batch_size 10 @blocks_concurrency 10 @sequence_name :block_catchup_sequencer - @geth_block_limit 128 defstruct blocks_batch_size: @blocks_batch_size, blocks_concurrency: @blocks_concurrency, @@ -159,60 +159,6 @@ defmodule Indexer.Block.Catchup.Fetcher do async_import_replaced_transactions(imported) end - defp async_import_created_contract_codes(%{transactions: transactions}) do - transactions - |> Enum.flat_map(fn - %Transaction{ - block_number: block_number, - hash: hash, - created_contract_address_hash: %Hash{} = created_contract_address_hash, - created_contract_code_indexed_at: nil, - internal_transactions_indexed_at: nil - } -> - [%{block_number: block_number, hash: hash, created_contract_address_hash: created_contract_address_hash}] - - %Transaction{internal_transactions_indexed_at: %DateTime{}} -> - [] - - %Transaction{created_contract_address_hash: nil} -> - [] - end) - |> ContractCode.async_fetch(10_000) - end - - defp async_import_created_contract_codes(_), do: :ok - - defp async_import_internal_transactions(%{blocks: blocks}, EthereumJSONRPC.Parity) do - blocks - |> Enum.map(fn %Chain.Block{number: block_number} -> %{number: block_number} end) - |> InternalTransaction.async_block_fetch(10_000) - end - - defp async_import_internal_transactions(%{transactions: transactions}, EthereumJSONRPC.Geth) do - {_, max_block_number} = Chain.fetch_min_and_max_block_numbers() - - transactions - |> Enum.flat_map(fn - %Transaction{block_number: block_number, index: index, hash: hash, internal_transactions_indexed_at: nil} -> - [%{block_number: block_number, index: index, hash: hash}] - - %Transaction{internal_transactions_indexed_at: %DateTime{}} -> - [] - end) - |> Enum.filter(fn %{block_number: block_number} -> - max_block_number - block_number < @geth_block_limit - end) - |> InternalTransaction.async_fetch(10_000) - end - - defp async_import_internal_transactions(_, _), do: :ok - - defp async_import_token_balances(%{address_token_balances: token_balances}) do - TokenBalance.async_fetch(token_balances) - end - - defp async_import_token_balances(_), do: :ok - defp stream_fetch_and_import(%__MODULE__{blocks_concurrency: blocks_concurrency} = state, sequence) when is_pid(sequence) do sequence diff --git a/apps/indexer/lib/indexer/block/fetcher.ex b/apps/indexer/lib/indexer/block/fetcher.ex index 313738de3a..241142f4f6 100644 --- a/apps/indexer/lib/indexer/block/fetcher.ex +++ b/apps/indexer/lib/indexer/block/fetcher.ex @@ -10,9 +10,21 @@ defmodule Indexer.Block.Fetcher do import EthereumJSONRPC, only: [quantity_to_integer: 1] alias EthereumJSONRPC.{Blocks, FetchedBeneficiaries} + alias Explorer.Chain alias Explorer.Chain.{Address, Block, Hash, Import, Transaction} alias Indexer.Block.Fetcher.Receipts - alias Indexer.Fetcher.{BlockReward, CoinBalance, ReplacedTransaction, Token, UncleBlock} + + alias Indexer.Fetcher.{ + BlockReward, + CoinBalance, + ContractCode, + InternalTransaction, + ReplacedTransaction, + Token, + TokenBalance, + UncleBlock + } + alias Indexer.Tracer alias Indexer.Transform.{ @@ -55,6 +67,7 @@ defmodule Indexer.Block.Fetcher do @receipts_batch_size 250 @receipts_concurrency 10 + @geth_block_limit 128 @doc false def default_receipts_batch_size, do: @receipts_batch_size @@ -205,6 +218,54 @@ defmodule Indexer.Block.Fetcher do def async_import_coin_balances(_, _), do: :ok + def async_import_created_contract_codes(%{transactions: transactions}) do + transactions + |> Enum.flat_map(fn + %Transaction{ + block_number: block_number, + hash: hash, + created_contract_address_hash: %Hash{} = created_contract_address_hash, + created_contract_code_indexed_at: nil, + internal_transactions_indexed_at: nil + } -> + [%{block_number: block_number, hash: hash, created_contract_address_hash: created_contract_address_hash}] + + %Transaction{internal_transactions_indexed_at: %DateTime{}} -> + [] + + %Transaction{created_contract_address_hash: nil} -> + [] + end) + |> ContractCode.async_fetch(10_000) + end + + def async_import_created_contract_codes(_), do: :ok + + def async_import_internal_transactions(%{blocks: blocks}, EthereumJSONRPC.Parity) do + blocks + |> Enum.map(fn %Block{number: block_number} -> %{number: block_number} end) + |> InternalTransaction.async_block_fetch(10_000) + end + + def async_import_internal_transactions(%{transactions: transactions}, EthereumJSONRPC.Geth) do + {_, max_block_number} = Chain.fetch_min_and_max_block_numbers() + + transactions + |> Enum.flat_map(fn + %Transaction{block_number: block_number, index: index, hash: hash, internal_transactions_indexed_at: nil} -> + [%{block_number: block_number, index: index, hash: hash}] + + %Transaction{internal_transactions_indexed_at: %DateTime{}} -> + [] + end) + |> Enum.filter(fn %{block_number: block_number} -> + max_block_number - block_number < @geth_block_limit + end) + |> InternalTransaction.async_fetch(10_000) + end + + def async_import_internal_transactions(_, _), do: :ok + def async_import_tokens(%{tokens: tokens}) do tokens |> Enum.map(& &1.contract_address_hash) @@ -213,6 +274,12 @@ defmodule Indexer.Block.Fetcher do def async_import_tokens(_), do: :ok + def async_import_token_balances(%{address_token_balances: token_balances}) do + TokenBalance.async_fetch(token_balances) + end + + def async_import_token_balances(_), do: :ok + def async_import_uncles(%{block_second_degree_relations: block_second_degree_relations}) do block_second_degree_relations |> Enum.map(& &1.uncle_hash) diff --git a/apps/indexer/lib/indexer/block/realtime/fetcher.ex b/apps/indexer/lib/indexer/block/realtime/fetcher.ex index 30d425cb12..ff99545fa0 100644 --- a/apps/indexer/lib/indexer/block/realtime/fetcher.ex +++ b/apps/indexer/lib/indexer/block/realtime/fetcher.ex @@ -14,19 +14,20 @@ defmodule Indexer.Block.Realtime.Fetcher do import Indexer.Block.Fetcher, only: [ async_import_block_rewards: 1, + async_import_created_contract_codes: 1, + async_import_internal_transactions: 2, + async_import_replaced_transactions: 1, async_import_tokens: 1, + async_import_token_balances: 1, async_import_uncles: 1, - fetch_and_import_range: 2, - async_import_replaced_transactions: 1 + fetch_and_import_range: 2 ] - alias ABI.TypeDecoder alias Ecto.Changeset alias EthereumJSONRPC.{FetchedBalances, Subscription} alias Explorer.Chain - alias Explorer.Chain.TokenTransfer alias Explorer.Counters.AverageBlockTime - alias Indexer.{Block, TokenBalances, Tracer} + alias Indexer.{Block, Tracer} alias Indexer.Block.Realtime.TaskSupervisor alias Indexer.Transform.Addresses alias Timex.Duration @@ -160,42 +161,21 @@ defmodule Indexer.Block.Realtime.Fetcher do @impl Block.Fetcher def import( - block_fetcher, + %Block.Fetcher{json_rpc_named_arguments: json_rpc_named_arguments} = block_fetcher, %{ address_coin_balances: %{params: address_coin_balances_params}, address_hash_to_fetched_balance_block_number: address_hash_to_block_number, - address_token_balances: %{params: address_token_balances_params}, addresses: %{params: addresses_params}, - blocks: %{params: blocks_params}, - block_rewards: block_rewards, - transactions: %{params: transactions_params}, - token_transfers: %{params: token_transfers_params} + block_rewards: block_rewards } = options ) do - with {:internal_transactions, - {:ok, - %{ - addresses_params: internal_transactions_addresses_params, - internal_transactions_params: internal_transactions_params, - internal_transactions_indexed_at_blocks_params: internal_transactions_indexed_at_blocks_params - }}} <- - {:internal_transactions, - internal_transactions(block_fetcher, %{ - addresses_params: addresses_params, - blocks_params: blocks_params, - token_transfers_params: token_transfers_params, - transactions_params: transactions_params - })}, - {:balances, {:ok, %{addresses_params: balances_addresses_params, balances_params: balances_params}}} <- + with {:balances, {:ok, %{addresses_params: balances_addresses_params, balances_params: balances_params}}} <- {:balances, balances(block_fetcher, %{ address_hash_to_block_number: address_hash_to_block_number, - addresses_params: internal_transactions_addresses_params, + addresses_params: addresses_params, balances_params: address_coin_balances_params })}, - {:address_token_balances, {:ok, address_token_balances}} <- - {:address_token_balances, fetch_token_balances(address_token_balances_params)}, - address_current_token_balances = TokenBalances.to_address_current_token_balances(address_token_balances), {block_reward_errors, chain_import_block_rewards} = Map.pop(block_rewards, :errors), chain_import_options = options @@ -203,16 +183,14 @@ defmodule Indexer.Block.Realtime.Fetcher do |> put_in([:addresses, :params], balances_addresses_params) |> put_in([:blocks, :params, Access.all(), :consensus], true) |> put_in([:block_rewards], chain_import_block_rewards) - |> put_in([Access.key(:address_coin_balances, %{}), :params], balances_params) - |> put_in([Access.key(:address_current_token_balances, %{}), :params], address_current_token_balances) - |> put_in([Access.key(:address_token_balances), :params], address_token_balances) - |> put_in([Access.key(:internal_transactions, %{}), :params], internal_transactions_params) - |> put_in([:internal_transactions_indexed_at_blocks], %{ - params: internal_transactions_indexed_at_blocks_params, - with: :number_only_changeset - }), + |> put_in([Access.key(:address_coin_balances, %{}), :params], balances_params), {:import, {:ok, imported} = ok} <- {:import, Chain.import(chain_import_options)} do - async_import_remaining_block_data(imported, %{block_rewards: %{errors: block_reward_errors}}) + async_import_remaining_block_data( + imported, + %{block_rewards: %{errors: block_reward_errors}}, + json_rpc_named_arguments + ) + ok end end @@ -355,147 +333,20 @@ defmodule Indexer.Block.Realtime.Fetcher do Enum.any?(changesets, &(Map.get(&1, :message) == "Unknown block number")) end - defp async_import_remaining_block_data(imported, %{block_rewards: %{errors: block_reward_errors}}) do + defp async_import_remaining_block_data( + imported, + %{block_rewards: %{errors: block_reward_errors}}, + json_rpc_named_arguments + ) do async_import_block_rewards(block_reward_errors) + async_import_created_contract_codes(imported) + async_import_internal_transactions(imported, Keyword.get(json_rpc_named_arguments, :variant)) async_import_tokens(imported) + async_import_token_balances(imported) async_import_uncles(imported) async_import_replaced_transactions(imported) end - defp internal_transactions( - %Block.Fetcher{json_rpc_named_arguments: json_rpc_named_arguments}, - %{ - addresses_params: addresses_params, - blocks_params: blocks_params, - token_transfers_params: token_transfers_params, - transactions_params: transactions_params - } - ) do - variant = Keyword.fetch!(json_rpc_named_arguments, :variant) - - internal_transactions_indexed_at_blocks_params = - case variant do - EthereumJSONRPC.Parity -> blocks_params - _ -> [] - end - - variant - |> case do - EthereumJSONRPC.Parity -> - blocks_params - |> Enum.map(fn %{number: block_number} -> block_number end) - |> EthereumJSONRPC.fetch_block_internal_transactions(json_rpc_named_arguments) - - _ -> - transactions_params - |> transactions_params_to_fetch_internal_transactions_params(token_transfers_params, json_rpc_named_arguments) - |> EthereumJSONRPC.fetch_internal_transactions(json_rpc_named_arguments) - end - |> case do - {:ok, internal_transactions_params} -> - merged_addresses_params = - %{internal_transactions: internal_transactions_params} - |> Addresses.extract_addresses() - |> Kernel.++(addresses_params) - |> Addresses.merge_addresses() - - {:ok, - %{ - addresses_params: merged_addresses_params, - internal_transactions_params: internal_transactions_params, - internal_transactions_indexed_at_blocks_params: internal_transactions_indexed_at_blocks_params - }} - - :ignore -> - {:ok, - %{ - addresses_params: addresses_params, - internal_transactions_params: [], - internal_transactions_indexed_at_blocks_params: [] - }} - - {:error, _reason} = error -> - error - end - end - - defp transactions_params_to_fetch_internal_transactions_params( - transactions_params, - token_transfers_params, - json_rpc_named_arguments - ) do - token_transfer_transaction_hash_set = MapSet.new(token_transfers_params, & &1.transaction_hash) - - Enum.flat_map( - transactions_params, - &transaction_params_to_fetch_internal_transaction_params_list( - &1, - token_transfer_transaction_hash_set, - json_rpc_named_arguments - ) - ) - end - - defp transaction_params_to_fetch_internal_transaction_params_list( - %{block_number: block_number, transaction_index: transaction_index, hash: hash} = transaction_params, - token_transfer_transaction_hash_set, - json_rpc_named_arguments - ) - when is_integer(block_number) and is_integer(transaction_index) and is_binary(hash) do - token_transfer? = hash in token_transfer_transaction_hash_set - - if fetch_internal_transactions?(transaction_params, token_transfer?, json_rpc_named_arguments) do - [%{block_number: block_number, transaction_index: transaction_index, hash_data: hash}] - else - [] - end - end - - # 0xa9059cbb - signature of the transfer(address,uint256) function from the ERC-20 token specification. - # Although transaction input data can be faked we use this heuristics to filter simple token transfer internal transactions from indexing because they slow down realtime fetcher - defp fetch_internal_transactions?( - %{ - status: :ok, - created_contract_address_hash: nil, - input: unquote(TokenTransfer.transfer_function_signature()) <> params, - value: 0 - }, - _, - _ - ) do - types = [:address, {:uint, 256}] - - try do - [_address, _value] = - params - |> Base.decode16!(case: :mixed) - |> TypeDecoder.decode_raw(types) - - false - rescue - _ -> true - end - end - - defp fetch_internal_transactions?( - %{ - status: :ok, - created_contract_address_hash: nil, - input: "0x", - to_address_hash: to_address_hash, - block_number: block_number - }, - _, - json_rpc_named_arguments - ) do - Chain.contract_address?(to_address_hash, block_number, json_rpc_named_arguments) - end - - # Token transfers not transferred during contract creation don't need internal transactions as the token transfers - # derive completely from the logs. - defp fetch_internal_transactions?(%{status: :ok, created_contract_address_hash: nil}, true, _), do: false - defp fetch_internal_transactions?(_, _, _), do: true - defp balances( %Block.Fetcher{json_rpc_named_arguments: json_rpc_named_arguments}, %{addresses_params: addresses_params} = options @@ -558,10 +409,4 @@ defmodule Indexer.Block.Realtime.Fetcher do %{hash_data: address_hash, block_quantity: integer_to_quantity(block_number)} end) end - - defp fetch_token_balances(address_token_balances_params) do - address_token_balances_params - |> MapSet.to_list() - |> TokenBalances.fetch_token_balances_from_blockchain() - end end diff --git a/apps/indexer/test/indexer/block/realtime/fetcher_test.exs b/apps/indexer/test/indexer/block/realtime/fetcher_test.exs index fd2bbdbd4c..f222805d26 100644 --- a/apps/indexer/test/indexer/block/realtime/fetcher_test.exs +++ b/apps/indexer/test/indexer/block/realtime/fetcher_test.exs @@ -8,7 +8,7 @@ defmodule Indexer.Block.Realtime.FetcherTest do alias Explorer.Chain.{Address, Transaction} alias Indexer.Block.Catchup.Sequence alias Indexer.Block.Realtime - alias Indexer.Fetcher.{ReplacedTransaction, Token, TokenBalance, UncleBlock} + alias Indexer.Fetcher.{ContractCode, InternalTransaction, ReplacedTransaction, Token, TokenBalance, UncleBlock} @moduletag capture_log: true @@ -51,6 +51,10 @@ defmodule Indexer.Block.Realtime.FetcherTest do Token.Supervisor.Case.start_supervised!(json_rpc_named_arguments: json_rpc_named_arguments) + ContractCode.Supervisor.Case.start_supervised!(json_rpc_named_arguments: json_rpc_named_arguments) + + InternalTransaction.Supervisor.Case.start_supervised!(json_rpc_named_arguments: json_rpc_named_arguments) + UncleBlock.Supervisor.Case.start_supervised!( block_fetcher: %Indexer.Block.Fetcher{json_rpc_named_arguments: json_rpc_named_arguments} ) @@ -201,11 +205,18 @@ defmodule Indexer.Block.Realtime.FetcherTest do } ]} end) - |> expect(:json_rpc, fn [%{method: "trace_block"}, %{method: "trace_block"}] = requests, _options -> - responses = Enum.map(requests, fn %{id: id} -> %{id: id, result: []} end) - {:ok, responses} - end) |> expect(:json_rpc, 2, fn + [ + %{id: 0, jsonrpc: "2.0", method: "trace_block", params: ["0x3C365F"]}, + %{id: 1, jsonrpc: "2.0", method: "trace_block", params: ["0x3C3660"]} + ], + _ -> + {:ok, + [ + %{id: 0, jsonrpc: "2.0", result: []}, + %{id: 1, jsonrpc: "2.0", result: []} + ]} + [ %{ id: 0, @@ -347,28 +358,22 @@ defmodule Indexer.Block.Realtime.FetcherTest do id: 0, jsonrpc: "2.0", method: "eth_getBalance", - params: ["0x11c4469d974f8af5ba9ec99f3c42c07c848c861c", "0x3C365F"] - }, - %{ - id: 1, - jsonrpc: "2.0", - method: "eth_getBalance", params: ["0x40b18103537c0f15d5e137dd8ddd019b84949d16", "0x3C365F"] }, %{ - id: 2, + id: 1, jsonrpc: "2.0", method: "eth_getBalance", params: ["0x5ee341ac44d344ade1ca3a771c59b98eb2a77df2", "0x3C365F"] }, %{ - id: 3, + id: 2, jsonrpc: "2.0", method: "eth_getBalance", params: ["0x66c9343c7e8ca673a1fedf9dbf2cd7936dbbf7e3", "0x3C3660"] }, %{ - id: 4, + id: 3, jsonrpc: "2.0", method: "eth_getBalance", params: ["0x698bf6943bab687b2756394624aa183f434f65da", "0x3C365F"] @@ -377,11 +382,10 @@ defmodule Indexer.Block.Realtime.FetcherTest do _ -> {:ok, [ - %{id: 0, jsonrpc: "2.0", result: "0x49e3de5187cf037d127"}, - %{id: 1, jsonrpc: "2.0", result: "0x148adc763b603291685"}, - %{id: 2, jsonrpc: "2.0", result: "0x53474fa377a46000"}, - %{id: 3, jsonrpc: "2.0", result: "0x53507afe51f28000"}, - %{id: 4, jsonrpc: "2.0", result: "0x3e1a95d7517dc197108"} + %{id: 0, jsonrpc: "2.0", result: "0x148adc763b603291685"}, + %{id: 1, jsonrpc: "2.0", result: "0x53474fa377a46000"}, + %{id: 2, jsonrpc: "2.0", result: "0x53507afe51f28000"}, + %{id: 3, jsonrpc: "2.0", result: "0x3e1a95d7517dc197108"} ]} end) end @@ -392,9 +396,8 @@ defmodule Indexer.Block.Realtime.FetcherTest do addresses: [ %Address{hash: first_address_hash, fetched_coin_balance_block_number: 3_946_079}, %Address{hash: second_address_hash, fetched_coin_balance_block_number: 3_946_079}, - %Address{hash: third_address_hash, fetched_coin_balance_block_number: 3_946_079}, - %Address{hash: fourth_address_hash, fetched_coin_balance_block_number: 3_946_080}, - %Address{hash: fifth_address_hash, fetched_coin_balance_block_number: 3_946_079} + %Address{hash: third_address_hash, fetched_coin_balance_block_number: 3_946_080}, + %Address{hash: fourth_address_hash, fetched_coin_balance_block_number: 3_946_079} ], address_coin_balances: [ %{ @@ -407,26 +410,14 @@ defmodule Indexer.Block.Realtime.FetcherTest do }, %{ address_hash: third_address_hash, - block_number: 3_946_079 - }, - %{ - address_hash: fourth_address_hash, block_number: 3_946_080 }, %{ - address_hash: fifth_address_hash, + address_hash: fourth_address_hash, block_number: 3_946_079 } ], blocks: [%Chain.Block{number: 3_946_079}, %Chain.Block{number: 3_946_080}], - internal_transactions: [ - %{index: 0, transaction_hash: transaction_hash}, - %{index: 1, transaction_hash: transaction_hash}, - %{index: 2, transaction_hash: transaction_hash}, - %{index: 3, transaction_hash: transaction_hash}, - %{index: 4, transaction_hash: transaction_hash}, - %{index: 5, transaction_hash: transaction_hash} - ], transactions: [%Transaction{hash: transaction_hash}] }, errors: []