Merge branch 'master' into ab-fix-empty-block-time

pull/1740/head
Ayrat Badykov 6 years ago committed by GitHub
commit 3b372d5ac0
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 1
      CHANGELOG.md
  2. 64
      apps/indexer/lib/indexer/block/catchup/fetcher.ex
  3. 69
      apps/indexer/lib/indexer/block/fetcher.ex
  4. 205
      apps/indexer/lib/indexer/block/realtime/fetcher.ex
  5. 61
      apps/indexer/test/indexer/block/realtime/fetcher_test.exs

@ -4,6 +4,7 @@
### Fixes
- [#1724](https://github.com/poanetwork/blockscout/pull/1724) - Remove internal tx and token balance fetching from realtime fetcher
- [#1727](https://github.com/poanetwork/blockscout/pull/1727) - add logs pagination in rpc api
- [#1740](https://github.com/poanetwork/blockscout/pull/1740) - fix empty block time

@ -11,18 +11,19 @@ defmodule Indexer.Block.Catchup.Fetcher do
only: [
async_import_block_rewards: 1,
async_import_coin_balances: 2,
async_import_created_contract_codes: 1,
async_import_internal_transactions: 2,
async_import_replaced_transactions: 1,
async_import_tokens: 1,
async_import_token_balances: 1,
async_import_uncles: 1,
fetch_and_import_range: 2,
async_import_replaced_transactions: 1
fetch_and_import_range: 2
]
alias Ecto.Changeset
alias Explorer.Chain
alias Explorer.Chain.{Hash, Transaction}
alias Indexer.{Block, Tracer}
alias Indexer.Block.Catchup.Sequence
alias Indexer.Fetcher.{ContractCode, InternalTransaction, TokenBalance}
alias Indexer.Memory.Shrinkable
@behaviour Block.Fetcher
@ -33,7 +34,6 @@ defmodule Indexer.Block.Catchup.Fetcher do
@blocks_batch_size 10
@blocks_concurrency 10
@sequence_name :block_catchup_sequencer
@geth_block_limit 128
defstruct blocks_batch_size: @blocks_batch_size,
blocks_concurrency: @blocks_concurrency,
@ -159,60 +159,6 @@ defmodule Indexer.Block.Catchup.Fetcher do
async_import_replaced_transactions(imported)
end
defp async_import_created_contract_codes(%{transactions: transactions}) do
transactions
|> Enum.flat_map(fn
%Transaction{
block_number: block_number,
hash: hash,
created_contract_address_hash: %Hash{} = created_contract_address_hash,
created_contract_code_indexed_at: nil,
internal_transactions_indexed_at: nil
} ->
[%{block_number: block_number, hash: hash, created_contract_address_hash: created_contract_address_hash}]
%Transaction{internal_transactions_indexed_at: %DateTime{}} ->
[]
%Transaction{created_contract_address_hash: nil} ->
[]
end)
|> ContractCode.async_fetch(10_000)
end
defp async_import_created_contract_codes(_), do: :ok
defp async_import_internal_transactions(%{blocks: blocks}, EthereumJSONRPC.Parity) do
blocks
|> Enum.map(fn %Chain.Block{number: block_number} -> %{number: block_number} end)
|> InternalTransaction.async_block_fetch(10_000)
end
defp async_import_internal_transactions(%{transactions: transactions}, EthereumJSONRPC.Geth) do
{_, max_block_number} = Chain.fetch_min_and_max_block_numbers()
transactions
|> Enum.flat_map(fn
%Transaction{block_number: block_number, index: index, hash: hash, internal_transactions_indexed_at: nil} ->
[%{block_number: block_number, index: index, hash: hash}]
%Transaction{internal_transactions_indexed_at: %DateTime{}} ->
[]
end)
|> Enum.filter(fn %{block_number: block_number} ->
max_block_number - block_number < @geth_block_limit
end)
|> InternalTransaction.async_fetch(10_000)
end
defp async_import_internal_transactions(_, _), do: :ok
defp async_import_token_balances(%{address_token_balances: token_balances}) do
TokenBalance.async_fetch(token_balances)
end
defp async_import_token_balances(_), do: :ok
defp stream_fetch_and_import(%__MODULE__{blocks_concurrency: blocks_concurrency} = state, sequence)
when is_pid(sequence) do
sequence

@ -10,9 +10,21 @@ defmodule Indexer.Block.Fetcher do
import EthereumJSONRPC, only: [quantity_to_integer: 1]
alias EthereumJSONRPC.{Blocks, FetchedBeneficiaries}
alias Explorer.Chain
alias Explorer.Chain.{Address, Block, Hash, Import, Transaction}
alias Indexer.Block.Fetcher.Receipts
alias Indexer.Fetcher.{BlockReward, CoinBalance, ReplacedTransaction, Token, UncleBlock}
alias Indexer.Fetcher.{
BlockReward,
CoinBalance,
ContractCode,
InternalTransaction,
ReplacedTransaction,
Token,
TokenBalance,
UncleBlock
}
alias Indexer.Tracer
alias Indexer.Transform.{
@ -55,6 +67,7 @@ defmodule Indexer.Block.Fetcher do
@receipts_batch_size 250
@receipts_concurrency 10
@geth_block_limit 128
@doc false
def default_receipts_batch_size, do: @receipts_batch_size
@ -205,6 +218,54 @@ defmodule Indexer.Block.Fetcher do
def async_import_coin_balances(_, _), do: :ok
def async_import_created_contract_codes(%{transactions: transactions}) do
transactions
|> Enum.flat_map(fn
%Transaction{
block_number: block_number,
hash: hash,
created_contract_address_hash: %Hash{} = created_contract_address_hash,
created_contract_code_indexed_at: nil,
internal_transactions_indexed_at: nil
} ->
[%{block_number: block_number, hash: hash, created_contract_address_hash: created_contract_address_hash}]
%Transaction{internal_transactions_indexed_at: %DateTime{}} ->
[]
%Transaction{created_contract_address_hash: nil} ->
[]
end)
|> ContractCode.async_fetch(10_000)
end
def async_import_created_contract_codes(_), do: :ok
def async_import_internal_transactions(%{blocks: blocks}, EthereumJSONRPC.Parity) do
blocks
|> Enum.map(fn %Block{number: block_number} -> %{number: block_number} end)
|> InternalTransaction.async_block_fetch(10_000)
end
def async_import_internal_transactions(%{transactions: transactions}, EthereumJSONRPC.Geth) do
{_, max_block_number} = Chain.fetch_min_and_max_block_numbers()
transactions
|> Enum.flat_map(fn
%Transaction{block_number: block_number, index: index, hash: hash, internal_transactions_indexed_at: nil} ->
[%{block_number: block_number, index: index, hash: hash}]
%Transaction{internal_transactions_indexed_at: %DateTime{}} ->
[]
end)
|> Enum.filter(fn %{block_number: block_number} ->
max_block_number - block_number < @geth_block_limit
end)
|> InternalTransaction.async_fetch(10_000)
end
def async_import_internal_transactions(_, _), do: :ok
def async_import_tokens(%{tokens: tokens}) do
tokens
|> Enum.map(& &1.contract_address_hash)
@ -213,6 +274,12 @@ defmodule Indexer.Block.Fetcher do
def async_import_tokens(_), do: :ok
def async_import_token_balances(%{address_token_balances: token_balances}) do
TokenBalance.async_fetch(token_balances)
end
def async_import_token_balances(_), do: :ok
def async_import_uncles(%{block_second_degree_relations: block_second_degree_relations}) do
block_second_degree_relations
|> Enum.map(& &1.uncle_hash)

@ -14,19 +14,20 @@ defmodule Indexer.Block.Realtime.Fetcher do
import Indexer.Block.Fetcher,
only: [
async_import_block_rewards: 1,
async_import_created_contract_codes: 1,
async_import_internal_transactions: 2,
async_import_replaced_transactions: 1,
async_import_tokens: 1,
async_import_token_balances: 1,
async_import_uncles: 1,
fetch_and_import_range: 2,
async_import_replaced_transactions: 1
fetch_and_import_range: 2
]
alias ABI.TypeDecoder
alias Ecto.Changeset
alias EthereumJSONRPC.{FetchedBalances, Subscription}
alias Explorer.Chain
alias Explorer.Chain.TokenTransfer
alias Explorer.Counters.AverageBlockTime
alias Indexer.{Block, TokenBalances, Tracer}
alias Indexer.{Block, Tracer}
alias Indexer.Block.Realtime.TaskSupervisor
alias Indexer.Transform.Addresses
alias Timex.Duration
@ -160,42 +161,21 @@ defmodule Indexer.Block.Realtime.Fetcher do
@impl Block.Fetcher
def import(
block_fetcher,
%Block.Fetcher{json_rpc_named_arguments: json_rpc_named_arguments} = block_fetcher,
%{
address_coin_balances: %{params: address_coin_balances_params},
address_hash_to_fetched_balance_block_number: address_hash_to_block_number,
address_token_balances: %{params: address_token_balances_params},
addresses: %{params: addresses_params},
blocks: %{params: blocks_params},
block_rewards: block_rewards,
transactions: %{params: transactions_params},
token_transfers: %{params: token_transfers_params}
block_rewards: block_rewards
} = options
) do
with {:internal_transactions,
{:ok,
%{
addresses_params: internal_transactions_addresses_params,
internal_transactions_params: internal_transactions_params,
internal_transactions_indexed_at_blocks_params: internal_transactions_indexed_at_blocks_params
}}} <-
{:internal_transactions,
internal_transactions(block_fetcher, %{
addresses_params: addresses_params,
blocks_params: blocks_params,
token_transfers_params: token_transfers_params,
transactions_params: transactions_params
})},
{:balances, {:ok, %{addresses_params: balances_addresses_params, balances_params: balances_params}}} <-
with {:balances, {:ok, %{addresses_params: balances_addresses_params, balances_params: balances_params}}} <-
{:balances,
balances(block_fetcher, %{
address_hash_to_block_number: address_hash_to_block_number,
addresses_params: internal_transactions_addresses_params,
addresses_params: addresses_params,
balances_params: address_coin_balances_params
})},
{:address_token_balances, {:ok, address_token_balances}} <-
{:address_token_balances, fetch_token_balances(address_token_balances_params)},
address_current_token_balances = TokenBalances.to_address_current_token_balances(address_token_balances),
{block_reward_errors, chain_import_block_rewards} = Map.pop(block_rewards, :errors),
chain_import_options =
options
@ -203,16 +183,14 @@ defmodule Indexer.Block.Realtime.Fetcher do
|> put_in([:addresses, :params], balances_addresses_params)
|> put_in([:blocks, :params, Access.all(), :consensus], true)
|> put_in([:block_rewards], chain_import_block_rewards)
|> put_in([Access.key(:address_coin_balances, %{}), :params], balances_params)
|> put_in([Access.key(:address_current_token_balances, %{}), :params], address_current_token_balances)
|> put_in([Access.key(:address_token_balances), :params], address_token_balances)
|> put_in([Access.key(:internal_transactions, %{}), :params], internal_transactions_params)
|> put_in([:internal_transactions_indexed_at_blocks], %{
params: internal_transactions_indexed_at_blocks_params,
with: :number_only_changeset
}),
|> put_in([Access.key(:address_coin_balances, %{}), :params], balances_params),
{:import, {:ok, imported} = ok} <- {:import, Chain.import(chain_import_options)} do
async_import_remaining_block_data(imported, %{block_rewards: %{errors: block_reward_errors}})
async_import_remaining_block_data(
imported,
%{block_rewards: %{errors: block_reward_errors}},
json_rpc_named_arguments
)
ok
end
end
@ -355,147 +333,20 @@ defmodule Indexer.Block.Realtime.Fetcher do
Enum.any?(changesets, &(Map.get(&1, :message) == "Unknown block number"))
end
defp async_import_remaining_block_data(imported, %{block_rewards: %{errors: block_reward_errors}}) do
defp async_import_remaining_block_data(
imported,
%{block_rewards: %{errors: block_reward_errors}},
json_rpc_named_arguments
) do
async_import_block_rewards(block_reward_errors)
async_import_created_contract_codes(imported)
async_import_internal_transactions(imported, Keyword.get(json_rpc_named_arguments, :variant))
async_import_tokens(imported)
async_import_token_balances(imported)
async_import_uncles(imported)
async_import_replaced_transactions(imported)
end
defp internal_transactions(
%Block.Fetcher{json_rpc_named_arguments: json_rpc_named_arguments},
%{
addresses_params: addresses_params,
blocks_params: blocks_params,
token_transfers_params: token_transfers_params,
transactions_params: transactions_params
}
) do
variant = Keyword.fetch!(json_rpc_named_arguments, :variant)
internal_transactions_indexed_at_blocks_params =
case variant do
EthereumJSONRPC.Parity -> blocks_params
_ -> []
end
variant
|> case do
EthereumJSONRPC.Parity ->
blocks_params
|> Enum.map(fn %{number: block_number} -> block_number end)
|> EthereumJSONRPC.fetch_block_internal_transactions(json_rpc_named_arguments)
_ ->
transactions_params
|> transactions_params_to_fetch_internal_transactions_params(token_transfers_params, json_rpc_named_arguments)
|> EthereumJSONRPC.fetch_internal_transactions(json_rpc_named_arguments)
end
|> case do
{:ok, internal_transactions_params} ->
merged_addresses_params =
%{internal_transactions: internal_transactions_params}
|> Addresses.extract_addresses()
|> Kernel.++(addresses_params)
|> Addresses.merge_addresses()
{:ok,
%{
addresses_params: merged_addresses_params,
internal_transactions_params: internal_transactions_params,
internal_transactions_indexed_at_blocks_params: internal_transactions_indexed_at_blocks_params
}}
:ignore ->
{:ok,
%{
addresses_params: addresses_params,
internal_transactions_params: [],
internal_transactions_indexed_at_blocks_params: []
}}
{:error, _reason} = error ->
error
end
end
defp transactions_params_to_fetch_internal_transactions_params(
transactions_params,
token_transfers_params,
json_rpc_named_arguments
) do
token_transfer_transaction_hash_set = MapSet.new(token_transfers_params, & &1.transaction_hash)
Enum.flat_map(
transactions_params,
&transaction_params_to_fetch_internal_transaction_params_list(
&1,
token_transfer_transaction_hash_set,
json_rpc_named_arguments
)
)
end
defp transaction_params_to_fetch_internal_transaction_params_list(
%{block_number: block_number, transaction_index: transaction_index, hash: hash} = transaction_params,
token_transfer_transaction_hash_set,
json_rpc_named_arguments
)
when is_integer(block_number) and is_integer(transaction_index) and is_binary(hash) do
token_transfer? = hash in token_transfer_transaction_hash_set
if fetch_internal_transactions?(transaction_params, token_transfer?, json_rpc_named_arguments) do
[%{block_number: block_number, transaction_index: transaction_index, hash_data: hash}]
else
[]
end
end
# 0xa9059cbb - signature of the transfer(address,uint256) function from the ERC-20 token specification.
# Although transaction input data can be faked we use this heuristics to filter simple token transfer internal transactions from indexing because they slow down realtime fetcher
defp fetch_internal_transactions?(
%{
status: :ok,
created_contract_address_hash: nil,
input: unquote(TokenTransfer.transfer_function_signature()) <> params,
value: 0
},
_,
_
) do
types = [:address, {:uint, 256}]
try do
[_address, _value] =
params
|> Base.decode16!(case: :mixed)
|> TypeDecoder.decode_raw(types)
false
rescue
_ -> true
end
end
defp fetch_internal_transactions?(
%{
status: :ok,
created_contract_address_hash: nil,
input: "0x",
to_address_hash: to_address_hash,
block_number: block_number
},
_,
json_rpc_named_arguments
) do
Chain.contract_address?(to_address_hash, block_number, json_rpc_named_arguments)
end
# Token transfers not transferred during contract creation don't need internal transactions as the token transfers
# derive completely from the logs.
defp fetch_internal_transactions?(%{status: :ok, created_contract_address_hash: nil}, true, _), do: false
defp fetch_internal_transactions?(_, _, _), do: true
defp balances(
%Block.Fetcher{json_rpc_named_arguments: json_rpc_named_arguments},
%{addresses_params: addresses_params} = options
@ -558,10 +409,4 @@ defmodule Indexer.Block.Realtime.Fetcher do
%{hash_data: address_hash, block_quantity: integer_to_quantity(block_number)}
end)
end
defp fetch_token_balances(address_token_balances_params) do
address_token_balances_params
|> MapSet.to_list()
|> TokenBalances.fetch_token_balances_from_blockchain()
end
end

@ -8,7 +8,7 @@ defmodule Indexer.Block.Realtime.FetcherTest do
alias Explorer.Chain.{Address, Transaction}
alias Indexer.Block.Catchup.Sequence
alias Indexer.Block.Realtime
alias Indexer.Fetcher.{ReplacedTransaction, Token, TokenBalance, UncleBlock}
alias Indexer.Fetcher.{ContractCode, InternalTransaction, ReplacedTransaction, Token, TokenBalance, UncleBlock}
@moduletag capture_log: true
@ -51,6 +51,10 @@ defmodule Indexer.Block.Realtime.FetcherTest do
Token.Supervisor.Case.start_supervised!(json_rpc_named_arguments: json_rpc_named_arguments)
ContractCode.Supervisor.Case.start_supervised!(json_rpc_named_arguments: json_rpc_named_arguments)
InternalTransaction.Supervisor.Case.start_supervised!(json_rpc_named_arguments: json_rpc_named_arguments)
UncleBlock.Supervisor.Case.start_supervised!(
block_fetcher: %Indexer.Block.Fetcher{json_rpc_named_arguments: json_rpc_named_arguments}
)
@ -201,11 +205,18 @@ defmodule Indexer.Block.Realtime.FetcherTest do
}
]}
end)
|> expect(:json_rpc, fn [%{method: "trace_block"}, %{method: "trace_block"}] = requests, _options ->
responses = Enum.map(requests, fn %{id: id} -> %{id: id, result: []} end)
{:ok, responses}
end)
|> expect(:json_rpc, 2, fn
[
%{id: 0, jsonrpc: "2.0", method: "trace_block", params: ["0x3C365F"]},
%{id: 1, jsonrpc: "2.0", method: "trace_block", params: ["0x3C3660"]}
],
_ ->
{:ok,
[
%{id: 0, jsonrpc: "2.0", result: []},
%{id: 1, jsonrpc: "2.0", result: []}
]}
[
%{
id: 0,
@ -347,28 +358,22 @@ defmodule Indexer.Block.Realtime.FetcherTest do
id: 0,
jsonrpc: "2.0",
method: "eth_getBalance",
params: ["0x11c4469d974f8af5ba9ec99f3c42c07c848c861c", "0x3C365F"]
},
%{
id: 1,
jsonrpc: "2.0",
method: "eth_getBalance",
params: ["0x40b18103537c0f15d5e137dd8ddd019b84949d16", "0x3C365F"]
},
%{
id: 2,
id: 1,
jsonrpc: "2.0",
method: "eth_getBalance",
params: ["0x5ee341ac44d344ade1ca3a771c59b98eb2a77df2", "0x3C365F"]
},
%{
id: 3,
id: 2,
jsonrpc: "2.0",
method: "eth_getBalance",
params: ["0x66c9343c7e8ca673a1fedf9dbf2cd7936dbbf7e3", "0x3C3660"]
},
%{
id: 4,
id: 3,
jsonrpc: "2.0",
method: "eth_getBalance",
params: ["0x698bf6943bab687b2756394624aa183f434f65da", "0x3C365F"]
@ -377,11 +382,10 @@ defmodule Indexer.Block.Realtime.FetcherTest do
_ ->
{:ok,
[
%{id: 0, jsonrpc: "2.0", result: "0x49e3de5187cf037d127"},
%{id: 1, jsonrpc: "2.0", result: "0x148adc763b603291685"},
%{id: 2, jsonrpc: "2.0", result: "0x53474fa377a46000"},
%{id: 3, jsonrpc: "2.0", result: "0x53507afe51f28000"},
%{id: 4, jsonrpc: "2.0", result: "0x3e1a95d7517dc197108"}
%{id: 0, jsonrpc: "2.0", result: "0x148adc763b603291685"},
%{id: 1, jsonrpc: "2.0", result: "0x53474fa377a46000"},
%{id: 2, jsonrpc: "2.0", result: "0x53507afe51f28000"},
%{id: 3, jsonrpc: "2.0", result: "0x3e1a95d7517dc197108"}
]}
end)
end
@ -392,9 +396,8 @@ defmodule Indexer.Block.Realtime.FetcherTest do
addresses: [
%Address{hash: first_address_hash, fetched_coin_balance_block_number: 3_946_079},
%Address{hash: second_address_hash, fetched_coin_balance_block_number: 3_946_079},
%Address{hash: third_address_hash, fetched_coin_balance_block_number: 3_946_079},
%Address{hash: fourth_address_hash, fetched_coin_balance_block_number: 3_946_080},
%Address{hash: fifth_address_hash, fetched_coin_balance_block_number: 3_946_079}
%Address{hash: third_address_hash, fetched_coin_balance_block_number: 3_946_080},
%Address{hash: fourth_address_hash, fetched_coin_balance_block_number: 3_946_079}
],
address_coin_balances: [
%{
@ -407,26 +410,14 @@ defmodule Indexer.Block.Realtime.FetcherTest do
},
%{
address_hash: third_address_hash,
block_number: 3_946_079
},
%{
address_hash: fourth_address_hash,
block_number: 3_946_080
},
%{
address_hash: fifth_address_hash,
address_hash: fourth_address_hash,
block_number: 3_946_079
}
],
blocks: [%Chain.Block{number: 3_946_079}, %Chain.Block{number: 3_946_080}],
internal_transactions: [
%{index: 0, transaction_hash: transaction_hash},
%{index: 1, transaction_hash: transaction_hash},
%{index: 2, transaction_hash: transaction_hash},
%{index: 3, transaction_hash: transaction_hash},
%{index: 4, transaction_hash: transaction_hash},
%{index: 5, transaction_hash: transaction_hash}
],
transactions: [%Transaction{hash: transaction_hash}]
},
errors: []

Loading…
Cancel
Save