Alter 'Realtime::Fetcher' and 'Catchup::Fetcher' to specify the broadcast type.

The Token Counter needs to listen to both :realtime and :catchup events so it can keep up to date with the counting, but the 'Live Update' in the front-end doesn't need to hear the catchup events.
pull/867/head
Amanda Sposito 6 years ago
parent 7a972b7bbc
commit 0ed9020a96
  1. 10
      apps/block_scout_web/lib/block_scout_web/notifier.ex
  2. 20
      apps/block_scout_web/test/block_scout_web/channels/address_channel_test.exs
  3. 2
      apps/block_scout_web/test/block_scout_web/channels/block_channel_test.exs
  4. 6
      apps/block_scout_web/test/block_scout_web/channels/transaction_channel_test.exs
  5. 4
      apps/block_scout_web/test/block_scout_web/features/viewing_addresses_test.exs
  6. 6
      apps/block_scout_web/test/block_scout_web/features/viewing_blocks_test.exs
  7. 2
      apps/explorer/lib/explorer/chain/import.ex
  8. 24
      apps/explorer/test/explorer/chain/import_test.exs
  9. 2
      apps/indexer/lib/indexer/block/catchup/bound_interval_supervisor.ex
  10. 6
      apps/indexer/lib/indexer/block/fetcher.ex
  11. 4
      apps/indexer/lib/indexer/block/realtime/fetcher.ex
  12. 2
      apps/indexer/lib/indexer/block/uncle/fetcher.ex
  13. 2
      apps/indexer/lib/indexer/pending_transaction/fetcher.ex

