Add BlockScoutWeb.RealtimeEventHandler and Explorer.Chain.Events.Publisher

* Separate EventHandler according to broadcast_type event, so the frontend realtime does not need to ignore the catchup events.
pull/1267/head
Amanda Sposito 6 years ago
parent 8a926c1b05
commit bf46d3ee51
  1. 6
      apps/block_scout_web/lib/block_scout_web/application.ex
  2. 34
      apps/block_scout_web/lib/block_scout_web/event_handler.ex
  3. 18
      apps/block_scout_web/lib/block_scout_web/notifier.ex
  4. 33
      apps/block_scout_web/lib/block_scout_web/realtime_event_handler.ex
  5. 29
      apps/explorer/lib/explorer/chain.ex
  6. 41
      apps/explorer/lib/explorer/chain/events/publisher.ex
  7. 48
      apps/explorer/lib/explorer/chain/events/subscriber.ex
  8. 22
      apps/explorer/lib/explorer/chain/import.ex
  9. 7
      apps/explorer/lib/explorer/exchange_rates/exchange_rates.ex
  10. 65
      apps/explorer/test/explorer/chain/events/publisher_test.exs
  11. 33
      apps/explorer/test/explorer/chain/events/subscriber_test.exs
  12. 28
      apps/explorer/test/explorer/chain/import_test.exs
  13. 6
      apps/explorer/test/explorer/chain_test.exs

@ -5,7 +5,9 @@ defmodule BlockScoutWeb.Application do
use Application use Application
alias BlockScoutWeb.{Endpoint, EventHandler, Prometheus} alias BlockScoutWeb.Counters.BlocksIndexedCounter
alias BlockScoutWeb.{Endpoint, Prometheus}
alias BlockScoutWeb.RealtimeEventHandler
def start(_type, _args) do def start(_type, _args) do
import Supervisor.Spec import Supervisor.Spec
@ -18,7 +20,7 @@ defmodule BlockScoutWeb.Application do
# Start the endpoint when the application starts # Start the endpoint when the application starts
supervisor(Endpoint, []), supervisor(Endpoint, []),
supervisor(Absinthe.Subscription, [Endpoint]), supervisor(Absinthe.Subscription, [Endpoint]),
{EventHandler, name: EventHandler} {RealtimeEventHandler, name: RealtimeEventHandler}
] ]
opts = [strategy: :one_for_one, name: BlockScoutWeb.Supervisor] opts = [strategy: :one_for_one, name: BlockScoutWeb.Supervisor]

@ -1,34 +0,0 @@
defmodule BlockScoutWeb.EventHandler do
@moduledoc """
Subscribing process for broadcast events from Chain context.
"""
use GenServer
alias BlockScoutWeb.Notifier
alias Explorer.Chain
# Client
def start_link(_) do
GenServer.start_link(__MODULE__, [], name: __MODULE__)
end
# Server
def init([]) do
Chain.subscribe_to_events(:addresses)
Chain.subscribe_to_events(:address_coin_balances)
Chain.subscribe_to_events(:blocks)
Chain.subscribe_to_events(:exchange_rate)
Chain.subscribe_to_events(:internal_transactions)
Chain.subscribe_to_events(:transactions)
Chain.subscribe_to_events(:token_transfers)
{:ok, []}
end
def handle_info(event, state) do
Notifier.handle_event(event)
{:noreply, state}
end
end

@ -1,6 +1,6 @@
defmodule BlockScoutWeb.Notifier do defmodule BlockScoutWeb.Notifier do
@moduledoc """ @moduledoc """
Responds to events from EventHandler by sending appropriate channel updates to front-end. Responds to events by sending appropriate channel updates to front-end.
""" """
alias Absinthe.Subscription alias Absinthe.Subscription
@ -21,22 +21,6 @@ defmodule BlockScoutWeb.Notifier do
Enum.each(address_coin_balances, &broadcast_address_coin_balance/1) Enum.each(address_coin_balances, &broadcast_address_coin_balance/1)
end end
def handle_event({:chain_event, :blocks, :catchup, _blocks}) do
ratio = Chain.indexed_ratio()
finished? =
if ratio < 1 do
false
else
Chain.finished_indexing?()
end
Endpoint.broadcast("blocks:indexing", "index_status", %{
ratio: ratio,
finished: finished?
})
end
def handle_event({:chain_event, :blocks, :realtime, blocks}) do def handle_event({:chain_event, :blocks, :realtime, blocks}) do
Enum.each(blocks, &broadcast_block/1) Enum.each(blocks, &broadcast_block/1)
end end

