add simple sender

pull/2449/head
Ayrat Badykov 5 years ago
parent c010eebabf
commit ad68c42fe0
No known key found for this signature in database
GPG Key ID: B44668E265E9396F
  1. 2
      apps/explorer/config/config.exs
  2. 2
      apps/explorer/config/test.exs
  3. 28
      apps/explorer/lib/explorer/chain/events/db_sender.ex
  4. 12
      apps/explorer/lib/explorer/chain/events/publisher.ex
  5. 12
      apps/explorer/lib/explorer/chain/events/sender.ex
  6. 10
      apps/explorer/lib/explorer/chain/events/sender_mock.ex
  7. 23
      apps/explorer/lib/explorer/chain/events/simple_sender.ex

@ -16,7 +16,7 @@ config :explorer,
include_uncles_in_average_block_time: include_uncles_in_average_block_time:
if(System.get_env("UNCLES_IN_AVERAGE_BLOCK_TIME") == "true", do: true, else: false), if(System.get_env("UNCLES_IN_AVERAGE_BLOCK_TIME") == "true", do: true, else: false),
healthy_blocks_period: System.get_env("HEALTHY_BLOCKS_PERIOD") || :timer.minutes(5), healthy_blocks_period: System.get_env("HEALTHY_BLOCKS_PERIOD") || :timer.minutes(5),
realtime_events_sender: Explorer.Chain.Events.Sender realtime_events_sender: Explorer.Chain.Events.DBSender
average_block_period = average_block_period =
case Integer.parse(System.get_env("AVERAGE_BLOCK_CACHE_PERIOD", "")) do case Integer.parse(System.get_env("AVERAGE_BLOCK_CACHE_PERIOD", "")) do

@ -44,7 +44,7 @@ config :explorer, Explorer.ExchangeRates.Source.TransactionAndLog,
secondary_source: Explorer.ExchangeRates.Source.OneCoinSource secondary_source: Explorer.ExchangeRates.Source.OneCoinSource
config :explorer, config :explorer,
realtime_events_sender: Explorer.Chain.Events.SenderMock realtime_events_sender: Explorer.Chain.Events.SimpleSender
variant = variant =
if is_nil(System.get_env("ETHEREUM_JSONRPC_VARIANT")) do if is_nil(System.get_env("ETHEREUM_JSONRPC_VARIANT")) do

@ -0,0 +1,28 @@
defmodule Explorer.Chain.Events.DBSender do
@moduledoc """
Sends events to Postgres.
"""
alias Explorer.Repo
def send_data(event_type) do
payload = encode_payload({:chain_event, event_type})
send_notify(payload)
end
def send_data(_event_type, :catchup, _event_data), do: :ok
def send_data(event_type, broadcast_type, event_data) do
payload = encode_payload({:chain_event, event_type, broadcast_type, event_data})
send_notify(payload)
end
defp encode_payload(payload) do
payload
|> :erlang.term_to_binary([:compressed])
|> Base.encode64()
end
defp send_notify(payload) do
Repo.query!("select pg_notify('chain_event', $1::text);", [payload])
end
end

@ -21,8 +21,7 @@ defmodule Explorer.Chain.Events.Publisher do
end end
defp send_data(event_type) do defp send_data(event_type) do
payload = encode_payload({:chain_event, event_type}) @sender.send_data(event_type)
@sender.send_notify(payload)
end end
# The :catchup type of event is not being consumed right now. # The :catchup type of event is not being consumed right now.
@ -31,13 +30,6 @@ defmodule Explorer.Chain.Events.Publisher do
defp send_data(_event_type, :catchup, _event_data), do: :ok defp send_data(_event_type, :catchup, _event_data), do: :ok
defp send_data(event_type, broadcast_type, event_data) do defp send_data(event_type, broadcast_type, event_data) do
payload = encode_payload({:chain_event, event_type, broadcast_type, event_data}) @sender.send_data(event_type, broadcast_type, event_data)
@sender.send_notify(payload)
end
defp encode_payload(payload) do
payload
|> :erlang.term_to_binary([:compressed])
|> Base.encode64()
end end
end end

@ -1,12 +0,0 @@
defmodule Explorer.Chain.Events.Sender do
@moduledoc """
Sends events to Postgres.
"""
alias Explorer.Repo
@callback send_notify(String.t()) :: {:ok, any}
def send_notify(payload) do
Repo.query!("select pg_notify('chain_event', $1::text);", [payload])
end
end

@ -1,10 +0,0 @@
defmodule Explorer.Chain.Events.SenderMock do
@moduledoc """
Sends events directly to Listener.
"""
alias Explorer.Chain.Events.Listener
def send_notify(payload) do
send(Listener, {:notification, nil, nil, nil, payload})
end
end

@ -0,0 +1,23 @@
defmodule Explorer.Chain.Events.SimpleSender do
@moduledoc """
Publishes events through Registry without intermediate levels.
"""
def send_data(event_type, broadcast_type, event_data) do
Registry.dispatch(Registry.ChainEvents, {event_type, broadcast_type}, fn entries ->
for {pid, _registered_val} <- entries do
send(pid, {:chain_event, event_type, broadcast_type, event_data})
end
end)
end
def send_data(_event_type, :catchup, _event_data), do: :ok
def send_data(event_type) do
Registry.dispatch(Registry.ChainEvents, event_type, fn entries ->
for {pid, _registered_val} <- entries do
send(pid, {:chain_event, event_type})
end
end)
end
end
Loading…
Cancel
Save