diff --git a/apps/explorer/config/config.exs b/apps/explorer/config/config.exs index fcff7aeea5..97f7391241 100644 --- a/apps/explorer/config/config.exs +++ b/apps/explorer/config/config.exs @@ -16,7 +16,7 @@ config :explorer, include_uncles_in_average_block_time: if(System.get_env("UNCLES_IN_AVERAGE_BLOCK_TIME") == "true", do: true, else: false), healthy_blocks_period: System.get_env("HEALTHY_BLOCKS_PERIOD") || :timer.minutes(5), - realtime_events_sender: Explorer.Chain.Events.Sender + realtime_events_sender: Explorer.Chain.Events.DBSender average_block_period = case Integer.parse(System.get_env("AVERAGE_BLOCK_CACHE_PERIOD", "")) do diff --git a/apps/explorer/config/test.exs b/apps/explorer/config/test.exs index c7061cd198..58050cba2c 100644 --- a/apps/explorer/config/test.exs +++ b/apps/explorer/config/test.exs @@ -44,7 +44,7 @@ config :explorer, Explorer.ExchangeRates.Source.TransactionAndLog, secondary_source: Explorer.ExchangeRates.Source.OneCoinSource config :explorer, - realtime_events_sender: Explorer.Chain.Events.SenderMock + realtime_events_sender: Explorer.Chain.Events.SimpleSender variant = if is_nil(System.get_env("ETHEREUM_JSONRPC_VARIANT")) do diff --git a/apps/explorer/lib/explorer/chain/events/db_sender.ex b/apps/explorer/lib/explorer/chain/events/db_sender.ex new file mode 100644 index 0000000000..0ea1c8eecf --- /dev/null +++ b/apps/explorer/lib/explorer/chain/events/db_sender.ex @@ -0,0 +1,28 @@ +defmodule Explorer.Chain.Events.DBSender do + @moduledoc """ + Sends events to Postgres. + """ + alias Explorer.Repo + + def send_data(event_type) do + payload = encode_payload({:chain_event, event_type}) + send_notify(payload) + end + + def send_data(_event_type, :catchup, _event_data), do: :ok + + def send_data(event_type, broadcast_type, event_data) do + payload = encode_payload({:chain_event, event_type, broadcast_type, event_data}) + send_notify(payload) + end + + defp encode_payload(payload) do + payload + |> :erlang.term_to_binary([:compressed]) + |> Base.encode64() + end + + defp send_notify(payload) do + Repo.query!("select pg_notify('chain_event', $1::text);", [payload]) + end +end diff --git a/apps/explorer/lib/explorer/chain/events/publisher.ex b/apps/explorer/lib/explorer/chain/events/publisher.ex index 11a7dc1518..142baee117 100644 --- a/apps/explorer/lib/explorer/chain/events/publisher.ex +++ b/apps/explorer/lib/explorer/chain/events/publisher.ex @@ -21,8 +21,7 @@ defmodule Explorer.Chain.Events.Publisher do end defp send_data(event_type) do - payload = encode_payload({:chain_event, event_type}) - @sender.send_notify(payload) + @sender.send_data(event_type) end # The :catchup type of event is not being consumed right now. @@ -31,13 +30,6 @@ defmodule Explorer.Chain.Events.Publisher do defp send_data(_event_type, :catchup, _event_data), do: :ok defp send_data(event_type, broadcast_type, event_data) do - payload = encode_payload({:chain_event, event_type, broadcast_type, event_data}) - @sender.send_notify(payload) - end - - defp encode_payload(payload) do - payload - |> :erlang.term_to_binary([:compressed]) - |> Base.encode64() + @sender.send_data(event_type, broadcast_type, event_data) end end diff --git a/apps/explorer/lib/explorer/chain/events/sender.ex b/apps/explorer/lib/explorer/chain/events/sender.ex deleted file mode 100644 index f64728791f..0000000000 --- a/apps/explorer/lib/explorer/chain/events/sender.ex +++ /dev/null @@ -1,12 +0,0 @@ -defmodule Explorer.Chain.Events.Sender do - @moduledoc """ - Sends events to Postgres. - """ - alias Explorer.Repo - - @callback send_notify(String.t()) :: {:ok, any} - - def send_notify(payload) do - Repo.query!("select pg_notify('chain_event', $1::text);", [payload]) - end -end diff --git a/apps/explorer/lib/explorer/chain/events/sender_mock.ex b/apps/explorer/lib/explorer/chain/events/sender_mock.ex deleted file mode 100644 index 44d55ab70d..0000000000 --- a/apps/explorer/lib/explorer/chain/events/sender_mock.ex +++ /dev/null @@ -1,10 +0,0 @@ -defmodule Explorer.Chain.Events.SenderMock do - @moduledoc """ - Sends events directly to Listener. - """ - alias Explorer.Chain.Events.Listener - - def send_notify(payload) do - send(Listener, {:notification, nil, nil, nil, payload}) - end -end diff --git a/apps/explorer/lib/explorer/chain/events/simple_sender.ex b/apps/explorer/lib/explorer/chain/events/simple_sender.ex new file mode 100644 index 0000000000..d72fe9b895 --- /dev/null +++ b/apps/explorer/lib/explorer/chain/events/simple_sender.ex @@ -0,0 +1,23 @@ +defmodule Explorer.Chain.Events.SimpleSender do + @moduledoc """ + Publishes events through Registry without intermediate levels. + """ + + def send_data(event_type, broadcast_type, event_data) do + Registry.dispatch(Registry.ChainEvents, {event_type, broadcast_type}, fn entries -> + for {pid, _registered_val} <- entries do + send(pid, {:chain_event, event_type, broadcast_type, event_data}) + end + end) + end + + def send_data(_event_type, :catchup, _event_data), do: :ok + + def send_data(event_type) do + Registry.dispatch(Registry.ChainEvents, event_type, fn entries -> + for {pid, _registered_val} <- entries do + send(pid, {:chain_event, event_type}) + end + end) + end +end