feat: broadcast updates about new Arbitrum batches and L1-L2 messages through WebSocket (#10272)

* publish new batches through websockets

* publish new L1-L2 messages amount through websockets

* credo issues

* Consitent handling of transaction hash

* new types re-used
pull/10381/head
Alexander Kolotov 5 months ago committed by GitHub
parent d6080e04a1
commit b2345b159f
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
  1. 14
      apps/block_scout_web/lib/block_scout_web/channels/arbitrum_channel.ex
  2. 5
      apps/block_scout_web/lib/block_scout_web/channels/user_socket_v2.ex
  3. 18
      apps/block_scout_web/lib/block_scout_web/notifier.ex
  4. 31
      apps/block_scout_web/lib/block_scout_web/notifiers/arbitrum.ex
  5. 16
      apps/block_scout_web/lib/block_scout_web/realtime_event_handler.ex
  6. 92
      apps/block_scout_web/lib/block_scout_web/views/api/v2/arbitrum_view.ex
  7. 17
      apps/explorer/lib/explorer/chain/events/publisher.ex
  8. 17
      apps/explorer/lib/explorer/chain/events/subscriber.ex
  9. 35
      apps/indexer/lib/indexer/fetcher/arbitrum/workers/new_batches.ex
  10. 28
      apps/indexer/lib/indexer/fetcher/arbitrum/workers/new_messages_to_l2.ex

@ -0,0 +1,14 @@
defmodule BlockScoutWeb.ArbitrumChannel do
@moduledoc """
Establishes pub/sub channel for live updates of Arbitrum related events.
"""
use BlockScoutWeb, :channel
def join("arbitrum:new_batch", _params, socket) do
{:ok, %{}, socket}
end
def join("arbitrum:new_messages_to_rollup_amount", _params, socket) do
{:ok, %{}, socket}
end
end

@ -14,6 +14,11 @@ defmodule BlockScoutWeb.UserSocketV2 do
channel("token_instances:*", BlockScoutWeb.TokenInstanceChannel)
channel("zkevm_batches:*", BlockScoutWeb.PolygonZkevmConfirmedBatchChannel)
case Application.compile_env(:explorer, :chain_type) do
:arbitrum -> channel("arbitrum:*", BlockScoutWeb.ArbitrumChannel)
_ -> nil
end
def connect(_params, socket) do
{:ok, socket}
end

@ -29,6 +29,14 @@ defmodule BlockScoutWeb.Notifier do
@check_broadcast_sequence_period 500
case Application.compile_env(:explorer, :chain_type) do
:arbitrum ->
@chain_type_specific_events ~w(new_arbitrum_batches new_messages_to_arbitrum_amount)a
_ ->
nil
end
def handle_event({:chain_event, :addresses, type, addresses}) when type in [:realtime, :on_demand] do
Endpoint.broadcast("addresses:new_address", "count", %{count: Counters.address_estimated_count()})
@ -280,6 +288,16 @@ defmodule BlockScoutWeb.Notifier do
})
end
case Application.compile_env(:explorer, :chain_type) do
:arbitrum ->
def handle_event({:chain_event, topic, _, _} = event) when topic in @chain_type_specific_events,
# credo:disable-for-next-line Credo.Check.Design.AliasUsage
do: BlockScoutWeb.Notifiers.Arbitrum.handle_event(event)
_ ->
nil
end
def handle_event(event) do
Logger.warning("Unknown broadcasted event #{inspect(event)}.")
nil

@ -0,0 +1,31 @@
defmodule BlockScoutWeb.Notifiers.Arbitrum do
@moduledoc """
Module to handle and broadcast Arbitrum related events.
"""
alias BlockScoutWeb.API.V2.ArbitrumView
alias BlockScoutWeb.Endpoint
require Logger
def handle_event({:chain_event, :new_arbitrum_batches, :realtime, batches}) do
batches
|> Enum.sort_by(& &1.number, :asc)
|> Enum.each(fn batch ->
Endpoint.broadcast("arbitrum:new_batch", "new_arbitrum_batch", %{
batch: ArbitrumView.render_base_info_for_batch(batch)
})
end)
end
def handle_event({:chain_event, :new_messages_to_arbitrum_amount, :realtime, new_messages_amount}) do
Endpoint.broadcast("arbitrum:new_messages_to_rollup_amount", "new_messages_to_rollup_amount", %{
new_messages_to_rollup_amount: new_messages_amount
})
end
def handle_event(event) do
Logger.warning("Unknown broadcasted event #{inspect(event)}.")
nil
end
end

