Send events through postgres notify

pull/2449/head
saneery 5 years ago
parent 185d8bed52
commit e6df337ab7
  1. 3
      apps/explorer/lib/explorer/application.ex
  2. 54
      apps/explorer/lib/explorer/chain/events/listener.ex
  3. 19
      apps/explorer/lib/explorer/chain/events/publisher.ex

@ -16,6 +16,8 @@ defmodule Explorer.Application do
Transactions Transactions
} }
alias Explorer.Chain.Events.Listener
alias Explorer.Chain.Supply.RSK alias Explorer.Chain.Supply.RSK
alias Explorer.Market.MarketHistoryCache alias Explorer.Market.MarketHistoryCache
@ -43,6 +45,7 @@ defmodule Explorer.Application do
{Admin.Recovery, [[], [name: Admin.Recovery]]}, {Admin.Recovery, [[], [name: Admin.Recovery]]},
{TransactionCount, [[], []]}, {TransactionCount, [[], []]},
{BlockCount, []}, {BlockCount, []},
{Listener, []},
con_cache_child_spec(Blocks.cache_name()), con_cache_child_spec(Blocks.cache_name()),
con_cache_child_spec(NetVersion.cache_name()), con_cache_child_spec(NetVersion.cache_name()),
con_cache_child_spec(MarketHistoryCache.cache_name()), con_cache_child_spec(MarketHistoryCache.cache_name()),

@ -0,0 +1,54 @@
defmodule Explorer.Chain.Events.Listener do
@moduledoc """
Listens and publishes events from PG
"""
use GenServer
alias Postgrex.Notifications
def start_link(_) do
GenServer.start_link(__MODULE__, "chain_event")
end
def init(channel) do
{:ok, pid} =
:explorer
|> Application.get_env(Explorer.Repo)
|> Notifications.start_link()
ref = Notifications.listen!(pid, channel)
{:ok, {pid, ref, channel}}
end
def handle_info({:notification, _pid, _ref, _topic, payload}, state) do
payload
|> decode_payload!()
|> broadcast()
{:noreply, state}
end
defp decode_payload!(payload) do
payload
|> Base.decode64!()
|> :erlang.binary_to_term()
end
defp broadcast({:chain_event, event_type} = event) do
Registry.dispatch(Registry.ChainEvents, event_type, fn entries ->
for {pid, _registered_val} <- entries do
send(pid, event)
end
end)
end
defp broadcast({:chain_event, event_type, broadcast_type, _data} = event) do
Registry.dispatch(Registry.ChainEvents, {event_type, broadcast_type}, fn entries ->
for {pid, _registered_val} <- entries do
send(pid, event)
end
end)
end
end

@ -2,6 +2,7 @@ defmodule Explorer.Chain.Events.Publisher do
@moduledoc """ @moduledoc """
Publishes events related to the Chain context. Publishes events related to the Chain context.
""" """
alias Explorer.Repo
@allowed_events ~w(addresses address_coin_balances blocks block_rewards internal_transactions token_transfers transactions contract_verification_result)a @allowed_events ~w(addresses address_coin_balances blocks block_rewards internal_transactions token_transfers transactions contract_verification_result)a
@ -19,11 +20,8 @@ defmodule Explorer.Chain.Events.Publisher do
end end
defp send_data(event_type) do defp send_data(event_type) do
Registry.dispatch(Registry.ChainEvents, event_type, fn entries -> payload = encode_payload({:chain_event, event_type})
for {pid, _registered_val} <- entries do Repo.query("NOTIFY chain_event, '#{payload}';")
send(pid, {:chain_event, event_type})
end
end)
end end
# The :catchup type of event is not being consumed right now. # The :catchup type of event is not being consumed right now.
@ -32,10 +30,13 @@ defmodule Explorer.Chain.Events.Publisher do
defp send_data(_event_type, :catchup, _event_data), do: :ok defp send_data(_event_type, :catchup, _event_data), do: :ok
defp send_data(event_type, broadcast_type, event_data) do defp send_data(event_type, broadcast_type, event_data) do
Registry.dispatch(Registry.ChainEvents, {event_type, broadcast_type}, fn entries -> payload = encode_payload({:chain_event, event_type, broadcast_type, event_data})
for {pid, _registered_val} <- entries do Repo.query("NOTIFY chain_event, '#{payload}';")
send(pid, {:chain_event, event_type, broadcast_type, event_data})
end end
end)
defp encode_payload(payload) do
payload
|> :erlang.term_to_binary()
|> Base.encode64()
end end
end end

Loading…
Cancel
Save