Indexer update: refactor coin balance daily fetcher

pull/4527/head
Viktor Baranov 3 years ago
parent ec6dccbd61
commit 3fba4f633c
  1. 1
      CHANGELOG.md
  2. 14
      apps/explorer/lib/explorer/chain.ex
  3. 6
      apps/indexer/lib/indexer/block/fetcher.ex
  4. 37
      apps/indexer/lib/indexer/fetcher/block_reward.ex
  5. 176
      apps/indexer/lib/indexer/transform/address_coin_balances_daily.ex
  6. 15
      apps/indexer/test/indexer/block/fetcher_test.exs
  7. 2
      apps/indexer/test/indexer/block/realtime/fetcher_test.exs
  8. 78
      apps/indexer/test/indexer/fetcher/block_reward_test.exs

@ -6,6 +6,7 @@
- [#4452](https://github.com/blockscout/blockscout/pull/4452) - Add names for smart-conrtact's function response
### Fixes
- [#4527](https://github.com/blockscout/blockscout/pull/4527) - Indexer performance update: refactor coin balance daily fetcher
- [#4525](https://github.com/blockscout/blockscout/pull/4525) - Uncataloged token transfers query performance improvement
- [#4513](https://github.com/blockscout/blockscout/pull/4513) - Fix installation with custom default path: add NETWORK_PATH variable to the current_path
- [#4500](https://github.com/blockscout/blockscout/pull/4500) - `/tokens/{addressHash}/instance/{id}/token-transfers`: fix incorrect next page url

@ -683,6 +683,20 @@ defmodule Explorer.Chain do
|> Enum.into(%{})
end
def timestamp_by_block_hash(block_hashes) when is_list(block_hashes) do
query =
from(
block in Block,
where: block.hash in ^block_hashes and block.consensus == true,
group_by: block.hash,
select: {block.hash, block.timestamp}
)
query
|> Repo.all()
|> Enum.into(%{})
end
@doc """
Finds all `t:Explorer.Chain.Transaction.t/0`s in the `t:Explorer.Chain.Block.t/0`.

@ -156,10 +156,8 @@ defmodule Indexer.Block.Fetcher do
|> AddressCoinBalances.params_set(),
coin_balances_params_daily_set =
%{
beneficiary_params: MapSet.to_list(beneficiary_params_set),
blocks_params: blocks,
logs_params: logs,
transactions_params: transactions_with_receipts
coin_balances_params: coin_balances_params_set,
blocks: blocks
}
|> AddressCoinBalancesDaily.params_set(),
beneficiaries_with_gas_payment <-

@ -129,6 +129,7 @@ defmodule Indexer.Fetcher.BlockReward do
beneficiaries_params ->
beneficiaries_params
|> add_gas_payments()
|> add_timestamp()
|> import_block_reward_params()
|> case do
{:ok, %{address_coin_balances: address_coin_balances, addresses: addresses}} ->
@ -186,6 +187,25 @@ defmodule Indexer.Fetcher.BlockReward do
|> reduce_uncle_rewards()
end
def add_timestamp(beneficiaries_params) do
timestamp_by_block_hash =
beneficiaries_params
|> Enum.map(& &1.block_hash)
|> Chain.timestamp_by_block_hash()
Enum.map(beneficiaries_params, fn %{block_hash: block_hash_str} = beneficiary ->
{:ok, block_hash} = Chain.string_to_block_hash(block_hash_str)
case timestamp_by_block_hash do
%{^block_hash => block_timestamp} ->
Map.put(beneficiary, :block_timestamp, block_timestamp)
_ ->
beneficiary
end
end)
end
defp add_validator_rewards(beneficiaries_params) do
gas_payment_by_block_hash =
beneficiaries_params
@ -245,8 +265,23 @@ defmodule Indexer.Fetcher.BlockReward do
addresses_params = Addresses.extract_addresses(%{block_reward_contract_beneficiaries: block_rewards_params})
address_coin_balances_params_set = AddressCoinBalances.params_set(%{beneficiary_params: block_rewards_params})
address_coin_balances_params_with_block_timestamp =
block_rewards_params
|> Enum.map(fn block_rewards_param ->
%{
address_hash: block_rewards_param.address_hash,
block_number: block_rewards_param.block_number,
block_timestamp: block_rewards_param.block_timestamp
}
end)
|> Enum.into(MapSet.new())
address_coin_balances_params_with_block_timestamp_set = %{
address_coin_balances_params_with_block_timestamp: address_coin_balances_params_with_block_timestamp
}
address_coin_balances_daily_params_set =
AddressCoinBalancesDaily.params_set(%{beneficiary_params: block_rewards_params})
AddressCoinBalancesDaily.params_set(address_coin_balances_params_with_block_timestamp_set)
Chain.import(%{
addresses: %{params: addresses_params},

@ -3,163 +3,55 @@ defmodule Indexer.Transform.AddressCoinBalancesDaily do
Extracts `Explorer.Chain.Address.CoinBalanceDaily` params from other schema's params.
"""
alias EthereumJSONRPC.Blocks
def params_set(%{coin_balances_params: coin_balances_params_set, blocks: blocks}) do
coin_balances_params =
coin_balances_params_set
|> MapSet.to_list()
def params_set(%{} = import_options) do
Enum.reduce(import_options, MapSet.new(), &reducer/2)
end
defp reducer({:beneficiary_params, beneficiary_params}, acc) when is_list(beneficiary_params) do
json_rpc_named_arguments = Application.get_env(:explorer, :json_rpc_named_arguments)
coin_balances_daily_params_list =
Enum.reduce(coin_balances_params, [], fn coin_balances_param, acc ->
address_hash = Map.get(coin_balances_param, :address_hash)
block_number = Map.get(coin_balances_param, :block_number)
block_numbers =
beneficiary_params
|> Enum.map(&Map.get(&1, :block_number))
|> Enum.sort()
|> Enum.dedup()
block =
blocks
|> Enum.find(fn block ->
block.number == block_number
end)
block_timestamp_map =
Enum.reduce(block_numbers, %{}, fn block_number, map ->
{:ok, %Blocks{blocks_params: [%{timestamp: timestamp}]}} =
EthereumJSONRPC.fetch_blocks_by_range(block_number..block_number, json_rpc_named_arguments)
day = DateTime.to_date(block.timestamp)
day = DateTime.to_date(timestamp)
Map.put(map, "#{block_number}", day)
[%{address_hash: address_hash, day: day} | acc]
end)
Enum.into(beneficiary_params, acc, fn %{
address_hash: address_hash,
block_number: block_number
}
when is_binary(address_hash) and is_integer(block_number) ->
day = Map.get(block_timestamp_map, "#{block_number}")
%{address_hash: address_hash, day: day}
end)
end
defp reducer({:blocks_params, blocks_params}, acc) when is_list(blocks_params) do
# a block MUST have a miner_hash and number
Enum.into(blocks_params, acc, fn %{miner_hash: address_hash, number: block_number, timestamp: block_timestamp}
when is_binary(address_hash) and is_integer(block_number) ->
day = DateTime.to_date(block_timestamp)
%{address_hash: address_hash, day: day}
end)
end
defp reducer({:internal_transactions_params, internal_transactions_params}, initial)
when is_list(internal_transactions_params) do
Enum.reduce(internal_transactions_params, initial, &internal_transactions_params_reducer/2)
end
defp reducer({:logs_params, logs_params}, acc) when is_list(logs_params) do
# a log MUST have address_hash and block_number
json_rpc_named_arguments = Application.get_env(:explorer, :json_rpc_named_arguments)
block_numbers =
logs_params
|> Enum.map(&Map.get(&1, :block_number))
|> Enum.sort()
coin_balances_daily_params_set =
coin_balances_daily_params_list
|> Enum.dedup()
|> Enum.into(MapSet.new())
block_timestamp_map =
Enum.reduce(block_numbers, %{}, fn block_number, map ->
case EthereumJSONRPC.fetch_blocks_by_range(block_number..block_number, json_rpc_named_arguments) do
{:ok, %Blocks{blocks_params: [%{timestamp: timestamp}]}} ->
day = DateTime.to_date(timestamp)
Map.put(map, "#{block_number}", day)
_ ->
map
end
end)
logs_params
|> Enum.into(acc, fn
%{address_hash: address_hash, block_number: block_number}
when is_binary(address_hash) and is_integer(block_number) ->
if Map.has_key?(block_timestamp_map, "#{block_number}") do
day = Map.get(block_timestamp_map, "#{block_number}")
%{address_hash: address_hash, day: day}
else
nil
end
%{type: "pending"} ->
nil
end)
|> Enum.reject(fn val -> is_nil(val) end)
|> MapSet.new()
coin_balances_daily_params_set
end
defp reducer({:transactions_params, transactions_params}, initial) when is_list(transactions_params) do
Enum.reduce(transactions_params, initial, &transactions_params_reducer/2)
end
def params_set(%{address_coin_balances_params_with_block_timestamp: address_coin_balances_params_with_block_timestamp}) do
coin_balances_params =
address_coin_balances_params_with_block_timestamp
|> MapSet.to_list()
defp reducer({:block_second_degree_relations_params, block_second_degree_relations_params}, initial)
when is_list(block_second_degree_relations_params),
do: initial
defp internal_transactions_params_reducer(
%{block_number: block_number} = internal_transaction_params,
acc
)
when is_integer(block_number) do
case internal_transaction_params do
%{type: "call"} ->
acc
%{type: "create", error: _} ->
acc
%{type: "create", created_contract_address_hash: address_hash} when is_binary(address_hash) ->
json_rpc_named_arguments = Application.get_env(:explorer, :json_rpc_named_arguments)
{:ok, %Blocks{blocks_params: [%{timestamp: block_timestamp}]}} =
EthereumJSONRPC.fetch_blocks_by_range(block_number..block_number, json_rpc_named_arguments)
day = DateTime.to_date(block_timestamp)
MapSet.put(acc, %{address_hash: address_hash, day: day})
%{type: "selfdestruct", from_address_hash: from_address_hash, to_address_hash: to_address_hash}
when is_binary(from_address_hash) and is_binary(to_address_hash) ->
json_rpc_named_arguments = Application.get_env(:explorer, :json_rpc_named_arguments)
{:ok, %Blocks{blocks_params: [%{timestamp: block_timestamp}]}} =
EthereumJSONRPC.fetch_blocks_by_range(block_number..block_number, json_rpc_named_arguments)
coin_balances_daily_params_list =
Enum.reduce(coin_balances_params, [], fn coin_balances_param, acc ->
address_hash = Map.get(coin_balances_param, :address_hash)
block_timestamp = Map.get(coin_balances_param, :block_timestamp)
day = DateTime.to_date(block_timestamp)
acc
|> MapSet.put(%{address_hash: from_address_hash, day: day})
|> MapSet.put(%{address_hash: to_address_hash, day: day})
end
end
defp transactions_params_reducer(
%{block_number: block_number, from_address_hash: from_address_hash} = transaction_params,
initial
)
when is_binary(from_address_hash) do
# a transaction MUST have a `from_address_hash`
json_rpc_named_arguments = Application.get_env(:explorer, :json_rpc_named_arguments)
case EthereumJSONRPC.fetch_blocks_by_range(block_number..block_number, json_rpc_named_arguments) do
{:ok, %Blocks{blocks_params: [%{timestamp: block_timestamp}]}} ->
day = DateTime.to_date(block_timestamp)
acc = MapSet.put(initial, %{address_hash: from_address_hash, day: day})
# `to_address_hash` is optional
case transaction_params do
%{to_address_hash: to_address_hash} when is_binary(to_address_hash) ->
MapSet.put(acc, %{address_hash: to_address_hash, day: day})
[%{address_hash: address_hash, day: day} | acc]
end)
_ ->
acc
end
coin_balances_daily_params_set =
coin_balances_daily_params_list
|> Enum.dedup()
|> Enum.into(MapSet.new())
_ ->
initial
end
coin_balances_daily_params_set
end
end

@ -395,7 +395,7 @@ defmodule Indexer.Block.FetcherTest do
end)
# async requests need to be grouped in one expect because the order is non-deterministic while multiple expect
# calls on the same name/arity are used in order
|> expect(:json_rpc, 11, fn json, _options ->
|> expect(:json_rpc, 9, fn json, _options ->
[request] = json
case request do
@ -747,19 +747,6 @@ defmodule Indexer.Block.FetcherTest do
block_quantity = integer_to_quantity(block_number)
res = eth_block_number_fake_response(block_quantity)
EthereumJSONRPC.Mox
|> expect(:json_rpc, fn [
%{
id: 0,
jsonrpc: "2.0",
method: "eth_getBlockByNumber",
params: [^block_quantity, true]
}
],
_ ->
{:ok, [res]}
end)
%{
id: id,
result: [

@ -205,7 +205,7 @@ defmodule Indexer.Block.Realtime.FetcherTest do
}
]}
end)
|> expect(:json_rpc, 5, fn
|> expect(:json_rpc, 4, fn
[
%{id: 0, jsonrpc: "2.0", method: "trace_block", params: ["0x3C365F"]},
%{id: 1, jsonrpc: "2.0", method: "trace_block", params: ["0x3C3660"]}

@ -119,19 +119,6 @@ defmodule Indexer.Fetcher.BlockRewardTest do
res = eth_block_number_fake_response(block_quantity)
EthereumJSONRPC.Mox
|> expect(:json_rpc, fn [
%{
id: 0,
jsonrpc: "2.0",
method: "eth_getBlockByNumber",
params: [^block_quantity, true]
}
],
_ ->
{:ok, [res]}
end)
assert count(Chain.Block.Reward) == 0
parent = self()
@ -207,19 +194,6 @@ defmodule Indexer.Fetcher.BlockRewardTest do
res = eth_block_number_fake_response(block_quantity)
EthereumJSONRPC.Mox
|> expect(:json_rpc, fn [
%{
id: 0,
jsonrpc: "2.0",
method: "eth_getBlockByNumber",
params: [^block_quantity, true]
}
],
_ ->
{:ok, [res]}
end)
parent = self()
pid =
@ -352,19 +326,6 @@ defmodule Indexer.Fetcher.BlockRewardTest do
res = eth_block_number_fake_response(block_quantity)
EthereumJSONRPC.Mox
|> expect(:json_rpc, fn [
%{
id: 0,
jsonrpc: "2.0",
method: "eth_getBlockByNumber",
params: [^block_quantity, true]
}
],
_ ->
{:ok, [res]}
end)
assert count(Chain.Block.Reward) == 0
assert count(Chain.Address.CoinBalance) == 0
@ -455,19 +416,6 @@ defmodule Indexer.Fetcher.BlockRewardTest do
res = eth_block_number_fake_response(block_quantity)
EthereumJSONRPC.Mox
|> expect(:json_rpc, fn [
%{
id: 0,
jsonrpc: "2.0",
method: "eth_getBlockByNumber",
params: [^block_quantity, true]
}
],
_ ->
{:ok, [res]}
end)
assert count(Chain.Block.Reward) == 0
assert count(Chain.Address.CoinBalance) == 0
@ -548,19 +496,6 @@ defmodule Indexer.Fetcher.BlockRewardTest do
res = eth_block_number_fake_response(block_quantity)
EthereumJSONRPC.Mox
|> expect(:json_rpc, fn [
%{
id: 0,
jsonrpc: "2.0",
method: "eth_getBlockByNumber",
params: [^block_quantity, true]
}
],
_ ->
{:ok, [res]}
end)
assert count(Chain.Block.Reward) == 1
assert count(Chain.Address.CoinBalance) == 1
@ -702,19 +637,6 @@ defmodule Indexer.Fetcher.BlockRewardTest do
res = eth_block_number_fake_response(block_quantity)
EthereumJSONRPC.Mox
|> expect(:json_rpc, fn [
%{
id: 0,
jsonrpc: "2.0",
method: "eth_getBlockByNumber",
params: [^block_quantity, true]
}
],
_ ->
{:ok, [res]}
end)
assert count(Chain.Block.Reward) == 0
assert count(Chain.Address.CoinBalance) == 0

Loading…
Cancel
Save