feat: AnyTrust and Celestia support as DA for Arbitrum batches (#10144)

* Initial version of x-level messages indexer

* fixes for cspell and credo

* new state of x-level messages

* Monitoring of new L1-to-L2 messages on L1

* new batches discovery

* fetcher workers in separate modules

* proper name

* Fix for responses without "id", e.g. "Too Many Requests"

* update DB with new batches and corresponding data

* update DB with confirmed blocks

* fixes for cspell and credo

* tracking commitments confirmations for L1 to L2 messages

* Proper usign of max function

* tracking completion of L2 to L1 messages

* catchup historical messages to L2

* incorrect version of committed file

* catchup historical messages from L2 and completion of L1-to-L2 messages

* historical batches catchup

* status for historical l2-to-l1 messages

* address matching issue

* catchup historical executions of L2-to-L1 messages

* db query to find unconfirmed blocks gaps

* first changes to catchup historical confirmations

* finalized catchup of historical confirmations

* 4844 blobs support

* fix for the issue with multiple confirmations

* limit amount of batches to handle at once

* Use latest L1 block by fetchers if start block is not configured

* merge issue fix

* missed file

* historical messages discovery

* reduce logs severity

* first iteration to improve documentation for new functionality

* second iteration to improve documentation for new functionality

* third iteration to improve documentation for new functionality

* fourth iteration to improve documentation for new functionality

* fifth iteration to improve documentation for new functionality

* final iteration to improve documentation for new functionality

* Arbitrum related info in Transaction and Block views

* Views to get info about batches and messages

* usage of committed for batches instead of confirmed

* merge issues addressed

* merge issues addressed

* code review issues addressed

* code review issues addressed

* fix merge issue

* raising exception in the case of DB inconsistency

* fix formatting issue

* termination case for RollupMessagesCatchup

* code review comments addressed

* code review comments addressed

* consistency in primary keys

* dialyzer fix

* code review comments addressed

* missed doc comment

* code review comments addressed

* changes after merge

* formatting issue fix

* block and transaction views extended

* updated indices creation as per code review comments

* code review comment addressed

* fix merge issue

* configuration of intervals as time variables

* TODO added to reflect improvement ability

* database fields refactoring

* association renaming

* associations and fields in api response renamed

* format issue addressed

* feat: APIv2 endpoints for Arbitrum messages and batches (#9963)

* Arbitrum related info in Transaction and Block views

* Views to get info about batches and messages

* usage of committed for batches instead of confirmed

* merge issues addressed

* changes after merge

* formatting issue fix

* code review comment addressed

* associations and fields in api response renamed

* format issue addressed

* feat: Arbitrum-specific fields in the block and transaction API endpoints (#10067)

* Arbitrum related info in Transaction and Block views

* Views to get info about batches and messages

* usage of committed for batches instead of confirmed

* merge issues addressed

* changes after merge

* formatting issue fix

* block and transaction views extended

* code review comment addressed

* associations and fields in api response renamed

* format issue addressed

* fix credo issue

* fix tests issues

* ethereumjsonrpc test fail investigation

* test issues fixes

* initial version to get DA infromation from batch transactions

* merge issues fix

* keep discovered da information in db

* show the batch data source in API response

* formatting, spelling and credo issues

* Documentation and specs improved

* covered a case with empty extra data

* API endpoints updated

* changed order of params for celestia

* more robust string hash identification

* duplcitated alias removed

* missed field in the type documentation

* mapset used instead of map

* comments for unfolding results of getKeysetCreationBlock call

* common function to get data key for Celestia blobs
pull/10390/head
Alexander Kolotov 5 months ago committed by GitHub
parent a5a7ebbba2
commit 3c268d2196
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
  1. 40
      apps/block_scout_web/lib/block_scout_web/controllers/api/v2/arbitrum_controller.ex
  2. 2
      apps/block_scout_web/lib/block_scout_web/routers/api_router.ex
  3. 141
      apps/block_scout_web/lib/block_scout_web/views/api/v2/arbitrum_view.ex
  4. 3
      apps/explorer/lib/explorer/chain/arbitrum/batch_block.ex
  5. 105
      apps/explorer/lib/explorer/chain/arbitrum/da_multi_purpose_record.ex
  6. 16
      apps/explorer/lib/explorer/chain/arbitrum/l1_batch.ex
  7. 117
      apps/explorer/lib/explorer/chain/arbitrum/reader.ex
  8. 106
      apps/explorer/lib/explorer/chain/import/runner/arbitrum/da_multi_purpose_records.ex
  9. 6
      apps/explorer/lib/explorer/chain/import/runner/arbitrum/l1_batches.ex
  10. 3
      apps/explorer/lib/explorer/chain/import/stage/block_referencing.ex
  11. 25
      apps/explorer/priv/arbitrum/migrations/20240527212653_add_da_info.exs
  12. 414
      apps/indexer/lib/indexer/fetcher/arbitrum/da/anytrust.ex
  13. 113
      apps/indexer/lib/indexer/fetcher/arbitrum/da/celestia.ex
  14. 143
      apps/indexer/lib/indexer/fetcher/arbitrum/da/common.ex
  15. 67
      apps/indexer/lib/indexer/fetcher/arbitrum/utils/db.ex
  16. 14
      apps/indexer/lib/indexer/fetcher/arbitrum/utils/helper.ex
  17. 55
      apps/indexer/lib/indexer/fetcher/arbitrum/utils/rpc.ex
  18. 228
      apps/indexer/lib/indexer/fetcher/arbitrum/workers/new_batches.ex
  19. 24
      apps/indexer/lib/indexer/fetcher/zksync/utils/rpc.ex
  20. 32
      apps/indexer/lib/indexer/helper.ex
  21. 3
      cspell.json

@ -5,15 +5,18 @@ defmodule BlockScoutWeb.API.V2.ArbitrumController do
only: [
next_page_params: 4,
paging_options: 1,
split_list_by_page: 1
split_list_by_page: 1,
parse_block_hash_or_number_param: 1
]
import Explorer.Chain.Arbitrum.DaMultiPurposeRecord.Helper, only: [calculate_celestia_data_key: 2]
alias Explorer.PagingOptions
alias Explorer.Chain.Arbitrum.{L1Batch, Message, Reader}
action_fallback(BlockScoutWeb.API.V2.FallbackController)
@batch_necessity_by_association %{:commitment_transaction => :optional}
@batch_necessity_by_association %{:commitment_transaction => :required}
@doc """
Function to handle GET requests to `/api/v2/arbitrum/messages/:direction` endpoint.
@ -76,6 +79,39 @@ defmodule BlockScoutWeb.API.V2.ArbitrumController do
end
end
@doc """
Function to handle GET requests to `/api/v2/arbitrum/batches/da/:data_hash` or
`/api/v2/arbitrum/batches/da/:tx_commitment/:height` endpoints.
"""
@spec batch_by_data_availability_info(Plug.Conn.t(), map()) :: Plug.Conn.t()
def batch_by_data_availability_info(conn, %{"data_hash" => data_hash} = _params) do
# In case of AnyTrust, `data_key` is the hash of the data itself
case Reader.get_da_record_by_data_key(data_hash, api?: true) do
{:ok, {batch_number, _}} ->
batch(conn, %{"batch_number" => batch_number})
{:error, :not_found} = res ->
res
end
end
def batch_by_data_availability_info(conn, %{"tx_commitment" => tx_commitment, "height" => height} = _params) do
# In case of Celestia, `data_key` is the hash of the height and the commitment hash
with {:ok, :hash, tx_commitment_hash} <- parse_block_hash_or_number_param(tx_commitment),
key <- calculate_celestia_data_key(height, tx_commitment_hash) do
case Reader.get_da_record_by_data_key(key, api?: true) do
{:ok, {batch_number, _}} ->
batch(conn, %{"batch_number" => batch_number})
{:error, :not_found} = res ->
res
end
else
res ->
res
end
end
@doc """
Function to handle GET requests to `/api/v2/arbitrum/batches/count` endpoint.
"""

@ -332,6 +332,8 @@ defmodule BlockScoutWeb.Routers.ApiRouter do
get("/batches", V2.ArbitrumController, :batches)
get("/batches/count", V2.ArbitrumController, :batches_count)
get("/batches/:batch_number", V2.ArbitrumController, :batch)
get("/batches/da/anytrust/:data_hash", V2.ArbitrumController, :batch_by_data_availability_info)
get("/batches/da/celestia/:height/:tx_commitment", V2.ArbitrumController, :batch_by_data_availability_info)
end
end

@ -3,7 +3,7 @@ defmodule BlockScoutWeb.API.V2.ArbitrumView do
alias BlockScoutWeb.API.V2.Helper, as: APIV2Helper
alias Explorer.Chain.{Block, Hash, Transaction, Wei}
alias Explorer.Chain.Arbitrum.{L1Batch, LifecycleTransaction}
alias Explorer.Chain.Arbitrum.{L1Batch, LifecycleTransaction, Reader}
@doc """
Function to render GET requests to `/api/v2/arbitrum/messages/:direction` endpoint.
@ -71,6 +71,7 @@ defmodule BlockScoutWeb.API.V2.ArbitrumView do
"after_acc" => batch.after_acc
}
|> add_l1_tx_info(batch)
|> add_da_info(batch)
end
@doc """
@ -128,13 +129,8 @@ defmodule BlockScoutWeb.API.V2.ArbitrumView do
:transactions_count => non_neg_integer(),
:start_block => non_neg_integer(),
:end_block => non_neg_integer(),
:commitment_transaction => %{
:hash => binary(),
:block_number => non_neg_integer(),
:timestamp => DateTime.t(),
:status => :finalized | :unfinalized,
optional(any()) => any()
},
:batch_container => atom() | nil,
:commitment_transaction => LifecycleTransaction.to_import(),
optional(any()) => any()
}
]
@ -162,13 +158,8 @@ defmodule BlockScoutWeb.API.V2.ArbitrumView do
:transactions_count => non_neg_integer(),
:start_block => non_neg_integer(),
:end_block => non_neg_integer(),
:commitment_transaction => %{
:hash => binary(),
:block_number => non_neg_integer(),
:timestamp => DateTime.t(),
:status => :finalized | :unfinalized,
optional(any()) => any()
},
:batch_container => atom() | nil,
:commitment_transaction => LifecycleTransaction.to_import(),
optional(any()) => any()
}
) :: map()
@ -176,7 +167,8 @@ defmodule BlockScoutWeb.API.V2.ArbitrumView do
%{
"number" => batch.number,
"transactions_count" => batch.transactions_count,
"blocks_count" => batch.end_block - batch.start_block + 1
"blocks_count" => batch.end_block - batch.start_block + 1,
"batch_data_container" => batch.batch_container
}
|> add_l1_tx_info(batch)
end
@ -258,6 +250,7 @@ defmodule BlockScoutWeb.API.V2.ArbitrumView do
commitment_transaction: arbitrum_entity.arbitrum_commitment_transaction,
confirmation_transaction: arbitrum_entity.arbitrum_confirmation_transaction
})
|> Map.put("batch_data_container", get_batch_data_container(arbitrum_entity))
|> Map.put("batch_number", get_batch_number(arbitrum_entity))
end
@ -276,6 +269,21 @@ defmodule BlockScoutWeb.API.V2.ArbitrumView do
end
end
# Retrieves the batch data container label from an Arbitrum block or transaction
# if the batch data is loaded.
@spec get_batch_data_container(%{
:__struct__ => Block | Transaction,
:arbitrum_batch => any(),
optional(any()) => any()
}) :: nil | String.t()
defp get_batch_data_container(arbitrum_entity) do
case Map.get(arbitrum_entity, :arbitrum_batch) do
nil -> nil
%Ecto.Association.NotLoaded{} -> nil
value -> to_string(value.batch_container)
end
end
# Augments an output JSON with commit transaction details and its status.
@spec add_l1_tx_info(map(), %{
:commitment_transaction => LifecycleTransaction.t() | LifecycleTransaction.to_import(),
@ -314,6 +322,107 @@ defmodule BlockScoutWeb.API.V2.ArbitrumView do
})
end
# Adds data availability (DA) information to the given output JSON based on the batch container type.
#
# This function enriches the output JSON with data availability information based on
# the type of batch container. It handles different DA types, including AnyTrust and
# Celestia, and generates the appropriate DA data for inclusion in the output.
#
# ## Parameters
# - `out_json`: The initial JSON map to be enriched with DA information.
# - `batch`: The batch struct containing information about the rollup batch.
#
# ## Returns
# - An updated JSON map containing the data availability information.
@spec add_da_info(map(), %{
:__struct__ => L1Batch,
:batch_container => :in_anytrust | :in_celestia | atom() | nil,
:number => non_neg_integer(),
optional(any()) => any()
}) :: map()
defp add_da_info(out_json, %L1Batch{} = batch) do
da_info =
case batch.batch_container do
nil -> %{"batch_data_container" => nil}
:in_anytrust -> generate_anytrust_certificate(batch.number)
:in_celestia -> generate_celestia_da_info(batch.number)
value -> %{"batch_data_container" => to_string(value)}
end
out_json
|> Map.put("data_availability", da_info)
end
# Generates an AnyTrust certificate for the specified batch number.
@spec generate_anytrust_certificate(non_neg_integer()) :: map()
defp generate_anytrust_certificate(batch_number) do
out = %{"batch_data_container" => "in_anytrust"}
da_info =
with raw_info <- Reader.get_da_info_by_batch_number(batch_number),
false <- Enum.empty?(raw_info) do
prepare_anytrust_certificate(raw_info)
else
_ -> %{"data_hash" => nil, "timeout" => nil, "bls_signature" => nil, "signers" => []}
end
out
|> Map.merge(da_info)
end
# Prepares an AnyTrust certificate from the given DA information.
#
# This function retrieves the corresponding AnyTrust keyset based on the provided
# DA information, constructs a list of signers and the signers' mask, and assembles
# the certificate data.
#
# ## Parameters
# - `da_info`: A map containing the DA information, including the keyset hash, data
# hash, timeout, aggregated BLS signature, and signers' mask.
#
# ## Returns
# - A map representing the AnyTrust certificate, containing the data hash, data
# availability timeout, aggregated BLS signature, and the list of committee
# members who guaranteed availability of data for the specified timeout.
@spec prepare_anytrust_certificate(map()) :: map()
defp prepare_anytrust_certificate(da_info) do
keyset = Reader.get_anytrust_keyset(da_info["keyset_hash"])
signers =
if Enum.empty?(keyset) do
[]
else
signers_mask = da_info["signers_mask"]
# Matches the signers' mask with the keyset to extract the list of signers.
keyset["pubkeys"]
|> Enum.with_index()
|> Enum.filter(fn {_, index} -> Bitwise.band(signers_mask, Bitwise.bsl(1, index)) != 0 end)
|> Enum.map(fn {pubkey, _} -> pubkey end)
end
%{
"data_hash" => da_info["data_hash"],
"timeout" => da_info["timeout"],
"bls_signature" => da_info["bls_signature"],
"signers" => signers
}
end
# Generates Celestia DA information for the given batch number.
@spec generate_celestia_da_info(non_neg_integer()) :: map()
defp generate_celestia_da_info(batch_number) do
out = %{"batch_data_container" => "in_celestia"}
da_info = Reader.get_da_info_by_batch_number(batch_number)
out
|> Map.merge(%{
"height" => Map.get(da_info, "height"),
"tx_commitment" => Map.get(da_info, "tx_commitment")
})
end
# Augments an output JSON with commit and confirm transaction details and their statuses.
@spec add_l1_txs_info_and_status(map(), %{
:commitment_transaction => any(),

@ -32,8 +32,9 @@ defmodule Explorer.Chain.Arbitrum.BatchBlock do
}
@typedoc """
* `batch_number` - The number of the Arbitrum batch.
* `block_number` - The number of the rollup block.
* `batch_number` - The number of the Arbitrum batch.
* `batch` - An instance of `Explorer.Chain.Arbitrum.L1Batch` referenced by `batch_number`.
* `confirmation_id` - The ID of the confirmation L1 transaction from
`Explorer.Chain.Arbitrum.LifecycleTransaction`, or `nil`
if the block is not confirmed yet.

@ -0,0 +1,105 @@
defmodule Explorer.Chain.Arbitrum.DaMultiPurposeRecord do
@moduledoc """
Models a multi purpose record related to Data Availability for Arbitrum.
Changes in the schema should be reflected in the bulk import module:
- Explorer.Chain.Import.Runner.Arbitrum.DAMultiPurposeRecords
Migrations:
- Explorer.Repo.Arbitrum.Migrations.AddDaInfo
"""
use Explorer.Schema
alias Explorer.Chain.Hash
alias Explorer.Chain.Arbitrum.L1Batch
@optional_attrs ~w(batch_number)a
@required_attrs ~w(data_key data_type data)a
@allowed_attrs @optional_attrs ++ @required_attrs
@typedoc """
Descriptor of the a multi purpose record related to Data Availability for Arbitrum rollups:
* `data_key` - The hash of the data key.
* `data_type` - The type of the data.
* `data` - The data
* `batch_number` - The number of the Arbitrum batch associated with the data for the
records where applicable.
"""
@type to_import :: %{
data_key: binary(),
data_type: non_neg_integer(),
data: map(),
batch_number: non_neg_integer() | nil
}
@typedoc """
* `data_key` - The hash of the data key.
* `data_type` - The type of the data.
* `data` - The data to be stored as a json in the database.
* `batch_number` - The number of the Arbitrum batch associated with the data for the
records where applicable.
* `batch` - An instance of `Explorer.Chain.Arbitrum.L1Batch` referenced by `batch_number`.
"""
@primary_key false
typed_schema "arbitrum_da_multi_purpose" do
field(:data_key, Hash.Full)
field(:data_type, :integer)
field(:data, :map)
belongs_to(:batch, L1Batch,
foreign_key: :batch_number,
references: :number,
type: :integer
)
timestamps()
end
@doc """
Validates that the `attrs` are valid.
"""
@spec changeset(Ecto.Schema.t(), map()) :: Ecto.Schema.t()
def changeset(%__MODULE__{} = da_records, attrs \\ %{}) do
da_records
|> cast(attrs, @allowed_attrs)
|> validate_required(@required_attrs)
|> foreign_key_constraint(:batch_number)
|> unique_constraint(:data_key)
end
end
defmodule Explorer.Chain.Arbitrum.DaMultiPurposeRecord.Helper do
@moduledoc """
Helper functions to work with `Explorer.Chain.Arbitrum.DaMultiPurposeRecord` data
"""
alias Explorer.Chain.Hash
@doc """
Calculates the data key for `Explorer.Chain.Arbitrum.DaMultiPurposeRecord` that contains Celestia blob data.
## Parameters
- `height`: The height of the block in the Celestia network.
- `tx_commitment`: The transaction commitment.
## Returns
- A binary representing the calculated data key for the record containing
Celestia blob data.
"""
@spec calculate_celestia_data_key(binary() | non_neg_integer(), binary() | Explorer.Chain.Hash.t()) :: binary()
def calculate_celestia_data_key(height, tx_commitment) when is_binary(height) do
calculate_celestia_data_key(String.to_integer(height), tx_commitment)
end
def calculate_celestia_data_key(height, %Hash{} = tx_commitment) when is_integer(height) do
calculate_celestia_data_key(height, tx_commitment.bytes)
end
def calculate_celestia_data_key(height, tx_commitment) when is_integer(height) and is_binary(tx_commitment) do
:crypto.hash(:sha256, :binary.encode_unsigned(height) <> tx_commitment)
end
end

@ -7,6 +7,7 @@ defmodule Explorer.Chain.Arbitrum.L1Batch do
Migrations:
- Explorer.Repo.Arbitrum.Migrations.CreateArbitrumTables
- Explorer.Repo.Arbitrum.Migrations.AddDaInfo
"""
use Explorer.Schema
@ -15,8 +16,12 @@ defmodule Explorer.Chain.Arbitrum.L1Batch do
alias Explorer.Chain.Arbitrum.LifecycleTransaction
@optional_attrs ~w(batch_container)a
@required_attrs ~w(number transactions_count start_block end_block before_acc after_acc commitment_id)a
@allowed_attrs @optional_attrs ++ @required_attrs
@typedoc """
Descriptor of the a L1 batch for Arbitrum rollups:
* `number` - The number of the Arbitrum batch.
@ -25,7 +30,8 @@ defmodule Explorer.Chain.Arbitrum.L1Batch do
* `end_block` - The number of the last block in the batch.
* `before_acc` - The hash of the state before the batch.
* `after_acc` - The hash of the state after the batch.
* `commitment_id` - The ID of the commitment L1 transaction from Explorer.Chain.LifecycleTransaction.
* `commitment_id` - The ID of the commitment L1 transaction from Explorer.Chain.Arbitrum.LifecycleTransaction.
* `batch_container` - The tag meaning the container of the batch data: `:in_blob4844`, `:in_calldata`, `:in_celestia`, `:in_anytrust`
"""
@type to_import :: %{
number: non_neg_integer(),
@ -34,7 +40,8 @@ defmodule Explorer.Chain.Arbitrum.L1Batch do
end_block: non_neg_integer(),
before_acc: binary(),
after_acc: binary(),
commitment_id: non_neg_integer()
commitment_id: non_neg_integer(),
batch_container: :in_blob4844 | :in_calldata | :in_celestia | :in_anytrust
}
@typedoc """
@ -46,6 +53,7 @@ defmodule Explorer.Chain.Arbitrum.L1Batch do
* `after_acc` - The hash of the state after the batch.
* `commitment_id` - The ID of the commitment L1 transaction from `Explorer.Chain.Arbitrum.LifecycleTransaction`.
* `commitment_transaction` - An instance of `Explorer.Chain.Arbitrum.LifecycleTransaction` referenced by `commitment_id`.
* `batch_container` - The tag meaning the container of the batch data: `:in_blob4844`, `:in_calldata`, `:in_celestia`, `:in_anytrust`
"""
@primary_key {:number, :integer, autogenerate: false}
typed_schema "arbitrum_l1_batches" do
@ -61,6 +69,8 @@ defmodule Explorer.Chain.Arbitrum.L1Batch do
type: :integer
)
field(:batch_container, Ecto.Enum, values: [:in_blob4844, :in_calldata, :in_celestia, :in_anytrust])
timestamps()
end
@ -70,7 +80,7 @@ defmodule Explorer.Chain.Arbitrum.L1Batch do
@spec changeset(Ecto.Schema.t(), map()) :: Ecto.Schema.t()
def changeset(%__MODULE__{} = batches, attrs \\ %{}) do
batches
|> cast(attrs, @required_attrs)
|> cast(attrs, @allowed_attrs)
|> validate_required(@required_attrs)
|> foreign_key_constraint(:commitment_id)
|> unique_constraint(:number)

@ -6,7 +6,15 @@ defmodule Explorer.Chain.Arbitrum.Reader do
import Ecto.Query, only: [from: 2, limit: 2, order_by: 2, subquery: 1, where: 2, where: 3]
import Explorer.Chain, only: [select_repo: 1]
alias Explorer.Chain.Arbitrum.{BatchBlock, BatchTransaction, L1Batch, L1Execution, LifecycleTransaction, Message}
alias Explorer.Chain.Arbitrum.{
BatchBlock,
BatchTransaction,
DaMultiPurposeRecord,
L1Batch,
L1Execution,
LifecycleTransaction,
Message
}
alias Explorer.{Chain, PagingOptions, Repo}
@ -232,7 +240,7 @@ defmodule Explorer.Chain.Arbitrum.Reader do
the input list. The output list may be smaller than the input list if some IDs do not
correspond to any existing transactions.
"""
@spec l1_executions(maybe_improper_list(non_neg_integer(), [])) :: [L1Execution]
@spec l1_executions(maybe_improper_list(non_neg_integer(), [])) :: [L1Execution.t()]
def l1_executions(message_ids) when is_list(message_ids) do
query =
from(
@ -287,7 +295,7 @@ defmodule Explorer.Chain.Arbitrum.Reader do
- A list of `Explorer.Chain.Arbitrum.LifecycleTransaction` representing unfinalized transactions,
or `[]` if no unfinalized transactions are found.
"""
@spec lifecycle_unfinalized_transactions(FullBlock.block_number()) :: [LifecycleTransaction]
@spec lifecycle_unfinalized_transactions(FullBlock.block_number()) :: [LifecycleTransaction.t()]
def lifecycle_unfinalized_transactions(finalized_block)
when is_integer(finalized_block) and finalized_block >= 0 do
query =
@ -361,7 +369,7 @@ defmodule Explorer.Chain.Arbitrum.Reader do
- An instance of `Explorer.Chain.Arbitrum.L1Batch` representing the batch containing
the specified rollup block number, or `nil` if no corresponding batch is found.
"""
@spec get_batch_by_rollup_block_number(FullBlock.block_number()) :: L1Batch | nil
@spec get_batch_by_rollup_block_number(FullBlock.block_number()) :: L1Batch.t() | nil
def get_batch_by_rollup_block_number(number)
when is_integer(number) and number >= 0 do
query =
@ -491,7 +499,7 @@ defmodule Explorer.Chain.Arbitrum.Reader do
unconfirmed block within the range. Returns `[]` if no unconfirmed blocks are found
within the range, or if the block fetcher has not indexed them.
"""
@spec unconfirmed_rollup_blocks(FullBlock.block_number(), FullBlock.block_number()) :: [BatchBlock]
@spec unconfirmed_rollup_blocks(FullBlock.block_number(), FullBlock.block_number()) :: [BatchBlock.t()]
def unconfirmed_rollup_blocks(first_block, last_block)
when is_integer(first_block) and first_block >= 0 and
is_integer(last_block) and first_block <= last_block do
@ -695,7 +703,7 @@ defmodule Explorer.Chain.Arbitrum.Reader do
@spec messages(binary(),
paging_options: PagingOptions.t(),
api?: boolean()
) :: [Message]
) :: [Message.t()]
def messages(direction, options) when direction == "from-rollup" do
do_messages(:from_l2, options)
end
@ -720,7 +728,7 @@ defmodule Explorer.Chain.Arbitrum.Reader do
@spec do_messages(:from_l2 | :to_l2,
paging_options: PagingOptions.t(),
api?: boolean()
) :: [Message]
) :: [Message.t()]
defp do_messages(direction, options) do
base_query =
from(msg in Message,
@ -756,7 +764,7 @@ defmodule Explorer.Chain.Arbitrum.Reader do
@spec relayed_l1_to_l2_messages(
paging_options: PagingOptions.t(),
api?: boolean()
) :: [Message]
) :: [Message.t()]
def relayed_l1_to_l2_messages(options) do
paging_options = Keyword.get(options, :paging_options, Chain.default_paging_options())
@ -802,7 +810,7 @@ defmodule Explorer.Chain.Arbitrum.Reader do
"""
def batch(number, options)
@spec batch(:latest, api?: boolean()) :: {:error, :not_found} | {:ok, L1Batch}
@spec batch(:latest, api?: boolean()) :: {:error, :not_found} | {:ok, L1Batch.t()}
def batch(:latest, options) do
L1Batch
|> order_by(desc: :number)
@ -817,7 +825,7 @@ defmodule Explorer.Chain.Arbitrum.Reader do
@spec batch(binary() | non_neg_integer(),
necessity_by_association: %{atom() => :optional | :required},
api?: boolean()
) :: {:error, :not_found} | {:ok, L1Batch}
) :: {:error, :not_found} | {:ok, L1Batch.t()}
def batch(number, options) do
necessity_by_association = Keyword.get(options, :necessity_by_association, %{})
@ -852,7 +860,7 @@ defmodule Explorer.Chain.Arbitrum.Reader do
committed?: boolean(),
paging_options: PagingOptions.t(),
api?: boolean()
) :: [L1Batch]
) :: [L1Batch.t()]
def batches(options) do
necessity_by_association = Keyword.get(options, :necessity_by_association, %{})
@ -895,7 +903,7 @@ defmodule Explorer.Chain.Arbitrum.Reader do
## Returns
- A list of `Explorer.Chain.Arbitrum.BatchTransaction` entries belonging to the specified batch.
"""
@spec batch_transactions(non_neg_integer() | binary(), api?: boolean()) :: [BatchTransaction]
@spec batch_transactions(non_neg_integer() | binary(), api?: boolean()) :: [BatchTransaction.t()]
def batch_transactions(batch_number, options) do
query = from(tx in BatchTransaction, where: tx.batch_number == ^batch_number)
@ -921,7 +929,7 @@ defmodule Explorer.Chain.Arbitrum.Reader do
necessity_by_association: %{atom() => :optional | :required},
api?: boolean(),
paging_options: PagingOptions.t()
) :: [FullBlock]
) :: [FullBlock.t()]
def batch_blocks(batch_number, options) do
necessity_by_association = Keyword.get(options, :necessity_by_association, %{})
paging_options = Keyword.get(options, :paging_options, Chain.default_paging_options())
@ -949,4 +957,87 @@ defmodule Explorer.Chain.Arbitrum.Reader do
defp page_blocks(query, %PagingOptions{key: {block_number}}) do
where(query, [block], block.number < ^block_number)
end
@doc """
Retrieves an AnyTrust keyset from the database using the provided keyset hash.
## Parameters
- `keyset_hash`: A binary representing the hash of the keyset to be retrieved.
## Returns
- A map containing information about the AnyTrust keyset, otherwise an empty map.
"""
@spec get_anytrust_keyset(binary()) :: map() | nil
def get_anytrust_keyset("0x" <> <<_::binary-size(64)>> = keyset_hash) do
get_anytrust_keyset(keyset_hash |> Chain.string_to_block_hash() |> Kernel.elem(1) |> Map.get(:bytes))
end
def get_anytrust_keyset(keyset_hash) do
query =
from(
da_records in DaMultiPurposeRecord,
where: da_records.data_key == ^keyset_hash and da_records.data_type == 1
)
case Repo.one(query) do
nil -> %{}
keyset -> keyset.data
end
end
@doc """
Retrieves Data Availability (DA) information from the database using the provided
batch number.
## Parameters
- `batch_number`: The batch number to be used for retrieval.
## Returns
- A map containing the DA information if found, otherwise an empty map.
"""
@spec get_da_info_by_batch_number(non_neg_integer()) :: map()
def get_da_info_by_batch_number(batch_number) do
query =
from(
da_records in DaMultiPurposeRecord,
where: da_records.batch_number == ^batch_number and da_records.data_type == 0
)
case Repo.one(query) do
nil -> %{}
keyset -> keyset.data
end
end
@doc """
Retrieves a Data Availability (DA) record from the database using the provided
data key.
## Parameters
- `data_key`: The key of the data to be retrieved.
## Returns
- `{:ok, {batch_number, da_info}}`, where
- `batch_number` is the number of the batch associated with the DA record
- `da_info` is a map containing the DA record.
- `{:error, :not_found}` if no record with the specified `data_key` exists.
"""
@spec get_da_record_by_data_key(binary(), api?: boolean()) :: {:ok, {non_neg_integer(), map()}} | {:error, :not_found}
def get_da_record_by_data_key("0x" <> _ = data_key, options) do
data_key_bytes = data_key |> Chain.string_to_block_hash() |> Kernel.elem(1) |> Map.get(:bytes)
get_da_record_by_data_key(data_key_bytes, options)
end
def get_da_record_by_data_key(data_key, options) do
query =
from(
da_records in DaMultiPurposeRecord,
where: da_records.data_key == ^data_key and da_records.data_type == 0
)
case select_repo(options).one(query) do
nil -> {:error, :not_found}
keyset -> {:ok, {keyset.batch_number, keyset.data}}
end
end
end

@ -0,0 +1,106 @@
defmodule Explorer.Chain.Import.Runner.Arbitrum.DaMultiPurposeRecords do
@moduledoc """
Bulk imports of Explorer.Chain.Arbitrum.DaMultiPurposeRecord.
"""
require Ecto.Query
alias Ecto.{Changeset, Multi, Repo}
alias Explorer.Chain.Arbitrum.DaMultiPurposeRecord
alias Explorer.Chain.Import
alias Explorer.Prometheus.Instrumenter
import Ecto.Query, only: [from: 2]
@behaviour Import.Runner
# milliseconds
@timeout 60_000
@type imported :: [DaMultiPurposeRecord.t()]
@impl Import.Runner
def ecto_schema_module, do: DaMultiPurposeRecord
@impl Import.Runner
def option_key, do: :arbitrum_da_multi_purpose_records
@impl Import.Runner
@spec imported_table_row() :: %{:value_description => binary(), :value_type => binary()}
def imported_table_row do
%{
value_type: "[#{ecto_schema_module()}.t()]",
value_description: "List of `t:#{ecto_schema_module()}.t/0`s"
}
end
@impl Import.Runner
@spec run(Multi.t(), list(), map()) :: Multi.t()
def run(multi, changes_list, %{timestamps: timestamps} = options) do
insert_options =
options
|> Map.get(option_key(), %{})
|> Map.take(~w(on_conflict timeout)a)
|> Map.put_new(:timeout, @timeout)
|> Map.put(:timestamps, timestamps)
Multi.run(multi, :insert_da_multi_purpose_records, fn repo, _ ->
Instrumenter.block_import_stage_runner(
fn -> insert(repo, changes_list, insert_options) end,
:block_referencing,
:arbitrum_da_multi_purpose_records,
:arbitrum_da_multi_purpose_records
)
end)
end
@impl Import.Runner
def timeout, do: @timeout
@spec insert(Repo.t(), [map()], %{required(:timeout) => timeout(), required(:timestamps) => Import.timestamps()}) ::
{:ok, [DaMultiPurposeRecord.t()]}
| {:error, [Changeset.t()]}
def insert(repo, changes_list, %{timeout: timeout, timestamps: timestamps} = options) when is_list(changes_list) do
on_conflict = Map.get_lazy(options, :on_conflict, &default_on_conflict/0)
# Enforce Arbitrum.DaMultiPurposeRecord ShareLocks order (see docs: sharelock.md)
ordered_changes_list = Enum.sort_by(changes_list, & &1.data_key)
{:ok, inserted} =
Import.insert_changes_list(
repo,
ordered_changes_list,
for: DaMultiPurposeRecord,
returning: true,
timeout: timeout,
timestamps: timestamps,
conflict_target: :data_key,
on_conflict: on_conflict
)
{:ok, inserted}
end
defp default_on_conflict do
from(
rec in DaMultiPurposeRecord,
update: [
set: [
# don't update `data_key` as it is a primary key and used for the conflict target
data_type: fragment("EXCLUDED.data_type"),
data: fragment("EXCLUDED.data"),
batch_number: fragment("EXCLUDED.batch_number"),
inserted_at: fragment("LEAST(?, EXCLUDED.inserted_at)", rec.inserted_at),
updated_at: fragment("GREATEST(?, EXCLUDED.updated_at)", rec.updated_at)
]
],
where:
fragment(
"(EXCLUDED.data_type, EXCLUDED.data, EXCLUDED.batch_number) IS DISTINCT FROM (?, ?, ?)",
rec.data_type,
rec.data,
rec.batch_number
)
)
end
end

@ -93,19 +93,21 @@ defmodule Explorer.Chain.Import.Runner.Arbitrum.L1Batches do
before_acc: fragment("EXCLUDED.before_acc"),
after_acc: fragment("EXCLUDED.after_acc"),
commitment_id: fragment("EXCLUDED.commitment_id"),
batch_container: fragment("EXCLUDED.batch_container"),
inserted_at: fragment("LEAST(?, EXCLUDED.inserted_at)", tb.inserted_at),
updated_at: fragment("GREATEST(?, EXCLUDED.updated_at)", tb.updated_at)
]
],
where:
fragment(
"(EXCLUDED.transactions_count, EXCLUDED.start_block, EXCLUDED.end_block, EXCLUDED.before_acc, EXCLUDED.after_acc, EXCLUDED.commitment_id) IS DISTINCT FROM (?, ?, ?, ?, ?, ?)",
"(EXCLUDED.transactions_count, EXCLUDED.start_block, EXCLUDED.end_block, EXCLUDED.before_acc, EXCLUDED.after_acc, EXCLUDED.commitment_id, EXCLUDED.batch_container) IS DISTINCT FROM (?, ?, ?, ?, ?, ?, ?)",
tb.transactions_count,
tb.start_block,
tb.end_block,
tb.before_acc,
tb.after_acc,
tb.commitment_id
tb.commitment_id,
tb.batch_container
)
)
end

@ -65,7 +65,8 @@ defmodule Explorer.Chain.Import.Stage.BlockReferencing do
Runner.Arbitrum.L1Executions,
Runner.Arbitrum.L1Batches,
Runner.Arbitrum.BatchBlocks,
Runner.Arbitrum.BatchTransactions
Runner.Arbitrum.BatchTransactions,
Runner.Arbitrum.DaMultiPurposeRecords
]
@impl Stage

@ -0,0 +1,25 @@
defmodule Explorer.Repo.Arbitrum.Migrations.AddDaInfo do
use Ecto.Migration
def change do
execute(
"CREATE TYPE arbitrum_da_containers_types AS ENUM ('in_blob4844', 'in_calldata', 'in_celestia', 'in_anytrust')",
"DROP TYPE arbitrum_da_containers_types"
)
alter table(:arbitrum_l1_batches) do
add(:batch_container, :arbitrum_da_containers_types)
end
create table(:arbitrum_da_multi_purpose, primary_key: false) do
add(:data_key, :bytea, null: false, primary_key: true)
add(:data_type, :integer, null: false)
add(:data, :map, null: false)
add(:batch_number, :integer)
timestamps(null: false, type: :utc_datetime_usec)
end
create(index(:arbitrum_da_multi_purpose, [:data_type, :data_key]))
create(index(:arbitrum_da_multi_purpose, [:data_type, :batch_number]))
end
end

@ -0,0 +1,414 @@
defmodule Indexer.Fetcher.Arbitrum.DA.Anytrust do
@moduledoc """
Provides functionality for handling AnyTrust data availability information
within the Arbitrum rollup context.
"""
import Indexer.Fetcher.Arbitrum.Utils.Logging, only: [log_error: 1, log_info: 1, log_debug: 1]
import Explorer.Helper, only: [decode_data: 2]
alias Indexer.Fetcher.Arbitrum.Utils.{Db, Rpc}
alias Indexer.Fetcher.Arbitrum.Utils.Helper, as: ArbitrumHelper
alias Indexer.Helper, as: IndexerHelper
alias Explorer.Chain.Arbitrum
@enforce_keys [
:batch_number,
:keyset_hash,
:data_hash,
:timeout,
:signers_mask,
:bls_signature
]
defstruct @enforce_keys
@typedoc """
AnyTrust DA info struct:
* `batch_number` - The batch number in the Arbitrum rollup associated with the
AnyTrust data blob.
* `keyset_hash` - The hash identifying a keyset that defines the rules (threshold
and committee members) to issue the DA certificate.
* `data_hash` - The hash of the data blob stored by the AnyTrust committee.
* `timeout` - Expiration timeout for the data blob.
* `signers_mask` - Mask identifying committee members who guaranteed data availability.
* `bls_signature` - Aggregated BLS signature of the committee members.
"""
@type t :: %__MODULE__{
batch_number: non_neg_integer(),
keyset_hash: binary(),
data_hash: binary(),
timeout: DateTime.t(),
signers_mask: non_neg_integer(),
bls_signature: binary()
}
@typedoc """
AnyTrust DA certificate struct:
* `keyset_hash` - The hash identifying a keyset that defines the rules (threshold
and committee members) to issue the DA certificate.
* `data_hash` - The hash of the data blob stored by the AnyTrust committee.
* `timeout` - Expiration timeout for the data blob.
* `signers_mask` - Mask identifying committee members who guaranteed data availability.
* `bls_signature` - Aggregated BLS signature of the committee members.
"""
@type certificate :: %{
:keyset_hash => String.t(),
:data_hash => String.t(),
:timeout => DateTime.t(),
:signers_mask => non_neg_integer(),
:bls_signature => String.t()
}
@typedoc """
AnyTrust committee member public key struct:
* `trusted` - A boolean indicating whether the member is trusted.
* `key` - The public key of the member.
* `proof` - The proof of the member's public key.
"""
@type signer :: %{
:trusted => boolean(),
:key => String.t(),
optional(:proof) => String.t()
}
@typedoc """
AnyTrust committee struct:
* `threshold` - The threshold of honest members for the keyset.
* `pubkeys` - A list of public keys of the committee members.
"""
@type keyset :: %{
:threshold => non_neg_integer(),
:pubkeys => [signer()]
}
# keccak256("SetValidKeyset(bytes32,bytes)")
@set_valid_keyset_event "0xabca9b7986bc22ad0160eb0cb88ae75411eacfba4052af0b457a9335ef655722"
@set_valid_keyset_event_unindexed_params [:bytes]
@doc """
Parses batch accompanying data to extract AnyTrust data availability information.
This function decodes the provided binary data to extract information related to
AnyTrust data availability.
## Parameters
- `batch_number`: The batch number associated with the AnyTrust data.
- `binary_data`: The binary data to be parsed, containing AnyTrust data fields.
## Returns
- `{:ok, :in_anytrust, da_info}` if the parsing is successful, where `da_info` is
the AnyTrust data availability information struct.
- `{:error, nil, nil}` if the parsing fails.
"""
@spec parse_batch_accompanying_data(non_neg_integer(), binary()) ::
{:ok, :in_anytrust, __MODULE__.t()} | {:error, nil, nil}
def parse_batch_accompanying_data(batch_number, <<
keyset_hash::binary-size(32),
data_hash::binary-size(32),
timeout::big-unsigned-integer-size(64),
_version::size(8),
signers_mask::big-unsigned-integer-size(64),
bls_signature::binary-size(96)
>>) do
# https://github.com/OffchainLabs/nitro/blob/ad9ab00723e13cf98307b9b65774ad455594ef7b/arbstate/das_reader.go#L95-L151
{:ok, :in_anytrust,
%__MODULE__{
batch_number: batch_number,
keyset_hash: keyset_hash,
data_hash: data_hash,
timeout: IndexerHelper.timestamp_to_datetime(timeout),
signers_mask: signers_mask,
bls_signature: bls_signature
}}
end
def parse_batch_accompanying_data(_, _) do
log_error("Can not parse Anytrust DA message.")
{:error, nil, nil}
end
@doc """
Prepares AnyTrust data availability information for import.
This function prepares a list of data structures for import into the database,
ensuring that AnyTrust DA information and related keysets are included. It
verifies if the keyset associated with the AnyTrust DA certificate is already
known or needs to be fetched from L1.
To avoid fetching the same keyset multiple times, the function uses a cache.
## Parameters
- `source`: The initial list of data to be imported.
- `da_info`: The AnyTrust DA info struct containing details about the data blob.
- `l1_connection_config`: A map containing the address of the Sequencer Inbox contract
and configuration parameters for the JSON RPC connection.
- `cache`: A set of unique elements used to cache the checked keysets.
## Returns
- A tuple containing:
- An updated list of data structures ready for import, including the DA
certificate (`data_type` is `0`) and potentially a new keyset (`data_type`
is `1`) if required.
- The updated cache with the checked keysets.
"""
@spec prepare_for_import(
list(),
__MODULE__.t(),
%{
:sequencer_inbox_address => String.t(),
:json_rpc_named_arguments => EthereumJSONRPC.json_rpc_named_arguments()
},
MapSet.t()
) ::
{[Arbitrum.DaMultiPurposeRecord.to_import()], MapSet.t()}
def prepare_for_import(source, %__MODULE__{} = da_info, l1_connection_config, cache) do
data = %{
keyset_hash: ArbitrumHelper.bytes_to_hex_str(da_info.keyset_hash),
data_hash: ArbitrumHelper.bytes_to_hex_str(da_info.data_hash),
timeout: da_info.timeout,
signers_mask: da_info.signers_mask,
bls_signature: ArbitrumHelper.bytes_to_hex_str(da_info.bls_signature)
}
res = [
%{
data_type: 0,
data_key: da_info.data_hash,
data: data,
batch_number: da_info.batch_number
}
]
{check_result, keyset_map, updated_cache} = check_if_new_keyset(da_info.keyset_hash, l1_connection_config, cache)
updated_res =
case check_result do
:new_keyset ->
[
%{
data_type: 1,
data_key: da_info.keyset_hash,
data: keyset_map,
batch_number: nil
}
| res
]
_ ->
res
end
{updated_res ++ source, updated_cache}
end
# Verifies the existence of an AnyTrust committee keyset in the database and fetches it from L1 if not found.
#
# To avoid fetching the same keyset multiple times, the function uses a cache.
#
# ## Parameters
# - `keyset_hash`: A binary representing the hash of the keyset.
# - `l1_connection_config`: A map containing the address of the Sequencer Inbox
# contract and configuration parameters for the JSON RPC
# connection.
# - `cache`: A set of unique elements used to cache the checked keysets.
#
# ## Returns
# - `{:new_keyset, keyset_info, updated_cache}` if the keyset is not found and fetched from L1.
# - `{:existing_keyset, nil, cache}` if the keyset is found in the cache or database.
@spec check_if_new_keyset(
binary(),
%{
:sequencer_inbox_address => binary(),
:json_rpc_named_arguments => EthereumJSONRPC.json_rpc_named_arguments()
},
MapSet.t()
) ::
{:new_keyset, __MODULE__.keyset(), MapSet.t()}
| {:existing_keyset, nil, MapSet.t()}
defp check_if_new_keyset(keyset_hash, l1_connection_config, cache) do
if MapSet.member?(cache, keyset_hash) do
{:existing_keyset, nil, cache}
else
updated_cache = MapSet.put(cache, keyset_hash)
case Db.anytrust_keyset_exists?(keyset_hash) do
true ->
{:existing_keyset, nil, updated_cache}
false ->
{:new_keyset, get_keyset_info_from_l1(keyset_hash, l1_connection_config), updated_cache}
end
end
end
# Retrieves and decodes AnyTrust committee keyset information from L1 using the provided keyset hash.
#
# This function fetches the block number when the keyset was applied, retrieves
# the raw keyset data from L1, and decodes it to extract the threshold and public
# keys information.
#
# ## Parameters
# - `keyset_hash`: The hash of the keyset to be retrieved.
# - A map containing:
# - `:sequencer_inbox_address`: The address of the Sequencer Inbox contract.
# - `:json_rpc_named_arguments`: Configuration parameters for the JSON RPC connection.
#
# ## Returns
# - A map describing an AnyTrust committee.
@spec get_keyset_info_from_l1(
binary(),
%{
:sequencer_inbox_address => binary(),
:json_rpc_named_arguments => EthereumJSONRPC.json_rpc_named_arguments()
}
) :: __MODULE__.keyset()
defp get_keyset_info_from_l1(keyset_hash, %{
sequencer_inbox_address: sequencer_inbox_address,
json_rpc_named_arguments: json_rpc_named_arguments
}) do
keyset_applied_block_number =
Rpc.get_block_number_for_keyset(sequencer_inbox_address, keyset_hash, json_rpc_named_arguments)
log_debug("Keyset applied block number: #{keyset_applied_block_number}")
raw_keyset_data =
get_keyset_raw_data(keyset_hash, keyset_applied_block_number, sequencer_inbox_address, json_rpc_named_arguments)
decode_keyset(raw_keyset_data)
end
# Retrieves the raw data of a keyset by querying logs for the `SetValidKeyset` event.
#
# This function fetches logs for the `SetValidKeyset` event within a specific block
# emitted by the Sequencer Inbox contract and extracts the keyset data if available.
#
# ## Parameters
# - `keyset_hash`: The hash of the keyset to retrieve.
# - `block_number`: The block number to search for the logs.
# - `sequencer_inbox_address`: The address of the Sequencer Inbox contract.
# - `json_rpc_named_arguments`: Configuration parameters for the JSON RPC connection.
#
# ## Returns
# - The raw data of the keyset if found, otherwise `nil`.
@spec get_keyset_raw_data(
binary(),
non_neg_integer(),
binary(),
EthereumJSONRPC.json_rpc_named_arguments()
) :: binary() | nil
defp get_keyset_raw_data(keyset_hash, block_number, sequencer_inbox_address, json_rpc_named_arguments) do
{:ok, logs} =
IndexerHelper.get_logs(
block_number,
block_number,
sequencer_inbox_address,
[@set_valid_keyset_event, ArbitrumHelper.bytes_to_hex_str(keyset_hash)],
json_rpc_named_arguments
)
if length(logs) > 0 do
log_info("Found #{length(logs)} SetValidKeyset logs")
set_valid_keyset_event_parse(List.first(logs))
else
log_error("No SetValidKeyset logs found in the block #{block_number}")
nil
end
end
defp set_valid_keyset_event_parse(event) do
[keyset_data] = decode_data(event["data"], @set_valid_keyset_event_unindexed_params)
keyset_data
end
# Decodes an AnyTrust committee keyset from a binary input.
#
# This function extracts the threshold of committee members configured for the
# keyset and the number of member public keys from the binary input, then decodes
# the specified number of public keys.
#
# Implemented as per: https://github.com/OffchainLabs/nitro/blob/ad9ab00723e13cf98307b9b65774ad455594ef7b/arbstate/das_reader.go#L217-L248
#
# ## Parameters
# - A binary input containing the threshold value, the number of public keys,
# and the public keys themselves.
#
# ## Returns
# - A map describing an AnyTrust committee.
@spec decode_keyset(binary()) :: __MODULE__.keyset()
defp decode_keyset(<<
threshold::big-unsigned-integer-size(64),
num_keys::big-unsigned-integer-size(64),
rest::binary
>>)
when num_keys <= 64 do
{pubkeys, _} = decode_pubkeys(rest, num_keys, [])
%{
threshold: threshold,
pubkeys: pubkeys
}
end
# Decodes a list of AnyTrust committee member public keys from a binary input.
#
# This function recursively processes a binary input to extract a specified number
# of public keys.
#
# ## Parameters
# - `data`: The binary input containing the public keys.
# - `num_keys`: The number of public keys to decode.
# - `acc`: An accumulator list to collect the decoded public keys.
#
# ## Returns
# - A tuple containing:
# - `{:error, "Insufficient data to decode public keys"}` if the input is insufficient
# to decode the specified number of keys.
# - A list of decoded AnyTrust committee member public keys and a binary entity
# of zero length, if successful.
@spec decode_pubkeys(binary(), non_neg_integer(), [
signer()
]) :: {:error, String.t()} | {[signer()], binary()}
defp decode_pubkeys(<<>>, 0, acc), do: {Enum.reverse(acc), <<>>}
defp decode_pubkeys(<<>>, _num_keys, _acc), do: {:error, "Insufficient data to decode public keys"}
defp decode_pubkeys(data, num_keys, acc) when num_keys > 0 do
<<high_byte, low_byte, rest::binary>> = data
pubkey_len = high_byte * 256 + low_byte
<<pubkey_data::binary-size(pubkey_len), remaining::binary>> = rest
pubkey = parse_pubkey(pubkey_data)
decode_pubkeys(remaining, num_keys - 1, [pubkey | acc])
end
# Parses a public key of an AnyTrust AnyTrust committee member from a binary input.
#
# This function extracts either the public key (for trusted sources) or the proof
# bytes and key bytes (for untrusted sources).
#
# Implemented as per: https://github.com/OffchainLabs/nitro/blob/35bd2aa59611702e6403051af581fddda7c17f74/blsSignatures/blsSignatures.go#L206C6-L242
#
# ## Parameters
# - A binary input containing the proof length and the rest of the data.
#
# ## Returns
# - A map describing an AnyTrust committee member public key.
@spec parse_pubkey(binary()) :: signer()
defp parse_pubkey(<<proof_len::size(8), rest::binary>>) do
if proof_len == 0 do
# Trusted source, no proof bytes, the rest is the key
%{trusted: true, key: ArbitrumHelper.bytes_to_hex_str(rest)}
else
<<proof_bytes::binary-size(proof_len), key_bytes::binary>> = rest
%{
trusted: false,
proof: ArbitrumHelper.bytes_to_hex_str(proof_bytes),
key: ArbitrumHelper.bytes_to_hex_str(key_bytes)
}
end
end
end

@ -0,0 +1,113 @@
defmodule Indexer.Fetcher.Arbitrum.DA.Celestia do
@moduledoc """
Provides functionality for parsing and preparing Celestia data availability
information associated with Arbitrum rollup batches.
"""
import Indexer.Fetcher.Arbitrum.Utils.Logging, only: [log_error: 1]
import Explorer.Chain.Arbitrum.DaMultiPurposeRecord.Helper, only: [calculate_celestia_data_key: 2]
alias Indexer.Fetcher.Arbitrum.Utils.Helper, as: ArbitrumHelper
alias Explorer.Chain.Arbitrum
@enforce_keys [:batch_number, :height, :tx_commitment, :raw]
defstruct @enforce_keys
@typedoc """
Celestia Blob Pointer struct:
* `batch_number` - The batch number in Arbitrum rollup associated with the
Celestia data.
* `height` - The height of the block in Celestia.
* `tx_commitment` - Data commitment in Celestia.
* `raw` - Unparsed blob pointer data containing data root, proof, etc.
"""
@type t :: %__MODULE__{
batch_number: non_neg_integer(),
height: non_neg_integer(),
tx_commitment: binary(),
raw: binary()
}
@typedoc """
Celestia Blob Descriptor struct:
* `height` - The height of the block in Celestia.
* `tx_commitment` - Data commitment in Celestia.
* `raw` - Unparsed blob pointer data containing data root, proof, etc.
"""
@type blob_descriptor :: %{
:height => non_neg_integer(),
:tx_commitment => String.t(),
:raw => String.t()
}
@doc """
Parses the batch accompanying data for Celestia.
This function extracts Celestia blob descriptor information, representing
information required to address a data blob and prove data availability,
from a binary input associated with a given batch number.
## Parameters
- `batch_number`: The batch number in the Arbitrum rollup associated with the Celestia data.
- `binary`: A binary input containing the Celestia blob descriptor data.
## Returns
- `{:ok, :in_celestia, da_info}` if the data is successfully parsed.
- `{:error, nil, nil}` if the data cannot be parsed.
"""
@spec parse_batch_accompanying_data(non_neg_integer(), binary()) ::
{:ok, :in_celestia, __MODULE__.t()} | {:error, nil, nil}
def parse_batch_accompanying_data(
batch_number,
<<
height::big-unsigned-integer-size(64),
_start_index::binary-size(8),
_shares_length::binary-size(8),
_key::big-unsigned-integer-size(64),
_num_leaves::big-unsigned-integer-size(64),
_tuple_root_nonce::big-unsigned-integer-size(64),
tx_commitment::binary-size(32),
_data_root::binary-size(32),
_side_nodes_length::big-unsigned-integer-size(64),
_rest::binary
>> = raw
) do
# https://github.com/celestiaorg/nitro-contracts/blob/celestia/blobstream/src/bridge/SequencerInbox.sol#L334-L360
{:ok, :in_celestia, %__MODULE__{batch_number: batch_number, height: height, tx_commitment: tx_commitment, raw: raw}}
end
def parse_batch_accompanying_data(_, _) do
log_error("Can not parse Celestia DA message.")
{:error, nil, nil}
end
@doc """
Prepares Celestia Blob data for import.
## Parameters
- `source`: The initial list of data to be imported.
- `da_info`: The Celestia blob descriptor struct containing details about the data blob.
## Returns
- An updated list of data structures ready for import, including the Celestia blob descriptor.
"""
@spec prepare_for_import(list(), __MODULE__.t()) :: [Arbitrum.DaMultiPurposeRecord.to_import()]
def prepare_for_import(source, %__MODULE__{} = da_info) do
data = %{
height: da_info.height,
tx_commitment: ArbitrumHelper.bytes_to_hex_str(da_info.tx_commitment),
raw: ArbitrumHelper.bytes_to_hex_str(da_info.raw)
}
[
%{
data_type: 0,
data_key: calculate_celestia_data_key(da_info.height, da_info.tx_commitment),
data: data,
batch_number: da_info.batch_number
}
| source
]
end
end

@ -0,0 +1,143 @@
defmodule Indexer.Fetcher.Arbitrum.DA.Common do
@moduledoc """
This module provides common functionalities for handling data availability (DA)
information in the Arbitrum rollup.
"""
import Indexer.Fetcher.Arbitrum.Utils.Logging, only: [log_error: 1]
alias Indexer.Fetcher.Arbitrum.DA.{Anytrust, Celestia}
alias Explorer.Chain.Arbitrum
@doc """
Examines the batch accompanying data to determine its type and parse it accordingly.
This function examines the batch accompanying data to identify its type and then
parses it based on the identified type if necessary.
## Parameters
- `batch_number`: The batch number in the Arbitrum rollup.
- `batch_accompanying_data`: The binary data accompanying the batch.
## Returns
- `{status, da_type, da_info}` where `da_type` is one of `:in_blob4844`,
`:in_calldata`, `:in_celestia`, `:in_anytrust`, or `nil` if the accompanying
data cannot be parsed or is of an unsupported type. `da_info` contains the DA
info descriptor for Celestia or Anytrust.
"""
@spec examine_batch_accompanying_data(non_neg_integer(), binary()) ::
{:ok, :in_blob4844, nil}
| {:ok, :in_calldata, nil}
| {:ok, :in_celestia, Celestia.t()}
| {:ok, :in_anytrust, Anytrust.t()}
| {:error, nil, nil}
def examine_batch_accompanying_data(batch_number, batch_accompanying_data) do
case batch_accompanying_data do
nil -> {:ok, :in_blob4844, nil}
_ -> parse_data_availability_info(batch_number, batch_accompanying_data)
end
end
@doc """
Prepares data availability (DA) information for import.
This function processes a list of DA information, either from Celestia or Anytrust,
preparing it for database import.
## Parameters
- `da_info`: A list of DA information structs.
- `l1_connection_config`: A map containing the address of the Sequencer Inbox contract
and configuration parameters for the JSON RPC connection.
## Returns
- A list of data structures ready for import, each containing:
- `:data_key`: A binary key identifying the data.
- `:data_type`: An integer indicating the type of data, which can be `0`
for data blob descriptors and `1` for Anytrust keyset descriptors.
- `:data`: A map containing the DA information.
- `:batch_number`: The batch number associated with the data, or `nil`.
"""
@spec prepare_for_import([Celestia.t() | Anytrust.t() | map()], %{
:sequencer_inbox_address => String.t(),
:json_rpc_named_arguments => EthereumJSONRPC.json_rpc_named_arguments()
}) :: [Arbitrum.DaMultiPurposeRecord.to_import()]
def prepare_for_import([], _), do: []
def prepare_for_import(da_info, l1_connection_config) do
da_info
|> Enum.reduce({[], MapSet.new()}, fn info, {acc, cache} ->
case info do
%Celestia{} ->
{Celestia.prepare_for_import(acc, info), cache}
%Anytrust{} ->
Anytrust.prepare_for_import(acc, info, l1_connection_config, cache)
_ ->
{acc, cache}
end
end)
|> Kernel.elem(0)
end
@doc """
Determines if data availability information requires import.
This function checks the type of data availability (DA) and returns whether
the data should be imported based on its type.
## Parameters
- `da_type`: The type of data availability, which can be `:in_blob4844`, `:in_calldata`,
`:in_celestia`, `:in_anytrust`, or `nil`.
## Returns
- `true` if the DA type is `:in_celestia` or `:in_anytrust`, indicating that the data
requires import.
- `false` for all other DA types, indicating that the data does not require import.
"""
@spec required_import?(:in_blob4844 | :in_calldata | :in_celestia | :in_anytrust | nil) :: boolean()
def required_import?(da_type) do
da_type in [:in_celestia, :in_anytrust]
end
# Parses data availability information based on the header flag.
@spec parse_data_availability_info(non_neg_integer(), binary()) ::
{:ok, :in_calldata, nil}
| {:ok, :in_celestia, Celestia.t()}
| {:ok, :in_anytrust, Anytrust.t()}
| {:error, nil, nil}
defp parse_data_availability_info(batch_number, <<
header_flag::size(8),
rest::binary
>>) do
# https://github.com/OffchainLabs/nitro-contracts/blob/90037b996509312ef1addb3f9352457b8a99d6a6/src/bridge/SequencerInbox.sol#L69-L81
case header_flag do
0 ->
{:ok, :in_calldata, nil}
12 ->
Celestia.parse_batch_accompanying_data(batch_number, rest)
32 ->
log_error("ZERO HEAVY messages are not supported.")
{:error, nil, nil}
128 ->
log_error("DAS messages are not supported.")
{:error, nil, nil}
136 ->
Anytrust.parse_batch_accompanying_data(batch_number, rest)
_ ->
log_error("Unknown header flag found during an attempt to parse DA data: #{header_flag}")
{:error, nil, nil}
end
end
defp parse_data_availability_info(_, _) do
log_error("Failed to parse data availability information.")
{:error, nil, nil}
end
end

@ -371,7 +371,7 @@ defmodule Indexer.Fetcher.Arbitrum.Utils.Db do
- A list of `Explorer.Chain.Block` instances containing detailed information for each
block number in the input list. Returns an empty list if no blocks are found for the given numbers.
"""
@spec rollup_blocks(maybe_improper_list(FullBlock.block_number(), [])) :: [FullBlock]
@spec rollup_blocks(maybe_improper_list(FullBlock.block_number(), [])) :: [FullBlock.t()]
def rollup_blocks(list_of_block_numbers)
when is_list(list_of_block_numbers) do
query =
@ -402,15 +402,7 @@ defmodule Indexer.Fetcher.Arbitrum.Utils.Db do
- A list of maps representing unfinalized L1 transactions and compatible with the
database import operation.
"""
@spec lifecycle_unfinalized_transactions(FullBlock.block_number()) :: [
%{
id: non_neg_integer(),
hash: Hash,
block_number: FullBlock.block_number(),
timestamp: DateTime,
status: :unfinalized
}
]
@spec lifecycle_unfinalized_transactions(FullBlock.block_number()) :: [Arbitrum.LifecycleTransaction.to_import()]
def lifecycle_unfinalized_transactions(finalized_block)
when is_integer(finalized_block) and finalized_block >= 0 do
finalized_block
@ -443,7 +435,7 @@ defmodule Indexer.Fetcher.Arbitrum.Utils.Db do
- The `Explorer.Chain.Arbitrum.L1Batch` associated with the given rollup block number
if it exists and its commit transaction is loaded.
"""
@spec get_batch_by_rollup_block_number(FullBlock.block_number()) :: Explorer.Chain.Arbitrum.L1Batch | nil
@spec get_batch_by_rollup_block_number(FullBlock.block_number()) :: Arbitrum.L1Batch.t() | nil
def get_batch_by_rollup_block_number(num)
when is_integer(num) and num >= 0 do
case Reader.get_batch_by_rollup_block_number(num) do
@ -476,11 +468,7 @@ defmodule Indexer.Fetcher.Arbitrum.Utils.Db do
If no unconfirmed blocks are found within the range, an empty list is returned.
"""
@spec unconfirmed_rollup_blocks(FullBlock.block_number(), FullBlock.block_number()) :: [
%{
batch_number: non_neg_integer(),
block_number: FullBlock.block_number(),
confirmation_id: non_neg_integer() | nil
}
Arbitrum.BatchBlock.to_import()
]
def unconfirmed_rollup_blocks(first_block, last_block)
when is_integer(first_block) and first_block >= 0 and
@ -519,17 +507,7 @@ defmodule Indexer.Fetcher.Arbitrum.Utils.Db do
database import operation. If no initiated messages are found up to the specified
block number, an empty list is returned.
"""
@spec initiated_l2_to_l1_messages(FullBlock.block_number()) :: [
%{
direction: :from_l2,
message_id: non_neg_integer(),
originator_address: binary(),
originating_transaction_hash: binary(),
originating_transaction_block_number: FullBlock.block_number(),
completion_transaction_hash: nil,
status: :initiated
}
]
@spec initiated_l2_to_l1_messages(FullBlock.block_number()) :: [Arbitrum.Message.to_import()]
def initiated_l2_to_l1_messages(block_number)
when is_integer(block_number) and block_number >= 0 do
# credo:disable-for-lines:2 Credo.Check.Refactor.PipeChainStart
@ -552,17 +530,7 @@ defmodule Indexer.Fetcher.Arbitrum.Utils.Db do
database import operation. If no messages with the 'sent' status are found by
the specified block number, an empty list is returned.
"""
@spec sent_l2_to_l1_messages(FullBlock.block_number()) :: [
%{
direction: :from_l2,
message_id: non_neg_integer(),
originator_address: binary(),
originating_transaction_hash: binary(),
originating_transaction_block_number: FullBlock.block_number(),
completion_transaction_hash: nil,
status: :sent
}
]
@spec sent_l2_to_l1_messages(FullBlock.block_number()) :: [Arbitrum.Message.to_import()]
def sent_l2_to_l1_messages(block_number)
when is_integer(block_number) and block_number >= 0 do
# credo:disable-for-lines:2 Credo.Check.Refactor.PipeChainStart
@ -619,7 +587,7 @@ defmodule Indexer.Fetcher.Arbitrum.Utils.Db do
the input list. The output list may be smaller than the input list if some IDs do not
correspond to any existing transactions.
"""
@spec l1_executions([non_neg_integer()]) :: [Explorer.Chain.Arbitrum.L1Execution]
@spec l1_executions([non_neg_integer()]) :: [Arbitrum.L1Execution.t()]
def l1_executions(message_ids) when is_list(message_ids) do
Reader.l1_executions(message_ids)
end
@ -745,12 +713,32 @@ defmodule Indexer.Fetcher.Arbitrum.Utils.Db do
Chain.timestamp_to_block_number(timestamp, :after, false)
end
@doc """
Checks if an AnyTrust keyset exists in the database using the provided keyset hash.
## Parameters
- `keyset_hash`: The hash of the keyset to be checked.
## Returns
- `true` if the keyset exists, `false` otherwise.
"""
@spec anytrust_keyset_exists?(binary()) :: boolean()
def anytrust_keyset_exists?(keyset_hash) do
not Enum.empty?(Reader.get_anytrust_keyset(keyset_hash))
end
@spec get_da_info_by_batch_number(non_neg_integer()) :: map() | nil
def get_da_info_by_batch_number(batch_number) do
Reader.get_da_info_by_batch_number(batch_number)
end
@spec lifecycle_transaction_to_map(Arbitrum.LifecycleTransaction.t()) :: Arbitrum.LifecycleTransaction.to_import()
defp lifecycle_transaction_to_map(tx) do
[:id, :hash, :block_number, :timestamp, :status]
|> db_record_to_map(tx)
end
@spec rollup_block_to_map(Arbitrum.BatchBlock.t()) :: Arbitrum.BatchBlock.to_import()
defp rollup_block_to_map(block) do
[:batch_number, :block_number, :confirmation_id]
|> db_record_to_map(block)
@ -763,6 +751,7 @@ defmodule Indexer.Fetcher.Arbitrum.Utils.Db do
:message_id,
:originator_address,
:originating_transaction_hash,
:origination_timestamp,
:originating_transaction_block_number,
:completion_transaction_hash,
:status

