Add secondary coin and transaction stats (#9483)
* Add volume_24h * Add secondary coin and transactions stats * Process review comments * Allow different source for secondary coin * Fix exchange_rates_secondary_coin_price_source --------- Co-authored-by: Nikita Pozdniakov <nikitosing4@mail.ru>pull/9264/merge
parent
da6c71d911
commit
dea361d56f
@ -0,0 +1,95 @@ |
||||
defmodule Explorer.Counters.FreshPendingTransactionsCounter do |
||||
@moduledoc """ |
||||
Caches number of pending transactions for last 30 minutes. |
||||
|
||||
It loads the sum asynchronously and in a time interval of :cache_period (default to 5 minutes). |
||||
""" |
||||
|
||||
use GenServer |
||||
|
||||
import Ecto.Query |
||||
|
||||
alias Explorer.{Chain, Repo} |
||||
alias Explorer.Chain.Transaction |
||||
|
||||
@counter_type "pending_transaction_count_30min" |
||||
|
||||
@doc """ |
||||
Starts a process to periodically update the counter. |
||||
""" |
||||
@spec start_link(term()) :: GenServer.on_start() |
||||
def start_link(_) do |
||||
GenServer.start_link(__MODULE__, :ok, name: __MODULE__) |
||||
end |
||||
|
||||
@impl true |
||||
def init(_args) do |
||||
{:ok, %{consolidate?: enable_consolidation?()}, {:continue, :ok}} |
||||
end |
||||
|
||||
defp schedule_next_consolidation do |
||||
Process.send_after(self(), :consolidate, cache_interval()) |
||||
end |
||||
|
||||
@impl true |
||||
def handle_continue(:ok, %{consolidate?: true} = state) do |
||||
consolidate() |
||||
schedule_next_consolidation() |
||||
|
||||
{:noreply, state} |
||||
end |
||||
|
||||
@impl true |
||||
def handle_continue(:ok, state) do |
||||
{:noreply, state} |
||||
end |
||||
|
||||
@impl true |
||||
def handle_info(:consolidate, state) do |
||||
consolidate() |
||||
schedule_next_consolidation() |
||||
|
||||
{:noreply, state} |
||||
end |
||||
|
||||
@doc """ |
||||
Fetches the value for a `#{@counter_type}` counter type from the `last_fetched_counters` table. |
||||
""" |
||||
def fetch(options) do |
||||
Chain.get_last_fetched_counter(@counter_type, options) |
||||
end |
||||
|
||||
@doc """ |
||||
Consolidates the info by populating the `last_fetched_counters` table with the current database information. |
||||
""" |
||||
def consolidate do |
||||
query = |
||||
from(transaction in Transaction, |
||||
where: is_nil(transaction.block_hash) and transaction.inserted_at >= ago(30, "minute"), |
||||
select: count(transaction.hash) |
||||
) |
||||
|
||||
count = Repo.one!(query, timeout: :infinity) |
||||
|
||||
Chain.upsert_last_fetched_counter(%{ |
||||
counter_type: @counter_type, |
||||
value: count |
||||
}) |
||||
end |
||||
|
||||
@doc """ |
||||
Returns a boolean that indicates whether consolidation is enabled |
||||
|
||||
In order to choose whether or not to enable the scheduler and the initial |
||||
consolidation, change the following Explorer config: |
||||
|
||||
`config :explorer, #{__MODULE__}, enable_consolidation: true` |
||||
|
||||
to: |
||||
|
||||
`config :explorer, #{__MODULE__}, enable_consolidation: false` |
||||
""" |
||||
def enable_consolidation?, do: Application.get_env(:explorer, __MODULE__)[:enable_consolidation] |
||||
|
||||
defp cache_interval, do: Application.get_env(:explorer, __MODULE__)[:cache_period] |
||||
end |
@ -0,0 +1,143 @@ |
||||
defmodule Explorer.Counters.Transactions24hStats do |
||||
@moduledoc """ |
||||
Caches number of transactions for last 24 hours, sum of transaction fees for last 24 hours and average transaction fee for last 24 hours counters. |
||||
|
||||
It loads the counters asynchronously and in a time interval of :cache_period (default to 1 hour). |
||||
""" |
||||
|
||||
use GenServer |
||||
|
||||
import Ecto.Query |
||||
|
||||
alias Explorer.{Chain, Repo} |
||||
alias Explorer.Chain.Transaction |
||||
|
||||
@tx_count_name "transaction_count_24h" |
||||
@tx_fee_sum_name "transaction_fee_sum_24h" |
||||
@tx_fee_average_name "transaction_fee_average_24h" |
||||
|
||||
@doc """ |
||||
Starts a process to periodically update the counters. |
||||
""" |
||||
@spec start_link(term()) :: GenServer.on_start() |
||||
def start_link(_) do |
||||
GenServer.start_link(__MODULE__, :ok, name: __MODULE__) |
||||
end |
||||
|
||||
@impl true |
||||
def init(_args) do |
||||
{:ok, %{consolidate?: enable_consolidation?()}, {:continue, :ok}} |
||||
end |
||||
|
||||
defp schedule_next_consolidation do |
||||
Process.send_after(self(), :consolidate, cache_interval()) |
||||
end |
||||
|
||||
@impl true |
||||
def handle_continue(:ok, %{consolidate?: true} = state) do |
||||
consolidate() |
||||
schedule_next_consolidation() |
||||
|
||||
{:noreply, state} |
||||
end |
||||
|
||||
@impl true |
||||
def handle_continue(:ok, state) do |
||||
{:noreply, state} |
||||
end |
||||
|
||||
@impl true |
||||
def handle_info(:consolidate, state) do |
||||
consolidate() |
||||
schedule_next_consolidation() |
||||
|
||||
{:noreply, state} |
||||
end |
||||
|
||||
@doc """ |
||||
Fetches the value for a `#{@tx_count_name}` counter type from the `last_fetched_counters` table. |
||||
""" |
||||
def fetch_count(options) do |
||||
Chain.get_last_fetched_counter(@tx_count_name, options) |
||||
end |
||||
|
||||
@doc """ |
||||
Fetches the value for a `#{@tx_fee_sum_name}` counter type from the `last_fetched_counters` table. |
||||
""" |
||||
def fetch_fee_sum(options) do |
||||
Chain.get_last_fetched_counter(@tx_fee_sum_name, options) |
||||
end |
||||
|
||||
@doc """ |
||||
Fetches the value for a `#{@tx_fee_average_name}` counter type from the `last_fetched_counters` table. |
||||
""" |
||||
def fetch_fee_average(options) do |
||||
Chain.get_last_fetched_counter(@tx_fee_average_name, options) |
||||
end |
||||
|
||||
@doc """ |
||||
Consolidates the info by populating the `last_fetched_counters` table with the current database information. |
||||
""" |
||||
def consolidate do |
||||
fee_query = |
||||
dynamic( |
||||
[transaction, block], |
||||
fragment( |
||||
"COALESCE(?, ? + LEAST(?, ?))", |
||||
transaction.gas_price, |
||||
block.base_fee_per_gas, |
||||
transaction.max_priority_fee_per_gas, |
||||
transaction.max_fee_per_gas - block.base_fee_per_gas |
||||
) * transaction.gas_used |
||||
) |
||||
|
||||
sum_query = dynamic([_, _], sum(^fee_query)) |
||||
avg_query = dynamic([_, _], avg(^fee_query)) |
||||
|
||||
query = |
||||
from(transaction in Transaction, |
||||
join: block in assoc(transaction, :block), |
||||
where: block.timestamp >= ago(24, "hour"), |
||||
select: %{count: count(transaction.hash)}, |
||||
select_merge: ^%{fee_sum: sum_query}, |
||||
select_merge: ^%{fee_average: avg_query} |
||||
) |
||||
|
||||
%{ |
||||
count: count, |
||||
fee_sum: fee_sum, |
||||
fee_average: fee_average |
||||
} = Repo.one!(query, timeout: :infinity) |
||||
|
||||
Chain.upsert_last_fetched_counter(%{ |
||||
counter_type: @tx_count_name, |
||||
value: count |
||||
}) |
||||
|
||||
Chain.upsert_last_fetched_counter(%{ |
||||
counter_type: @tx_fee_sum_name, |
||||
value: fee_sum |
||||
}) |
||||
|
||||
Chain.upsert_last_fetched_counter(%{ |
||||
counter_type: @tx_fee_average_name, |
||||
value: fee_average |
||||
}) |
||||
end |
||||
|
||||
@doc """ |
||||
Returns a boolean that indicates whether consolidation is enabled |
||||
|
||||
In order to choose whether or not to enable the scheduler and the initial |
||||
consolidation, change the following Explorer config: |
||||
|
||||
`config :explorer, #{__MODULE__}, enable_consolidation: true` |
||||
|
||||
to: |
||||
|
||||
`config :explorer, #{__MODULE__}, enable_consolidation: false` |
||||
""" |
||||
def enable_consolidation?, do: Application.get_env(:explorer, __MODULE__)[:enable_consolidation] |
||||
|
||||
defp cache_interval, do: Application.get_env(:explorer, __MODULE__)[:cache_period] |
||||
end |
@ -0,0 +1,9 @@ |
||||
defmodule Explorer.Repo.Migrations.AddVolume24hToTokens do |
||||
use Ecto.Migration |
||||
|
||||
def change do |
||||
alter table(:tokens) do |
||||
add(:volume_24h, :decimal) |
||||
end |
||||
end |
||||
end |
@ -0,0 +1,12 @@ |
||||
defmodule Explorer.Repo.Migrations.AddSecondaryCoinMarketHistory do |
||||
use Ecto.Migration |
||||
|
||||
def change do |
||||
alter table(:market_history) do |
||||
add(:secondary_coin, :boolean, default: false) |
||||
end |
||||
|
||||
drop_if_exists(unique_index(:market_history, [:date])) |
||||
create(unique_index(:market_history, [:date, :secondary_coin])) |
||||
end |
||||
end |
@ -0,0 +1,27 @@ |
||||
defmodule Explorer.Counters.FreshPendingTransactionsCounterTest do |
||||
use Explorer.DataCase |
||||
|
||||
alias Explorer.Counters.FreshPendingTransactionsCounter |
||||
|
||||
test "populates the cache with the number of pending transactions addresses" do |
||||
insert(:transaction) |
||||
insert(:transaction) |
||||
insert(:transaction) |
||||
|
||||
start_supervised!(FreshPendingTransactionsCounter) |
||||
FreshPendingTransactionsCounter.consolidate() |
||||
|
||||
assert FreshPendingTransactionsCounter.fetch([]) == Decimal.new("3") |
||||
end |
||||
|
||||
test "count only fresh transactions" do |
||||
insert(:transaction, inserted_at: Timex.shift(Timex.now(), hours: -2)) |
||||
insert(:transaction) |
||||
insert(:transaction) |
||||
|
||||
start_supervised!(FreshPendingTransactionsCounter) |
||||
FreshPendingTransactionsCounter.consolidate() |
||||
|
||||
assert FreshPendingTransactionsCounter.fetch([]) == Decimal.new("2") |
||||
end |
||||
end |
@ -0,0 +1,61 @@ |
||||
defmodule Explorer.Counters.Transactions24hStatsTest do |
||||
use Explorer.DataCase |
||||
|
||||
alias Explorer.Counters.Transactions24hStats |
||||
|
||||
test "populates the cache with transaction counters" do |
||||
block = insert(:block, base_fee_per_gas: 50) |
||||
address = insert(:address) |
||||
|
||||
# fee = 10000 |
||||
|
||||
insert(:transaction, |
||||
from_address: address, |
||||
block: block, |
||||
block_number: block.number, |
||||
cumulative_gas_used: 0, |
||||
index: 0, |
||||
gas_price: 100, |
||||
gas_used: 100 |
||||
) |
||||
|
||||
# fee = 15000 |
||||
|
||||
insert(:transaction, |
||||
from_address: address, |
||||
block: block, |
||||
block_number: block.number, |
||||
cumulative_gas_used: 100, |
||||
index: 1, |
||||
gas_price: 150, |
||||
gas_used: 100, |
||||
max_priority_fee_per_gas: 100, |
||||
max_fee_per_gas: 200 |
||||
) |
||||
|
||||
# fee = 10000 |
||||
|
||||
insert(:transaction, |
||||
from_address: address, |
||||
block: block, |
||||
block_number: block.number, |
||||
cumulative_gas_used: 200, |
||||
index: 2, |
||||
gas_price: 100, |
||||
gas_used: 100, |
||||
max_priority_fee_per_gas: 70, |
||||
max_fee_per_gas: 100 |
||||
) |
||||
|
||||
start_supervised!(Transactions24hStats) |
||||
Transactions24hStats.consolidate() |
||||
|
||||
transaction_count = Transactions24hStats.fetch_count([]) |
||||
transaction_fee_sum = Transactions24hStats.fetch_fee_sum([]) |
||||
transaction_fee_average = Transactions24hStats.fetch_fee_average([]) |
||||
|
||||
assert transaction_count == Decimal.new("3") |
||||
assert transaction_fee_sum == Decimal.new("35000") |
||||
assert transaction_fee_average == Decimal.new("11667") |
||||
end |
||||
end |
Loading…
Reference in new issue