diff --git a/apps/block_scout_web/lib/block_scout_web/notifier.ex b/apps/block_scout_web/lib/block_scout_web/notifier.ex index 71106b2840..644cfb0ad9 100644 --- a/apps/block_scout_web/lib/block_scout_web/notifier.ex +++ b/apps/block_scout_web/lib/block_scout_web/notifier.ex @@ -8,7 +8,7 @@ defmodule BlockScoutWeb.Notifier do alias Explorer.ExchangeRates.Token alias BlockScoutWeb.Endpoint - def handle_event({:chain_event, :addresses, addresses}) do + def handle_event({:chain_event, :addresses, :realtime, addresses}) do Endpoint.broadcast("addresses:new_address", "count", %{count: Chain.address_estimated_count()}) addresses @@ -16,7 +16,7 @@ defmodule BlockScoutWeb.Notifier do |> Enum.each(&broadcast_balance/1) end - def handle_event({:chain_event, :blocks, blocks}) do + def handle_event({:chain_event, :blocks, :realtime, blocks}) do Enum.each(blocks, &broadcast_block/1) end @@ -35,7 +35,7 @@ defmodule BlockScoutWeb.Notifier do }) end - def handle_event({:chain_event, :internal_transactions, internal_transactions}) do + def handle_event({:chain_event, :internal_transactions, :realtime, internal_transactions}) do internal_transactions |> Stream.map( &(InternalTransaction @@ -45,7 +45,7 @@ defmodule BlockScoutWeb.Notifier do |> Enum.each(&broadcast_internal_transaction/1) end - def handle_event({:chain_event, :transactions, transaction_hashes}) do + def handle_event({:chain_event, :transactions, :realtime, transaction_hashes}) do transaction_hashes |> Chain.hashes_to_transactions( necessity_by_association: %{ @@ -59,6 +59,8 @@ defmodule BlockScoutWeb.Notifier do |> Enum.each(&broadcast_transaction/1) end + def handle_event(_), do: nil + defp broadcast_balance(%Address{hash: address_hash} = address) do Endpoint.broadcast("addresses:#{address_hash}", "balance_update", %{ address: address, diff --git a/apps/block_scout_web/test/block_scout_web/channels/address_channel_test.exs b/apps/block_scout_web/test/block_scout_web/channels/address_channel_test.exs index 6408cb8ea6..aad8a9246f 100644 --- a/apps/block_scout_web/test/block_scout_web/channels/address_channel_test.exs +++ b/apps/block_scout_web/test/block_scout_web/channels/address_channel_test.exs @@ -9,7 +9,7 @@ defmodule BlockScoutWeb.AddressChannelTest do address = insert(:address) - Notifier.handle_event({:chain_event, :addresses, [address]}) + Notifier.handle_event({:chain_event, :addresses, :realtime, [address]}) receive do %Phoenix.Socket.Broadcast{topic: ^topic, event: "count", payload: %{count: _}} -> @@ -30,7 +30,7 @@ defmodule BlockScoutWeb.AddressChannelTest do test "notified of balance_update for matching address", %{address: address, topic: topic} do address_with_balance = %{address | fetched_coin_balance: 1} - Notifier.handle_event({:chain_event, :addresses, [address_with_balance]}) + Notifier.handle_event({:chain_event, :addresses, :realtime, [address_with_balance]}) receive do %Phoenix.Socket.Broadcast{topic: ^topic, event: "balance_update", payload: payload} -> @@ -42,7 +42,7 @@ defmodule BlockScoutWeb.AddressChannelTest do end test "not notified of balance_update if fetched_coin_balance is nil", %{address: address} do - Notifier.handle_event({:chain_event, :addresses, [address]}) + Notifier.handle_event({:chain_event, :addresses, :realtime, [address]}) receive do _ -> assert false, "Message was broadcast for nil fetched_coin_balance." @@ -54,7 +54,7 @@ defmodule BlockScoutWeb.AddressChannelTest do test "notified of new_pending_transaction for matching from_address", %{address: address, topic: topic} do pending = insert(:transaction, from_address: address) - Notifier.handle_event({:chain_event, :transactions, [pending.hash]}) + Notifier.handle_event({:chain_event, :transactions, :realtime, [pending.hash]}) receive do %Phoenix.Socket.Broadcast{topic: ^topic, event: "pending_transaction", payload: payload} -> @@ -72,7 +72,7 @@ defmodule BlockScoutWeb.AddressChannelTest do |> insert(from_address: address) |> with_block() - Notifier.handle_event({:chain_event, :transactions, [transaction.hash]}) + Notifier.handle_event({:chain_event, :transactions, :realtime, [transaction.hash]}) receive do %Phoenix.Socket.Broadcast{topic: ^topic, event: "transaction", payload: payload} -> @@ -90,7 +90,7 @@ defmodule BlockScoutWeb.AddressChannelTest do |> insert(to_address: address) |> with_block() - Notifier.handle_event({:chain_event, :transactions, [transaction.hash]}) + Notifier.handle_event({:chain_event, :transactions, :realtime, [transaction.hash]}) receive do %Phoenix.Socket.Broadcast{topic: ^topic, event: "transaction", payload: payload} -> @@ -108,7 +108,7 @@ defmodule BlockScoutWeb.AddressChannelTest do |> insert(from_address: address, to_address: address) |> with_block() - Notifier.handle_event({:chain_event, :transactions, [transaction.hash]}) + Notifier.handle_event({:chain_event, :transactions, :realtime, [transaction.hash]}) receive do %Phoenix.Socket.Broadcast{topic: ^topic, event: "transaction", payload: payload} -> @@ -134,7 +134,7 @@ defmodule BlockScoutWeb.AddressChannelTest do internal_transaction = insert(:internal_transaction, transaction: transaction, from_address: address, index: 0) - Notifier.handle_event({:chain_event, :internal_transactions, [internal_transaction]}) + Notifier.handle_event({:chain_event, :internal_transactions, :realtime, [internal_transaction]}) receive do %Phoenix.Socket.Broadcast{topic: ^topic, event: "internal_transaction", payload: payload} -> @@ -154,7 +154,7 @@ defmodule BlockScoutWeb.AddressChannelTest do internal_transaction = insert(:internal_transaction, transaction: transaction, to_address: address, index: 0) - Notifier.handle_event({:chain_event, :internal_transactions, [internal_transaction]}) + Notifier.handle_event({:chain_event, :internal_transactions, :realtime, [internal_transaction]}) receive do %Phoenix.Socket.Broadcast{topic: ^topic, event: "internal_transaction", payload: payload} -> @@ -178,7 +178,7 @@ defmodule BlockScoutWeb.AddressChannelTest do internal_transaction = insert(:internal_transaction, transaction: transaction, from_address: address, to_address: address, index: 0) - Notifier.handle_event({:chain_event, :internal_transactions, [internal_transaction]}) + Notifier.handle_event({:chain_event, :internal_transactions, :realtime, [internal_transaction]}) receive do %Phoenix.Socket.Broadcast{topic: ^topic, event: "internal_transaction", payload: payload} -> diff --git a/apps/block_scout_web/test/block_scout_web/channels/block_channel_test.exs b/apps/block_scout_web/test/block_scout_web/channels/block_channel_test.exs index 85579cde69..3b0c4eadb4 100644 --- a/apps/block_scout_web/test/block_scout_web/channels/block_channel_test.exs +++ b/apps/block_scout_web/test/block_scout_web/channels/block_channel_test.exs @@ -9,7 +9,7 @@ defmodule BlockScoutWeb.BlockChannelTest do block = insert(:block, number: 1) - Notifier.handle_event({:chain_event, :blocks, [block]}) + Notifier.handle_event({:chain_event, :blocks, :realtime, [block]}) receive do %Phoenix.Socket.Broadcast{topic: ^topic, event: "new_block", payload: %{block: _}} -> diff --git a/apps/block_scout_web/test/block_scout_web/channels/transaction_channel_test.exs b/apps/block_scout_web/test/block_scout_web/channels/transaction_channel_test.exs index 9b4f66f224..2488855188 100644 --- a/apps/block_scout_web/test/block_scout_web/channels/transaction_channel_test.exs +++ b/apps/block_scout_web/test/block_scout_web/channels/transaction_channel_test.exs @@ -13,7 +13,7 @@ defmodule BlockScoutWeb.TransactionChannelTest do |> insert() |> with_block() - Notifier.handle_event({:chain_event, :transactions, [transaction.hash]}) + Notifier.handle_event({:chain_event, :transactions, :realtime, [transaction.hash]}) receive do %Phoenix.Socket.Broadcast{topic: ^topic, event: "transaction", payload: payload} -> @@ -30,7 +30,7 @@ defmodule BlockScoutWeb.TransactionChannelTest do pending = insert(:transaction) - Notifier.handle_event({:chain_event, :transactions, [pending.hash]}) + Notifier.handle_event({:chain_event, :transactions, :realtime, [pending.hash]}) receive do %Phoenix.Socket.Broadcast{topic: ^topic, event: "pending_transaction", payload: payload} -> @@ -50,7 +50,7 @@ defmodule BlockScoutWeb.TransactionChannelTest do topic = "transactions:#{Hash.to_string(transaction.hash)}" @endpoint.subscribe(topic) - Notifier.handle_event({:chain_event, :transactions, [transaction.hash]}) + Notifier.handle_event({:chain_event, :transactions, :realtime, [transaction.hash]}) receive do %Phoenix.Socket.Broadcast{topic: ^topic, event: "collated", payload: %{}} -> diff --git a/apps/block_scout_web/test/block_scout_web/features/viewing_addresses_test.exs b/apps/block_scout_web/test/block_scout_web/features/viewing_addresses_test.exs index e541e517cc..1fef901655 100644 --- a/apps/block_scout_web/test/block_scout_web/features/viewing_addresses_test.exs +++ b/apps/block_scout_web/test/block_scout_web/features/viewing_addresses_test.exs @@ -147,7 +147,7 @@ defmodule BlockScoutWeb.ViewingAddressesTest do new_pending = insert(:transaction, from_address: addresses.lincoln) - Notifier.handle_event({:chain_event, :transactions, [new_pending.hash]}) + Notifier.handle_event({:chain_event, :transactions, :realtime, [new_pending.hash]}) assert_has(session, AddressPage.pending_transaction(new_pending)) end @@ -278,7 +278,7 @@ defmodule BlockScoutWeb.ViewingAddressesTest do internal_transaction = insert(:internal_transaction, transaction: transaction, index: 0, from_address: addresses.lincoln) - Notifier.handle_event({:chain_event, :internal_transactions, [internal_transaction]}) + Notifier.handle_event({:chain_event, :internal_transactions, :realtime, [internal_transaction]}) session |> assert_has(AddressPage.internal_transactions(count: 3)) diff --git a/apps/block_scout_web/test/block_scout_web/features/viewing_blocks_test.exs b/apps/block_scout_web/test/block_scout_web/features/viewing_blocks_test.exs index 34ff36128e..4fe2c06b9c 100644 --- a/apps/block_scout_web/test/block_scout_web/features/viewing_blocks_test.exs +++ b/apps/block_scout_web/test/block_scout_web/features/viewing_blocks_test.exs @@ -39,7 +39,7 @@ defmodule BlockScoutWeb.ViewingBlocksTest do BlockListPage.visit_page(session) block = insert(:block, number: 315) - Notifier.handle_event({:chain_event, :blocks, [block]}) + Notifier.handle_event({:chain_event, :blocks, :realtime, [block]}) session |> assert_has(BlockListPage.block(block)) @@ -50,14 +50,14 @@ defmodule BlockScoutWeb.ViewingBlocksTest do BlockListPage.visit_page(session) block = insert(:block, number: 315) - Notifier.handle_event({:chain_event, :blocks, [block]}) + Notifier.handle_event({:chain_event, :blocks, :realtime, [block]}) session |> assert_has(BlockListPage.block(block)) |> assert_has(BlockListPage.place_holder_blocks(3)) skipped_block = insert(:block, number: 314) - Notifier.handle_event({:chain_event, :blocks, [skipped_block]}) + Notifier.handle_event({:chain_event, :blocks, :realtime, [skipped_block]}) session |> assert_has(BlockListPage.block(skipped_block)) diff --git a/apps/explorer/lib/explorer/chain/import.ex b/apps/explorer/lib/explorer/chain/import.ex index ebbb6807c5..3ba010e6be 100644 --- a/apps/explorer/lib/explorer/chain/import.ex +++ b/apps/explorer/lib/explorer/chain/import.ex @@ -38,7 +38,7 @@ defmodule Explorer.Chain.Import do end @type all_options :: %{ - optional(:broadcast) => boolean, + optional(:broadcast) => atom, optional(:timeout) => timeout, unquote_splicing(quoted_runner_options) } diff --git a/apps/explorer/test/explorer/chain/import_test.exs b/apps/explorer/test/explorer/chain/import_test.exs index 46de08a5fc..91d5485b3a 100644 --- a/apps/explorer/test/explorer/chain/import_test.exs +++ b/apps/explorer/test/explorer/chain/import_test.exs @@ -42,7 +42,7 @@ defmodule Explorer.Chain.ImportTest do ], timeout: 5 }, - broadcast: true, + broadcast: :realtime, internal_transactions: %{ params: [ %{ @@ -399,7 +399,7 @@ defmodule Explorer.Chain.ImportTest do test "publishes data to subscribers on insert" do Chain.subscribe_to_events(:logs) Import.all(@import_data) - assert_received {:chain_event, :logs, [%Log{}]} + assert_received {:chain_event, :logs, :realtime, [%Log{}]} end test "with invalid data" do @@ -413,31 +413,39 @@ defmodule Explorer.Chain.ImportTest do test "publishes addresses with updated fetched_coin_balance data to subscribers on insert" do Chain.subscribe_to_events(:addresses) Import.all(@import_data) - assert_received {:chain_event, :addresses, [%Address{}, %Address{}, %Address{}]} + assert_received {:chain_event, :addresses, :realtime, [%Address{}, %Address{}, %Address{}]} end test "publishes block data to subscribers on insert" do Chain.subscribe_to_events(:blocks) Import.all(@import_data) - assert_received {:chain_event, :blocks, [%Block{}]} + assert_received {:chain_event, :blocks, :realtime, [%Block{}]} end test "publishes internal_transaction data to subscribers on insert" do Chain.subscribe_to_events(:internal_transactions) Import.all(@import_data) - assert_received {:chain_event, :internal_transactions, [%{id: _}, %{id: _}]} + assert_received {:chain_event, :internal_transactions, :realtime, [%{id: _}, %{id: _}]} end test "publishes log data to subscribers on insert" do Chain.subscribe_to_events(:logs) Import.all(@import_data) - assert_received {:chain_event, :logs, [%Log{}]} + assert_received {:chain_event, :logs, :realtime, [%Log{}]} end test "publishes transaction hashes data to subscribers on insert" do Chain.subscribe_to_events(:transactions) Import.all(@import_data) - assert_received {:chain_event, :transactions, [%Hash{}]} + assert_received {:chain_event, :transactions, :realtime, [%Hash{}]} + end + + test "publishes token_transfers data to subscribers on insert" do + Chain.subscribe_to_events(:token_transfers) + + Import.all(@import_data) + + assert_received {:chain_event, :token_transfers, :realtime, [%TokenTransfer{}]} end test "does not broadcast if broadcast option is false" do @@ -445,7 +453,7 @@ defmodule Explorer.Chain.ImportTest do Chain.subscribe_to_events(:logs) Import.all(non_broadcast_data) - refute_received {:chain_event, :logs, [%Log{}]} + refute_received {:chain_event, :logs, :realtime, [%Log{}]} end test "updates address with contract code" do diff --git a/apps/indexer/lib/indexer/block/catchup/bound_interval_supervisor.ex b/apps/indexer/lib/indexer/block/catchup/bound_interval_supervisor.ex index 68f9ad7cd6..36b05c93bb 100644 --- a/apps/indexer/lib/indexer/block/catchup/bound_interval_supervisor.ex +++ b/apps/indexer/lib/indexer/block/catchup/bound_interval_supervisor.ex @@ -56,7 +56,7 @@ defmodule Indexer.Block.Catchup.BoundIntervalSupervisor do end defp new(%{block_fetcher: common_block_fetcher} = named_arguments) do - block_fetcher = %Block.Fetcher{common_block_fetcher | broadcast: false, callback_module: Catchup.Fetcher} + block_fetcher = %Block.Fetcher{common_block_fetcher | broadcast: :catchup, callback_module: Catchup.Fetcher} block_interval = Map.get(named_arguments, :block_interval, @block_interval) minimum_interval = div(block_interval, 2) diff --git a/apps/indexer/lib/indexer/block/fetcher.ex b/apps/indexer/lib/indexer/block/fetcher.ex index 7ad5e403ff..936443374c 100644 --- a/apps/indexer/lib/indexer/block/fetcher.ex +++ b/apps/indexer/lib/indexer/block/fetcher.ex @@ -28,7 +28,7 @@ defmodule Indexer.Block.Fetcher do address_token_balances: Import.Runner.options(), blocks: Import.Runner.options(), block_second_degree_relations: Import.Runner.options(), - broadcast: boolean, + broadcast: term(), logs: Import.Runner.options(), token_transfers: Import.Runner.options(), tokens: Import.Runner.options(), @@ -82,13 +82,13 @@ defmodule Indexer.Block.Fetcher do | {step :: atom(), failed_value :: term(), changes_so_far :: term()}} def fetch_and_import_range( %__MODULE__{ - broadcast: broadcast, + broadcast: _broadcast, callback_module: callback_module, json_rpc_named_arguments: json_rpc_named_arguments } = state, _.._ = range ) - when broadcast in ~w(true false)a and callback_module != nil do + when callback_module != nil do with {:blocks, {:ok, next, result}} <- {:blocks, EthereumJSONRPC.fetch_blocks_by_range(range, json_rpc_named_arguments)}, %{ diff --git a/apps/indexer/lib/indexer/block/realtime/fetcher.ex b/apps/indexer/lib/indexer/block/realtime/fetcher.ex index e343a15e92..5c66110545 100644 --- a/apps/indexer/lib/indexer/block/realtime/fetcher.ex +++ b/apps/indexer/lib/indexer/block/realtime/fetcher.ex @@ -22,7 +22,7 @@ defmodule Indexer.Block.Realtime.Fetcher do @type t :: %__MODULE__{ block_fetcher: %Block.Fetcher{ - broadcast: true, + broadcast: term(), callback_module: __MODULE__, json_rpc_named_arguments: EthereumJSONRPC.json_rpc_named_arguments(), receipts_batch_size: pos_integer(), @@ -39,7 +39,7 @@ defmodule Indexer.Block.Realtime.Fetcher do @impl GenServer def init(%{block_fetcher: %Block.Fetcher{} = block_fetcher, subscribe_named_arguments: subscribe_named_arguments}) when is_list(subscribe_named_arguments) do - {:ok, %__MODULE__{block_fetcher: %Block.Fetcher{block_fetcher | broadcast: true, callback_module: __MODULE__}}, + {:ok, %__MODULE__{block_fetcher: %Block.Fetcher{block_fetcher | broadcast: :realtime, callback_module: __MODULE__}}, {:continue, {:init, subscribe_named_arguments}}} end diff --git a/apps/indexer/lib/indexer/block/uncle/fetcher.ex b/apps/indexer/lib/indexer/block/uncle/fetcher.ex index 75ca77a885..296548d0ba 100644 --- a/apps/indexer/lib/indexer/block/uncle/fetcher.ex +++ b/apps/indexer/lib/indexer/block/uncle/fetcher.ex @@ -48,7 +48,7 @@ defmodule Indexer.Block.Uncle.Fetcher do merged_init_options = @defaults |> Keyword.merge(mergeable_init_options) - |> Keyword.put(:state, %Block.Fetcher{state | broadcast: false, callback_module: __MODULE__}) + |> Keyword.put(:state, %Block.Fetcher{state | broadcast: :uncle, callback_module: __MODULE__}) Supervisor.child_spec({BufferedTask, [{__MODULE__, merged_init_options}, gen_server_options]}, id: __MODULE__) end diff --git a/apps/indexer/lib/indexer/pending_transaction/fetcher.ex b/apps/indexer/lib/indexer/pending_transaction/fetcher.ex index bfb9f46d3c..482ba0ab18 100644 --- a/apps/indexer/lib/indexer/pending_transaction/fetcher.ex +++ b/apps/indexer/lib/indexer/pending_transaction/fetcher.ex @@ -110,7 +110,7 @@ defmodule Indexer.PendingTransaction.Fetcher do {:ok, _} = Chain.import(%{ addresses: %{params: addresses_params}, - broadcast: true, + broadcast: :realtime, transactions: %{params: transactions_params, on_conflict: :nothing} })