@ -8,7 +8,7 @@ defmodule BlockScoutWeb.Notifier do
alias Explorer.ExchangeRates.Token
alias BlockScoutWeb.Endpoint
def handle_event({:chain_event, :addresses, addresses}) do
def handle_event({:chain_event, :addresses, :realtime, addresses}) do
Endpoint.broadcast("addresses:new_address", "count", %{count: Chain.address_estimated_count()})
addresses
@ -16,7 +16,7 @@ defmodule BlockScoutWeb.Notifier do
|> Enum.each(&broadcast_balance/1)
end
def handle_event({:chain_event, :blocks, blocks}) do
def handle_event({:chain_event, :blocks, :realtime, blocks}) do
Enum.each(blocks, &broadcast_block/1)
end
@ -35,7 +35,7 @@ defmodule BlockScoutWeb.Notifier do
})
end
def handle_event({:chain_event, :internal_transactions, internal_transactions}) do
def handle_event({:chain_event, :internal_transactions, :realtime, internal_transactions}) do
internal_transactions
|> Stream.map(
&(InternalTransaction
@ -45,7 +45,7 @@ defmodule BlockScoutWeb.Notifier do
|> Enum.each(&broadcast_internal_transaction/1)
end
def handle_event({:chain_event, :transactions, transaction_hashes}) do
def handle_event({:chain_event, :transactions, :realtime, transaction_hashes}) do
transaction_hashes
|> Chain.hashes_to_transactions(
necessity_by_association: %{
@ -59,6 +59,8 @@ defmodule BlockScoutWeb.Notifier do
|> Enum.each(&broadcast_transaction/1)
end
def handle_event(_), do: nil
defp broadcast_balance(%Address{hash: address_hash} = address) do
Endpoint.broadcast("addresses:#{address_hash}", "balance_update", %{
address: address,

@ -9,7 +9,7 @@ defmodule BlockScoutWeb.AddressChannelTest do
address = insert(:address)
Notifier.handle_event({:chain_event, :addresses, [address]})
Notifier.handle_event({:chain_event, :addresses, :realtime, [address]})
receive do
%Phoenix.Socket.Broadcast{topic: ^topic, event: "count", payload: %{count: _}} ->
@ -30,7 +30,7 @@ defmodule BlockScoutWeb.AddressChannelTest do
test "notified of balance_update for matching address", %{address: address, topic: topic} do
address_with_balance = %{address | fetched_coin_balance: 1}
Notifier.handle_event({:chain_event, :addresses, [address_with_balance]})
Notifier.handle_event({:chain_event, :addresses, :realtime, [address_with_balance]})
receive do
%Phoenix.Socket.Broadcast{topic: ^topic, event: "balance_update", payload: payload} ->
@ -42,7 +42,7 @@ defmodule BlockScoutWeb.AddressChannelTest do
end
test "not notified of balance_update if fetched_coin_balance is nil", %{address: address} do
Notifier.handle_event({:chain_event, :addresses, [address]})
Notifier.handle_event({:chain_event, :addresses, :realtime, [address]})
receive do
_ -> assert false, "Message was broadcast for nil fetched_coin_balance."
@ -54,7 +54,7 @@ defmodule BlockScoutWeb.AddressChannelTest do
test "notified of new_pending_transaction for matching from_address", %{address: address, topic: topic} do
pending = insert(:transaction, from_address: address)
Notifier.handle_event({:chain_event, :transactions, [pending.hash]})
Notifier.handle_event({:chain_event, :transactions, :realtime, [pending.hash]})
receive do
%Phoenix.Socket.Broadcast{topic: ^topic, event: "pending_transaction", payload: payload} ->
@ -72,7 +72,7 @@ defmodule BlockScoutWeb.AddressChannelTest do
|> insert(from_address: address)
|> with_block()
Notifier.handle_event({:chain_event, :transactions, [transaction.hash]})
Notifier.handle_event({:chain_event, :transactions, :realtime, [transaction.hash]})
receive do
%Phoenix.Socket.Broadcast{topic: ^topic, event: "transaction", payload: payload} ->
@ -90,7 +90,7 @@ defmodule BlockScoutWeb.AddressChannelTest do
|> insert(to_address: address)
|> with_block()
Notifier.handle_event({:chain_event, :transactions, [transaction.hash]})
Notifier.handle_event({:chain_event, :transactions, :realtime, [transaction.hash]})
receive do
%Phoenix.Socket.Broadcast{topic: ^topic, event: "transaction", payload: payload} ->
@ -108,7 +108,7 @@ defmodule BlockScoutWeb.AddressChannelTest do
|> insert(from_address: address, to_address: address)
|> with_block()
Notifier.handle_event({:chain_event, :transactions, [transaction.hash]})
Notifier.handle_event({:chain_event, :transactions, :realtime, [transaction.hash]})
receive do
%Phoenix.Socket.Broadcast{topic: ^topic, event: "transaction", payload: payload} ->
@ -134,7 +134,7 @@ defmodule BlockScoutWeb.AddressChannelTest do
internal_transaction = insert(:internal_transaction, transaction: transaction, from_address: address, index: 0)
Notifier.handle_event({:chain_event, :internal_transactions, [internal_transaction]})
Notifier.handle_event({:chain_event, :internal_transactions, :realtime, [internal_transaction]})
receive do
%Phoenix.Socket.Broadcast{topic: ^topic, event: "internal_transaction", payload: payload} ->
@ -154,7 +154,7 @@ defmodule BlockScoutWeb.AddressChannelTest do
internal_transaction = insert(:internal_transaction, transaction: transaction, to_address: address, index: 0)
Notifier.handle_event({:chain_event, :internal_transactions, [internal_transaction]})
Notifier.handle_event({:chain_event, :internal_transactions, :realtime, [internal_transaction]})
receive do
%Phoenix.Socket.Broadcast{topic: ^topic, event: "internal_transaction", payload: payload} ->
@ -178,7 +178,7 @@ defmodule BlockScoutWeb.AddressChannelTest do
internal_transaction =
insert(:internal_transaction, transaction: transaction, from_address: address, to_address: address, index: 0)
Notifier.handle_event({:chain_event, :internal_transactions, [internal_transaction]})
Notifier.handle_event({:chain_event, :internal_transactions, :realtime, [internal_transaction]})
receive do
%Phoenix.Socket.Broadcast{topic: ^topic, event: "internal_transaction", payload: payload} ->

@ -9,7 +9,7 @@ defmodule BlockScoutWeb.BlockChannelTest do
block = insert(:block, number: 1)
Notifier.handle_event({:chain_event, :blocks, [block]})
Notifier.handle_event({:chain_event, :blocks, :realtime, [block]})
receive do
%Phoenix.Socket.Broadcast{topic: ^topic, event: "new_block", payload: %{block: _}} ->

@ -13,7 +13,7 @@ defmodule BlockScoutWeb.TransactionChannelTest do
|> insert()
|> with_block()
Notifier.handle_event({:chain_event, :transactions, [transaction.hash]})
Notifier.handle_event({:chain_event, :transactions, :realtime, [transaction.hash]})
receive do
%Phoenix.Socket.Broadcast{topic: ^topic, event: "transaction", payload: payload} ->
@ -30,7 +30,7 @@ defmodule BlockScoutWeb.TransactionChannelTest do
pending = insert(:transaction)
Notifier.handle_event({:chain_event, :transactions, [pending.hash]})
Notifier.handle_event({:chain_event, :transactions, :realtime, [pending.hash]})
receive do
%Phoenix.Socket.Broadcast{topic: ^topic, event: "pending_transaction", payload: payload} ->
@ -50,7 +50,7 @@ defmodule BlockScoutWeb.TransactionChannelTest do
topic = "transactions:#{Hash.to_string(transaction.hash)}"
@endpoint.subscribe(topic)
Notifier.handle_event({:chain_event, :transactions, [transaction.hash]})
Notifier.handle_event({:chain_event, :transactions, :realtime, [transaction.hash]})
receive do
%Phoenix.Socket.Broadcast{topic: ^topic, event: "collated", payload: %{}} ->

@ -147,7 +147,7 @@ defmodule BlockScoutWeb.ViewingAddressesTest do
new_pending = insert(:transaction, from_address: addresses.lincoln)
Notifier.handle_event({:chain_event, :transactions, [new_pending.hash]})
Notifier.handle_event({:chain_event, :transactions, :realtime, [new_pending.hash]})
assert_has(session, AddressPage.pending_transaction(new_pending))
end
@ -278,7 +278,7 @@ defmodule BlockScoutWeb.ViewingAddressesTest do
internal_transaction =
insert(:internal_transaction, transaction: transaction, index: 0, from_address: addresses.lincoln)
Notifier.handle_event({:chain_event, :internal_transactions, [internal_transaction]})
Notifier.handle_event({:chain_event, :internal_transactions, :realtime, [internal_transaction]})
session
|> assert_has(AddressPage.internal_transactions(count: 3))

@ -39,7 +39,7 @@ defmodule BlockScoutWeb.ViewingBlocksTest do
BlockListPage.visit_page(session)
block = insert(:block, number: 315)
Notifier.handle_event({:chain_event, :blocks, [block]})
Notifier.handle_event({:chain_event, :blocks, :realtime, [block]})
session
|> assert_has(BlockListPage.block(block))
@ -50,14 +50,14 @@ defmodule BlockScoutWeb.ViewingBlocksTest do
BlockListPage.visit_page(session)
block = insert(:block, number: 315)
Notifier.handle_event({:chain_event, :blocks, [block]})
Notifier.handle_event({:chain_event, :blocks, :realtime, [block]})
session
|> assert_has(BlockListPage.block(block))
|> assert_has(BlockListPage.place_holder_blocks(3))
skipped_block = insert(:block, number: 314)
Notifier.handle_event({:chain_event, :blocks, [skipped_block]})
Notifier.handle_event({:chain_event, :blocks, :realtime, [skipped_block]})
session
|> assert_has(BlockListPage.block(skipped_block))

@ -38,7 +38,7 @@ defmodule Explorer.Chain.Import do
end
@type all_options :: %{
optional(:broadcast) => boolean,
optional(:broadcast) => atom,
optional(:timeout) => timeout,
unquote_splicing(quoted_runner_options)
}

@ -42,7 +42,7 @@ defmodule Explorer.Chain.ImportTest do
],
timeout: 5
},
broadcast: true,
broadcast: :realtime,
internal_transactions: %{
params: [
%{
@ -399,7 +399,7 @@ defmodule Explorer.Chain.ImportTest do
test "publishes data to subscribers on insert" do
Chain.subscribe_to_events(:logs)
Import.all(@import_data)
assert_received {:chain_event, :logs, [%Log{}]}
assert_received {:chain_event, :logs, :realtime, [%Log{}]}
end
test "with invalid data" do
@ -413,31 +413,39 @@ defmodule Explorer.Chain.ImportTest do
test "publishes addresses with updated fetched_coin_balance data to subscribers on insert" do
Chain.subscribe_to_events(:addresses)
Import.all(@import_data)
assert_received {:chain_event, :addresses, [%Address{}, %Address{}, %Address{}]}
assert_received {:chain_event, :addresses, :realtime, [%Address{}, %Address{}, %Address{}]}
end
test "publishes block data to subscribers on insert" do
Chain.subscribe_to_events(:blocks)
Import.all(@import_data)
assert_received {:chain_event, :blocks, [%Block{}]}
assert_received {:chain_event, :blocks, :realtime, [%Block{}]}
end
test "publishes internal_transaction data to subscribers on insert" do
Chain.subscribe_to_events(:internal_transactions)
Import.all(@import_data)
assert_received {:chain_event, :internal_transactions, [%{id: _}, %{id: _}]}
assert_received {:chain_event, :internal_transactions, :realtime, [%{id: _}, %{id: _}]}
end
test "publishes log data to subscribers on insert" do
Chain.subscribe_to_events(:logs)
Import.all(@import_data)
assert_received {:chain_event, :logs, [%Log{}]}
assert_received {:chain_event, :logs, :realtime, [%Log{}]}
end
test "publishes transaction hashes data to subscribers on insert" do
Chain.subscribe_to_events(:transactions)
Import.all(@import_data)
assert_received {:chain_event, :transactions, [%Hash{}]}
assert_received {:chain_event, :transactions, :realtime, [%Hash{}]}
end
test "publishes token_transfers data to subscribers on insert" do
Chain.subscribe_to_events(:token_transfers)
Import.all(@import_data)
assert_received {:chain_event, :token_transfers, :realtime, [%TokenTransfer{}]}
end
test "does not broadcast if broadcast option is false" do
@ -445,7 +453,7 @@ defmodule Explorer.Chain.ImportTest do
Chain.subscribe_to_events(:logs)
Import.all(non_broadcast_data)
refute_received {:chain_event, :logs, [%Log{}]}
refute_received {:chain_event, :logs, :realtime, [%Log{}]}
end
test "updates address with contract code" do

@ -56,7 +56,7 @@ defmodule Indexer.Block.Catchup.BoundIntervalSupervisor do
end
defp new(%{block_fetcher: common_block_fetcher} = named_arguments) do
block_fetcher = %Block.Fetcher{common_block_fetcher | broadcast: false, callback_module: Catchup.Fetcher}
block_fetcher = %Block.Fetcher{common_block_fetcher | broadcast: :catchup, callback_module: Catchup.Fetcher}
block_interval = Map.get(named_arguments, :block_interval, @block_interval)
minimum_interval = div(block_interval, 2)

@ -28,7 +28,7 @@ defmodule Indexer.Block.Fetcher do
address_token_balances: Import.Runner.options(),
blocks: Import.Runner.options(),
block_second_degree_relations: Import.Runner.options(),
broadcast: boolean,
broadcast: term(),
logs: Import.Runner.options(),
token_transfers: Import.Runner.options(),
tokens: Import.Runner.options(),
@ -82,13 +82,13 @@ defmodule Indexer.Block.Fetcher do
| {step :: atom(), failed_value :: term(), changes_so_far :: term()}}
def fetch_and_import_range(
%__MODULE__{
broadcast: broadcast,
broadcast: _broadcast,
callback_module: callback_module,
json_rpc_named_arguments: json_rpc_named_arguments
} = state,
_.._ = range
)
when broadcast in ~w(true false)a and callback_module != nil do
when callback_module != nil do
with {:blocks, {:ok, next, result}} <-
{:blocks, EthereumJSONRPC.fetch_blocks_by_range(range, json_rpc_named_arguments)},
%{

@ -22,7 +22,7 @@ defmodule Indexer.Block.Realtime.Fetcher do
@type t :: %__MODULE__{
block_fetcher: %Block.Fetcher{
broadcast: true,
broadcast: term(),
callback_module: __MODULE__,
json_rpc_named_arguments: EthereumJSONRPC.json_rpc_named_arguments(),
receipts_batch_size: pos_integer(),
@ -39,7 +39,7 @@ defmodule Indexer.Block.Realtime.Fetcher do
@impl GenServer
def init(%{block_fetcher: %Block.Fetcher{} = block_fetcher, subscribe_named_arguments: subscribe_named_arguments})
when is_list(subscribe_named_arguments) do
{:ok, %__MODULE__{block_fetcher: %Block.Fetcher{block_fetcher | broadcast: true, callback_module: __MODULE__}},
{:ok, %__MODULE__{block_fetcher: %Block.Fetcher{block_fetcher | broadcast: :realtime, callback_module: __MODULE__}},
{:continue, {:init, subscribe_named_arguments}}}
end

@ -48,7 +48,7 @@ defmodule Indexer.Block.Uncle.Fetcher do
merged_init_options =
@defaults
|> Keyword.merge(mergeable_init_options)
|> Keyword.put(:state, %Block.Fetcher{state | broadcast: false, callback_module: __MODULE__})
|> Keyword.put(:state, %Block.Fetcher{state | broadcast: :uncle, callback_module: __MODULE__})
Supervisor.child_spec({BufferedTask, [{__MODULE__, merged_init_options}, gen_server_options]}, id: __MODULE__)
end

@ -110,7 +110,7 @@ defmodule Indexer.PendingTransaction.Fetcher do
{:ok, _} =
Chain.import(%{
addresses: %{params: addresses_params},
broadcast: true,
broadcast: :realtime,
transactions: %{params: transactions_params, on_conflict: :nothing}
})

Loading…
Cancel
Save