diff --git a/apps/explorer/lib/explorer/chain/events/listener.ex b/apps/explorer/lib/explorer/chain/events/listener.ex index 8ff076a014..40a4131e84 100644 --- a/apps/explorer/lib/explorer/chain/events/listener.ex +++ b/apps/explorer/lib/explorer/chain/events/listener.ex @@ -30,6 +30,7 @@ defmodule Explorer.Chain.Events.Listener do {:noreply, state} end + # sobelow_skip ["Misc.BinToTerm"] defp decode_payload!(payload) do payload |> Base.decode64!() diff --git a/apps/explorer/lib/explorer/chain/events/publisher.ex b/apps/explorer/lib/explorer/chain/events/publisher.ex index 8bb59fac65..31374e19f0 100644 --- a/apps/explorer/lib/explorer/chain/events/publisher.ex +++ b/apps/explorer/lib/explorer/chain/events/publisher.ex @@ -2,6 +2,7 @@ defmodule Explorer.Chain.Events.Publisher do @moduledoc """ Publishes events related to the Chain context. """ + alias Ecto.Adapters.SQL.Sandbox alias Explorer.Repo @allowed_events ~w(addresses address_coin_balances blocks block_rewards internal_transactions token_transfers transactions contract_verification_result)a @@ -17,11 +18,12 @@ defmodule Explorer.Chain.Events.Publisher do @spec broadcast(atom()) :: :ok def broadcast(event_type) do send_data(event_type) + :ok end defp send_data(event_type) do payload = encode_payload({:chain_event, event_type}) - Repo.query("NOTIFY chain_event, '#{payload}';") + send_notify(payload) end # The :catchup type of event is not being consumed right now. @@ -31,12 +33,24 @@ defmodule Explorer.Chain.Events.Publisher do defp send_data(event_type, broadcast_type, event_data) do payload = encode_payload({:chain_event, event_type, broadcast_type, event_data}) - Repo.query("NOTIFY chain_event, '#{payload}';") + send_notify(payload) end defp encode_payload(payload) do payload - |> :erlang.term_to_binary() + |> :erlang.term_to_binary([:compressed]) |> Base.encode64() end + + defp send_notify(payload) do + fun = fn -> + Repo.query("select pg_notify('chain_event', $1::text);", [payload]) + end + + if Mix.env() == :test do + Sandbox.unboxed_run(Repo, fun) + else + fun.() + end + end end diff --git a/apps/explorer/test/explorer/chain/events/publisher_test.exs b/apps/explorer/test/explorer/chain/events/publisher_test.exs index ef76126086..e0934a53dd 100644 --- a/apps/explorer/test/explorer/chain/events/publisher_test.exs +++ b/apps/explorer/test/explorer/chain/events/publisher_test.exs @@ -15,7 +15,7 @@ defmodule Explorer.Chain.Events.PublisherTest do Publisher.broadcast([{event_type, event_data}], broadcast_type) - assert_received {:chain_event, ^event_type, ^broadcast_type, []} + assert_receive {:chain_event, ^event_type, ^broadcast_type, []} end test "won't send chain_event of catchup type" do @@ -27,7 +27,7 @@ defmodule Explorer.Chain.Events.PublisherTest do Publisher.broadcast([{event_type, event_data}], broadcast_type) - refute_received {:chain_event, ^event_type, ^broadcast_type, []} + refute_receive {:chain_event, ^event_type, ^broadcast_type, []} end test "won't send event that is not allowed" do @@ -59,7 +59,7 @@ defmodule Explorer.Chain.Events.PublisherTest do Publisher.broadcast(event_type) - assert_received {:chain_event, ^event_type} + assert_receive {:chain_event, ^event_type} end end end diff --git a/apps/explorer/test/explorer/chain/events/subscriber_test.exs b/apps/explorer/test/explorer/chain/events/subscriber_test.exs index 531efc24b2..b26841d0ef 100644 --- a/apps/explorer/test/explorer/chain/events/subscriber_test.exs +++ b/apps/explorer/test/explorer/chain/events/subscriber_test.exs @@ -15,7 +15,7 @@ defmodule Explorer.Chain.Events.SubscriberTest do Publisher.broadcast([{event_type, event_data}], broadcast_type) - assert_received {:chain_event, :blocks, :realtime, []} + assert_receive {:chain_event, :blocks, :realtime, []} end end @@ -27,7 +27,7 @@ defmodule Explorer.Chain.Events.SubscriberTest do Publisher.broadcast(event_type) - assert_received {:chain_event, :exchange_rate} + assert_receive {:chain_event, :exchange_rate} end end end diff --git a/apps/explorer/test/explorer/chain/import_test.exs b/apps/explorer/test/explorer/chain/import_test.exs index dd3bbf1aec..0c559bdfec 100644 --- a/apps/explorer/test/explorer/chain/import_test.exs +++ b/apps/explorer/test/explorer/chain/import_test.exs @@ -470,27 +470,27 @@ defmodule Explorer.Chain.ImportTest do test "publishes addresses with updated fetched_coin_balance data to subscribers on insert" do Subscriber.to(:addresses, :realtime) Import.all(@import_data) - assert_received {:chain_event, :addresses, :realtime, [%Address{}, %Address{}, %Address{}]} + assert_receive {:chain_event, :addresses, :realtime, [%Address{}, %Address{}, %Address{}]} end test "publishes block data to subscribers on insert" do Subscriber.to(:blocks, :realtime) Import.all(@import_data) - assert_received {:chain_event, :blocks, :realtime, [%Block{}]} + assert_receive {:chain_event, :blocks, :realtime, [%Block{}]} end test "publishes internal_transaction data to subscribers on insert" do Subscriber.to(:internal_transactions, :realtime) Import.all(@import_data) - assert_received {:chain_event, :internal_transactions, :realtime, - [%{transaction_hash: _, index: _}, %{transaction_hash: _, index: _}]} + assert_receive {:chain_event, :internal_transactions, :realtime, + [%{transaction_hash: _, index: _}, %{transaction_hash: _, index: _}]} end test "publishes transactions data to subscribers on insert" do Subscriber.to(:transactions, :realtime) Import.all(@import_data) - assert_received {:chain_event, :transactions, :realtime, [%Transaction{}]} + assert_receive {:chain_event, :transactions, :realtime, [%Transaction{}]} end test "publishes token_transfers data to subscribers on insert" do @@ -498,7 +498,7 @@ defmodule Explorer.Chain.ImportTest do Import.all(@import_data) - assert_received {:chain_event, :token_transfers, :realtime, [%TokenTransfer{}]} + assert_receive {:chain_event, :token_transfers, :realtime, [%TokenTransfer{}]} end test "does not broadcast if broadcast option is false" do