@ -12,6 +12,19 @@ defmodule BlockScoutWeb.RealtimeEventHandler do
GenServer.start_link(__MODULE__, [], name: __MODULE__)
end
case Application.compile_env(:explorer, :chain_type) do
:arbitrum ->
def chain_type_specific_subscriptions do
Subscriber.to(:new_arbitrum_batches, :realtime)
Subscriber.to(:new_messages_to_arbitrum_amount, :realtime)
end
_ ->
def chain_type_specific_subscriptions do
nil
end
end
@impl true
def init([]) do
Subscriber.to(:address_coin_balances, :realtime)
@ -34,6 +47,9 @@ defmodule BlockScoutWeb.RealtimeEventHandler do
# Does not come from the indexer
Subscriber.to(:exchange_rate)
Subscriber.to(:transaction_stats)
chain_type_specific_subscriptions()
{:ok, []}
end

@ -114,21 +114,71 @@ defmodule BlockScoutWeb.API.V2.ArbitrumView do
# transaction that committed the batch to L1.
#
# ## Parameters
# - `batches`: A list of `Explorer.Chain.Arbitrum.L1Batch` entries.
# - `batches`: A list of `Explorer.Chain.Arbitrum.L1Batch` entries or a list of maps
# with the corresponding fields.
#
# ## Returns
# - A list of maps with detailed information about each batch, formatted for use
# in JSON HTTP responses.
@spec render_arbitrum_batches([L1Batch]) :: [map()]
@spec render_arbitrum_batches(
[L1Batch.t()]
| [
%{
:number => non_neg_integer(),
:transactions_count => non_neg_integer(),
:start_block => non_neg_integer(),
:end_block => non_neg_integer(),
:commitment_transaction => %{
:hash => binary(),
:block_number => non_neg_integer(),
:timestamp => DateTime.t(),
:status => :finalized | :unfinalized,
optional(any()) => any()
},
optional(any()) => any()
}
]
) :: [map()]
defp render_arbitrum_batches(batches) do
Enum.map(batches, fn batch ->
%{
"number" => batch.number,
"transactions_count" => batch.transactions_count,
"blocks_count" => batch.end_block - batch.start_block + 1
}
|> add_l1_tx_info(batch)
end)
Enum.map(batches, &render_base_info_for_batch/1)
end
# Transforms a L1 batch into a map format for HTTP response.
#
# This function processes an Arbitrum L1 batch and converts it into a map that
# includes basic batch information and details of the associated transaction
# that committed the batch to L1.
#
# ## Parameters
# - `batch`: Either an `Explorer.Chain.Arbitrum.L1Batch` entry or a map with
# the corresponding fields.
#
# ## Returns
# - A map with detailed information about the batch, formatted for use in JSON HTTP responses.
@spec render_base_info_for_batch(
L1Batch.t()
| %{
:number => non_neg_integer(),
:transactions_count => non_neg_integer(),
:start_block => non_neg_integer(),
:end_block => non_neg_integer(),
:commitment_transaction => %{
:hash => binary(),
:block_number => non_neg_integer(),
:timestamp => DateTime.t(),
:status => :finalized | :unfinalized,
optional(any()) => any()
},
optional(any()) => any()
}
) :: map()
def render_base_info_for_batch(batch) do
%{
"number" => batch.number,
"transactions_count" => batch.transactions_count,
"blocks_count" => batch.end_block - batch.start_block + 1
}
|> add_l1_tx_info(batch)
end
@doc """
@ -228,8 +278,7 @@ defmodule BlockScoutWeb.API.V2.ArbitrumView do
# Augments an output JSON with commit transaction details and its status.
@spec add_l1_tx_info(map(), %{
:__struct__ => L1Batch,
:commitment_transaction => any(),
:commitment_transaction => LifecycleTransaction.t() | LifecycleTransaction.to_import(),
optional(any()) => any()
}) :: map()
defp add_l1_tx_info(out_json, %L1Batch{} = batch) do
@ -246,6 +295,25 @@ defmodule BlockScoutWeb.API.V2.ArbitrumView do
})
end
defp add_l1_tx_info(out_json, %{
commitment_transaction: %{
hash: hash,
block_number: block_number,
timestamp: ts,
status: status
}
}) do
out_json
|> Map.merge(%{
"commitment_transaction" => %{
"hash" => %Hash{byte_count: 32, bytes: hash},
"block_number" => block_number,
"timestamp" => ts,
"status" => status
}
})
end
# Augments an output JSON with commit and confirm transaction details and their statuses.
@spec add_l1_txs_info_and_status(map(), %{
:commitment_transaction => any(),

@ -3,7 +3,22 @@ defmodule Explorer.Chain.Events.Publisher do
Publishes events related to the Chain context.
"""
@allowed_events ~w(addresses address_coin_balances address_token_balances address_current_token_balances blocks block_rewards internal_transactions last_block_number optimism_deposits token_transfers transactions contract_verification_result token_total_supply changed_bytecode fetched_bytecode fetched_token_instance_metadata smart_contract_was_verified zkevm_confirmed_batches eth_bytecode_db_lookup_started smart_contract_was_not_verified)a
@common_allowed_events ~w(addresses address_coin_balances address_token_balances
address_current_token_balances blocks block_rewards internal_transactions
last_block_number optimism_deposits token_transfers transactions contract_verification_result
token_total_supply changed_bytecode fetched_bytecode fetched_token_instance_metadata
smart_contract_was_verified zkevm_confirmed_batches eth_bytecode_db_lookup_started
smart_contract_was_not_verified)a
case Application.compile_env(:explorer, :chain_type) do
:arbitrum ->
@chain_type_specific_allowed_events ~w(new_arbitrum_batches new_messages_to_arbitrum_amount)a
_ ->
@chain_type_specific_allowed_events ~w()a
end
@allowed_events @common_allowed_events ++ @chain_type_specific_allowed_events
def broadcast(_data, false), do: :ok

