From e6df337ab7c90fa4476a19cb9982c711df626b91 Mon Sep 17 00:00:00 2001 From: saneery Date: Fri, 26 Jul 2019 15:39:27 +0300 Subject: [PATCH] Send events through postgres notify --- apps/explorer/lib/explorer/application.ex | 3 ++ .../lib/explorer/chain/events/listener.ex | 54 +++++++++++++++++++ .../lib/explorer/chain/events/publisher.ex | 21 ++++---- 3 files changed, 68 insertions(+), 10 deletions(-) create mode 100644 apps/explorer/lib/explorer/chain/events/listener.ex diff --git a/apps/explorer/lib/explorer/application.ex b/apps/explorer/lib/explorer/application.ex index c1fb19f600..c509490525 100644 --- a/apps/explorer/lib/explorer/application.ex +++ b/apps/explorer/lib/explorer/application.ex @@ -16,6 +16,8 @@ defmodule Explorer.Application do Transactions } + alias Explorer.Chain.Events.Listener + alias Explorer.Chain.Supply.RSK alias Explorer.Market.MarketHistoryCache @@ -43,6 +45,7 @@ defmodule Explorer.Application do {Admin.Recovery, [[], [name: Admin.Recovery]]}, {TransactionCount, [[], []]}, {BlockCount, []}, + {Listener, []}, con_cache_child_spec(Blocks.cache_name()), con_cache_child_spec(NetVersion.cache_name()), con_cache_child_spec(MarketHistoryCache.cache_name()), diff --git a/apps/explorer/lib/explorer/chain/events/listener.ex b/apps/explorer/lib/explorer/chain/events/listener.ex new file mode 100644 index 0000000000..8ff076a014 --- /dev/null +++ b/apps/explorer/lib/explorer/chain/events/listener.ex @@ -0,0 +1,54 @@ +defmodule Explorer.Chain.Events.Listener do + @moduledoc """ + Listens and publishes events from PG + """ + + use GenServer + + alias Postgrex.Notifications + + def start_link(_) do + GenServer.start_link(__MODULE__, "chain_event") + end + + def init(channel) do + {:ok, pid} = + :explorer + |> Application.get_env(Explorer.Repo) + |> Notifications.start_link() + + ref = Notifications.listen!(pid, channel) + + {:ok, {pid, ref, channel}} + end + + def handle_info({:notification, _pid, _ref, _topic, payload}, state) do + payload + |> decode_payload!() + |> broadcast() + + {:noreply, state} + end + + defp decode_payload!(payload) do + payload + |> Base.decode64!() + |> :erlang.binary_to_term() + end + + defp broadcast({:chain_event, event_type} = event) do + Registry.dispatch(Registry.ChainEvents, event_type, fn entries -> + for {pid, _registered_val} <- entries do + send(pid, event) + end + end) + end + + defp broadcast({:chain_event, event_type, broadcast_type, _data} = event) do + Registry.dispatch(Registry.ChainEvents, {event_type, broadcast_type}, fn entries -> + for {pid, _registered_val} <- entries do + send(pid, event) + end + end) + end +end diff --git a/apps/explorer/lib/explorer/chain/events/publisher.ex b/apps/explorer/lib/explorer/chain/events/publisher.ex index c7e01eef3a..8bb59fac65 100644 --- a/apps/explorer/lib/explorer/chain/events/publisher.ex +++ b/apps/explorer/lib/explorer/chain/events/publisher.ex @@ -2,6 +2,7 @@ defmodule Explorer.Chain.Events.Publisher do @moduledoc """ Publishes events related to the Chain context. """ + alias Explorer.Repo @allowed_events ~w(addresses address_coin_balances blocks block_rewards internal_transactions token_transfers transactions contract_verification_result)a @@ -19,11 +20,8 @@ defmodule Explorer.Chain.Events.Publisher do end defp send_data(event_type) do - Registry.dispatch(Registry.ChainEvents, event_type, fn entries -> - for {pid, _registered_val} <- entries do - send(pid, {:chain_event, event_type}) - end - end) + payload = encode_payload({:chain_event, event_type}) + Repo.query("NOTIFY chain_event, '#{payload}';") end # The :catchup type of event is not being consumed right now. @@ -32,10 +30,13 @@ defmodule Explorer.Chain.Events.Publisher do defp send_data(_event_type, :catchup, _event_data), do: :ok defp send_data(event_type, broadcast_type, event_data) do - Registry.dispatch(Registry.ChainEvents, {event_type, broadcast_type}, fn entries -> - for {pid, _registered_val} <- entries do - send(pid, {:chain_event, event_type, broadcast_type, event_data}) - end - end) + payload = encode_payload({:chain_event, event_type, broadcast_type, event_data}) + Repo.query("NOTIFY chain_event, '#{payload}';") + end + + defp encode_payload(payload) do + payload + |> :erlang.term_to_binary() + |> Base.encode64() end end