pull/511/head
parent
88174b3617
commit
faa53be558
@ -1,104 +0,0 @@ |
||||
defmodule Explorer.Chain.Statistics do |
||||
@moduledoc """ |
||||
Represents statistics about the chain. |
||||
""" |
||||
|
||||
import Ecto.Query |
||||
|
||||
alias Ecto.Adapters.SQL |
||||
alias Explorer.{Chain, PagingOptions, Repo} |
||||
alias Explorer.Chain.{Block, Transaction} |
||||
alias Timex.Duration |
||||
|
||||
@average_time_query """ |
||||
SELECT coalesce(avg(difference), interval '0 seconds') |
||||
FROM ( |
||||
SELECT b.timestamp - lag(b.timestamp) over (order by b.timestamp) as difference |
||||
FROM (SELECT * FROM blocks ORDER BY number DESC LIMIT 101) b |
||||
LIMIT 100 OFFSET 1 |
||||
) t |
||||
""" |
||||
|
||||
@typedoc """ |
||||
The number of `t:Explorer.Chain.Block.t/0` mined/validated per minute. |
||||
""" |
||||
@type blocks_per_minute :: non_neg_integer() |
||||
|
||||
@typedoc """ |
||||
* `average_time` - the average time it took to mine/validate the last <= 100 `t:Explorer.Chain.Block.t/0` |
||||
* `blocks` - the last <= 5 `t:Explorer.Chain.Block.t/0` |
||||
(`t:Explorer.Chain.Block.t/0` `timestamp`) and when it was inserted into the databasse |
||||
(`t:Explorer.Chain.Block.t/0` `inserted_at`) |
||||
* `number` - the latest `t:Explorer.Chain.Block.t/0` `number` |
||||
`t:Explorer.Chain.Block.t/0` |
||||
* `timestamp` - when the last `t:Explorer.Chain.Block.t/0` was mined/validated |
||||
* `transaction_velocity` - the number of `t:Explorer.Chain.Block.t/0` mined/validated in the last minute |
||||
* `transactions` - the last <= 5 `t:Explorer.Chain.Transaction.t/0` |
||||
""" |
||||
@type t :: %__MODULE__{ |
||||
average_time: Duration.t(), |
||||
blocks: [Block.t()], |
||||
number: Block.block_number(), |
||||
timestamp: :calendar.datetime(), |
||||
transactions: [Transaction.t()] |
||||
} |
||||
|
||||
defstruct average_time: %Duration{seconds: 0, megaseconds: 0, microseconds: 0}, |
||||
blocks: [], |
||||
number: -1, |
||||
timestamp: nil, |
||||
transactions: [] |
||||
|
||||
def fetch do |
||||
blocks = |
||||
from( |
||||
block in Block, |
||||
order_by: [desc: block.number], |
||||
preload: [:miner, :transactions], |
||||
limit: 4 |
||||
) |
||||
|
||||
transactions = |
||||
Chain.recent_collated_transactions( |
||||
necessity_by_association: %{ |
||||
block: :required, |
||||
from_address: :required, |
||||
to_address: :optional |
||||
}, |
||||
paging_options: %PagingOptions{page_size: 5} |
||||
) |
||||
|
||||
%__MODULE__{ |
||||
average_time: query_duration(@average_time_query), |
||||
blocks: Repo.all(blocks), |
||||
transactions: transactions |
||||
} |
||||
|> put_max_numbered_block() |
||||
end |
||||
|
||||
defp put_max_numbered_block(state) do |
||||
case Chain.max_numbered_block() do |
||||
{:ok, %Block{number: number, timestamp: timestamp}} -> |
||||
%__MODULE__{ |
||||
state |
||||
| number: number, |
||||
timestamp: timestamp |
||||
} |
||||
|
||||
{:error, :not_found} -> |
||||
state |
||||
end |
||||
end |
||||
|
||||
defp query_duration(query) do |
||||
results = SQL.query!(Repo, query, []) |
||||
|
||||
{:ok, value} = |
||||
results.rows |
||||
|> List.first() |
||||
|> List.first() |
||||
|> Timex.Ecto.Time.load() |
||||
|
||||
value |
||||
end |
||||
end |
@ -1,77 +0,0 @@ |
||||
defmodule Explorer.Chain.Statistics.Server do |
||||
@moduledoc "Stores the latest chain statistics." |
||||
|
||||
use GenServer |
||||
|
||||
require Logger |
||||
|
||||
alias Explorer.Chain.Statistics |
||||
|
||||
@interval 1_000 |
||||
|
||||
defstruct statistics: %Statistics{}, |
||||
task: nil |
||||
|
||||
def child_spec(_) do |
||||
Supervisor.Spec.worker(__MODULE__, [[refresh: true]]) |
||||
end |
||||
|
||||
@spec fetch() :: Statistics.t() |
||||
def fetch do |
||||
GenServer.call(__MODULE__, :fetch) |
||||
end |
||||
|
||||
def start_link(opts \\ []) do |
||||
GenServer.start_link(__MODULE__, opts, name: __MODULE__) |
||||
end |
||||
|
||||
@impl GenServer |
||||
def init(options) when is_list(options) do |
||||
if Keyword.get(options, :refresh, true) do |
||||
send(self(), :refresh) |
||||
end |
||||
|
||||
{:ok, %__MODULE__{}} |
||||
end |
||||
|
||||
@impl GenServer |
||||
|
||||
def handle_info(:refresh, %__MODULE__{task: task} = state) do |
||||
new_state = |
||||
case task do |
||||
nil -> |
||||
%__MODULE__{state | task: Task.Supervisor.async_nolink(Explorer.TaskSupervisor, Statistics, :fetch, [])} |
||||
|
||||
_ -> |
||||
state |
||||
end |
||||
|
||||
{:noreply, new_state} |
||||
end |
||||
|
||||
def handle_info({ref, %Statistics{} = statistics}, %__MODULE__{task: %Task{ref: ref}} = state) do |
||||
Process.demonitor(ref, [:flush]) |
||||
Process.send_after(self(), :refresh, @interval) |
||||
|
||||
{:noreply, %__MODULE__{state | statistics: statistics, task: nil}} |
||||
end |
||||
|
||||
def handle_info({:DOWN, ref, :process, pid, reason}, %__MODULE__{task: %Task{pid: pid, ref: ref}} = state) do |
||||
Logger.error(fn -> "#{inspect(Statistics)}.fetch failed and could not be cached: #{inspect(reason)}" end) |
||||
|
||||
Process.send_after(self(), :refresh, @interval) |
||||
|
||||
{:noreply, %__MODULE__{state | task: nil}} |
||||
end |
||||
|
||||
@impl GenServer |
||||
def handle_call(:fetch, _, %__MODULE__{statistics: %Statistics{} = statistics} = state), |
||||
do: {:reply, statistics, state} |
||||
|
||||
@impl GenServer |
||||
def terminate(_, %__MODULE__{task: nil}), do: :ok |
||||
|
||||
def terminate(_, %__MODULE__{task: task}) do |
||||
Task.shutdown(task) |
||||
end |
||||
end |
@ -1,120 +0,0 @@ |
||||
defmodule Explorer.Chain.Statistics.ServerTest do |
||||
use Explorer.DataCase, async: false |
||||
|
||||
import ExUnit.CaptureLog |
||||
|
||||
alias Explorer.Chain.Statistics |
||||
alias Explorer.Chain.Statistics.Server |
||||
|
||||
# shutdown: "owner exited with: shutdown" error from polluting logs when tests are successful |
||||
@moduletag :capture_log |
||||
|
||||
describe "child_spec/1" do |
||||
test "it defines a child_spec/1 that works with supervisors" do |
||||
insert(:block) |
||||
|
||||
assert {:ok, pid} = start_supervised(Server) |
||||
|
||||
%Server{task: %Task{pid: pid}} = :sys.get_state(pid) |
||||
ref = Process.monitor(pid) |
||||
|
||||
assert_receive {:DOWN, ^ref, :process, ^pid, _} |
||||
end |
||||
end |
||||
|
||||
describe "init/1" do |
||||
test "returns a new chain when not told to refresh" do |
||||
empty_statistics = %Statistics{} |
||||
|
||||
assert {:ok, %Server{statistics: ^empty_statistics}} = Server.init(refresh: false) |
||||
end |
||||
|
||||
test "returns a new Statistics when told to refresh" do |
||||
empty_statistics = %Statistics{} |
||||
|
||||
assert {:ok, %Server{statistics: ^empty_statistics}} = Server.init(refresh: true) |
||||
end |
||||
|
||||
test "refreshes when told to refresh" do |
||||
{:ok, _} = Server.init([]) |
||||
|
||||
assert_receive :refresh, 2_000 |
||||
end |
||||
end |
||||
|
||||
describe "handle_info/2" do |
||||
setup :state |
||||
|
||||
test "returns the original statistics when sent a :refresh message", %{ |
||||
state: %Server{statistics: statistics} = state |
||||
} do |
||||
assert {:noreply, %Server{statistics: ^statistics, task: task}} = Server.handle_info(:refresh, state) |
||||
|
||||
Task.await(task) |
||||
end |
||||
|
||||
test "launches a Statistics.fetch Task update when sent a :refresh message", %{state: state} do |
||||
assert {:noreply, %Server{task: %Task{} = task}} = Server.handle_info(:refresh, state) |
||||
|
||||
assert %Statistics{} = Task.await(task) |
||||
end |
||||
|
||||
test "stores successful Task in state", %{state: state} do |
||||
# so that `statistics` from Task will be different |
||||
insert(:block) |
||||
|
||||
assert {:noreply, %Server{task: %Task{ref: ref}} = refresh_state} = Server.handle_info(:refresh, state) |
||||
|
||||
assert_receive {^ref, %Statistics{} = message_statistics} = message |
||||
|
||||
assert {:noreply, %Server{statistics: ^message_statistics, task: nil}} = |
||||
Server.handle_info(message, refresh_state) |
||||
|
||||
refute message_statistics == state.statistics |
||||
end |
||||
|
||||
test "logs crashed Task", %{state: state} do |
||||
assert {:noreply, %Server{task: %Task{pid: pid, ref: ref}} = refresh_state} = Server.handle_info(:refresh, state) |
||||
|
||||
Process.exit(pid, :boom) |
||||
|
||||
assert_receive {:DOWN, ^ref, :process, ^pid, :boom} = message |
||||
|
||||
captured_log = |
||||
capture_log(fn -> |
||||
assert {:noreply, %Server{task: nil}} = Server.handle_info(message, refresh_state) |
||||
end) |
||||
|
||||
assert captured_log =~ "Explorer.Chain.Statistics.fetch failed and could not be cached: :boom" |
||||
end |
||||
end |
||||
|
||||
describe "handle_call/3" do |
||||
test "replies with statistics when sent a :fetch message" do |
||||
original = Statistics.fetch() |
||||
state = %Server{statistics: original} |
||||
|
||||
assert {:reply, ^original, ^state} = Server.handle_call(:fetch, self(), state) |
||||
end |
||||
end |
||||
|
||||
describe "terminate/2" do |
||||
setup :state |
||||
|
||||
test "cleans up in-progress tasks when terminated", %{state: state} do |
||||
assert {:noreply, %Server{task: %Task{pid: pid}} = refresh_state} = Server.handle_info(:refresh, state) |
||||
|
||||
second_ref = Process.monitor(pid) |
||||
|
||||
Server.terminate(:boom, refresh_state) |
||||
|
||||
assert_receive {:DOWN, ^second_ref, :process, ^pid, :shutdown} |
||||
end |
||||
end |
||||
|
||||
defp state(_) do |
||||
{:ok, state} = Server.init([]) |
||||
|
||||
%{state: state} |
||||
end |
||||
end |
@ -1,68 +0,0 @@ |
||||
defmodule Explorer.Chain.StatisticsTest do |
||||
use Explorer.DataCase |
||||
|
||||
alias Explorer.Chain.Statistics |
||||
alias Timex.Duration |
||||
|
||||
describe "fetch/0" do |
||||
test "returns -1 for the number when there are no blocks" do |
||||
assert %Statistics{number: -1} = Statistics.fetch() |
||||
end |
||||
|
||||
test "returns the highest block number when there is a block" do |
||||
insert(:block, number: 1) |
||||
|
||||
max_number = 100 |
||||
insert(:block, number: max_number) |
||||
|
||||
assert %Statistics{number: ^max_number} = Statistics.fetch() |
||||
end |
||||
|
||||
test "returns the latest block timestamp" do |
||||
time = DateTime.utc_now() |
||||
insert(:block, timestamp: time) |
||||
|
||||
statistics = Statistics.fetch() |
||||
|
||||
assert Timex.diff(statistics.timestamp, time, :seconds) == 0 |
||||
end |
||||
|
||||
test "returns the average time between blocks for the last 100 blocks" do |
||||
time = DateTime.utc_now() |
||||
|
||||
insert(:block, timestamp: Timex.shift(time, seconds: -1000)) |
||||
|
||||
for x <- 100..0 do |
||||
insert(:block, timestamp: Timex.shift(time, seconds: -5 * x)) |
||||
end |
||||
|
||||
assert %Statistics{ |
||||
average_time: %Duration{ |
||||
seconds: 5, |
||||
megaseconds: 0, |
||||
microseconds: 0 |
||||
} |
||||
} = Statistics.fetch() |
||||
end |
||||
|
||||
test "returns the last five blocks" do |
||||
insert_list(5, :block) |
||||
|
||||
statistics = Statistics.fetch() |
||||
|
||||
assert statistics.blocks |> Enum.count() == 4 |
||||
end |
||||
|
||||
test "returns the last five transactions with blocks" do |
||||
Enum.map(0..5, fn _ -> |
||||
:transaction |
||||
|> insert() |
||||
|> with_block() |
||||
end) |
||||
|
||||
statistics = Statistics.fetch() |
||||
|
||||
assert statistics.transactions |> Enum.count() == 5 |
||||
end |
||||
end |
||||
end |
Loading…
Reference in new issue