@ -0,0 +1,33 @@
defmodule BlockScoutWeb.RealtimeEventHandler do
@moduledoc """
Subscribing process for broadcast events from realtime.
"""
use GenServer
alias BlockScoutWeb.Notifier
alias Explorer.Chain.Events.Subscriber
def start_link(_) do
GenServer.start_link(__MODULE__, [], name: __MODULE__)
end
@impl true
def init([]) do
Subscriber.to(:addresses, :realtime)
Subscriber.to(:address_coin_balances, :realtime)
Subscriber.to(:blocks, :realtime)
Subscriber.to(:internal_transactions, :realtime)
Subscriber.to(:transactions, :realtime)
Subscriber.to(:token_transfers, :realtime)
# Does not come from the indexer
Subscriber.to(:exchange_rate)
{:ok, []}
end
@impl true
def handle_info(event, state) do
Notifier.handle_event(event)
{:noreply, state}
end
end

@ -1520,35 +1520,6 @@ defmodule Explorer.Chain do
Hash.Full.cast(string) Hash.Full.cast(string)
end end
@doc """
Subscribes the caller process to a specified subset of chain-related events.
## Handling An Event
A subscribed process should handle an event message. The message is in the
format of a three-element tuple.
* Element 0 - `:chain_event`
* Element 1 - event subscribed to
* Element 2 - event data in list form
# A new block event in a GenServer
def handle_info({:chain_event, :blocks, blocks}, state) do
# Do something with the blocks
end
## Example
iex> Explorer.Chain.subscribe_to_events(:blocks)
:ok
"""
@spec subscribe_to_events(chain_event()) :: :ok
def subscribe_to_events(event_type)
when event_type in ~w(addresses address_coin_balances blocks exchange_rate internal_transactions logs token_transfers transactions)a do
Registry.register(Registry.ChainEvents, event_type, [])
:ok
end
@doc """ @doc """
Estimated count of `t:Explorer.Chain.Transaction.t/0`. Estimated count of `t:Explorer.Chain.Transaction.t/0`.

@ -0,0 +1,41 @@
defmodule Explorer.Chain.Events.Publisher do
@moduledoc """
Publishes events related to the Chain context.
"""
@allowed_events ~w(addresses address_coin_balances blocks internal_transactions token_transfers transactions)a
def broadcast(_data, false), do: :ok
def broadcast(data, broadcast_type) do
for {event_type, event_data} <- data, event_type in @allowed_events do
send_data(event_type, broadcast_type, event_data)
end
end
@spec broadcast(atom()) :: :ok
def broadcast(event_type) do
send_data(event_type)
end
defp send_data(event_type) do
Registry.dispatch(Registry.ChainEvents, event_type, fn entries ->
for {pid, _registered_val} <- entries do
send(pid, {:chain_event, event_type})
end
end)
end
# The :catchup type of event is not being consumed right now.
# To avoid a large number of unread messages in the `mailbox` the dispatch of
# these type of events is disabled for now.
defp send_data(_event_type, :catchup, _event_data), do: :ok
defp send_data(event_type, broadcast_type, event_data) do
Registry.dispatch(Registry.ChainEvents, {event_type, broadcast_type}, fn entries ->
for {pid, _registered_val} <- entries do
send(pid, {:chain_event, event_type, broadcast_type, event_data})
end
end)
end
end

