Merge pull request #1267 from poanetwork/ams-event-handler-messages

Decrease BlockScoutWeb.EventHandler message queue
pull/1280/head
Andrew Cravenho 6 years ago committed by GitHub
commit 37121c1d25
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 2
      apps/block_scout_web/config/config.exs
  2. 2
      apps/block_scout_web/config/test.exs
  3. 7
      apps/block_scout_web/lib/block_scout_web/application.ex
  4. 63
      apps/block_scout_web/lib/block_scout_web/counters/blocks_indexed_counter.ex
  5. 34
      apps/block_scout_web/lib/block_scout_web/event_handler.ex
  6. 28
      apps/block_scout_web/lib/block_scout_web/notifier.ex
  7. 33
      apps/block_scout_web/lib/block_scout_web/realtime_event_handler.ex
  8. 15
      apps/block_scout_web/test/block_scout_web/channels/block_channel_test.exs
  9. 39
      apps/block_scout_web/test/block_scout_web/features/viewing_app_test.exs
  10. 29
      apps/explorer/lib/explorer/chain.ex
  11. 41
      apps/explorer/lib/explorer/chain/events/publisher.ex
  12. 48
      apps/explorer/lib/explorer/chain/events/subscriber.ex
  13. 22
      apps/explorer/lib/explorer/chain/import.ex
  14. 7
      apps/explorer/lib/explorer/exchange_rates/exchange_rates.ex
  15. 65
      apps/explorer/test/explorer/chain/events/publisher_test.exs
  16. 33
      apps/explorer/test/explorer/chain/events/subscriber_test.exs
  17. 28
      apps/explorer/test/explorer/chain/import_test.exs
  18. 6
      apps/explorer/test/explorer/chain_test.exs

@ -60,6 +60,8 @@ config :wobserver,
discovery: :none,
mode: :plug
config :block_scout_web, BlockScoutWeb.Counters.BlocksIndexedCounter, enabled: true
# Import environment specific config. This must remain at the bottom
# of this file so it overrides the configuration defined above.
import_config "#{Mix.env()}.exs"

@ -19,3 +19,5 @@ config :logger, :block_scout_web,
config :wallaby, screenshot_on_failure: true
config :explorer, Explorer.ExchangeRates, enabled: false, store: :none
config :block_scout_web, BlockScoutWeb.Counters.BlocksIndexedCounter, enabled: false

@ -5,7 +5,9 @@ defmodule BlockScoutWeb.Application do
use Application
alias BlockScoutWeb.{Endpoint, EventHandler, Prometheus}
alias BlockScoutWeb.Counters.BlocksIndexedCounter
alias BlockScoutWeb.{Endpoint, Prometheus}
alias BlockScoutWeb.RealtimeEventHandler
def start(_type, _args) do
import Supervisor.Spec
@ -18,7 +20,8 @@ defmodule BlockScoutWeb.Application do
# Start the endpoint when the application starts
supervisor(Endpoint, []),
supervisor(Absinthe.Subscription, [Endpoint]),
{EventHandler, name: EventHandler}
{RealtimeEventHandler, name: RealtimeEventHandler},
{BlocksIndexedCounter, name: BlocksIndexedCounter}
]
opts = [strategy: :one_for_one, name: BlockScoutWeb.Supervisor]

@ -0,0 +1,63 @@
defmodule BlockScoutWeb.Counters.BlocksIndexedCounter do
@moduledoc """
Module responsible for fetching and consolidating the number blocks indexed.
It loads the count asynchronously in a time interval.
"""
use GenServer
alias BlockScoutWeb.Notifier
alias Explorer.Chain
# It is undesirable to automatically start the counter in all environments.
# Consider the test environment: if it initiates but does not finish before a
# test ends, that test will fail.
config = Application.get_env(:block_scout_web, BlockScoutWeb.Counters.BlocksIndexedCounter)
@enabled Keyword.get(config, :enabled)
@doc """
Starts a process to periodically update the % of blocks indexed.
"""
@spec start_link(term()) :: GenServer.on_start()
def start_link(_) do
GenServer.start_link(__MODULE__, :ok, name: __MODULE__)
end
@impl true
def init(args) do
if @enabled do
Task.start_link(&calculate_blocks_indexed/0)
schedule_next_consolidation()
end
{:ok, args}
end
def calculate_blocks_indexed do
ratio = Chain.indexed_ratio()
finished? =
if ratio < 1 do
false
else
Chain.finished_indexing?()
end
Notifier.broadcast_blocks_indexed_ratio(ratio, finished?)
end
defp schedule_next_consolidation do
Process.send_after(self(), :calculate_blocks_indexed, :timer.minutes(5))
end
@impl true
def handle_info(:calculate_blocks_indexed, state) do
calculate_blocks_indexed()
schedule_next_consolidation()
{:noreply, state}
end
end

