From b2345b159fa49428c9ecb61d6f599e56adb2c78a Mon Sep 17 00:00:00 2001 From: Alexander Kolotov Date: Fri, 5 Jul 2024 02:29:19 -0600 Subject: [PATCH] feat: broadcast updates about new Arbitrum batches and L1-L2 messages through WebSocket (#10272) * publish new batches through websockets * publish new L1-L2 messages amount through websockets * credo issues * Consitent handling of transaction hash * new types re-used --- .../channels/arbitrum_channel.ex | 14 +++ .../channels/user_socket_v2.ex | 5 + .../lib/block_scout_web/notifier.ex | 18 ++++ .../lib/block_scout_web/notifiers/arbitrum.ex | 31 +++++++ .../block_scout_web/realtime_event_handler.ex | 16 ++++ .../views/api/v2/arbitrum_view.ex | 92 ++++++++++++++++--- .../lib/explorer/chain/events/publisher.ex | 17 +++- .../lib/explorer/chain/events/subscriber.ex | 17 +++- .../fetcher/arbitrum/workers/new_batches.ex | 35 ++++++- .../arbitrum/workers/new_messages_to_l2.ex | 28 ++++-- 10 files changed, 248 insertions(+), 25 deletions(-) create mode 100644 apps/block_scout_web/lib/block_scout_web/channels/arbitrum_channel.ex create mode 100644 apps/block_scout_web/lib/block_scout_web/notifiers/arbitrum.ex diff --git a/apps/block_scout_web/lib/block_scout_web/channels/arbitrum_channel.ex b/apps/block_scout_web/lib/block_scout_web/channels/arbitrum_channel.ex new file mode 100644 index 0000000000..27bc00e8b2 --- /dev/null +++ b/apps/block_scout_web/lib/block_scout_web/channels/arbitrum_channel.ex @@ -0,0 +1,14 @@ +defmodule BlockScoutWeb.ArbitrumChannel do + @moduledoc """ + Establishes pub/sub channel for live updates of Arbitrum related events. + """ + use BlockScoutWeb, :channel + + def join("arbitrum:new_batch", _params, socket) do + {:ok, %{}, socket} + end + + def join("arbitrum:new_messages_to_rollup_amount", _params, socket) do + {:ok, %{}, socket} + end +end diff --git a/apps/block_scout_web/lib/block_scout_web/channels/user_socket_v2.ex b/apps/block_scout_web/lib/block_scout_web/channels/user_socket_v2.ex index 8ac5295d60..57cdf442c9 100644 --- a/apps/block_scout_web/lib/block_scout_web/channels/user_socket_v2.ex +++ b/apps/block_scout_web/lib/block_scout_web/channels/user_socket_v2.ex @@ -14,6 +14,11 @@ defmodule BlockScoutWeb.UserSocketV2 do channel("token_instances:*", BlockScoutWeb.TokenInstanceChannel) channel("zkevm_batches:*", BlockScoutWeb.PolygonZkevmConfirmedBatchChannel) + case Application.compile_env(:explorer, :chain_type) do + :arbitrum -> channel("arbitrum:*", BlockScoutWeb.ArbitrumChannel) + _ -> nil + end + def connect(_params, socket) do {:ok, socket} end diff --git a/apps/block_scout_web/lib/block_scout_web/notifier.ex b/apps/block_scout_web/lib/block_scout_web/notifier.ex index 951579c042..f364dfa64e 100644 --- a/apps/block_scout_web/lib/block_scout_web/notifier.ex +++ b/apps/block_scout_web/lib/block_scout_web/notifier.ex @@ -29,6 +29,14 @@ defmodule BlockScoutWeb.Notifier do @check_broadcast_sequence_period 500 + case Application.compile_env(:explorer, :chain_type) do + :arbitrum -> + @chain_type_specific_events ~w(new_arbitrum_batches new_messages_to_arbitrum_amount)a + + _ -> + nil + end + def handle_event({:chain_event, :addresses, type, addresses}) when type in [:realtime, :on_demand] do Endpoint.broadcast("addresses:new_address", "count", %{count: Counters.address_estimated_count()}) @@ -280,6 +288,16 @@ defmodule BlockScoutWeb.Notifier do }) end + case Application.compile_env(:explorer, :chain_type) do + :arbitrum -> + def handle_event({:chain_event, topic, _, _} = event) when topic in @chain_type_specific_events, + # credo:disable-for-next-line Credo.Check.Design.AliasUsage + do: BlockScoutWeb.Notifiers.Arbitrum.handle_event(event) + + _ -> + nil + end + def handle_event(event) do Logger.warning("Unknown broadcasted event #{inspect(event)}.") nil diff --git a/apps/block_scout_web/lib/block_scout_web/notifiers/arbitrum.ex b/apps/block_scout_web/lib/block_scout_web/notifiers/arbitrum.ex new file mode 100644 index 0000000000..2b7589dc0f --- /dev/null +++ b/apps/block_scout_web/lib/block_scout_web/notifiers/arbitrum.ex @@ -0,0 +1,31 @@ +defmodule BlockScoutWeb.Notifiers.Arbitrum do + @moduledoc """ + Module to handle and broadcast Arbitrum related events. + """ + + alias BlockScoutWeb.API.V2.ArbitrumView + alias BlockScoutWeb.Endpoint + + require Logger + + def handle_event({:chain_event, :new_arbitrum_batches, :realtime, batches}) do + batches + |> Enum.sort_by(& &1.number, :asc) + |> Enum.each(fn batch -> + Endpoint.broadcast("arbitrum:new_batch", "new_arbitrum_batch", %{ + batch: ArbitrumView.render_base_info_for_batch(batch) + }) + end) + end + + def handle_event({:chain_event, :new_messages_to_arbitrum_amount, :realtime, new_messages_amount}) do + Endpoint.broadcast("arbitrum:new_messages_to_rollup_amount", "new_messages_to_rollup_amount", %{ + new_messages_to_rollup_amount: new_messages_amount + }) + end + + def handle_event(event) do + Logger.warning("Unknown broadcasted event #{inspect(event)}.") + nil + end +end diff --git a/apps/block_scout_web/lib/block_scout_web/realtime_event_handler.ex b/apps/block_scout_web/lib/block_scout_web/realtime_event_handler.ex index b19ead1cc0..c2aa239fb0 100644 --- a/apps/block_scout_web/lib/block_scout_web/realtime_event_handler.ex +++ b/apps/block_scout_web/lib/block_scout_web/realtime_event_handler.ex @@ -12,6 +12,19 @@ defmodule BlockScoutWeb.RealtimeEventHandler do GenServer.start_link(__MODULE__, [], name: __MODULE__) end + case Application.compile_env(:explorer, :chain_type) do + :arbitrum -> + def chain_type_specific_subscriptions do + Subscriber.to(:new_arbitrum_batches, :realtime) + Subscriber.to(:new_messages_to_arbitrum_amount, :realtime) + end + + _ -> + def chain_type_specific_subscriptions do + nil + end + end + @impl true def init([]) do Subscriber.to(:address_coin_balances, :realtime) @@ -34,6 +47,9 @@ defmodule BlockScoutWeb.RealtimeEventHandler do # Does not come from the indexer Subscriber.to(:exchange_rate) Subscriber.to(:transaction_stats) + + chain_type_specific_subscriptions() + {:ok, []} end diff --git a/apps/block_scout_web/lib/block_scout_web/views/api/v2/arbitrum_view.ex b/apps/block_scout_web/lib/block_scout_web/views/api/v2/arbitrum_view.ex index 3fa3262884..185a00da1a 100644 --- a/apps/block_scout_web/lib/block_scout_web/views/api/v2/arbitrum_view.ex +++ b/apps/block_scout_web/lib/block_scout_web/views/api/v2/arbitrum_view.ex @@ -114,21 +114,71 @@ defmodule BlockScoutWeb.API.V2.ArbitrumView do # transaction that committed the batch to L1. # # ## Parameters - # - `batches`: A list of `Explorer.Chain.Arbitrum.L1Batch` entries. + # - `batches`: A list of `Explorer.Chain.Arbitrum.L1Batch` entries or a list of maps + # with the corresponding fields. # # ## Returns # - A list of maps with detailed information about each batch, formatted for use # in JSON HTTP responses. - @spec render_arbitrum_batches([L1Batch]) :: [map()] + @spec render_arbitrum_batches( + [L1Batch.t()] + | [ + %{ + :number => non_neg_integer(), + :transactions_count => non_neg_integer(), + :start_block => non_neg_integer(), + :end_block => non_neg_integer(), + :commitment_transaction => %{ + :hash => binary(), + :block_number => non_neg_integer(), + :timestamp => DateTime.t(), + :status => :finalized | :unfinalized, + optional(any()) => any() + }, + optional(any()) => any() + } + ] + ) :: [map()] defp render_arbitrum_batches(batches) do - Enum.map(batches, fn batch -> - %{ - "number" => batch.number, - "transactions_count" => batch.transactions_count, - "blocks_count" => batch.end_block - batch.start_block + 1 - } - |> add_l1_tx_info(batch) - end) + Enum.map(batches, &render_base_info_for_batch/1) + end + + # Transforms a L1 batch into a map format for HTTP response. + # + # This function processes an Arbitrum L1 batch and converts it into a map that + # includes basic batch information and details of the associated transaction + # that committed the batch to L1. + # + # ## Parameters + # - `batch`: Either an `Explorer.Chain.Arbitrum.L1Batch` entry or a map with + # the corresponding fields. + # + # ## Returns + # - A map with detailed information about the batch, formatted for use in JSON HTTP responses. + @spec render_base_info_for_batch( + L1Batch.t() + | %{ + :number => non_neg_integer(), + :transactions_count => non_neg_integer(), + :start_block => non_neg_integer(), + :end_block => non_neg_integer(), + :commitment_transaction => %{ + :hash => binary(), + :block_number => non_neg_integer(), + :timestamp => DateTime.t(), + :status => :finalized | :unfinalized, + optional(any()) => any() + }, + optional(any()) => any() + } + ) :: map() + def render_base_info_for_batch(batch) do + %{ + "number" => batch.number, + "transactions_count" => batch.transactions_count, + "blocks_count" => batch.end_block - batch.start_block + 1 + } + |> add_l1_tx_info(batch) end @doc """ @@ -228,8 +278,7 @@ defmodule BlockScoutWeb.API.V2.ArbitrumView do # Augments an output JSON with commit transaction details and its status. @spec add_l1_tx_info(map(), %{ - :__struct__ => L1Batch, - :commitment_transaction => any(), + :commitment_transaction => LifecycleTransaction.t() | LifecycleTransaction.to_import(), optional(any()) => any() }) :: map() defp add_l1_tx_info(out_json, %L1Batch{} = batch) do @@ -246,6 +295,25 @@ defmodule BlockScoutWeb.API.V2.ArbitrumView do }) end + defp add_l1_tx_info(out_json, %{ + commitment_transaction: %{ + hash: hash, + block_number: block_number, + timestamp: ts, + status: status + } + }) do + out_json + |> Map.merge(%{ + "commitment_transaction" => %{ + "hash" => %Hash{byte_count: 32, bytes: hash}, + "block_number" => block_number, + "timestamp" => ts, + "status" => status + } + }) + end + # Augments an output JSON with commit and confirm transaction details and their statuses. @spec add_l1_txs_info_and_status(map(), %{ :commitment_transaction => any(), diff --git a/apps/explorer/lib/explorer/chain/events/publisher.ex b/apps/explorer/lib/explorer/chain/events/publisher.ex index 55bdd4a217..adea72f104 100644 --- a/apps/explorer/lib/explorer/chain/events/publisher.ex +++ b/apps/explorer/lib/explorer/chain/events/publisher.ex @@ -3,7 +3,22 @@ defmodule Explorer.Chain.Events.Publisher do Publishes events related to the Chain context. """ - @allowed_events ~w(addresses address_coin_balances address_token_balances address_current_token_balances blocks block_rewards internal_transactions last_block_number optimism_deposits token_transfers transactions contract_verification_result token_total_supply changed_bytecode fetched_bytecode fetched_token_instance_metadata smart_contract_was_verified zkevm_confirmed_batches eth_bytecode_db_lookup_started smart_contract_was_not_verified)a + @common_allowed_events ~w(addresses address_coin_balances address_token_balances + address_current_token_balances blocks block_rewards internal_transactions + last_block_number optimism_deposits token_transfers transactions contract_verification_result + token_total_supply changed_bytecode fetched_bytecode fetched_token_instance_metadata + smart_contract_was_verified zkevm_confirmed_batches eth_bytecode_db_lookup_started + smart_contract_was_not_verified)a + + case Application.compile_env(:explorer, :chain_type) do + :arbitrum -> + @chain_type_specific_allowed_events ~w(new_arbitrum_batches new_messages_to_arbitrum_amount)a + + _ -> + @chain_type_specific_allowed_events ~w()a + end + + @allowed_events @common_allowed_events ++ @chain_type_specific_allowed_events def broadcast(_data, false), do: :ok diff --git a/apps/explorer/lib/explorer/chain/events/subscriber.ex b/apps/explorer/lib/explorer/chain/events/subscriber.ex index f599822341..3e76f65796 100644 --- a/apps/explorer/lib/explorer/chain/events/subscriber.ex +++ b/apps/explorer/lib/explorer/chain/events/subscriber.ex @@ -3,7 +3,22 @@ defmodule Explorer.Chain.Events.Subscriber do Subscribes to events related to the Chain context. """ - @allowed_broadcast_events ~w(addresses address_coin_balances address_token_balances address_current_token_balances blocks block_rewards internal_transactions last_block_number optimism_deposits token_transfers transactions contract_verification_result token_total_supply changed_bytecode fetched_bytecode fetched_token_instance_metadata smart_contract_was_verified zkevm_confirmed_batches eth_bytecode_db_lookup_started smart_contract_was_not_verified)a + @common_allowed_broadcast_events ~w(addresses address_coin_balances address_token_balances + address_current_token_balances blocks block_rewards internal_transactions + last_block_number optimism_deposits token_transfers transactions contract_verification_result + token_total_supply changed_bytecode fetched_bytecode fetched_token_instance_metadata + smart_contract_was_verified zkevm_confirmed_batches eth_bytecode_db_lookup_started + smart_contract_was_not_verified)a + + case Application.compile_env(:explorer, :chain_type) do + :arbitrum -> + @chain_type_specific_allowed_broadcast_events ~w(new_arbitrum_batches new_messages_to_arbitrum_amount)a + + _ -> + @chain_type_specific_allowed_broadcast_events ~w()a + end + + @allowed_broadcast_events @common_allowed_broadcast_events ++ @chain_type_specific_allowed_broadcast_events @allowed_broadcast_types ~w(catchup realtime on_demand contract_verification_result)a diff --git a/apps/indexer/lib/indexer/fetcher/arbitrum/workers/new_batches.ex b/apps/indexer/lib/indexer/fetcher/arbitrum/workers/new_batches.ex index c0672139ec..cd44cb1d63 100644 --- a/apps/indexer/lib/indexer/fetcher/arbitrum/workers/new_batches.ex +++ b/apps/indexer/lib/indexer/fetcher/arbitrum/workers/new_batches.ex @@ -34,6 +34,7 @@ defmodule Indexer.Fetcher.Arbitrum.Workers.NewBatches do alias Explorer.Chain alias Explorer.Chain.Arbitrum + alias Explorer.Chain.Events.Publisher require Logger @@ -296,7 +297,8 @@ defmodule Indexer.Fetcher.Arbitrum.Workers.NewBatches do # constructs comprehensive data structures for batches, lifecycle transactions, rollup # blocks, and rollup transactions. Additionally, it identifies any L2-to-L1 messages that # have been committed within these batches and updates their status. All discovered and - # processed data are then imported into the database. + # processed data are then imported into the database. If new batches were found, they are + # announced to be broadcasted through a websocket. # # ## Parameters # - `sequencer_inbox_address`: The SequencerInbox contract address used to filter logs. @@ -326,10 +328,14 @@ defmodule Indexer.Fetcher.Arbitrum.Workers.NewBatches do l1_rpc_config.json_rpc_named_arguments ) + new_batches_discovery? = end_block >= start_block + logs = - if end_block >= start_block do + if new_batches_discovery? do + # called by `discover` raw_logs else + # called by `discover_historical` Enum.reverse(raw_logs) end @@ -355,6 +361,13 @@ defmodule Indexer.Fetcher.Arbitrum.Workers.NewBatches do arbitrum_messages: %{params: committed_txs}, timeout: :infinity }) + + if not Enum.empty?(batches) and new_batches_discovery? do + Publisher.broadcast( + [{:new_arbitrum_batches, extend_batches_with_commitment_transactions(batches, lifecycle_txs)}], + :realtime + ) + end end) end @@ -1094,4 +1107,22 @@ defmodule Indexer.Fetcher.Arbitrum.Workers.NewBatches do Map.put(tx, :status, :sent) end) end + + # Extends the provided list of batches with their corresponding commitment transactions. + @spec extend_batches_with_commitment_transactions( + [%{:commitment_id => non_neg_integer(), optional(any()) => any()}], + [%{:id => non_neg_integer(), optional(any()) => any()}] + ) :: [ + %{ + :commitment_id => non_neg_integer(), + :commitment_transaction => %{:id => non_neg_integer(), optional(any()) => any()}, + optional(any()) => any() + } + ] + defp extend_batches_with_commitment_transactions(batches, lifecycle_txs) do + Enum.map(batches, fn batch -> + lifecycle_tx = Enum.find(lifecycle_txs, fn tx -> tx.id == batch.commitment_id end) + Map.put(batch, :commitment_transaction, lifecycle_tx) + end) + end end diff --git a/apps/indexer/lib/indexer/fetcher/arbitrum/workers/new_messages_to_l2.ex b/apps/indexer/lib/indexer/fetcher/arbitrum/workers/new_messages_to_l2.ex index d0f1556798..ab030735be 100644 --- a/apps/indexer/lib/indexer/fetcher/arbitrum/workers/new_messages_to_l2.ex +++ b/apps/indexer/lib/indexer/fetcher/arbitrum/workers/new_messages_to_l2.ex @@ -25,6 +25,7 @@ defmodule Indexer.Fetcher.Arbitrum.Workers.NewMessagesToL2 do alias Explorer.Chain alias Explorer.Chain.Arbitrum + alias Explorer.Chain.Events.Publisher require Logger @@ -47,7 +48,9 @@ defmodule Indexer.Fetcher.Arbitrum.Workers.NewMessagesToL2 do This function calculates the block range for discovering new messages from L1 to L2 based on the latest block number available on the network. It then fetches logs related to L1-to-L2 events within this range, extracts message details from both - the log and the corresponding L1 transaction, and imports them into the database. + the log and the corresponding L1 transaction, and imports them into the database. If + new messages were discovered, their amount is announced to be broadcasted through + a websocket. ## Parameters - A map containing: @@ -101,13 +104,18 @@ defmodule Indexer.Fetcher.Arbitrum.Workers.NewMessagesToL2 do if start_block <= end_block do log_info("Block range for discovery new messages from L1: #{start_block}..#{end_block}") - discover( - bridge_address, - start_block, - end_block, - json_rpc_named_arguments, - chunk_size - ) + new_messages_amount = + discover( + bridge_address, + start_block, + end_block, + json_rpc_named_arguments, + chunk_size + ) + + if new_messages_amount > 0 do + Publisher.broadcast(%{new_messages_to_arbitrum_amount: new_messages_amount}, :realtime) + end {:ok, end_block} else @@ -201,7 +209,7 @@ defmodule Indexer.Fetcher.Arbitrum.Workers.NewMessagesToL2 do # - `chunk_size`: The size of chunks for processing RPC calls in batches. # # ## Returns - # - N/A + # - amount of discovered messages defp discover(bridge_address, start_block, end_block, json_rpc_named_argument, chunk_size) do logs = get_logs_for_l1_to_l2_messages( @@ -222,6 +230,8 @@ defmodule Indexer.Fetcher.Arbitrum.Workers.NewMessagesToL2 do arbitrum_messages: %{params: messages}, timeout: :infinity }) + + length(messages) end # Retrieves logs representing the `MessageDelivered` events.