@ -0,0 +1,48 @@
defmodule Explorer.Chain.Events.Subscriber do
@moduledoc """
Subscribes to events related to the Chain context.
"""
@allowed_broadcast_events ~w(addresses address_coin_balances blocks internal_transactions token_transfers transactions)a
@allowed_broadcast_types ~w(catchup realtime)a
@allowed_events ~w(exchange_rate)a
@type broadcast_type :: :realtime | :catchup
@doc """
Subscribes the caller process to a specified subset of chain-related events.
## Handling An Event
A subscribed process should handle an event message. The message is in the
format of a three-element tuple.
* Element 0 - `:chain_event`
* Element 1 - event subscribed to
* Element 2 - event data in list form
# A new block event in a GenServer
def handle_info({:chain_event, :blocks, blocks}, state) do
# Do something with the blocks
end
## Example
iex> Explorer.Chain.Events.Subscriber.to(:blocks, :realtime)
:ok
"""
@spec to(atom(), broadcast_type()) :: :ok
def to(event_type, broadcast_type)
when event_type in @allowed_broadcast_events and broadcast_type in @allowed_broadcast_types do
Registry.register(Registry.ChainEvents, {event_type, broadcast_type}, [])
:ok
end
@spec to(atom()) :: :ok
def to(event_type) when event_type in @allowed_events do
Registry.register(Registry.ChainEvents, event_type, [])
:ok
end
end

@ -4,6 +4,7 @@ defmodule Explorer.Chain.Import do
""" """
alias Ecto.Changeset alias Ecto.Changeset
alias Explorer.Chain.Events.Publisher
alias Explorer.Chain.Import alias Explorer.Chain.Import
alias Explorer.Repo alias Explorer.Repo
@ -107,7 +108,7 @@ defmodule Explorer.Chain.Import do
## Data Notifications ## Data Notifications
On successful inserts, processes interested in certain domains of data will be notified On successful inserts, processes interested in certain domains of data will be notified
that new data has been inserted. See `Explorer.Chain.subscribe_to_events/1` for more information. that new data has been inserted. See `Explorer.Chain.Events.Subscriber.to_events/2` for more information.
## Options ## Options
@ -122,28 +123,11 @@ defmodule Explorer.Chain.Import do
{:ok, valid_runner_option_pairs} <- validate_runner_options_pairs(runner_options_pairs), {:ok, valid_runner_option_pairs} <- validate_runner_options_pairs(runner_options_pairs),
{:ok, runner_to_changes_list} <- runner_to_changes_list(valid_runner_option_pairs), {:ok, runner_to_changes_list} <- runner_to_changes_list(valid_runner_option_pairs),
{:ok, data} <- insert_runner_to_changes_list(runner_to_changes_list, options) do {:ok, data} <- insert_runner_to_changes_list(runner_to_changes_list, options) do
broadcast_events(data, Map.get(options, :broadcast, false)) Publisher.broadcast(data, Map.get(options, :broadcast, false))
{:ok, data} {:ok, data}
end end
end end
defp broadcast_events(_data, false), do: nil
defp broadcast_events(data, broadcast_type) do
for {event_type, event_data} <- data,
event_type in ~w(addresses address_coin_balances blocks internal_transactions logs token_transfers transactions)a do
broadcast_event_data(event_type, broadcast_type, event_data)
end
end
defp broadcast_event_data(event_type, broadcast_type, event_data) do
Registry.dispatch(Registry.ChainEvents, event_type, fn entries ->
for {pid, _registered_val} <- entries do
send(pid, {:chain_event, event_type, broadcast_type, event_data})
end
end)
end
defp runner_to_changes_list(runner_options_pairs) when is_list(runner_options_pairs) do defp runner_to_changes_list(runner_options_pairs) when is_list(runner_options_pairs) do
runner_options_pairs runner_options_pairs
|> Stream.map(fn {runner, options} -> runner_changes_list(runner, options) end) |> Stream.map(fn {runner, options} -> runner_changes_list(runner, options) end)

