Merge pull request #3125 from poanetwork/vb-fix-performance-coin-balance-history-nnew-wave
Fix performance of coin balance history chartpull/3142/head
commit
70197692a4
@ -0,0 +1,74 @@ |
||||
defmodule Explorer.Chain.Address.CoinBalanceDaily do |
||||
@moduledoc """ |
||||
Maximum `t:Explorer.Chain.Wei.t/0` `value` of `t:Explorer.Chain.Address.t/0` at the day. |
||||
This table is used to display coinn balance history chart. |
||||
""" |
||||
|
||||
use Explorer.Schema |
||||
|
||||
alias Explorer.Chain.{Address, Hash, Wei} |
||||
alias Explorer.Chain.Address.CoinBalanceDaily |
||||
|
||||
@optional_fields ~w(value)a |
||||
@required_fields ~w(address_hash day)a |
||||
@allowed_fields @optional_fields ++ @required_fields |
||||
|
||||
@typedoc """ |
||||
* `address` - the `t:Explorer.Chain.Address.t/0`. |
||||
* `address_hash` - foreign key for `address`. |
||||
* `day` - the `t:Date.t/0`. |
||||
* `inserted_at` - When the balance was first inserted into the database. |
||||
* `updated_at` - When the balance was last updated. |
||||
* `value` - the max balance (`value`) of `address` during the `day`. |
||||
""" |
||||
@type t :: %__MODULE__{ |
||||
address: %Ecto.Association.NotLoaded{} | Address.t(), |
||||
address_hash: Hash.Address.t(), |
||||
day: Date.t(), |
||||
inserted_at: DateTime.t(), |
||||
updated_at: DateTime.t(), |
||||
value: Wei.t() | nil |
||||
} |
||||
|
||||
@primary_key false |
||||
schema "address_coin_balances_daily" do |
||||
field(:day, :date) |
||||
field(:value, Wei) |
||||
|
||||
timestamps() |
||||
|
||||
belongs_to(:address, Address, foreign_key: :address_hash, references: :hash, type: Hash.Address) |
||||
end |
||||
|
||||
@doc """ |
||||
Builds an `Ecto.Query` to fetch a series of balances by day for the given account. Each element in the series |
||||
corresponds to the maximum balance in that day. Only the last `n` days of data are used. |
||||
`n` is configurable via COIN_BALANCE_HISTORY_DAYS ENV var. |
||||
""" |
||||
def balances_by_day(address_hash) do |
||||
{days_to_consider, _} = |
||||
Application.get_env(:block_scout_web, BlockScoutWeb.Chain.Address.CoinBalance)[:coin_balance_history_days] |
||||
|> Integer.parse() |
||||
|
||||
CoinBalanceDaily |
||||
|> where([cbd], cbd.address_hash == ^address_hash) |
||||
|> limit_time_interval(days_to_consider) |
||||
|> select([cbd], %{date: cbd.day, value: cbd.value}) |
||||
end |
||||
|
||||
def limit_time_interval(query, days_to_consider) do |
||||
query |
||||
|> where( |
||||
[cbd], |
||||
cbd.day >= fragment("date_trunc('day', now() - CAST(? AS INTERVAL))", ^%Postgrex.Interval{days: days_to_consider}) |
||||
) |
||||
end |
||||
|
||||
def changeset(%__MODULE__{} = balance, params) do |
||||
balance |
||||
|> cast(params, @allowed_fields) |
||||
|> validate_required(@required_fields) |
||||
|> foreign_key_constraint(:address_hash) |
||||
|> unique_constraint(:day, name: :address_coin_balances_daily_address_hash_day_index) |
||||
end |
||||
end |
@ -0,0 +1,139 @@ |
||||
defmodule Explorer.Chain.Import.Runner.Address.CoinBalancesDaily do |
||||
@moduledoc """ |
||||
Bulk imports `t:Explorer.Chain.Address.CoinBalancesDaily.t/0`. |
||||
""" |
||||
|
||||
require Ecto.Query |
||||
|
||||
import Ecto.Query, only: [from: 2] |
||||
|
||||
alias Ecto.{Changeset, Multi, Repo} |
||||
alias Explorer.Chain.Address.CoinBalanceDaily |
||||
alias Explorer.Chain.{Hash, Import, Wei} |
||||
|
||||
@behaviour Import.Runner |
||||
|
||||
# milliseconds |
||||
@timeout 60_000 |
||||
|
||||
@type imported :: [ |
||||
%{required(:address_hash) => Hash.Address.t(), required(:day) => Date.t()} |
||||
] |
||||
|
||||
@impl Import.Runner |
||||
def ecto_schema_module, do: CoinBalanceDaily |
||||
|
||||
@impl Import.Runner |
||||
def option_key, do: :address_coin_balances_daily |
||||
|
||||
@impl Import.Runner |
||||
def imported_table_row do |
||||
%{ |
||||
value_type: "[%{address_hash: Explorer.Chain.Hash.t(), day: Date.t()}]", |
||||
value_description: "List of maps of the `t:#{ecto_schema_module()}.t/0` `address_hash` and `day`" |
||||
} |
||||
end |
||||
|
||||
@impl Import.Runner |
||||
def run(multi, changes_list, %{timestamps: timestamps} = options) do |
||||
insert_options = |
||||
options |
||||
|> Map.get(option_key(), %{}) |
||||
|> Map.take(~w(on_conflict timeout)a) |
||||
|> Map.put_new(:timeout, @timeout) |
||||
|> Map.put(:timestamps, timestamps) |
||||
|
||||
Multi.run(multi, :address_coin_balances_daily, fn repo, _ -> |
||||
insert(repo, changes_list, insert_options) |
||||
end) |
||||
end |
||||
|
||||
@impl Import.Runner |
||||
def timeout, do: @timeout |
||||
|
||||
@spec insert( |
||||
Repo.t(), |
||||
[ |
||||
%{ |
||||
required(:address_hash) => Hash.Address.t(), |
||||
required(:day) => Date.t(), |
||||
required(:value) => Wei.t() |
||||
} |
||||
], |
||||
%{ |
||||
optional(:on_conflict) => Import.Runner.on_conflict(), |
||||
required(:timeout) => timeout, |
||||
required(:timestamps) => Import.timestamps() |
||||
} |
||||
) :: |
||||
{:ok, [%{required(:address_hash) => Hash.Address.t(), required(:day) => Date.t()}]} |
||||
| {:error, [Changeset.t()]} |
||||
defp insert(repo, changes_list, %{timeout: timeout, timestamps: timestamps} = options) when is_list(changes_list) do |
||||
on_conflict = Map.get_lazy(options, :on_conflict, &default_on_conflict/0) |
||||
|
||||
combined_changes_list = |
||||
changes_list |
||||
|> Enum.reduce([], fn change, acc -> |
||||
if Enum.empty?(acc) do |
||||
[change | acc] |
||||
else |
||||
target_item = |
||||
Enum.find(acc, fn item -> |
||||
item.day == change.day && item.address_hash == change.address_hash |
||||
end) |
||||
|
||||
if target_item do |
||||
if change.value > target_item.value do |
||||
acc_updated = List.delete(acc, target_item) |
||||
[change | acc_updated] |
||||
else |
||||
acc |
||||
end |
||||
else |
||||
[change | acc] |
||||
end |
||||
end |
||||
end) |
||||
|
||||
# Enforce CoinBalanceDaily ShareLocks order (see docs: sharelocks.md) |
||||
ordered_changes_list = Enum.sort_by(combined_changes_list, &{&1.address_hash, &1.day}) |
||||
|
||||
{:ok, _} = |
||||
Import.insert_changes_list( |
||||
repo, |
||||
ordered_changes_list, |
||||
conflict_target: [:address_hash, :day], |
||||
on_conflict: on_conflict, |
||||
for: CoinBalanceDaily, |
||||
timeout: timeout, |
||||
timestamps: timestamps |
||||
) |
||||
|
||||
{:ok, Enum.map(ordered_changes_list, &Map.take(&1, ~w(address_hash day)a))} |
||||
end |
||||
|
||||
def default_on_conflict do |
||||
from( |
||||
balance in CoinBalanceDaily, |
||||
update: [ |
||||
set: [ |
||||
value: |
||||
fragment( |
||||
""" |
||||
CASE WHEN EXCLUDED.value IS NOT NULL AND EXCLUDED.value > ? THEN |
||||
EXCLUDED.value |
||||
ELSE |
||||
? |
||||
END |
||||
""", |
||||
balance.value, |
||||
balance.value |
||||
), |
||||
inserted_at: fragment("LEAST(EXCLUDED.inserted_at, ?)", balance.inserted_at), |
||||
updated_at: fragment("GREATEST(EXCLUDED.updated_at, ?)", balance.updated_at) |
||||
] |
||||
], |
||||
where: fragment("EXCLUDED.value IS NOT NULL") |
||||
) |
||||
end |
||||
end |
@ -0,0 +1,17 @@ |
||||
defmodule Explorer.Repo.Migrations.AddressCoinBalancesDaily do |
||||
use Ecto.Migration |
||||
|
||||
def change do |
||||
create table(:address_coin_balances_daily, primary_key: false) do |
||||
add(:address_hash, references(:addresses, column: :hash, type: :bytea), null: false) |
||||
add(:day, :date, null: false) |
||||
|
||||
# null until fetched |
||||
add(:value, :numeric, precision: 100, default: fragment("NULL"), null: true) |
||||
|
||||
timestamps(null: false, type: :utc_datetime_usec) |
||||
end |
||||
|
||||
create(unique_index(:address_coin_balances_daily, [:address_hash, :day])) |
||||
end |
||||
end |
File diff suppressed because one or more lines are too long
@ -0,0 +1,155 @@ |
||||
defmodule Indexer.Transform.AddressCoinBalancesDaily do |
||||
@moduledoc """ |
||||
Extracts `Explorer.Chain.Address.CoinBalanceDaily` params from other schema's params. |
||||
""" |
||||
|
||||
alias EthereumJSONRPC.Blocks |
||||
|
||||
def params_set(%{} = import_options) do |
||||
Enum.reduce(import_options, MapSet.new(), &reducer/2) |
||||
end |
||||
|
||||
defp reducer({:beneficiary_params, beneficiary_params}, acc) when is_list(beneficiary_params) do |
||||
json_rpc_named_arguments = Application.get_env(:explorer, :json_rpc_named_arguments) |
||||
|
||||
block_numbers = |
||||
beneficiary_params |
||||
|> Enum.map(&Map.get(&1, :block_number)) |
||||
|> Enum.sort() |
||||
|> Enum.dedup() |
||||
|
||||
block_timestamp_map = |
||||
Enum.reduce(block_numbers, %{}, fn block_number, map -> |
||||
{:ok, %Blocks{blocks_params: [%{timestamp: timestamp}]}} = |
||||
EthereumJSONRPC.fetch_blocks_by_range(block_number..block_number, json_rpc_named_arguments) |
||||
|
||||
day = DateTime.to_date(timestamp) |
||||
Map.put(map, "#{block_number}", day) |
||||
end) |
||||
|
||||
Enum.into(beneficiary_params, acc, fn %{ |
||||
address_hash: address_hash, |
||||
block_number: block_number |
||||
} |
||||
when is_binary(address_hash) and is_integer(block_number) -> |
||||
day = Map.get(block_timestamp_map, "#{block_number}") |
||||
|
||||
%{address_hash: address_hash, day: day} |
||||
end) |
||||
end |
||||
|
||||
defp reducer({:blocks_params, blocks_params}, acc) when is_list(blocks_params) do |
||||
# a block MUST have a miner_hash and number |
||||
Enum.into(blocks_params, acc, fn %{miner_hash: address_hash, number: block_number, timestamp: block_timestamp} |
||||
when is_binary(address_hash) and is_integer(block_number) -> |
||||
day = DateTime.to_date(block_timestamp) |
||||
%{address_hash: address_hash, day: day} |
||||
end) |
||||
end |
||||
|
||||
defp reducer({:internal_transactions_params, internal_transactions_params}, initial) |
||||
when is_list(internal_transactions_params) do |
||||
Enum.reduce(internal_transactions_params, initial, &internal_transactions_params_reducer/2) |
||||
end |
||||
|
||||
defp reducer({:logs_params, logs_params}, acc) when is_list(logs_params) do |
||||
# a log MUST have address_hash and block_number |
||||
json_rpc_named_arguments = Application.get_env(:explorer, :json_rpc_named_arguments) |
||||
|
||||
block_numbers = |
||||
logs_params |
||||
|> Enum.map(&Map.get(&1, :block_number)) |
||||
|> Enum.sort() |
||||
|> Enum.dedup() |
||||
|
||||
block_timestamp_map = |
||||
Enum.reduce(block_numbers, %{}, fn block_number, map -> |
||||
{:ok, %Blocks{blocks_params: [%{timestamp: timestamp}]}} = |
||||
EthereumJSONRPC.fetch_blocks_by_range(block_number..block_number, json_rpc_named_arguments) |
||||
|
||||
day = DateTime.to_date(timestamp) |
||||
Map.put(map, "#{block_number}", day) |
||||
end) |
||||
|
||||
logs_params |
||||
|> Enum.into(acc, fn |
||||
%{address_hash: address_hash, block_number: block_number} |
||||
when is_binary(address_hash) and is_integer(block_number) -> |
||||
day = Map.get(block_timestamp_map, "#{block_number}") |
||||
%{address_hash: address_hash, day: day} |
||||
|
||||
%{type: "pending"} -> |
||||
nil |
||||
end) |
||||
|> Enum.reject(fn val -> is_nil(val) end) |
||||
|> MapSet.new() |
||||
end |
||||
|
||||
defp reducer({:transactions_params, transactions_params}, initial) when is_list(transactions_params) do |
||||
Enum.reduce(transactions_params, initial, &transactions_params_reducer/2) |
||||
end |
||||
|
||||
defp reducer({:block_second_degree_relations_params, block_second_degree_relations_params}, initial) |
||||
when is_list(block_second_degree_relations_params), |
||||
do: initial |
||||
|
||||
defp internal_transactions_params_reducer( |
||||
%{block_number: block_number} = internal_transaction_params, |
||||
acc |
||||
) |
||||
when is_integer(block_number) do |
||||
case internal_transaction_params do |
||||
%{type: "call"} -> |
||||
acc |
||||
|
||||
%{type: "create", error: _} -> |
||||
acc |
||||
|
||||
%{type: "create", created_contract_address_hash: address_hash} when is_binary(address_hash) -> |
||||
json_rpc_named_arguments = Application.get_env(:explorer, :json_rpc_named_arguments) |
||||
|
||||
{:ok, %Blocks{blocks_params: [%{timestamp: block_timestamp}]}} = |
||||
EthereumJSONRPC.fetch_blocks_by_range(block_number..block_number, json_rpc_named_arguments) |
||||
|
||||
day = DateTime.to_date(block_timestamp) |
||||
MapSet.put(acc, %{address_hash: address_hash, day: day}) |
||||
|
||||
%{type: "selfdestruct", from_address_hash: from_address_hash, to_address_hash: to_address_hash} |
||||
when is_binary(from_address_hash) and is_binary(to_address_hash) -> |
||||
json_rpc_named_arguments = Application.get_env(:explorer, :json_rpc_named_arguments) |
||||
|
||||
{:ok, %Blocks{blocks_params: [%{timestamp: block_timestamp}]}} = |
||||
EthereumJSONRPC.fetch_blocks_by_range(block_number..block_number, json_rpc_named_arguments) |
||||
|
||||
day = DateTime.to_date(block_timestamp) |
||||
|
||||
acc |
||||
|> MapSet.put(%{address_hash: from_address_hash, day: day}) |
||||
|> MapSet.put(%{address_hash: to_address_hash, day: day}) |
||||
end |
||||
end |
||||
|
||||
defp transactions_params_reducer( |
||||
%{block_number: block_number, from_address_hash: from_address_hash} = transaction_params, |
||||
initial |
||||
) |
||||
when is_binary(from_address_hash) do |
||||
# a transaction MUST have a `from_address_hash` |
||||
json_rpc_named_arguments = Application.get_env(:explorer, :json_rpc_named_arguments) |
||||
|
||||
{:ok, %Blocks{blocks_params: [%{timestamp: block_timestamp}]}} = |
||||
EthereumJSONRPC.fetch_blocks_by_range(block_number..block_number, json_rpc_named_arguments) |
||||
|
||||
day = DateTime.to_date(block_timestamp) |
||||
acc = MapSet.put(initial, %{address_hash: from_address_hash, day: day}) |
||||
|
||||
# `to_address_hash` is optional |
||||
case transaction_params do |
||||
%{to_address_hash: to_address_hash} when is_binary(to_address_hash) -> |
||||
MapSet.put(acc, %{address_hash: to_address_hash, day: day}) |
||||
|
||||
_ -> |
||||
acc |
||||
end |
||||
end |
||||
end |
Loading…
Reference in new issue