Merge pull request #511 from poanetwork/live-update-chain-statistics-#33
Live update chain statistics #33pull/526/head
commit
da3419ddbb
@ -1,123 +0,0 @@ |
||||
defmodule Explorer.Chain.Statistics do |
||||
@moduledoc """ |
||||
Represents statistics about the chain. |
||||
""" |
||||
|
||||
import Ecto.Query |
||||
|
||||
alias Ecto.Adapters.SQL |
||||
alias Explorer.{Chain, PagingOptions, Repo} |
||||
alias Explorer.Chain.{Block, Transaction} |
||||
alias Timex.Duration |
||||
|
||||
@average_time_query """ |
||||
SELECT coalesce(avg(difference), interval '0 seconds') |
||||
FROM ( |
||||
SELECT b.timestamp - lag(b.timestamp) over (order by b.timestamp) as difference |
||||
FROM (SELECT * FROM blocks ORDER BY number DESC LIMIT 101) b |
||||
LIMIT 100 OFFSET 1 |
||||
) t |
||||
""" |
||||
|
||||
@transaction_velocity_query """ |
||||
SELECT count(transactions.inserted_at) |
||||
FROM transactions |
||||
WHERE transactions.inserted_at > NOW() - interval '1 minute' |
||||
""" |
||||
|
||||
@typedoc """ |
||||
The number of `t:Explorer.Chain.Block.t/0` mined/validated per minute. |
||||
""" |
||||
@type blocks_per_minute :: non_neg_integer() |
||||
|
||||
@typedoc """ |
||||
The number of `t:Explorer.Chain.Transaction.t/0` mined/validated per minute. |
||||
""" |
||||
@type transactions_per_minute :: non_neg_integer() |
||||
|
||||
@typedoc """ |
||||
* `average_time` - the average time it took to mine/validate the last <= 100 `t:Explorer.Chain.Block.t/0` |
||||
* `blocks` - the last <= 5 `t:Explorer.Chain.Block.t/0` |
||||
(`t:Explorer.Chain.Block.t/0` `timestamp`) and when it was inserted into the databasse |
||||
(`t:Explorer.Chain.Block.t/0` `inserted_at`) |
||||
* `number` - the latest `t:Explorer.Chain.Block.t/0` `number` |
||||
`t:Explorer.Chain.Block.t/0` |
||||
* `timestamp` - when the last `t:Explorer.Chain.Block.t/0` was mined/validated |
||||
* `transaction_velocity` - the number of `t:Explorer.Chain.Block.t/0` mined/validated in the last minute |
||||
* `transactions` - the last <= 5 `t:Explorer.Chain.Transaction.t/0` |
||||
""" |
||||
@type t :: %__MODULE__{ |
||||
average_time: Duration.t(), |
||||
blocks: [Block.t()], |
||||
number: Block.block_number(), |
||||
timestamp: :calendar.datetime(), |
||||
transaction_velocity: transactions_per_minute(), |
||||
transactions: [Transaction.t()] |
||||
} |
||||
|
||||
defstruct average_time: %Duration{seconds: 0, megaseconds: 0, microseconds: 0}, |
||||
blocks: [], |
||||
number: -1, |
||||
timestamp: nil, |
||||
transaction_velocity: 0, |
||||
transactions: [] |
||||
|
||||
def fetch do |
||||
blocks = |
||||
from( |
||||
block in Block, |
||||
order_by: [desc: block.number], |
||||
preload: [:miner, :transactions], |
||||
limit: 4 |
||||
) |
||||
|
||||
transactions = |
||||
Chain.recent_collated_transactions( |
||||
necessity_by_association: %{ |
||||
block: :required, |
||||
from_address: :required, |
||||
to_address: :optional |
||||
}, |
||||
paging_options: %PagingOptions{page_size: 5} |
||||
) |
||||
|
||||
%__MODULE__{ |
||||
average_time: query_duration(@average_time_query), |
||||
blocks: Repo.all(blocks), |
||||
transaction_velocity: query_value(@transaction_velocity_query), |
||||
transactions: transactions |
||||
} |
||||
|> put_max_numbered_block() |
||||
end |
||||
|
||||
defp put_max_numbered_block(state) do |
||||
case Chain.max_numbered_block() do |
||||
{:ok, %Block{number: number, timestamp: timestamp}} -> |
||||
%__MODULE__{ |
||||
state |
||||
| number: number, |
||||
timestamp: timestamp |
||||
} |
||||
|
||||
{:error, :not_found} -> |
||||
state |
||||
end |
||||
end |
||||
|
||||
defp query_value(query, args \\ []) do |
||||
results = SQL.query!(Repo, query, args) |
||||
results.rows |> List.first() |> List.first() |
||||
end |
||||
|
||||
defp query_duration(query) do |
||||
results = SQL.query!(Repo, query, []) |
||||
|
||||
{:ok, value} = |
||||
results.rows |
||||
|> List.first() |
||||
|> List.first() |
||||
|> Timex.Ecto.Time.load() |
||||
|
||||
value |
||||
end |
||||
end |
@ -1,77 +0,0 @@ |
||||
defmodule Explorer.Chain.Statistics.Server do |
||||
@moduledoc "Stores the latest chain statistics." |
||||
|
||||
use GenServer |
||||
|
||||
require Logger |
||||
|
||||
alias Explorer.Chain.Statistics |
||||
|
||||
@interval 1_000 |
||||
|
||||
defstruct statistics: %Statistics{}, |
||||
task: nil |
||||
|
||||
def child_spec(_) do |
||||
Supervisor.Spec.worker(__MODULE__, [[refresh: true]]) |
||||
end |
||||
|
||||
@spec fetch() :: Statistics.t() |
||||
def fetch do |
||||
GenServer.call(__MODULE__, :fetch) |
||||
end |
||||
|
||||
def start_link(opts \\ []) do |
||||
GenServer.start_link(__MODULE__, opts, name: __MODULE__) |
||||
end |
||||
|
||||
@impl GenServer |
||||
def init(options) when is_list(options) do |
||||
if Keyword.get(options, :refresh, true) do |
||||
send(self(), :refresh) |
||||
end |
||||
|
||||
{:ok, %__MODULE__{}} |
||||
end |
||||
|
||||
@impl GenServer |
||||
|
||||
def handle_info(:refresh, %__MODULE__{task: task} = state) do |
||||
new_state = |
||||
case task do |
||||
nil -> |
||||
%__MODULE__{state | task: Task.Supervisor.async_nolink(Explorer.TaskSupervisor, Statistics, :fetch, [])} |
||||
|
||||
_ -> |
||||
state |
||||
end |
||||
|
||||
{:noreply, new_state} |
||||
end |
||||
|
||||
def handle_info({ref, %Statistics{} = statistics}, %__MODULE__{task: %Task{ref: ref}} = state) do |
||||
Process.demonitor(ref, [:flush]) |
||||
Process.send_after(self(), :refresh, @interval) |
||||
|
||||
{:noreply, %__MODULE__{state | statistics: statistics, task: nil}} |
||||
end |
||||
|
||||
def handle_info({:DOWN, ref, :process, pid, reason}, %__MODULE__{task: %Task{pid: pid, ref: ref}} = state) do |
||||
Logger.error(fn -> "#{inspect(Statistics)}.fetch failed and could not be cached: #{inspect(reason)}" end) |
||||
|
||||
Process.send_after(self(), :refresh, @interval) |
||||
|
||||
{:noreply, %__MODULE__{state | task: nil}} |
||||
end |
||||
|
||||
@impl GenServer |
||||
def handle_call(:fetch, _, %__MODULE__{statistics: %Statistics{} = statistics} = state), |
||||
do: {:reply, statistics, state} |
||||
|
||||
@impl GenServer |
||||
def terminate(_, %__MODULE__{task: nil}), do: :ok |
||||
|
||||
def terminate(_, %__MODULE__{task: task}) do |
||||
Task.shutdown(task) |
||||
end |
||||
end |
@ -1,120 +0,0 @@ |
||||
defmodule Explorer.Chain.Statistics.ServerTest do |
||||
use Explorer.DataCase, async: false |
||||
|
||||
import ExUnit.CaptureLog |
||||
|
||||
alias Explorer.Chain.Statistics |
||||
alias Explorer.Chain.Statistics.Server |
||||
|
||||
# shutdown: "owner exited with: shutdown" error from polluting logs when tests are successful |
||||
@moduletag :capture_log |
||||
|
||||
describe "child_spec/1" do |
||||
test "it defines a child_spec/1 that works with supervisors" do |
||||
insert(:block) |
||||
|
||||
assert {:ok, pid} = start_supervised(Server) |
||||
|
||||
%Server{task: %Task{pid: pid}} = :sys.get_state(pid) |
||||
ref = Process.monitor(pid) |
||||
|
||||
assert_receive {:DOWN, ^ref, :process, ^pid, _} |
||||
end |
||||
end |
||||
|
||||
describe "init/1" do |
||||
test "returns a new chain when not told to refresh" do |
||||
empty_statistics = %Statistics{} |
||||
|
||||
assert {:ok, %Server{statistics: ^empty_statistics}} = Server.init(refresh: false) |
||||
end |
||||
|
||||
test "returns a new Statistics when told to refresh" do |
||||
empty_statistics = %Statistics{} |
||||
|
||||
assert {:ok, %Server{statistics: ^empty_statistics}} = Server.init(refresh: true) |
||||
end |
||||
|
||||
test "refreshes when told to refresh" do |
||||
{:ok, _} = Server.init([]) |
||||
|
||||
assert_receive :refresh, 2_000 |
||||
end |
||||
end |
||||
|
||||
describe "handle_info/2" do |
||||
setup :state |
||||
|
||||
test "returns the original statistics when sent a :refresh message", %{ |
||||
state: %Server{statistics: statistics} = state |
||||
} do |
||||
assert {:noreply, %Server{statistics: ^statistics, task: task}} = Server.handle_info(:refresh, state) |
||||
|
||||
Task.await(task) |
||||
end |
||||
|
||||
test "launches a Statistics.fetch Task update when sent a :refresh message", %{state: state} do |
||||
assert {:noreply, %Server{task: %Task{} = task}} = Server.handle_info(:refresh, state) |
||||
|
||||
assert %Statistics{} = Task.await(task) |
||||
end |
||||
|
||||
test "stores successful Task in state", %{state: state} do |
||||
# so that `statistics` from Task will be different |
||||
insert(:block) |
||||
|
||||
assert {:noreply, %Server{task: %Task{ref: ref}} = refresh_state} = Server.handle_info(:refresh, state) |
||||
|
||||
assert_receive {^ref, %Statistics{} = message_statistics} = message |
||||
|
||||
assert {:noreply, %Server{statistics: ^message_statistics, task: nil}} = |
||||
Server.handle_info(message, refresh_state) |
||||
|
||||
refute message_statistics == state.statistics |
||||
end |
||||
|
||||
test "logs crashed Task", %{state: state} do |
||||
assert {:noreply, %Server{task: %Task{pid: pid, ref: ref}} = refresh_state} = Server.handle_info(:refresh, state) |
||||
|
||||
Process.exit(pid, :boom) |
||||
|
||||
assert_receive {:DOWN, ^ref, :process, ^pid, :boom} = message |
||||
|
||||
captured_log = |
||||
capture_log(fn -> |
||||
assert {:noreply, %Server{task: nil}} = Server.handle_info(message, refresh_state) |
||||
end) |
||||
|
||||
assert captured_log =~ "Explorer.Chain.Statistics.fetch failed and could not be cached: :boom" |
||||
end |
||||
end |
||||
|
||||
describe "handle_call/3" do |
||||
test "replies with statistics when sent a :fetch message" do |
||||
original = Statistics.fetch() |
||||
state = %Server{statistics: original} |
||||
|
||||
assert {:reply, ^original, ^state} = Server.handle_call(:fetch, self(), state) |
||||
end |
||||
end |
||||
|
||||
describe "terminate/2" do |
||||
setup :state |
||||
|
||||
test "cleans up in-progress tasks when terminated", %{state: state} do |
||||
assert {:noreply, %Server{task: %Task{pid: pid}} = refresh_state} = Server.handle_info(:refresh, state) |
||||
|
||||
second_ref = Process.monitor(pid) |
||||
|
||||
Server.terminate(:boom, refresh_state) |
||||
|
||||
assert_receive {:DOWN, ^second_ref, :process, ^pid, :shutdown} |
||||
end |
||||
end |
||||
|
||||
defp state(_) do |
||||
{:ok, state} = Server.init([]) |
||||
|
||||
%{state: state} |
||||
end |
||||
end |
@ -1,76 +0,0 @@ |
||||
defmodule Explorer.Chain.StatisticsTest do |
||||
use Explorer.DataCase |
||||
|
||||
alias Explorer.Chain.Statistics |
||||
alias Timex.Duration |
||||
|
||||
describe "fetch/0" do |
||||
test "returns -1 for the number when there are no blocks" do |
||||
assert %Statistics{number: -1} = Statistics.fetch() |
||||
end |
||||
|
||||
test "returns the highest block number when there is a block" do |
||||
insert(:block, number: 1) |
||||
|
||||
max_number = 100 |
||||
insert(:block, number: max_number) |
||||
|
||||
assert %Statistics{number: ^max_number} = Statistics.fetch() |
||||
end |
||||
|
||||
test "returns the latest block timestamp" do |
||||
time = DateTime.utc_now() |
||||
insert(:block, timestamp: time) |
||||
|
||||
statistics = Statistics.fetch() |
||||
|
||||
assert Timex.diff(statistics.timestamp, time, :seconds) == 0 |
||||
end |
||||
|
||||
test "returns the average time between blocks for the last 100 blocks" do |
||||
time = DateTime.utc_now() |
||||
|
||||
insert(:block, timestamp: Timex.shift(time, seconds: -1000)) |
||||
|
||||
for x <- 100..0 do |
||||
insert(:block, timestamp: Timex.shift(time, seconds: -5 * x)) |
||||
end |
||||
|
||||
assert %Statistics{ |
||||
average_time: %Duration{ |
||||
seconds: 5, |
||||
megaseconds: 0, |
||||
microseconds: 0 |
||||
} |
||||
} = Statistics.fetch() |
||||
end |
||||
|
||||
test "returns the number of transactions inserted in the last minute" do |
||||
old_inserted_at = Timex.shift(DateTime.utc_now(), days: -1) |
||||
insert(:transaction, inserted_at: old_inserted_at) |
||||
insert(:transaction) |
||||
|
||||
assert %Statistics{transaction_velocity: 1} = Statistics.fetch() |
||||
end |
||||
|
||||
test "returns the last five blocks" do |
||||
insert_list(5, :block) |
||||
|
||||
statistics = Statistics.fetch() |
||||
|
||||
assert statistics.blocks |> Enum.count() == 4 |
||||
end |
||||
|
||||
test "returns the last five transactions with blocks" do |
||||
Enum.map(0..5, fn _ -> |
||||
:transaction |
||||
|> insert() |
||||
|> with_block() |
||||
end) |
||||
|
||||
statistics = Statistics.fetch() |
||||
|
||||
assert statistics.transactions |> Enum.count() == 5 |
||||
end |
||||
end |
||||
end |
@ -0,0 +1,184 @@ |
||||
defmodule ExplorerWeb.ViewingChainTest do |
||||
@moduledoc false |
||||
|
||||
use ExplorerWeb.FeatureCase, async: true |
||||
|
||||
alias ExplorerWeb.{AddressPage, BlockPage, ChainPage, Notifier, TransactionPage} |
||||
|
||||
setup do |
||||
[oldest_block | _] = Enum.map(401..404, &insert(:block, number: &1)) |
||||
|
||||
block = insert(:block, number: 405) |
||||
|
||||
[oldest_transaction | _] = |
||||
4 |
||||
|> insert_list(:transaction) |
||||
|> with_block(block) |
||||
|
||||
:transaction |
||||
|> insert() |
||||
|> with_block(block) |
||||
|
||||
{:ok, |
||||
%{ |
||||
block: block, |
||||
last_shown_block: oldest_block, |
||||
last_shown_transaction: oldest_transaction |
||||
}} |
||||
end |
||||
|
||||
describe "statistics" do |
||||
test "average block time live updates", %{session: session} do |
||||
time = DateTime.utc_now() |
||||
|
||||
for x <- 100..0 do |
||||
insert(:block, timestamp: Timex.shift(time, seconds: -5 * x), number: x + 500) |
||||
end |
||||
|
||||
session |
||||
|> ChainPage.visit_page() |
||||
|> assert_has(ChainPage.average_block_time("5 seconds")) |
||||
|
||||
block = |
||||
100..0 |
||||
|> Enum.map(fn index -> |
||||
insert(:block, timestamp: Timex.shift(time, seconds: -10 * index), number: index + 800) |
||||
end) |
||||
|> hd() |
||||
|
||||
Notifier.handle_event({:chain_event, :blocks, [block]}) |
||||
|
||||
assert_has(session, ChainPage.average_block_time("10 seconds")) |
||||
end |
||||
|
||||
test "address count live updates", %{session: session} do |
||||
count = ExplorerWeb.FakeAdapter.address_estimated_count() |
||||
|
||||
session |
||||
|> ChainPage.visit_page() |
||||
|> assert_has(ChainPage.address_count(count)) |
||||
|
||||
address = insert(:address) |
||||
Notifier.handle_event({:chain_event, :addresses, [address]}) |
||||
|
||||
assert_has(session, ChainPage.address_count(count + 1)) |
||||
end |
||||
end |
||||
|
||||
describe "viewing addresses" do |
||||
test "search for address", %{session: session} do |
||||
address = insert(:address) |
||||
|
||||
session |
||||
|> ChainPage.visit_page() |
||||
|> ChainPage.search(to_string(address.hash)) |
||||
|> assert_has(AddressPage.detail_hash(address)) |
||||
end |
||||
end |
||||
|
||||
describe "viewing blocks" do |
||||
test "search for blocks from chain page", %{session: session} do |
||||
block = insert(:block, number: 6) |
||||
|
||||
session |
||||
|> ChainPage.visit_page() |
||||
|> ChainPage.search(to_string(block.number)) |
||||
|> assert_has(BlockPage.detail_number(block)) |
||||
end |
||||
|
||||
test "blocks list", %{session: session} do |
||||
session |
||||
|> ChainPage.visit_page() |
||||
|> assert_has(ChainPage.blocks(count: 4)) |
||||
end |
||||
|
||||
test "viewing new blocks via live update", %{session: session, last_shown_block: last_shown_block} do |
||||
session |
||||
|> ChainPage.visit_page() |
||||
|> assert_has(ChainPage.blocks(count: 4)) |
||||
|
||||
block = insert(:block, number: 6) |
||||
|
||||
Notifier.handle_event({:chain_event, :blocks, [block]}) |
||||
|
||||
session |
||||
|> assert_has(ChainPage.blocks(count: 4)) |
||||
|> assert_has(ChainPage.block(block)) |
||||
|> refute_has(ChainPage.block(last_shown_block)) |
||||
end |
||||
end |
||||
|
||||
describe "viewing transactions" do |
||||
test "search for transactions", %{session: session} do |
||||
transaction = insert(:transaction) |
||||
|
||||
session |
||||
|> ChainPage.visit_page() |
||||
|> ChainPage.search(to_string(transaction.hash)) |
||||
|> assert_has(TransactionPage.detail_hash(transaction)) |
||||
end |
||||
|
||||
test "transactions list", %{session: session} do |
||||
session |
||||
|> ChainPage.visit_page() |
||||
|> assert_has(ChainPage.transactions(count: 5)) |
||||
end |
||||
|
||||
test "viewing new transactions via live update", %{ |
||||
session: session, |
||||
block: block, |
||||
last_shown_transaction: last_shown_transaction |
||||
} do |
||||
session |
||||
|> ChainPage.visit_page() |
||||
|> assert_has(ChainPage.transactions(count: 5)) |
||||
|
||||
transaction = |
||||
:transaction |
||||
|> insert() |
||||
|> with_block(block) |
||||
|
||||
Notifier.handle_event({:chain_event, :transactions, [transaction.hash]}) |
||||
|
||||
session |
||||
|> assert_has(ChainPage.transactions(count: 5)) |
||||
|> assert_has(ChainPage.transaction(transaction)) |
||||
|> refute_has(ChainPage.transaction(last_shown_transaction)) |
||||
end |
||||
|
||||
test "count of non-loaded transactions live update when batch overflow", %{session: session, block: block} do |
||||
transaction_hashes = |
||||
30 |
||||
|> insert_list(:transaction) |
||||
|> with_block(block) |
||||
|> Enum.map(& &1.hash) |
||||
|
||||
session |
||||
|> ChainPage.visit_page() |
||||
|> assert_has(ChainPage.transactions(count: 5)) |
||||
|
||||
Notifier.handle_event({:chain_event, :transactions, transaction_hashes}) |
||||
|
||||
assert_has(session, ChainPage.non_loaded_transaction_count("30")) |
||||
end |
||||
|
||||
test "contract creation is shown for to_address", %{session: session, block: block} do |
||||
contract_address = insert(:contract_address) |
||||
|
||||
transaction = |
||||
:transaction |
||||
|> insert(to_address: nil) |
||||
|> with_contract_creation(contract_address) |
||||
|> with_block(block) |
||||
|
||||
# internal_transaction = |
||||
# :internal_transaction_create |
||||
# |> insert(transaction: transaction, index: 0) |
||||
# |> with_contract_creation(contract_address) |
||||
|
||||
session |
||||
|> ChainPage.visit_page() |
||||
|> assert_has(ChainPage.contract_creation(transaction)) |
||||
end |
||||
end |
||||
end |
@ -0,0 +1,8 @@ |
||||
defmodule ExplorerWeb.FakeAdapter do |
||||
alias Explorer.Chain.Address |
||||
alias Explorer.Repo |
||||
|
||||
def address_estimated_count do |
||||
Repo.aggregate(Address, :count, :hash) |
||||
end |
||||
end |
Loading…
Reference in new issue