@ -9,6 +9,7 @@ defmodule Explorer.ExchangeRates do
require Logger require Logger
alias Explorer.Chain.Events.Publisher
alias Explorer.ExchangeRates.{Source, Token} alias Explorer.ExchangeRates.{Source, Token}
@interval :timer.minutes(5) @interval :timer.minutes(5)
@ -105,11 +106,7 @@ defmodule Explorer.ExchangeRates do
@spec broadcast_event(atom()) :: :ok @spec broadcast_event(atom()) :: :ok
defp broadcast_event(event_type) do defp broadcast_event(event_type) do
Registry.dispatch(Registry.ChainEvents, event_type, fn entries -> Publisher.broadcast(event_type)
for {pid, _registered_val} <- entries do
send(pid, {:chain_event, event_type})
end
end)
end end
@spec config(atom()) :: term @spec config(atom()) :: term

@ -0,0 +1,65 @@
defmodule Explorer.Chain.Events.PublisherTest do
use ExUnit.Case, async: true
doctest Explorer.Chain.Events.Publisher
alias Explorer.Chain.Events.{Publisher, Subscriber}
describe "broadcast/2" do
test "sends chain_event of realtime type" do
event_type = :blocks
broadcast_type = :realtime
event_data = []
Subscriber.to(event_type, broadcast_type)
Publisher.broadcast([{event_type, event_data}], broadcast_type)
assert_received {:chain_event, ^event_type, ^broadcast_type, []}
end
test "won't send chain_event of catchup type" do
event_type = :blocks
broadcast_type = :catchup
event_data = []
Subscriber.to(event_type, broadcast_type)
Publisher.broadcast([{event_type, event_data}], broadcast_type)
refute_received {:chain_event, ^event_type, ^broadcast_type, []}
end
test "won't send event that is not allowed" do
event_type = :not_allowed
broadcast_type = :catchup
event_data = []
Publisher.broadcast([{event_type, event_data}], broadcast_type)
refute_received {:chain_event, ^event_type, ^broadcast_type, []}
end
test "won't send event of broadcast type not allowed" do
event_type = :blocks
broadcast_type = :something
event_data = []
Publisher.broadcast([{event_type, event_data}], broadcast_type)
refute_received {:chain_event, ^event_type, ^broadcast_type, []}
end
end
describe "broadcast/1" do
test "sends event whithout type of broadcast" do
event_type = :exchange_rate
Subscriber.to(event_type)
Publisher.broadcast(event_type)
assert_received {:chain_event, ^event_type}
end
end
end

@ -0,0 +1,33 @@
defmodule Explorer.Chain.Events.SubscriberTest do
use ExUnit.Case, async: true
doctest Explorer.Chain.Events.Subscriber
alias Explorer.Chain.Events.{Publisher, Subscriber}
describe "to/2" do
test "receives event when there is a type of broadcast" do
event_type = :blocks
broadcast_type = :realtime
event_data = []
Subscriber.to(event_type, broadcast_type)
Publisher.broadcast([{event_type, event_data}], broadcast_type)
assert_received {:chain_event, :blocks, :realtime, []}
end
end
describe "to/1" do
test "receives event when there is not a type of broadcast" do
event_type = :exchange_rate
Subscriber.to(event_type)
Publisher.broadcast(event_type)
assert_received {:chain_event, :exchange_rate}
end
end
end

