Update tests to receive realtime events

pull/2449/head
saneery 5 years ago
parent 51f6a68886
commit 19d84abd40
  1. 1
      apps/explorer/lib/explorer/chain/events/listener.ex
  2. 20
      apps/explorer/lib/explorer/chain/events/publisher.ex
  3. 6
      apps/explorer/test/explorer/chain/events/publisher_test.exs
  4. 4
      apps/explorer/test/explorer/chain/events/subscriber_test.exs
  5. 10
      apps/explorer/test/explorer/chain/import_test.exs

@ -30,6 +30,7 @@ defmodule Explorer.Chain.Events.Listener do
{:noreply, state} {:noreply, state}
end end
# sobelow_skip ["Misc.BinToTerm"]
defp decode_payload!(payload) do defp decode_payload!(payload) do
payload payload
|> Base.decode64!() |> Base.decode64!()

@ -2,6 +2,7 @@ defmodule Explorer.Chain.Events.Publisher do
@moduledoc """ @moduledoc """
Publishes events related to the Chain context. Publishes events related to the Chain context.
""" """
alias Ecto.Adapters.SQL.Sandbox
alias Explorer.Repo alias Explorer.Repo
@allowed_events ~w(addresses address_coin_balances blocks block_rewards internal_transactions token_transfers transactions contract_verification_result)a @allowed_events ~w(addresses address_coin_balances blocks block_rewards internal_transactions token_transfers transactions contract_verification_result)a
@ -17,11 +18,12 @@ defmodule Explorer.Chain.Events.Publisher do
@spec broadcast(atom()) :: :ok @spec broadcast(atom()) :: :ok
def broadcast(event_type) do def broadcast(event_type) do
send_data(event_type) send_data(event_type)
:ok
end end
defp send_data(event_type) do defp send_data(event_type) do
payload = encode_payload({:chain_event, event_type}) payload = encode_payload({:chain_event, event_type})
Repo.query("NOTIFY chain_event, '#{payload}';") send_notify(payload)
end end
# The :catchup type of event is not being consumed right now. # The :catchup type of event is not being consumed right now.
@ -31,12 +33,24 @@ defmodule Explorer.Chain.Events.Publisher do
defp send_data(event_type, broadcast_type, event_data) do defp send_data(event_type, broadcast_type, event_data) do
payload = encode_payload({:chain_event, event_type, broadcast_type, event_data}) payload = encode_payload({:chain_event, event_type, broadcast_type, event_data})
Repo.query("NOTIFY chain_event, '#{payload}';") send_notify(payload)
end end
defp encode_payload(payload) do defp encode_payload(payload) do
payload payload
|> :erlang.term_to_binary() |> :erlang.term_to_binary([:compressed])
|> Base.encode64() |> Base.encode64()
end end
defp send_notify(payload) do
fun = fn ->
Repo.query("select pg_notify('chain_event', $1::text);", [payload])
end
if Mix.env() == :test do
Sandbox.unboxed_run(Repo, fun)
else
fun.()
end
end
end end

@ -15,7 +15,7 @@ defmodule Explorer.Chain.Events.PublisherTest do
Publisher.broadcast([{event_type, event_data}], broadcast_type) Publisher.broadcast([{event_type, event_data}], broadcast_type)
assert_received {:chain_event, ^event_type, ^broadcast_type, []} assert_receive {:chain_event, ^event_type, ^broadcast_type, []}
end end
test "won't send chain_event of catchup type" do test "won't send chain_event of catchup type" do
@ -27,7 +27,7 @@ defmodule Explorer.Chain.Events.PublisherTest do
Publisher.broadcast([{event_type, event_data}], broadcast_type) Publisher.broadcast([{event_type, event_data}], broadcast_type)
refute_received {:chain_event, ^event_type, ^broadcast_type, []} refute_receive {:chain_event, ^event_type, ^broadcast_type, []}
end end
test "won't send event that is not allowed" do test "won't send event that is not allowed" do
@ -59,7 +59,7 @@ defmodule Explorer.Chain.Events.PublisherTest do
Publisher.broadcast(event_type) Publisher.broadcast(event_type)
assert_received {:chain_event, ^event_type} assert_receive {:chain_event, ^event_type}
end end
end end
end end

@ -15,7 +15,7 @@ defmodule Explorer.Chain.Events.SubscriberTest do
Publisher.broadcast([{event_type, event_data}], broadcast_type) Publisher.broadcast([{event_type, event_data}], broadcast_type)
assert_received {:chain_event, :blocks, :realtime, []} assert_receive {:chain_event, :blocks, :realtime, []}
end end
end end
@ -27,7 +27,7 @@ defmodule Explorer.Chain.Events.SubscriberTest do
Publisher.broadcast(event_type) Publisher.broadcast(event_type)
assert_received {:chain_event, :exchange_rate} assert_receive {:chain_event, :exchange_rate}
end end
end end
end end

@ -470,27 +470,27 @@ defmodule Explorer.Chain.ImportTest do
test "publishes addresses with updated fetched_coin_balance data to subscribers on insert" do test "publishes addresses with updated fetched_coin_balance data to subscribers on insert" do
Subscriber.to(:addresses, :realtime) Subscriber.to(:addresses, :realtime)
Import.all(@import_data) Import.all(@import_data)
assert_received {:chain_event, :addresses, :realtime, [%Address{}, %Address{}, %Address{}]} assert_receive {:chain_event, :addresses, :realtime, [%Address{}, %Address{}, %Address{}]}
end end
test "publishes block data to subscribers on insert" do test "publishes block data to subscribers on insert" do
Subscriber.to(:blocks, :realtime) Subscriber.to(:blocks, :realtime)
Import.all(@import_data) Import.all(@import_data)
assert_received {:chain_event, :blocks, :realtime, [%Block{}]} assert_receive {:chain_event, :blocks, :realtime, [%Block{}]}
end end
test "publishes internal_transaction data to subscribers on insert" do test "publishes internal_transaction data to subscribers on insert" do
Subscriber.to(:internal_transactions, :realtime) Subscriber.to(:internal_transactions, :realtime)
Import.all(@import_data) Import.all(@import_data)
assert_received {:chain_event, :internal_transactions, :realtime, assert_receive {:chain_event, :internal_transactions, :realtime,
[%{transaction_hash: _, index: _}, %{transaction_hash: _, index: _}]} [%{transaction_hash: _, index: _}, %{transaction_hash: _, index: _}]}
end end
test "publishes transactions data to subscribers on insert" do test "publishes transactions data to subscribers on insert" do
Subscriber.to(:transactions, :realtime) Subscriber.to(:transactions, :realtime)
Import.all(@import_data) Import.all(@import_data)
assert_received {:chain_event, :transactions, :realtime, [%Transaction{}]} assert_receive {:chain_event, :transactions, :realtime, [%Transaction{}]}
end end
test "publishes token_transfers data to subscribers on insert" do test "publishes token_transfers data to subscribers on insert" do
@ -498,7 +498,7 @@ defmodule Explorer.Chain.ImportTest do
Import.all(@import_data) Import.all(@import_data)
assert_received {:chain_event, :token_transfers, :realtime, [%TokenTransfer{}]} assert_receive {:chain_event, :token_transfers, :realtime, [%TokenTransfer{}]}
end end
test "does not broadcast if broadcast option is false" do test "does not broadcast if broadcast option is false" do

Loading…
Cancel
Save