@ -3,7 +3,22 @@ defmodule Explorer.Chain.Events.Subscriber do
Subscribes to events related to the Chain context.
"""
@allowed_broadcast_events ~w(addresses address_coin_balances address_token_balances address_current_token_balances blocks block_rewards internal_transactions last_block_number optimism_deposits token_transfers transactions contract_verification_result token_total_supply changed_bytecode fetched_bytecode fetched_token_instance_metadata smart_contract_was_verified zkevm_confirmed_batches eth_bytecode_db_lookup_started smart_contract_was_not_verified)a
@common_allowed_broadcast_events ~w(addresses address_coin_balances address_token_balances
address_current_token_balances blocks block_rewards internal_transactions
last_block_number optimism_deposits token_transfers transactions contract_verification_result
token_total_supply changed_bytecode fetched_bytecode fetched_token_instance_metadata
smart_contract_was_verified zkevm_confirmed_batches eth_bytecode_db_lookup_started
smart_contract_was_not_verified)a
case Application.compile_env(:explorer, :chain_type) do
:arbitrum ->
@chain_type_specific_allowed_broadcast_events ~w(new_arbitrum_batches new_messages_to_arbitrum_amount)a
_ ->
@chain_type_specific_allowed_broadcast_events ~w()a
end
@allowed_broadcast_events @common_allowed_broadcast_events ++ @chain_type_specific_allowed_broadcast_events
@allowed_broadcast_types ~w(catchup realtime on_demand contract_verification_result)a

@ -34,6 +34,7 @@ defmodule Indexer.Fetcher.Arbitrum.Workers.NewBatches do
alias Explorer.Chain
alias Explorer.Chain.Arbitrum
alias Explorer.Chain.Events.Publisher
require Logger
@ -296,7 +297,8 @@ defmodule Indexer.Fetcher.Arbitrum.Workers.NewBatches do
# constructs comprehensive data structures for batches, lifecycle transactions, rollup
# blocks, and rollup transactions. Additionally, it identifies any L2-to-L1 messages that
# have been committed within these batches and updates their status. All discovered and
# processed data are then imported into the database.
# processed data are then imported into the database. If new batches were found, they are
# announced to be broadcasted through a websocket.
#
# ## Parameters
# - `sequencer_inbox_address`: The SequencerInbox contract address used to filter logs.
@ -326,10 +328,14 @@ defmodule Indexer.Fetcher.Arbitrum.Workers.NewBatches do
l1_rpc_config.json_rpc_named_arguments
)
new_batches_discovery? = end_block >= start_block
logs =
if end_block >= start_block do
if new_batches_discovery? do
# called by `discover`
raw_logs
else
# called by `discover_historical`
Enum.reverse(raw_logs)
end
@ -355,6 +361,13 @@ defmodule Indexer.Fetcher.Arbitrum.Workers.NewBatches do
arbitrum_messages: %{params: committed_txs},
timeout: :infinity
})
if not Enum.empty?(batches) and new_batches_discovery? do
Publisher.broadcast(
[{:new_arbitrum_batches, extend_batches_with_commitment_transactions(batches, lifecycle_txs)}],
:realtime
)
end
end)
end
@ -1094,4 +1107,22 @@ defmodule Indexer.Fetcher.Arbitrum.Workers.NewBatches do
Map.put(tx, :status, :sent)
end)
end
# Extends the provided list of batches with their corresponding commitment transactions.
@spec extend_batches_with_commitment_transactions(
[%{:commitment_id => non_neg_integer(), optional(any()) => any()}],
[%{:id => non_neg_integer(), optional(any()) => any()}]
) :: [
%{
:commitment_id => non_neg_integer(),
:commitment_transaction => %{:id => non_neg_integer(), optional(any()) => any()},
optional(any()) => any()
}
]
defp extend_batches_with_commitment_transactions(batches, lifecycle_txs) do
Enum.map(batches, fn batch ->
lifecycle_tx = Enum.find(lifecycle_txs, fn tx -> tx.id == batch.commitment_id end)
Map.put(batch, :commitment_transaction, lifecycle_tx)
end)
end
end

@ -25,6 +25,7 @@ defmodule Indexer.Fetcher.Arbitrum.Workers.NewMessagesToL2 do
alias Explorer.Chain
alias Explorer.Chain.Arbitrum
alias Explorer.Chain.Events.Publisher
require Logger
@ -47,7 +48,9 @@ defmodule Indexer.Fetcher.Arbitrum.Workers.NewMessagesToL2 do
This function calculates the block range for discovering new messages from L1 to L2
based on the latest block number available on the network. It then fetches logs
related to L1-to-L2 events within this range, extracts message details from both
the log and the corresponding L1 transaction, and imports them into the database.
the log and the corresponding L1 transaction, and imports them into the database. If
new messages were discovered, their amount is announced to be broadcasted through
a websocket.
## Parameters
- A map containing:
@ -101,13 +104,18 @@ defmodule Indexer.Fetcher.Arbitrum.Workers.NewMessagesToL2 do
if start_block <= end_block do
log_info("Block range for discovery new messages from L1: #{start_block}..#{end_block}")
discover(
bridge_address,
start_block,
end_block,
json_rpc_named_arguments,
chunk_size
)
new_messages_amount =
discover(
bridge_address,
start_block,
end_block,
json_rpc_named_arguments,
chunk_size
)
if new_messages_amount > 0 do
Publisher.broadcast(%{new_messages_to_arbitrum_amount: new_messages_amount}, :realtime)
end
{:ok, end_block}
else
@ -201,7 +209,7 @@ defmodule Indexer.Fetcher.Arbitrum.Workers.NewMessagesToL2 do
# - `chunk_size`: The size of chunks for processing RPC calls in batches.
#
# ## Returns
# - N/A
# - amount of discovered messages
defp discover(bridge_address, start_block, end_block, json_rpc_named_argument, chunk_size) do
logs =
get_logs_for_l1_to_l2_messages(
@ -222,6 +230,8 @@ defmodule Indexer.Fetcher.Arbitrum.Workers.NewMessagesToL2 do
arbitrum_messages: %{params: messages},
timeout: :infinity
})
length(messages)
end
# Retrieves logs representing the `MessageDelivered` events.

Loading…
Cancel
Save