@ -17,6 +17,8 @@ defmodule Explorer.Chain.ImportTest do
Transaction Transaction
} }
alias Explorer.Chain.Events.Subscriber
@moduletag :capturelog @moduletag :capturelog
doctest Import doctest Import
@ -451,12 +453,6 @@ defmodule Explorer.Chain.ImportTest do
assert {:ok, %{}} == Import.all(%{}) assert {:ok, %{}} == Import.all(%{})
end end
test "publishes data to subscribers on insert" do
Chain.subscribe_to_events(:logs)
Import.all(@import_data)
assert_received {:chain_event, :logs, :realtime, [%Log{}]}
end
test "with invalid data" do test "with invalid data" do
invalid_import_data = invalid_import_data =
update_in(@import_data, [:internal_transactions, :params, Access.at(0)], &Map.delete(&1, :call_type)) update_in(@import_data, [:internal_transactions, :params, Access.at(0)], &Map.delete(&1, :call_type))
@ -466,39 +462,33 @@ defmodule Explorer.Chain.ImportTest do
end end
test "publishes addresses with updated fetched_coin_balance data to subscribers on insert" do test "publishes addresses with updated fetched_coin_balance data to subscribers on insert" do
Chain.subscribe_to_events(:addresses) Subscriber.to(:addresses, :realtime)
Import.all(@import_data) Import.all(@import_data)
assert_received {:chain_event, :addresses, :realtime, [%Address{}, %Address{}, %Address{}]} assert_received {:chain_event, :addresses, :realtime, [%Address{}, %Address{}, %Address{}]}
end end
test "publishes block data to subscribers on insert" do test "publishes block data to subscribers on insert" do
Chain.subscribe_to_events(:blocks) Subscriber.to(:blocks, :realtime)
Import.all(@import_data) Import.all(@import_data)
assert_received {:chain_event, :blocks, :realtime, [%Block{}]} assert_received {:chain_event, :blocks, :realtime, [%Block{}]}
end end
test "publishes internal_transaction data to subscribers on insert" do test "publishes internal_transaction data to subscribers on insert" do
Chain.subscribe_to_events(:internal_transactions) Subscriber.to(:internal_transactions, :realtime)
Import.all(@import_data) Import.all(@import_data)
assert_received {:chain_event, :internal_transactions, :realtime, assert_received {:chain_event, :internal_transactions, :realtime,
[%{transaction_hash: _, index: _}, %{transaction_hash: _, index: _}]} [%{transaction_hash: _, index: _}, %{transaction_hash: _, index: _}]}
end end
test "publishes log data to subscribers on insert" do
Chain.subscribe_to_events(:logs)
Import.all(@import_data)
assert_received {:chain_event, :logs, :realtime, [%Log{}]}
end
test "publishes transaction hashes data to subscribers on insert" do test "publishes transaction hashes data to subscribers on insert" do
Chain.subscribe_to_events(:transactions) Subscriber.to(:transactions, :realtime)
Import.all(@import_data) Import.all(@import_data)
assert_received {:chain_event, :transactions, :realtime, [%Hash{}]} assert_received {:chain_event, :transactions, :realtime, [%Hash{}]}
end end
test "publishes token_transfers data to subscribers on insert" do test "publishes token_transfers data to subscribers on insert" do
Chain.subscribe_to_events(:token_transfers) Subscriber.to(:token_transfers, :realtime)
Import.all(@import_data) Import.all(@import_data)
@ -508,9 +498,9 @@ defmodule Explorer.Chain.ImportTest do
test "does not broadcast if broadcast option is false" do test "does not broadcast if broadcast option is false" do
non_broadcast_data = Map.merge(@import_data, %{broadcast: false}) non_broadcast_data = Map.merge(@import_data, %{broadcast: false})
Chain.subscribe_to_events(:logs) Subscriber.to(:addresses, :realtime)
Import.all(non_broadcast_data) Import.all(non_broadcast_data)
refute_received {:chain_event, :logs, :realtime, [%Log{}]} refute_received {:chain_event, :addresses, :realtime, [%Address{}]}
end end
test "updates address with contract code" do test "updates address with contract code" do

@ -2906,12 +2906,6 @@ defmodule Explorer.ChainTest do
end end
end end
test "subscribe_to_events/1" do
assert :ok == Chain.subscribe_to_events(:logs)
current_pid = self()
assert [{^current_pid, _}] = Registry.lookup(Registry.ChainEvents, :logs)
end
describe "token_from_address_hash/1" do describe "token_from_address_hash/1" do
test "with valid hash" do test "with valid hash" do
token = insert(:token) token = insert(:token)

Loading…
Cancel
Save