@ -1,34 +0,0 @@
defmodule BlockScoutWeb.EventHandler do
@moduledoc """
Subscribing process for broadcast events from Chain context.
"""
use GenServer
alias BlockScoutWeb.Notifier
alias Explorer.Chain
# Client
def start_link(_) do
GenServer.start_link(__MODULE__, [], name: __MODULE__)
end
# Server
def init([]) do
Chain.subscribe_to_events(:addresses)
Chain.subscribe_to_events(:address_coin_balances)
Chain.subscribe_to_events(:blocks)
Chain.subscribe_to_events(:exchange_rate)
Chain.subscribe_to_events(:internal_transactions)
Chain.subscribe_to_events(:transactions)
Chain.subscribe_to_events(:token_transfers)
{:ok, []}
end
def handle_info(event, state) do
Notifier.handle_event(event)
{:noreply, state}
end
end

@ -1,6 +1,6 @@
defmodule BlockScoutWeb.Notifier do
@moduledoc """
Responds to events from EventHandler by sending appropriate channel updates to front-end.
Responds to events by sending appropriate channel updates to front-end.
"""
alias Absinthe.Subscription
@ -21,22 +21,6 @@ defmodule BlockScoutWeb.Notifier do
Enum.each(address_coin_balances, &broadcast_address_coin_balance/1)
end
def handle_event({:chain_event, :blocks, :catchup, _blocks}) do
ratio = Chain.indexed_ratio()
finished? =
if ratio < 1 do
false
else
Chain.finished_indexing?()
end
Endpoint.broadcast("blocks:indexing", "index_status", %{
ratio: ratio,
finished: finished?
})
end
def handle_event({:chain_event, :blocks, :realtime, blocks}) do
Enum.each(blocks, &broadcast_block/1)
end
@ -94,6 +78,16 @@ defmodule BlockScoutWeb.Notifier do
def handle_event(_), do: nil
@doc """
Broadcast the percentage of blocks indexed so far.
"""
def broadcast_blocks_indexed_ratio(ratio, finished?) do
Endpoint.broadcast("blocks:indexing", "index_status", %{
ratio: ratio,
finished: finished?
})
end
defp broadcast_address_coin_balance(%{address_hash: address_hash, block_number: block_number}) do
Endpoint.broadcast("addresses:#{address_hash}", "coin_balance", %{
block_number: block_number

@ -0,0 +1,33 @@
defmodule BlockScoutWeb.RealtimeEventHandler do
@moduledoc """
Subscribing process for broadcast events from realtime.
"""
use GenServer
alias BlockScoutWeb.Notifier
alias Explorer.Chain.Events.Subscriber
def start_link(_) do
GenServer.start_link(__MODULE__, [], name: __MODULE__)
end
@impl true
def init([]) do
Subscriber.to(:addresses, :realtime)
Subscriber.to(:address_coin_balances, :realtime)
Subscriber.to(:blocks, :realtime)
Subscriber.to(:internal_transactions, :realtime)
Subscriber.to(:transactions, :realtime)
Subscriber.to(:token_transfers, :realtime)
# Does not come from the indexer
Subscriber.to(:exchange_rate)
{:ok, []}
end
@impl true
def handle_info(event, state) do
Notifier.handle_event(event)
{:noreply, state}
end
end

@ -19,19 +19,4 @@ defmodule BlockScoutWeb.BlockChannelTest do
assert false, "Expected message received nothing."
end
end
test "subscribed user is notified of new_block event for catchup" do
topic = "blocks:indexing"
@endpoint.subscribe(topic)
Notifier.handle_event({:chain_event, :blocks, :catchup, []})
receive do
%Phoenix.Socket.Broadcast{topic: ^topic, event: "index_status", payload: %{}} ->
assert true
after
:timer.seconds(5) ->
assert false, "Expected message received nothing."
end
end
end

@ -3,18 +3,23 @@ defmodule BlockScoutWeb.ViewingAppTest do
use BlockScoutWeb.FeatureCase, async: true
alias BlockScoutWeb.{AppPage, Notifier}
alias BlockScoutWeb.AppPage
alias BlockScoutWeb.Counters.BlocksIndexedCounter
alias Explorer.Counters.AddressesWithBalanceCounter
setup do
start_supervised!(AddressesWithBalanceCounter)
AddressesWithBalanceCounter.consolidate()
:ok
end
describe "loading bar when indexing" do
test "shows blocks indexed percentage", %{session: session} do
for index <- 5..9 do
insert(:block, number: index)
end
start_supervised!(AddressesWithBalanceCounter)
AddressesWithBalanceCounter.consolidate()
assert Explorer.Chain.indexed_ratio() == 0.5
session
@ -27,9 +32,6 @@ defmodule BlockScoutWeb.ViewingAppTest do
insert(:block, number: index)
end
start_supervised!(AddressesWithBalanceCounter)
AddressesWithBalanceCounter.consolidate()
assert Explorer.Chain.indexed_ratio() == 1.0
session
@ -37,13 +39,12 @@ defmodule BlockScoutWeb.ViewingAppTest do
|> assert_has(AppPage.indexed_status("Indexing Tokens"))
end
test "live updates blocks indexed percentage", %{session: session} do
test "updates blocks indexed percentage", %{session: session} do
for index <- 5..9 do
insert(:block, number: index)
end
start_supervised!(AddressesWithBalanceCounter)
AddressesWithBalanceCounter.consolidate()
BlocksIndexedCounter.calculate_blocks_indexed()
assert Explorer.Chain.indexed_ratio() == 0.5
@ -52,18 +53,18 @@ defmodule BlockScoutWeb.ViewingAppTest do
|> assert_has(AppPage.indexed_status("50% Blocks Indexed"))
insert(:block, number: 4)
Notifier.handle_event({:chain_event, :blocks, :catchup, []})
BlocksIndexedCounter.calculate_blocks_indexed()
assert_has(session, AppPage.indexed_status("60% Blocks Indexed"))
end
test "live updates when blocks are fully indexed", %{session: session} do
test "updates when blocks are fully indexed", %{session: session} do
for index <- 1..9 do
insert(:block, number: index)
end
start_supervised!(AddressesWithBalanceCounter)
AddressesWithBalanceCounter.consolidate()
BlocksIndexedCounter.calculate_blocks_indexed()
assert Explorer.Chain.indexed_ratio() == 0.9
@ -72,19 +73,19 @@ defmodule BlockScoutWeb.ViewingAppTest do
|> assert_has(AppPage.indexed_status("90% Blocks Indexed"))
insert(:block, number: 0)
Notifier.handle_event({:chain_event, :blocks, :catchup, []})
BlocksIndexedCounter.calculate_blocks_indexed()
assert_has(session, AppPage.indexed_status("Indexing Tokens"))
end
test "live removes message when chain is indexed", %{session: session} do
test "removes message when chain is indexed", %{session: session} do
[block | _] =
for index <- 0..9 do
insert(:block, number: index)
end
start_supervised!(AddressesWithBalanceCounter)
AddressesWithBalanceCounter.consolidate()
BlocksIndexedCounter.calculate_blocks_indexed()
assert Explorer.Chain.indexed_ratio() == 1.0
@ -96,7 +97,7 @@ defmodule BlockScoutWeb.ViewingAppTest do
|> insert()
|> with_block(block, internal_transactions_indexed_at: DateTime.utc_now())
Notifier.handle_event({:chain_event, :blocks, :catchup, []})
BlocksIndexedCounter.calculate_blocks_indexed()
refute_has(session, AppPage.still_indexing?())
end

@ -1520,35 +1520,6 @@ defmodule Explorer.Chain do
Hash.Full.cast(string)
end
@doc """
Subscribes the caller process to a specified subset of chain-related events.
## Handling An Event
A subscribed process should handle an event message. The message is in the
format of a three-element tuple.
* Element 0 - `:chain_event`
* Element 1 - event subscribed to
* Element 2 - event data in list form
# A new block event in a GenServer
def handle_info({:chain_event, :blocks, blocks}, state) do
# Do something with the blocks
end
## Example
iex> Explorer.Chain.subscribe_to_events(:blocks)
:ok
"""
@spec subscribe_to_events(chain_event()) :: :ok
def subscribe_to_events(event_type)
when event_type in ~w(addresses address_coin_balances blocks exchange_rate internal_transactions logs token_transfers transactions)a do
Registry.register(Registry.ChainEvents, event_type, [])
:ok
end
@doc """
Estimated count of `t:Explorer.Chain.Transaction.t/0`.

@ -0,0 +1,41 @@
defmodule Explorer.Chain.Events.Publisher do
@moduledoc """
Publishes events related to the Chain context.
"""
@allowed_events ~w(addresses address_coin_balances blocks internal_transactions token_transfers transactions)a
def broadcast(_data, false), do: :ok
def broadcast(data, broadcast_type) do
for {event_type, event_data} <- data, event_type in @allowed_events do
send_data(event_type, broadcast_type, event_data)
end
end
@spec broadcast(atom()) :: :ok
def broadcast(event_type) do
send_data(event_type)
end
defp send_data(event_type) do
Registry.dispatch(Registry.ChainEvents, event_type, fn entries ->
for {pid, _registered_val} <- entries do
send(pid, {:chain_event, event_type})
end
end)
end
# The :catchup type of event is not being consumed right now.
# To avoid a large number of unread messages in the `mailbox` the dispatch of
# these type of events is disabled for now.
defp send_data(_event_type, :catchup, _event_data), do: :ok
defp send_data(event_type, broadcast_type, event_data) do
Registry.dispatch(Registry.ChainEvents, {event_type, broadcast_type}, fn entries ->
for {pid, _registered_val} <- entries do
send(pid, {:chain_event, event_type, broadcast_type, event_data})
end
end)
end
end

@ -0,0 +1,48 @@
defmodule Explorer.Chain.Events.Subscriber do
@moduledoc """
Subscribes to events related to the Chain context.
"""
@allowed_broadcast_events ~w(addresses address_coin_balances blocks internal_transactions token_transfers transactions)a
@allowed_broadcast_types ~w(catchup realtime)a
@allowed_events ~w(exchange_rate)a
@type broadcast_type :: :realtime | :catchup
@doc """
Subscribes the caller process to a specified subset of chain-related events.
## Handling An Event
A subscribed process should handle an event message. The message is in the
format of a three-element tuple.
* Element 0 - `:chain_event`
* Element 1 - event subscribed to
* Element 2 - event data in list form
# A new block event in a GenServer
def handle_info({:chain_event, :blocks, blocks}, state) do
# Do something with the blocks
end
## Example
iex> Explorer.Chain.Events.Subscriber.to(:blocks, :realtime)
:ok
"""
@spec to(atom(), broadcast_type()) :: :ok
def to(event_type, broadcast_type)
when event_type in @allowed_broadcast_events and broadcast_type in @allowed_broadcast_types do
Registry.register(Registry.ChainEvents, {event_type, broadcast_type}, [])
:ok
end
@spec to(atom()) :: :ok
def to(event_type) when event_type in @allowed_events do
Registry.register(Registry.ChainEvents, event_type, [])
:ok
end
end

@ -4,6 +4,7 @@ defmodule Explorer.Chain.Import do
"""
alias Ecto.Changeset
alias Explorer.Chain.Events.Publisher
alias Explorer.Chain.Import
alias Explorer.Repo
@ -107,7 +108,7 @@ defmodule Explorer.Chain.Import do
## Data Notifications
On successful inserts, processes interested in certain domains of data will be notified
that new data has been inserted. See `Explorer.Chain.subscribe_to_events/1` for more information.
that new data has been inserted. See `Explorer.Chain.Events.Subscriber.to_events/2` for more information.
## Options
@ -122,28 +123,11 @@ defmodule Explorer.Chain.Import do
{:ok, valid_runner_option_pairs} <- validate_runner_options_pairs(runner_options_pairs),
{:ok, runner_to_changes_list} <- runner_to_changes_list(valid_runner_option_pairs),
{:ok, data} <- insert_runner_to_changes_list(runner_to_changes_list, options) do
broadcast_events(data, Map.get(options, :broadcast, false))
Publisher.broadcast(data, Map.get(options, :broadcast, false))
{:ok, data}
end
end
defp broadcast_events(_data, false), do: nil
defp broadcast_events(data, broadcast_type) do
for {event_type, event_data} <- data,
event_type in ~w(addresses address_coin_balances blocks internal_transactions logs token_transfers transactions)a do
broadcast_event_data(event_type, broadcast_type, event_data)
end
end
defp broadcast_event_data(event_type, broadcast_type, event_data) do
Registry.dispatch(Registry.ChainEvents, event_type, fn entries ->
for {pid, _registered_val} <- entries do
send(pid, {:chain_event, event_type, broadcast_type, event_data})
end
end)
end
defp runner_to_changes_list(runner_options_pairs) when is_list(runner_options_pairs) do
runner_options_pairs
|> Stream.map(fn {runner, options} -> runner_changes_list(runner, options) end)

@ -9,6 +9,7 @@ defmodule Explorer.ExchangeRates do
require Logger
alias Explorer.Chain.Events.Publisher
alias Explorer.ExchangeRates.{Source, Token}
@interval :timer.minutes(5)
@ -105,11 +106,7 @@ defmodule Explorer.ExchangeRates do
@spec broadcast_event(atom()) :: :ok
defp broadcast_event(event_type) do
Registry.dispatch(Registry.ChainEvents, event_type, fn entries ->
for {pid, _registered_val} <- entries do
send(pid, {:chain_event, event_type})
end
end)
Publisher.broadcast(event_type)
end
@spec config(atom()) :: term

@ -0,0 +1,65 @@
defmodule Explorer.Chain.Events.PublisherTest do
use ExUnit.Case, async: true
doctest Explorer.Chain.Events.Publisher
alias Explorer.Chain.Events.{Publisher, Subscriber}
describe "broadcast/2" do
test "sends chain_event of realtime type" do
event_type = :blocks
broadcast_type = :realtime
event_data = []
Subscriber.to(event_type, broadcast_type)
Publisher.broadcast([{event_type, event_data}], broadcast_type)
assert_received {:chain_event, ^event_type, ^broadcast_type, []}
end
test "won't send chain_event of catchup type" do
event_type = :blocks
broadcast_type = :catchup
event_data = []
Subscriber.to(event_type, broadcast_type)
Publisher.broadcast([{event_type, event_data}], broadcast_type)
refute_received {:chain_event, ^event_type, ^broadcast_type, []}
end
test "won't send event that is not allowed" do
event_type = :not_allowed
broadcast_type = :catchup
event_data = []
Publisher.broadcast([{event_type, event_data}], broadcast_type)
refute_received {:chain_event, ^event_type, ^broadcast_type, []}
end
test "won't send event of broadcast type not allowed" do
event_type = :blocks
broadcast_type = :something
event_data = []
Publisher.broadcast([{event_type, event_data}], broadcast_type)
refute_received {:chain_event, ^event_type, ^broadcast_type, []}
end
end
describe "broadcast/1" do
test "sends event whithout type of broadcast" do
event_type = :exchange_rate
Subscriber.to(event_type)
Publisher.broadcast(event_type)
assert_received {:chain_event, ^event_type}
end
end
end

@ -0,0 +1,33 @@
defmodule Explorer.Chain.Events.SubscriberTest do
use ExUnit.Case, async: true
doctest Explorer.Chain.Events.Subscriber
alias Explorer.Chain.Events.{Publisher, Subscriber}
describe "to/2" do
test "receives event when there is a type of broadcast" do
event_type = :blocks
broadcast_type = :realtime
event_data = []
Subscriber.to(event_type, broadcast_type)
Publisher.broadcast([{event_type, event_data}], broadcast_type)
assert_received {:chain_event, :blocks, :realtime, []}
end
end
describe "to/1" do
test "receives event when there is not a type of broadcast" do
event_type = :exchange_rate
Subscriber.to(event_type)
Publisher.broadcast(event_type)
assert_received {:chain_event, :exchange_rate}
end
end
end

@ -17,6 +17,8 @@ defmodule Explorer.Chain.ImportTest do
Transaction
}
alias Explorer.Chain.Events.Subscriber
@moduletag :capturelog
doctest Import
@ -451,12 +453,6 @@ defmodule Explorer.Chain.ImportTest do
assert {:ok, %{}} == Import.all(%{})
end
test "publishes data to subscribers on insert" do
Chain.subscribe_to_events(:logs)
Import.all(@import_data)
assert_received {:chain_event, :logs, :realtime, [%Log{}]}
end
test "with invalid data" do
invalid_import_data =
update_in(@import_data, [:internal_transactions, :params, Access.at(0)], &Map.delete(&1, :call_type))
@ -466,39 +462,33 @@ defmodule Explorer.Chain.ImportTest do
end
test "publishes addresses with updated fetched_coin_balance data to subscribers on insert" do
Chain.subscribe_to_events(:addresses)
Subscriber.to(:addresses, :realtime)
Import.all(@import_data)
assert_received {:chain_event, :addresses, :realtime, [%Address{}, %Address{}, %Address{}]}
end
test "publishes block data to subscribers on insert" do
Chain.subscribe_to_events(:blocks)
Subscriber.to(:blocks, :realtime)
Import.all(@import_data)
assert_received {:chain_event, :blocks, :realtime, [%Block{}]}
end
test "publishes internal_transaction data to subscribers on insert" do
Chain.subscribe_to_events(:internal_transactions)
Subscriber.to(:internal_transactions, :realtime)
Import.all(@import_data)
assert_received {:chain_event, :internal_transactions, :realtime,
[%{transaction_hash: _, index: _}, %{transaction_hash: _, index: _}]}
end
test "publishes log data to subscribers on insert" do
Chain.subscribe_to_events(:logs)
Import.all(@import_data)
assert_received {:chain_event, :logs, :realtime, [%Log{}]}
end
test "publishes transaction hashes data to subscribers on insert" do
Chain.subscribe_to_events(:transactions)
Subscriber.to(:transactions, :realtime)
Import.all(@import_data)
assert_received {:chain_event, :transactions, :realtime, [%Hash{}]}
end
test "publishes token_transfers data to subscribers on insert" do
Chain.subscribe_to_events(:token_transfers)
Subscriber.to(:token_transfers, :realtime)
Import.all(@import_data)
@ -508,9 +498,9 @@ defmodule Explorer.Chain.ImportTest do
test "does not broadcast if broadcast option is false" do
non_broadcast_data = Map.merge(@import_data, %{broadcast: false})
Chain.subscribe_to_events(:logs)
Subscriber.to(:addresses, :realtime)
Import.all(non_broadcast_data)
refute_received {:chain_event, :logs, :realtime, [%Log{}]}
refute_received {:chain_event, :addresses, :realtime, [%Address{}]}
end
test "updates address with contract code" do

@ -2906,12 +2906,6 @@ defmodule Explorer.ChainTest do
end
end
test "subscribe_to_events/1" do
assert :ok == Chain.subscribe_to_events(:logs)
current_pid = self()
assert [{^current_pid, _}] = Registry.lookup(Registry.ChainEvents, :logs)
end
describe "token_from_address_hash/1" do
test "with valid hash" do
token = insert(:token)

Loading…
Cancel
Save