@ -83,4 +83,18 @@ defmodule Indexer.Fetcher.Arbitrum.Utils.Helper do
)
end)
end
@doc """
Converts a binary data to a hexadecimal string.
## Parameters
- `data`: The binary data to convert to a hexadecimal string.
## Returns
- A hexadecimal string representation of the input data.
"""
@spec bytes_to_hex_str(binary()) :: String.t()
def bytes_to_hex_str(data) do
"0x" <> Base.encode16(data, case: :lower)
end
end

@ -57,6 +57,18 @@ defmodule Indexer.Fetcher.Arbitrum.Utils.Rpc do
}
]
# getKeysetCreationBlock(bytes32 ksHash)
@selector_get_keyset_creation_block "258f0495"
@selector_sequencer_inbox_contract_abi [
%{
"inputs" => [%{"internalType" => "bytes32", "name" => "ksHash", "type" => "bytes32"}],
"name" => "getKeysetCreationBlock",
"outputs" => [%{"internalType" => "uint256", "name" => "", "type" => "uint256"}],
"stateMutability" => "view",
"type" => "function"
}
]
@doc """
Constructs a JSON RPC request to retrieve a transaction by its hash.
@ -114,6 +126,49 @@ defmodule Indexer.Fetcher.Arbitrum.Utils.Rpc do
)
end
@doc """
Retrieves the block number associated with a specific keyset from the Sequencer Inbox contract.
This function performs an `eth_call` to the Sequencer Inbox contract to get the block number
when a keyset was created.
## Parameters
- `sequencer_inbox_address`: The address of the Sequencer Inbox contract.
- `keyset_hash`: The hash of the keyset for which the block number is to be retrieved.
- `json_rpc_named_arguments`: Configuration parameters for the JSON RPC connection.
## Returns
- The block number.
"""
@spec get_block_number_for_keyset(
EthereumJSONRPC.address(),
EthereumJSONRPC.hash(),
EthereumJSONRPC.json_rpc_named_arguments()
) :: non_neg_integer()
def get_block_number_for_keyset(sequencer_inbox_address, keyset_hash, json_rpc_named_arguments) do
[
%{
contract_address: sequencer_inbox_address,
method_id: @selector_get_keyset_creation_block,
args: [keyset_hash]
}
]
|> IndexerHelper.read_contracts_with_retries(
@selector_sequencer_inbox_contract_abi,
json_rpc_named_arguments,
@rpc_resend_attempts
)
# Extracts the list of responses from the tuple returned by read_contracts_with_retries.
|> Kernel.elem(0)
# Retrieves the first response from the list of responses. The responses are in a list
# because read_contracts_with_retries accepts a list of method calls.
|> List.first()
# Extracts the result from the {status, result} tuple which is composed in EthereumJSONRPC.Encoder.decode_result.
|> Kernel.elem(1)
# Extracts the first decoded value from the result, which is a list, even if it contains only one value.
|> List.first()
end
# Calls getter functions on a rollup contract and collects their return values.
#
# This function is designed to interact with a rollup contract and invoke specified getter methods.

@ -7,12 +7,13 @@ defmodule Indexer.Fetcher.Arbitrum.Workers.NewBatches do
being created and historical batches processed in the past but not yet imported
into the database.
The process involves fetching logs for the `SequencerBatchDelivered` event
emitted by the Arbitrum `SequencerInbox` contract, processing these logs to
extract batch details, and then building the link between batches and the
corresponding rollup blocks and transactions. It also discovers those
cross-chain messages initiated in rollup blocks linked with the new batches
and updates the status of messages to consider them as committed (`:sent`).
Fetch logs for the `SequencerBatchDelivered` event emitted by the Arbitrum
`SequencerInbox` contract. Process the logs to extract batch details. Build the
link between batches and the corresponding rollup blocks and transactions. If
the batch data is located in Data Availability solutions like AnyTrust or
Celestia, fetch DA information to locate the batch data. Discover cross-chain
messages initiated in rollup blocks linked with the new batches and update the
status of messages to consider them as committed (`:sent`).
For any blocks or transactions missing in the database, data is requested in
chunks from the rollup RPC endpoint by `eth_getBlockByNumber`. Additionally,
@ -29,8 +30,10 @@ defmodule Indexer.Fetcher.Arbitrum.Workers.NewBatches do
alias EthereumJSONRPC.Block.ByNumber, as: BlockByNumber
alias Indexer.Helper, as: IndexerHelper
alias Indexer.Fetcher.Arbitrum.DA.Common, as: DataAvailabilityInfo
alias Indexer.Fetcher.Arbitrum.DA.{Anytrust, Celestia}
alias Indexer.Fetcher.Arbitrum.Utils.{Db, Logging, Rpc}
alias Indexer.Helper, as: IndexerHelper
alias Explorer.Chain
alias Explorer.Chain.Arbitrum
@ -292,25 +295,44 @@ defmodule Indexer.Fetcher.Arbitrum.Workers.NewBatches do
# Performs the discovery of new or historical batches within a specified block range,
# processing and importing the relevant data into the database.
#
# This function retrieves SequencerBatchDelivered event logs from the specified block range
# and processes these logs to identify new batches and their corresponding details. It then
# constructs comprehensive data structures for batches, lifecycle transactions, rollup
# blocks, and rollup transactions. Additionally, it identifies any L2-to-L1 messages that
# have been committed within these batches and updates their status. All discovered and
# processed data are then imported into the database. If new batches were found, they are
# announced to be broadcasted through a websocket.
# This function retrieves SequencerBatchDelivered event logs from the specified block
# range and processes these logs to identify new batches and their corresponding details.
# It then constructs comprehensive data structures for batches, lifecycle transactions,
# rollup blocks, rollup transactions, and Data Availability related records. Additionally,
# it identifies any L2-to-L1 messages that have been committed within these batches and
# updates their status. All discovered and processed data are then imported into the
# database. If new batches were found, they are announced to be broadcasted through a
# websocket.
#
# ## Parameters
# - `sequencer_inbox_address`: The SequencerInbox contract address used to filter logs.
# - `start_block`: The starting block number for the discovery range.
# - `end_block`: The ending block number for the discovery range.
# - `new_batches_limit`: The maximum number of new batches to process in one iteration.
# - `messages_to_blocks_shift`: The value used to align message counts with rollup block numbers.
# - `messages_to_blocks_shift`: The value used to align message counts with rollup block
# numbers.
# - `l1_rpc_config`: RPC configuration parameters for L1.
# - `rollup_rpc_config`: RPC configuration parameters for rollup data.
#
# ## Returns
# - N/A
@spec do_discover(
binary(),
non_neg_integer(),
non_neg_integer(),
non_neg_integer(),
non_neg_integer(),
%{
:json_rpc_named_arguments => EthereumJSONRPC.json_rpc_named_arguments(),
:chunk_size => non_neg_integer(),
optional(any()) => any()
},
%{
:json_rpc_named_arguments => EthereumJSONRPC.json_rpc_named_arguments(),
:chunk_size => non_neg_integer(),
optional(any()) => any()
}
) :: :ok
defp do_discover(
sequencer_inbox_address,
start_block,
@ -344,11 +366,12 @@ defmodule Indexer.Fetcher.Arbitrum.Workers.NewBatches do
logs
|> Enum.chunk_every(new_batches_limit)
|> Enum.each(fn chunked_logs ->
{batches, lifecycle_txs, rollup_blocks, rollup_txs, committed_txs} =
{batches, lifecycle_txs, rollup_blocks, rollup_txs, committed_txs, da_records} =
handle_batches_from_logs(
chunked_logs,
messages_to_blocks_shift,
l1_rpc_config,
sequencer_inbox_address,
rollup_rpc_config
)
@ -359,6 +382,7 @@ defmodule Indexer.Fetcher.Arbitrum.Workers.NewBatches do
arbitrum_batch_blocks: %{params: rollup_blocks},
arbitrum_batch_transactions: %{params: rollup_txs},
arbitrum_messages: %{params: committed_txs},
arbitrum_da_multi_purpose_records: %{params: da_records},
timeout: :infinity
})
@ -384,6 +408,8 @@ defmodule Indexer.Fetcher.Arbitrum.Workers.NewBatches do
#
# ## Returns
# - A list of logs for SequencerBatchDelivered events within the specified block range.
@spec get_logs_new_batches(non_neg_integer(), non_neg_integer(), binary(), EthereumJSONRPC.json_rpc_named_arguments()) ::
[%{String.t() => any()}]
defp get_logs_new_batches(start_block, end_block, sequencer_inbox_address, json_rpc_named_arguments)
when start_block <= end_block do
{:ok, logs} =
@ -408,21 +434,23 @@ defmodule Indexer.Fetcher.Arbitrum.Workers.NewBatches do
# and retrieves their details, avoiding the reprocessing of batches already known
# in the database. It enriches the details of new batches with data from corresponding
# L1 transactions and blocks, including timestamps and block ranges. The function
# then prepares batches, associated rollup blocks and transactions, and lifecycle
# transactions for database import. Additionally, L2-to-L1 messages initiated in the
# rollup blocks associated with the discovered batches are retrieved from the database,
# marked as `:sent`, and prepared for database import.
# then prepares batches, associated rollup blocks and transactions, lifecycle
# transactions and Data Availability related records for database import.
# Additionally, L2-to-L1 messages initiated in the rollup blocks associated with the
# discovered batches are retrieved from the database, marked as `:sent`, and prepared
# for database import.
#
# ## Parameters
# - `logs`: The list of SequencerBatchDelivered event logs.
# - `msg_to_block_shift`: The shift value for mapping batch messages to block numbers.
# - `l1_rpc_config`: The RPC configuration for L1 requests.
# - `sequencer_inbox_address`: The address of the SequencerInbox contract.
# - `rollup_rpc_config`: The RPC configuration for rollup data requests.
#
# ## Returns
# - A tuple containing lists of batches, lifecycle transactions, rollup blocks,
# rollup transactions, and committed messages (with the status `:sent`), all
# ready for database import.
# rollup transactions, committed messages (with the status `:sent`), and records
# with DA-related information if applicable, all ready for database import.
@spec handle_batches_from_logs(
[%{String.t() => any()}],
non_neg_integer(),
@ -431,21 +459,19 @@ defmodule Indexer.Fetcher.Arbitrum.Workers.NewBatches do
:chunk_size => non_neg_integer(),
optional(any()) => any()
},
binary(),
%{
:json_rpc_named_arguments => EthereumJSONRPC.json_rpc_named_arguments(),
:chunk_size => non_neg_integer(),
optional(any()) => any()
}
) :: {
[Arbitrum.L1Batch.to_import()],
[Arbitrum.LifecycleTransaction.to_import()],
[Arbitrum.BatchBlock.to_import()],
[Arbitrum.BatchTransaction.to_import()],
[Arbitrum.Message.to_import()]
}
defp handle_batches_from_logs(logs, msg_to_block_shift, l1_rpc_config, rollup_rpc_config)
) ::
{[Arbitrum.L1Batch.to_import()], [Arbitrum.LifecycleTransaction.to_import()],
[Arbitrum.BatchBlock.to_import()], [Arbitrum.BatchTransaction.to_import()], [Arbitrum.Message.to_import()],
[Arbitrum.DaMultiPurposeRecord.to_import()]}
defp handle_batches_from_logs(logs, msg_to_block_shift, l1_rpc_config, sequencer_inbox_address, rollup_rpc_config)
defp handle_batches_from_logs([], _, _, _), do: {[], [], [], [], []}
defp handle_batches_from_logs([], _, _, _, _), do: {[], [], [], [], [], []}
defp handle_batches_from_logs(
logs,
@ -454,6 +480,7 @@ defmodule Indexer.Fetcher.Arbitrum.Workers.NewBatches do
json_rpc_named_arguments: json_rpc_named_arguments,
chunk_size: chunk_size
} = l1_rpc_config,
sequencer_inbox_address,
rollup_rpc_config
) do
existing_batches =
@ -466,7 +493,7 @@ defmodule Indexer.Fetcher.Arbitrum.Workers.NewBatches do
blocks_to_ts = Rpc.execute_blocks_requests_and_get_ts(blocks_requests, json_rpc_named_arguments, chunk_size)
{initial_lifecycle_txs, batches_to_import} =
{initial_lifecycle_txs, batches_to_import, da_info} =
execute_tx_requests_parse_txs_calldata(txs_requests, msg_to_block_shift, blocks_to_ts, batches, l1_rpc_config)
# Check if the commitment transactions for the batches which are already in the database
@ -502,6 +529,12 @@ defmodule Indexer.Fetcher.Arbitrum.Workers.NewBatches do
]
end)
da_records =
DataAvailabilityInfo.prepare_for_import(da_info, %{
sequencer_inbox_address: sequencer_inbox_address,
json_rpc_named_arguments: l1_rpc_config.json_rpc_named_arguments
})
# It is safe to not re-mark messages as committed for the batches that are already in the database
committed_messages =
if Enum.empty?(blocks_to_import) do
@ -515,10 +548,11 @@ defmodule Indexer.Fetcher.Arbitrum.Workers.NewBatches do
end
{batches_list_to_import, Map.values(lifecycle_txs), Map.values(blocks_to_import), rollup_txs_to_import,
committed_messages}
committed_messages, da_records}
end
# Extracts batch numbers from logs of SequencerBatchDelivered events.
@spec parse_logs_to_get_batch_numbers([%{String.t() => any()}]) :: [non_neg_integer()]
defp parse_logs_to_get_batch_numbers(logs) do
logs
|> Enum.map(fn event ->
@ -554,7 +588,14 @@ defmodule Indexer.Fetcher.Arbitrum.Workers.NewBatches do
[%{String.t() => any()}],
[non_neg_integer()]
) :: {
%{:number => non_neg_integer(), :before_acc => binary(), :after_acc => binary(), :tx_hash => binary()},
%{
non_neg_integer() => %{
:number => non_neg_integer(),
:before_acc => binary(),
:after_acc => binary(),
:tx_hash => binary()
}
},
[EthereumJSONRPC.Transport.request()],
[EthereumJSONRPC.Transport.request()],
%{binary() => non_neg_integer()}
@ -611,6 +652,7 @@ defmodule Indexer.Fetcher.Arbitrum.Workers.NewBatches do
end
# Parses SequencerBatchDelivered event to get batch sequence number and associated accumulators
@spec sequencer_batch_delivered_event_parse(%{String.t() => any()}) :: {non_neg_integer(), binary(), binary()}
defp sequencer_batch_delivered_event_parse(event) do
[_, batch_sequence_number, before_acc, after_acc] = event["topics"]
@ -622,7 +664,9 @@ defmodule Indexer.Fetcher.Arbitrum.Workers.NewBatches do
# This function processes a list of RPC `eth_getTransactionByHash` requests, extracts
# and decodes the calldata from the transactions to obtain batch details. It updates
# the provided batch map with block ranges for new batches and constructs a map of
# lifecycle transactions with their timestamps and finalization status.
# lifecycle transactions with their timestamps and finalization status. Additionally,
# it examines the data availability (DA) information for Anytrust or Celestia and
# constructs a list of DA info structs.
#
# ## Parameters
# - `txs_requests`: The list of RPC requests to fetch transaction data.
@ -631,15 +675,46 @@ defmodule Indexer.Fetcher.Arbitrum.Workers.NewBatches do
# - `blocks_to_ts`: A map of block numbers to their timestamps, required to complete
# data for corresponding lifecycle transactions.
# - `batches`: The current batch data to be updated.
# - A configuration map containing JSON RPC arguments, a track finalization flag,
# and a chunk size for batch processing.
# - A configuration map containing:
# - `json_rpc_named_arguments`: Configuration parameters for the JSON RPC connection.
# - `track_finalization`: A boolean flag indicating if finalization tracking is needed.
# - `chunk_size`: The size of chunks for batch processing.
#
# ## Returns
# - A tuple containing:
# - A map of lifecycle (L1) transactions, which are not yet compatible with
# database import and require further processing.
# - An updated map of batch descriptions, also requiring further processing
# before database import.
# - A map of lifecycle (L1) transactions, including their hashes, block numbers,
# timestamps, and statuses (finalized or unfinalized).
# - An updated map of batch descriptions with block ranges and data availability
# information.
# - A list of data availability information structs for Anytrust or Celestia.
@spec execute_tx_requests_parse_txs_calldata(
[EthereumJSONRPC.Transport.request()],
non_neg_integer(),
%{EthereumJSONRPC.block_number() => DateTime.t()},
%{non_neg_integer() => map()},
%{
:chunk_size => non_neg_integer(),
:json_rpc_named_arguments => EthereumJSONRPC.json_rpc_named_arguments(),
:track_finalization => boolean(),
optional(any()) => any()
}
) ::
{%{
binary() => %{
:hash => binary(),
:block_number => non_neg_integer(),
:timestamp => DateTime.t(),
:status => :unfinalized | :finalized
}
},
%{
non_neg_integer() => %{
:start_block => non_neg_integer(),
:end_block => non_neg_integer(),
:data_available => atom() | nil,
optional(any()) => any()
}
}, [Anytrust.t() | Celestia.t()]}
defp execute_tx_requests_parse_txs_calldata(txs_requests, msg_to_block_shift, blocks_to_ts, batches, %{
json_rpc_named_arguments: json_rpc_named_arguments,
track_finalization: track_finalization?,
@ -647,20 +722,26 @@ defmodule Indexer.Fetcher.Arbitrum.Workers.NewBatches do
}) do
txs_requests
|> Enum.chunk_every(chunk_size)
|> Enum.reduce({%{}, batches}, fn chunk, {l1_txs, updated_batches} ->
|> Enum.reduce({%{}, batches, []}, fn chunk, {l1_txs, updated_batches, da_info} ->
chunk
# each eth_getTransactionByHash will take time since it returns entire batch
# in `input` which is heavy because contains dozens of rollup blocks
|> Rpc.make_chunked_request(json_rpc_named_arguments, "eth_getTransactionByHash")
|> Enum.reduce({l1_txs, updated_batches}, fn resp, {txs_map, batches_map} ->
|> Enum.reduce({l1_txs, updated_batches, da_info}, fn resp, {txs_map, batches_map, da_info_list} ->
block_num = quantity_to_integer(resp["blockNumber"])
tx_hash = Rpc.string_hash_to_bytes_hash(resp["hash"])
# Although they are called messages in the functions' ABI, in fact they are
# rollup blocks
{batch_num, prev_message_count, new_message_count} =
{batch_num, prev_message_count, new_message_count, extra_data} =
add_sequencer_l2_batch_from_origin_calldata_parse(resp["input"])
{da_type, da_data} =
case DataAvailabilityInfo.examine_batch_accompanying_data(batch_num, extra_data) do
{:ok, t, d} -> {t, d}
{:error, _, _} -> {nil, nil}
end
# In some cases extracted numbers for messages does not linked directly
# with rollup blocks, for this, the numbers are shifted by a value specific
# for particular rollup
@ -670,7 +751,8 @@ defmodule Indexer.Fetcher.Arbitrum.Workers.NewBatches do
batch_num,
Map.merge(batches_map[batch_num], %{
start_block: prev_message_count + msg_to_block_shift,
end_block: new_message_count + msg_to_block_shift - 1
end_block: new_message_count + msg_to_block_shift - 1,
batch_container: da_type
})
)
@ -687,18 +769,28 @@ defmodule Indexer.Fetcher.Arbitrum.Workers.NewBatches do
end
})
{updated_txs_map, updated_batches_map}
# credo:disable-for-lines:6 Credo.Check.Refactor.Nesting
updated_da_info_list =
if DataAvailabilityInfo.required_import?(da_type) do
[da_data | da_info_list]
else
da_info_list
end
{updated_txs_map, updated_batches_map, updated_da_info_list}
end)
end)
end
# Parses calldata of `addSequencerL2BatchFromOrigin` or `addSequencerL2BatchFromBlobs`
# functions to extract batch information.
@spec add_sequencer_l2_batch_from_origin_calldata_parse(binary()) ::
{non_neg_integer(), non_neg_integer(), non_neg_integer(), binary() | nil}
defp add_sequencer_l2_batch_from_origin_calldata_parse(calldata) do
case calldata do
"0x8f111f3c" <> encoded_params ->
# addSequencerL2BatchFromOrigin(uint256 sequenceNumber, bytes calldata data, uint256 afterDelayedMessagesRead, address gasRefunder, uint256 prevMessageCount, uint256 newMessageCount)
[sequence_number, _data, _after_delayed_messages_read, _gas_refunder, prev_message_count, new_message_count] =
[sequence_number, data, _after_delayed_messages_read, _gas_refunder, prev_message_count, new_message_count] =
TypeDecoder.decode(
Base.decode16!(encoded_params, case: :lower),
%FunctionSelector{
@ -714,7 +806,7 @@ defmodule Indexer.Fetcher.Arbitrum.Workers.NewBatches do
}
)
{sequence_number, prev_message_count, new_message_count}
{sequence_number, prev_message_count, new_message_count, data}
"0x3e5aa082" <> encoded_params ->
# addSequencerL2BatchFromBlobs(uint256 sequenceNumber, uint256 afterDelayedMessagesRead, address gasRefunder, uint256 prevMessageCount, uint256 newMessageCount)
@ -733,7 +825,7 @@ defmodule Indexer.Fetcher.Arbitrum.Workers.NewBatches do
}
)
{sequence_number, prev_message_count, new_message_count}
{sequence_number, prev_message_count, new_message_count, nil}
end
end
@ -861,6 +953,14 @@ defmodule Indexer.Fetcher.Arbitrum.Workers.NewBatches do
# ## Returns
# - A map where each key is a rollup block number and its value is the
# corresponding batch number.
@spec unwrap_rollup_block_ranges(%{
non_neg_integer() => %{
:start_block => non_neg_integer(),
:end_block => non_neg_integer(),
:number => non_neg_integer(),
optional(any()) => any()
}
}) :: %{non_neg_integer() => non_neg_integer()}
defp unwrap_rollup_block_ranges(batches) do
batches
|> Map.values()
@ -933,6 +1033,18 @@ defmodule Indexer.Fetcher.Arbitrum.Workers.NewBatches do
# ## Returns
# - A tuple containing the updated map of rollup blocks and the updated list of
# transactions, both are ready for database import.
@spec recover_data_if_necessary(
%{non_neg_integer() => Arbitrum.BatchBlock.to_import()},
[Arbitrum.BatchTransaction.to_import()],
[non_neg_integer()],
%{non_neg_integer() => non_neg_integer()},
%{
:json_rpc_named_arguments => EthereumJSONRPC.json_rpc_named_arguments(),
:chunk_size => non_neg_integer(),
optional(any()) => any()
}
) ::
{%{non_neg_integer() => Arbitrum.BatchBlock.to_import()}, [Arbitrum.BatchTransaction.to_import()]}
defp recover_data_if_necessary(
current_rollup_blocks,
current_rollup_txs,
@ -988,6 +1100,18 @@ defmodule Indexer.Fetcher.Arbitrum.Workers.NewBatches do
# - A list of transactions, each associated with its respective rollup block
# and batch number, ready for database import.
# - The updated counter of processed chunks (usually ignored).
@spec recover_rollup_blocks_and_txs_from_rpc(
[non_neg_integer()],
[non_neg_integer()],
%{non_neg_integer() => non_neg_integer()},
%{
:json_rpc_named_arguments => EthereumJSONRPC.json_rpc_named_arguments(),
:chunk_size => non_neg_integer(),
optional(any()) => any()
}
) ::
{%{non_neg_integer() => Arbitrum.BatchBlock.to_import()}, [Arbitrum.BatchTransaction.to_import()],
non_neg_integer()}
defp recover_rollup_blocks_and_txs_from_rpc(
required_blocks_numbers,
found_blocks_numbers,
@ -1054,6 +1178,11 @@ defmodule Indexer.Fetcher.Arbitrum.Workers.NewBatches do
# for database import.
# - An updated list of transactions, each associated with its respective rollup
# block and batch number, ready for database import.
@spec prepare_rollup_block_map_and_transactions_list(
[%{id: non_neg_integer(), result: %{String.t() => any()}}],
%{non_neg_integer() => Arbitrum.BatchBlock.to_import()},
[Arbitrum.BatchTransaction.to_import()]
) :: {%{non_neg_integer() => Arbitrum.BatchBlock.to_import()}, [Arbitrum.BatchTransaction.to_import()]}
defp prepare_rollup_block_map_and_transactions_list(json_responses, rollup_blocks, rollup_txs) do
json_responses
|> Enum.reduce({rollup_blocks, rollup_txs}, fn resp, {blocks_map, txs_list} ->
@ -1100,6 +1229,7 @@ defmodule Indexer.Fetcher.Arbitrum.Workers.NewBatches do
end
# Retrieves initiated L2-to-L1 messages up to specified block number and marks them as 'sent'.
@spec get_committed_l2_to_l1_messages(non_neg_integer()) :: [Arbitrum.Message.to_import()]
defp get_committed_l2_to_l1_messages(block_number) do
block_number
|> Db.initiated_l2_to_l1_messages()

@ -51,28 +51,10 @@ defmodule Indexer.Fetcher.ZkSync.Utils.Rpc do
end)
end
defp from_ts_to_datetime(time_ts) do
{_, unix_epoch_starts} = DateTime.from_unix(0)
case is_nil(time_ts) or time_ts == 0 do
true ->
unix_epoch_starts
false ->
case DateTime.from_unix(time_ts) do
{:ok, datetime} ->
datetime
{:error, _} ->
unix_epoch_starts
end
end
end
defp from_iso8601_to_datetime(time_string) do
case is_nil(time_string) do
true ->
from_ts_to_datetime(0)
IndexerHelper.timestamp_to_datetime(0)
false ->
case DateTime.from_iso8601(time_string) do
@ -80,7 +62,7 @@ defmodule Indexer.Fetcher.ZkSync.Utils.Rpc do
datetime
{:error, _} ->
from_ts_to_datetime(0)
IndexerHelper.timestamp_to_datetime(0)
end
end
end
@ -139,7 +121,7 @@ defmodule Indexer.Fetcher.ZkSync.Utils.Rpc do
key_atom,
case transform_type do
:iso8601_to_datetime -> from_iso8601_to_datetime(value_in_json_response)
:ts_to_datetime -> from_ts_to_datetime(value_in_json_response)
:ts_to_datetime -> IndexerHelper.timestamp_to_datetime(value_in_json_response)
:str_to_txhash -> json_tx_id_to_hash(value_in_json_response)
:str_to_byteshash -> string_hash_to_bytes_hash(value_in_json_response)
_ -> value_in_json_response

@ -62,6 +62,38 @@ defmodule Indexer.Helper do
end
end
@doc """
Converts a Unix timestamp to a `DateTime`.
If the given timestamp is `nil` or `0`, it returns the Unix epoch start.
If the conversion fails, it also returns the Unix epoch start.
## Parameters
- `time_ts`: A non-negative integer representing the Unix timestamp or `nil`.
## Returns
- A `DateTime` corresponding to the given Unix timestamp, or the Unix epoch start if
the timestamp is `nil`, `0`, or if the conversion fails.
"""
@spec timestamp_to_datetime(non_neg_integer() | nil) :: DateTime.t()
def timestamp_to_datetime(time_ts) do
{_, unix_epoch_starts} = DateTime.from_unix(0)
case is_nil(time_ts) or time_ts == 0 do
true ->
unix_epoch_starts
false ->
case DateTime.from_unix(time_ts) do
{:ok, datetime} ->
datetime
{:error, _} ->
unix_epoch_starts
end
end
end
@doc """
Calculates average block time in milliseconds (based on the latest 100 blocks) divided by 2.
Sends corresponding requests to the RPC node.

@ -20,6 +20,7 @@
"Aiubo",
"alloc",
"amzootyukbugmx",
"anytrust",
"apikey",
"APIV",
"Arbitrum",
@ -399,6 +400,8 @@
"progressbar",
"proxiable",
"psql",
"pubkey",
"pubkeys",
"purrstige",
"qdai",
"Qebz",

Loading…
Cancel
Save