feat: revisited approach to catchup missed Arbitrum messages (#10374)

* DB query to identify missed messages

* DB query to identify missed L2-L1 messages

* reworked approach to stop missed messages discovery

* initial version of improved backfiller

* documentation updated

* new env var and new way to identify if rollup synced

* format, credo, spelling issues fixed

* proper spec

* missing comments added

* missed env variable

* Unified queries in the functions with similar functionality

* extra space removed
pull/10244/head
Alexander Kolotov 4 months ago committed by GitHub
parent 536960363a
commit 98f299beea
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
  1. 192
      apps/explorer/lib/explorer/chain/arbitrum/reader.ex
  2. 33
      apps/explorer/lib/explorer/utility/missing_block_range.ex
  3. 14
      apps/explorer/priv/arbitrum/migrations/20240628210148_add_index_for_messages.exs
  4. 112
      apps/indexer/lib/indexer/fetcher/arbitrum/rollup_messages_catchup.ex
  5. 223
      apps/indexer/lib/indexer/fetcher/arbitrum/utils/db.ex
  6. 197
      apps/indexer/lib/indexer/fetcher/arbitrum/workers/historical_messages_on_l2.ex
  7. 6
      config/runtime.exs
  8. 1
      cspell.json
  9. 3
      docker-compose/envs/common-blockscout.env

@ -3,7 +3,7 @@ defmodule Explorer.Chain.Arbitrum.Reader do
Contains read functions for Arbitrum modules. Contains read functions for Arbitrum modules.
""" """
import Ecto.Query, only: [from: 2, limit: 2, order_by: 2, subquery: 1, where: 2, where: 3] import Ecto.Query, only: [dynamic: 2, from: 2, limit: 2, order_by: 2, select: 3, subquery: 1, where: 2, where: 3]
import Explorer.Chain, only: [select_repo: 1] import Explorer.Chain, only: [select_repo: 1]
alias Explorer.Chain.Arbitrum.{ alias Explorer.Chain.Arbitrum.{
@ -19,7 +19,9 @@ defmodule Explorer.Chain.Arbitrum.Reader do
alias Explorer.{Chain, PagingOptions, Repo} alias Explorer.{Chain, PagingOptions, Repo}
alias Explorer.Chain.Block, as: FullBlock alias Explorer.Chain.Block, as: FullBlock
alias Explorer.Chain.{Hash, Transaction} alias Explorer.Chain.{Hash, Log, Transaction}
@to_l2_messages_transaction_types [100, 105]
@doc """ @doc """
Retrieves the number of the latest L1 block where an L1-to-L2 message was discovered. Retrieves the number of the latest L1 block where an L1-to-L2 message was discovered.
@ -62,50 +64,49 @@ defmodule Explorer.Chain.Arbitrum.Reader do
end end
@doc """ @doc """
Retrieves the number of the earliest rollup block where an L2-to-L1 message was discovered. Retrieves the rollup block number of the first missed L2-to-L1 message.
The function identifies missing messages by checking logs for the specified
L2-to-L1 event and verifying if there are corresponding entries in the messages
table. A message is considered missed if there is a log entry without a
matching message record.
## Parameters
- `arbsys_contract`: The address of the Arbitrum system contract.
- `l2_to_l1_event`: The event identifier for L2-to-L1 messages.
## Returns ## Returns
- The number of rollup block, or `nil` if no L2-to-L1 messages are found. - The block number of the first missed L2-to-L1 message, or `nil` if no missed
messages are found.
""" """
@spec rollup_block_of_earliest_discovered_message_from_l2() :: FullBlock.block_number() | nil @spec rollup_block_of_first_missed_message_from_l2(binary(), binary()) :: FullBlock.block_number() | nil
def rollup_block_of_earliest_discovered_message_from_l2 do def rollup_block_of_first_missed_message_from_l2(arbsys_contract, l2_to_l1_event) do
query = # credo:disable-for-lines:5 Credo.Check.Refactor.PipeChainStart
from(msg in Message, missed_messages_from_l2_query(arbsys_contract, l2_to_l1_event)
select: msg.originating_transaction_block_number, |> order_by(desc: :block_number)
where: msg.direction == :from_l2 and not is_nil(msg.originating_transaction_block_number), |> limit(1)
order_by: [asc: msg.originating_transaction_block_number], |> select([log], log.block_number)
limit: 1
)
query
|> Repo.one() |> Repo.one()
end end
@doc """ @doc """
Retrieves the number of the earliest rollup block where a completed L1-to-L2 message was discovered. Retrieves the rollup block number of the first missed L1-to-L2 message.
The function identifies missing messages by checking transactions of specific
types that are supposed to contain L1-to-L2 messages and verifying if there are
corresponding entries in the messages table. A message is considered missed if
there is a transaction without a matching message record.
## Returns ## Returns
- The block number of the rollup block, or `nil` if no completed L1-to-L2 messages are found, - The block number of the first missed L1-to-L2 message, or `nil` if no missed
or if the rollup transaction that emitted the corresponding message has not been indexed yet. messages are found.
""" """
@spec rollup_block_of_earliest_discovered_message_to_l2() :: FullBlock.block_number() | nil @spec rollup_block_of_first_missed_message_to_l2() :: FullBlock.block_number() | nil
def rollup_block_of_earliest_discovered_message_to_l2 do def rollup_block_of_first_missed_message_to_l2 do
completion_tx_subquery = missed_messages_to_l2_query()
from(msg in Message, |> order_by(desc: :block_number)
select: msg.completion_transaction_hash, |> limit(1)
where: msg.direction == :to_l2 and not is_nil(msg.completion_transaction_hash), |> select([rollup_tx], rollup_tx.block_number)
order_by: [asc: msg.message_id],
limit: 1
)
query =
from(tx in Transaction,
select: tx.block_number,
where: tx.hash == subquery(completion_tx_subquery),
limit: 1
)
query
|> Repo.one() |> Repo.one()
end end
@ -799,6 +800,125 @@ defmodule Explorer.Chain.Arbitrum.Reader do
select_repo(options).all(query) select_repo(options).all(query)
end end
@doc """
Retrieves the transaction hashes for missed L1-to-L2 messages within a specified
block range.
The function identifies missed messages by checking transactions of specific
types that are supposed to contain L1-to-L2 messages and verifying if there are
corresponding entries in the messages table. A message is considered missed if
there is a transaction without a matching message record within the specified
block range.
## Parameters
- `start_block`: The starting block number of the range.
- `end_block`: The ending block number of the range.
## Returns
- A list of transaction hashes for missed L1-to-L2 messages.
"""
@spec transactions_for_missed_messages_to_l2(non_neg_integer(), non_neg_integer()) :: [Hash.t()]
def transactions_for_missed_messages_to_l2(start_block, end_block) do
missed_messages_to_l2_query()
|> where([rollup_tx], rollup_tx.block_number >= ^start_block and rollup_tx.block_number <= ^end_block)
|> order_by(desc: :block_timestamp)
|> select([rollup_tx], rollup_tx.hash)
|> Repo.all()
end
# Constructs a query to retrieve missed L1-to-L2 messages.
#
# The function constructs a query to identify missing messages by checking
# transactions of specific types that are supposed to contain L1-to-L2
# messages and verifying if there are corresponding entries in the messages
# table. A message is considered missed if there is a transaction without a
# matching message record.
#
# ## Returns
# - A query to retrieve missed L1-to-L2 messages.
@spec missed_messages_to_l2_query() :: Ecto.Query.t()
defp missed_messages_to_l2_query do
from(rollup_tx in Transaction,
left_join: msg in Message,
on: rollup_tx.hash == msg.completion_transaction_hash and msg.direction == :to_l2,
where: rollup_tx.type in @to_l2_messages_transaction_types and is_nil(msg.completion_transaction_hash)
)
end
@doc """
Retrieves the logs for missed L2-to-L1 messages within a specified block range.
The function identifies missed messages by checking logs for the specified
L2-to-L1 event and verifying if there are corresponding entries in the messages
table. A message is considered missed if there is a log entry without a
matching message record within the specified block range.
## Parameters
- `start_block`: The starting block number of the range.
- `end_block`: The ending block number of the range.
- `arbsys_contract`: The address of the Arbitrum system contract.
- `l2_to_l1_event`: The event identifier for L2-to-L1 messages.
## Returns
- A list of logs for missed L2-to-L1 messages.
"""
@spec logs_for_missed_messages_from_l2(non_neg_integer(), non_neg_integer(), binary(), binary()) :: [Log.t()]
def logs_for_missed_messages_from_l2(start_block, end_block, arbsys_contract, l2_to_l1_event) do
# credo:disable-for-lines:5 Credo.Check.Refactor.PipeChainStart
missed_messages_from_l2_query(arbsys_contract, l2_to_l1_event, start_block, end_block)
|> where([log, msg], log.block_number >= ^start_block and log.block_number <= ^end_block)
|> order_by(desc: :block_number, desc: :index)
|> select([log], log)
|> Repo.all()
end
# Constructs a query to retrieve missed L2-to-L1 messages.
#
# The function constructs a query to identify missing messages by checking logs
# for the specified L2-to-L1 and verifying if there are corresponding entries
# in the messages table within a given block range, or among all messages if no
# block range is provided. A message is considered missed if there is a log
# entry without a matching message record.
#
# ## Parameters
# - `arbsys_contract`: The address hash of the Arbitrum system contract.
# - `l2_to_l1_event`: The event identifier for L2 to L1 messages.
# - `start_block`: The starting block number for the search range (optional).
# - `end_block`: The ending block number for the search range (optional).
#
# ## Returns
# - A query to retrieve missed L2-to-L1 messages.
@spec missed_messages_from_l2_query(binary(), binary(), non_neg_integer() | nil, non_neg_integer() | nil) ::
Ecto.Query.t()
defp missed_messages_from_l2_query(arbsys_contract, l2_to_l1_event, start_block \\ nil, end_block \\ nil) do
# It is assumed that all the messages from the same transaction are handled
# atomically so there is no need to check the message_id for each log entry.
# Otherwise, the join condition must be extended with
# fragment("encode(l0.fourth_topic, 'hex') = LPAD(TO_HEX(a1.message_id::BIGINT), 64, '0')")
base_condition =
dynamic([log, msg], log.transaction_hash == msg.originating_transaction_hash and msg.direction == :from_l2)
join_condition =
if is_nil(start_block) or is_nil(end_block) do
base_condition
else
dynamic(
[_, msg],
^base_condition and
msg.originating_transaction_block_number >= ^start_block and
msg.originating_transaction_block_number <= ^end_block
)
end
from(log in Log,
left_join: msg in Message,
on: ^join_condition,
where:
log.address_hash == ^arbsys_contract and log.first_topic == ^l2_to_l1_event and
is_nil(msg.originating_transaction_hash)
)
end
@doc """ @doc """
Retrieves the total count of rollup batches indexed up to the current moment. Retrieves the total count of rollup batches indexed up to the current moment.

@ -9,6 +9,10 @@ defmodule Explorer.Utility.MissingBlockRange do
@default_returning_batch_size 10 @default_returning_batch_size 10
@typedoc """
* `from_number`: The lower bound of the block range.
* `to_number`: The upper bound of the block range.
"""
typed_schema "missing_block_ranges" do typed_schema "missing_block_ranges" do
field(:from_number, :integer) field(:from_number, :integer)
field(:to_number, :integer) field(:to_number, :integer)
@ -139,7 +143,8 @@ defmodule Explorer.Utility.MissingBlockRange do
## Returns ## Returns
- Returns `nil` if no intersecting ranges are found, or an `Explorer.Utility.MissingBlockRange` instance of the first intersecting range otherwise. - Returns `nil` if no intersecting ranges are found, or an `Explorer.Utility.MissingBlockRange` instance of the first intersecting range otherwise.
""" """
@spec intersects_with_range(Block.block_number(), Block.block_number()) :: nil | Explorer.Utility.MissingBlockRange @spec intersects_with_range(Block.block_number(), Block.block_number()) ::
nil | Explorer.Utility.MissingBlockRange.t()
def intersects_with_range(lower_number, higher_number) def intersects_with_range(lower_number, higher_number)
when is_integer(lower_number) and lower_number >= 0 and when is_integer(lower_number) and lower_number >= 0 and
is_integer(higher_number) and lower_number <= higher_number do is_integer(higher_number) and lower_number <= higher_number do
@ -182,7 +187,19 @@ defmodule Explorer.Utility.MissingBlockRange do
defp update_to_number_or_delete_range(%{from_number: from} = range, to) when to > from, do: Repo.delete(range) defp update_to_number_or_delete_range(%{from_number: from} = range, to) when to > from, do: Repo.delete(range)
defp update_to_number_or_delete_range(range, to), do: update_range(range, %{to_number: to}) defp update_to_number_or_delete_range(range, to), do: update_range(range, %{to_number: to})
defp get_range_by_block_number(number) do @doc """
Fetches the range of blocks that includes the given block number if it falls
within any of the ranges that need to be (re)fetched.
## Parameters
- `number`: The block number to check against the missing block ranges.
## Returns
- A single range record of `Explorer.Utility.MissingBlockRange` that includes
the given block number, or `nil` if no such range is found.
"""
@spec get_range_by_block_number(Block.block_number()) :: nil | Explorer.Utility.MissingBlockRange.t()
def get_range_by_block_number(number) do
number number
|> include_bound_query() |> include_bound_query()
|> Repo.one() |> Repo.one()
@ -250,6 +267,18 @@ defmodule Explorer.Utility.MissingBlockRange do
from(r in query, where: r.to_number > ^upper_bound) from(r in query, where: r.to_number > ^upper_bound)
end end
@doc """
Constructs a query to check if a given block number falls within any of the
ranges of blocks that need to be (re)fetched.
## Parameters
- `bound`: The block number to check against the missing block ranges.
## Returns
- A query that can be used to find ranges where the given block number is
within the `from_number` and `to_number` bounds.
"""
@spec include_bound_query(Block.block_number()) :: Ecto.Query.t()
def include_bound_query(bound) do def include_bound_query(bound) do
from(r in __MODULE__, where: r.from_number >= ^bound, where: r.to_number <= ^bound) from(r in __MODULE__, where: r.from_number >= ^bound, where: r.to_number <= ^bound)
end end

@ -0,0 +1,14 @@
defmodule Explorer.Repo.Arbitrum.Migrations.AddIndexForMessages do
use Ecto.Migration
def change do
# name of the index is specified explicitly because the default index name is cut and not unique
create(
index(
:arbitrum_crosslevel_messages,
[:direction, :originating_transaction_block_number, :originating_transaction_hash],
name: :arbitrum_crosslevel_messages_dir_block_hash
)
)
end
end

@ -1,21 +1,23 @@
defmodule Indexer.Fetcher.Arbitrum.RollupMessagesCatchup do defmodule Indexer.Fetcher.Arbitrum.RollupMessagesCatchup do
@moduledoc """ @moduledoc """
Manages the catch-up process for historical rollup messages between Layer 1 (L1) and Layer 2 (L2) within the Arbitrum network. Manages the catch-up process for historical rollup messages between Layer 1 (L1)
and Layer 2 (L2) within the Arbitrum network.
This module aims to discover historical messages that were not captured by the block
fetcher or the catch-up block fetcher. This situation arises during the upgrade of an This module aims to discover historical messages that were not captured by the
existing instance of BlockScout (BS) that already has indexed blocks but lacks block fetcher or the catch-up block fetcher. This situation arises during the
a crosschain messages discovery mechanism. Therefore, it becomes necessary to traverse upgrade of an existing instance of BlockScout (BS) that already has indexed
the already indexed blocks to extract crosschain messages contained within them. blocks but lacks a crosschain messages discovery mechanism. Therefore, it
becomes necessary to traverse the already indexed blocks to extract crosschain
The fetcher's operation cycle consists of five phases, initiated by sending specific messages contained within them.
messages:
The fetcher's operation cycle consists of five phases, initiated by sending
specific messages:
- `:wait_for_new_block`: Waits for the block fetcher to index new blocks before - `:wait_for_new_block`: Waits for the block fetcher to index new blocks before
proceeding with message discovery. proceeding with message discovery.
- `:init_worker`: Sets up the initial parameters for the message discovery process, - `:init_worker`: Sets up the initial parameters for the message discovery
identifying the ending blocks for the search. process, identifying the ending blocks for the search.
- `:historical_msg_from_l2` and `:historical_msg_to_l2`: Manage the discovery and - `:historical_msg_from_l2` and `:historical_msg_to_l2`: Manage the discovery
processing of messages sent from L2 to L1 and from L1 to L2, respectively. and processing of messages sent from L2 to L1 and from L1 to L2, respectively.
- `:plan_next_iteration`: Schedules the next iteration of the catch-up process. - `:plan_next_iteration`: Schedules the next iteration of the catch-up process.
Workflow diagram of the fetcher state changes: Workflow diagram of the fetcher state changes:
@ -29,23 +31,24 @@ defmodule Indexer.Fetcher.Arbitrum.RollupMessagesCatchup do
|-> historical_msg_from_l2 -> historical_msg_to_l2 -> plan_next_iteration ->| |-> historical_msg_from_l2 -> historical_msg_to_l2 -> plan_next_iteration ->|
|---------------------------------------------------------------------------| |---------------------------------------------------------------------------|
`historical_msg_from_l2` discovers L2-to-L1 messages by analyzing logs from already `historical_msg_from_l2` discovers L2-to-L1 messages by analyzing logs from
indexed rollup transactions. Logs representing the `L2ToL1Tx` event are utilized already indexed rollup transactions. Logs representing the `L2ToL1Tx` event are
to construct messages. The current rollup state, including information about utilized to construct messages. The current rollup state, including information
committed batches and confirmed blocks, is used to assign the appropriate status about committed batches and confirmed blocks, is used to assign the appropriate
to the messages before importing them into the database. status to the messages before importing them into the database.
`historical_msg_to_l2` discovers L1-to-L2 messages by requesting rollup `historical_msg_to_l2` discovers in the database transactions that could be
transactions through RPC. Transactions containing a `requestId` in their body are responsible for L1-to-L2 messages and then re-requests these transactions
utilized to construct messages. These messages are marked as `:relayed`, indicating through RPC. Results are utilized to construct messages. These messages are
that they have been successfully received on L2 and are considered completed, and marked as `:relayed`, indicating that they have been successfully received on
are then imported into the database. This approach is adopted because it parallels L2 and are considered completed, and are then imported into the database. This
the action of re-indexing existing transactions to include Arbitrum-specific fields, approach is adopted because it parallels the action of re-indexing existing
which are absent in the currently indexed transactions. However, permanently adding transactions to include Arbitrum-specific fields, which are absent in the
these fields to the database model for the sake of historical message catch-up is currently indexed transactions. However, permanently adding these fields to the
impractical. Therefore, to avoid the extensive process of re-indexing and to database model for the sake of historical message catch-up is impractical.
minimize changes to the database schema, fetching the required data directly from Therefore, to avoid the extensive process of re-indexing and to minimize changes
an external node via RPC is preferred for historical message discovery. to the database schema, fetching the required data directly from an external
node via RPC is preferred for historical message discovery.
""" """
use GenServer use GenServer
@ -89,8 +92,7 @@ defmodule Indexer.Fetcher.Arbitrum.RollupMessagesCatchup do
config_tracker = Application.get_all_env(:indexer)[__MODULE__] config_tracker = Application.get_all_env(:indexer)[__MODULE__]
recheck_interval = config_tracker[:recheck_interval] recheck_interval = config_tracker[:recheck_interval]
messages_to_l2_blocks_depth = config_tracker[:messages_to_l2_blocks_depth] missed_messages_blocks_depth = config_tracker[:missed_messages_blocks_depth]
messages_from_l2_blocks_depth = config_tracker[:messages_to_l1_blocks_depth]
Process.send(self(), :wait_for_new_block, []) Process.send(self(), :wait_for_new_block, [])
@ -104,8 +106,7 @@ defmodule Indexer.Fetcher.Arbitrum.RollupMessagesCatchup do
}, },
json_l2_rpc_named_arguments: args[:json_rpc_named_arguments], json_l2_rpc_named_arguments: args[:json_rpc_named_arguments],
recheck_interval: recheck_interval, recheck_interval: recheck_interval,
messages_to_l2_blocks_depth: messages_to_l2_blocks_depth, missed_messages_blocks_depth: missed_messages_blocks_depth
messages_from_l2_blocks_depth: messages_from_l2_blocks_depth
}, },
data: %{} data: %{}
}} }}
@ -167,9 +168,9 @@ defmodule Indexer.Fetcher.Arbitrum.RollupMessagesCatchup do
end end
# Sets the initial parameters for discovering historical messages. This function # Sets the initial parameters for discovering historical messages. This function
# calculates the end blocks for both L1-to-L2 and L2-to-L1 message discovery # inspects the database for missed messages and, if any are found, identifies the
# processes based on th earliest messages already indexed. If no messages are # end blocks of the ranges for both L1-to-L2 and L2-to-L1 messages. If no missed
# available, the block number before the latest indexed block will be used. # messages are found, the block number before the latest indexed block will be used.
# These end blocks are used to initiate the discovery process in subsequent iterations. # These end blocks are used to initiate the discovery process in subsequent iterations.
# #
# After identifying the initial values, the function immediately transitions to # After identifying the initial values, the function immediately transitions to
@ -178,15 +179,22 @@ defmodule Indexer.Fetcher.Arbitrum.RollupMessagesCatchup do
# #
# ## Parameters # ## Parameters
# - `:init_worker`: The message that triggers the handler. # - `:init_worker`: The message that triggers the handler.
# - `state`: The current state of the fetcher. # - `state`: The current state of the fetcher containing the first rollup block
# number and the number of the most recent block indexed.
# #
# ## Returns # ## Returns
# - `{:noreply, new_state}` where the end blocks for both L1-to-L2 and L2-to-L1 # - `{:noreply, new_state}` where the end blocks for both L1-to-L2 and L2-to-L1
# message discovery are established. # message discovery are established.
@impl GenServer @impl GenServer
def handle_info(:init_worker, %{data: _} = state) do def handle_info(
historical_msg_from_l2_end_block = Db.rollup_block_to_discover_missed_messages_from_l2(state.data.new_block - 1) :init_worker,
historical_msg_to_l2_end_block = Db.rollup_block_to_discover_missed_messages_to_l2(state.data.new_block - 1) %{config: %{rollup_rpc: %{first_block: rollup_first_block}}, data: %{new_block: just_received_block}} = state
) do
historical_msg_from_l2_end_block =
Db.rollup_block_to_discover_missed_messages_from_l2(just_received_block, rollup_first_block)
historical_msg_to_l2_end_block =
Db.rollup_block_to_discover_missed_messages_to_l2(just_received_block, rollup_first_block)
Process.send(self(), :historical_msg_from_l2, []) Process.send(self(), :historical_msg_from_l2, [])
@ -205,7 +213,8 @@ defmodule Indexer.Fetcher.Arbitrum.RollupMessagesCatchup do
# #
# This function uses the results from the previous iteration to set the end block # This function uses the results from the previous iteration to set the end block
# for the current message discovery iteration. It identifies the start block and # for the current message discovery iteration. It identifies the start block and
# requests rollup logs within the specified range to explore `L2ToL1Tx` events. # requests rollup logs within the specified range to explore `L2ToL1Tx` events
# that have no matching records in the cross-level messages table.
# Discovered events are used to compose messages to be stored in the database. # Discovered events are used to compose messages to be stored in the database.
# Before being stored in the database, each message is assigned the appropriate # Before being stored in the database, each message is assigned the appropriate
# status based on the current state of the rollup. # status based on the current state of the rollup.
@ -251,17 +260,18 @@ defmodule Indexer.Fetcher.Arbitrum.RollupMessagesCatchup do
# Processes the next iteration of historical L1-to-L2 message discovery. # Processes the next iteration of historical L1-to-L2 message discovery.
# #
# This function uses the results from the previous iteration to set the end block for # This function uses the results from the previous iteration to set the end block
# the current message discovery iteration. It identifies the start block and requests # for the current message discovery iteration. It identifies the start block and
# rollup blocks within the specified range through RPC to explore transactions # inspects the database for transactions within the block range that could contain
# containing a `requestId` in their body. This RPC request is necessary because the # missing messages. Then it requests rollup transactions through RPC to extract the
# `requestId` for every transaction. This RPC request is necessary because the
# `requestId` field is not present in the transaction model of already indexed # `requestId` field is not present in the transaction model of already indexed
# transactions in the database. The discovered transactions are then used to construct # transactions in the database. Results are used to construct messages, which are
# messages, which are subsequently stored in the database. These imported messages are # subsequently stored in the database. These imported messages are marked as
# marked as `:relayed`, signifying that they represent completed actions from L1 to L2. # `:relayed`, signifying that they represent completed actions from L1 to L2.
# #
# After importing the messages, the function immediately switches to the process # After importing the messages, the function immediately switches to the process
# of choosing a delay prior to the next iteration of historical messages discovery # of choosing a delay prior to the next iteration of historical message discovery
# by sending the `:plan_next_iteration` message. # by sending the `:plan_next_iteration` message.
# #
# ## Parameters # ## Parameters

@ -5,13 +5,13 @@ defmodule Indexer.Fetcher.Arbitrum.Utils.Db do
import Ecto.Query, only: [from: 2] import Ecto.Query, only: [from: 2]
import Indexer.Fetcher.Arbitrum.Utils.Logging, only: [log_warning: 1] import Indexer.Fetcher.Arbitrum.Utils.Logging, only: [log_warning: 1, log_info: 1]
alias Explorer.{Chain, Repo} alias Explorer.{Chain, Repo}
alias Explorer.Chain.Arbitrum alias Explorer.Chain.Arbitrum
alias Explorer.Chain.Arbitrum.Reader alias Explorer.Chain.Arbitrum.Reader
alias Explorer.Chain.Block, as: FullBlock alias Explorer.Chain.Block, as: FullBlock
alias Explorer.Chain.{Data, Hash, Log} alias Explorer.Chain.{Data, Hash}
alias Explorer.Utility.MissingBlockRange alias Explorer.Utility.MissingBlockRange
@ -218,52 +218,79 @@ defmodule Indexer.Fetcher.Arbitrum.Utils.Db do
end end
@doc """ @doc """
Determines the rollup block number to start searching for missed messages originating from L2. Determines the rollup block number to discover missed L2-to-L1 messages within
a specified range.
The function checks for the first missed L2-to-L1 message and whether historical
block fetching is still in progress. If no missed messages are found and
historical fetching is complete, it returns the block number just before the
first rollup block. Otherwise, it returns the appropriate block number based on
the findings.
## Parameters ## Parameters
- `value_if_nil`: The default value to return if no messages originating from L2 have been found. - `initial_value`: The initial block number to start the further search of the
missed messages from if no missed messages are found and historical blocks
are not fetched yet.
- `rollup_first_block`: The block number of the first rollup block.
## Returns ## Returns
- The rollup block number just before the earliest discovered message from L2, - The block number of the first missed L2-to-L1 message.
or `value_if_nil` if no messages from L2 are found.
""" """
@spec rollup_block_to_discover_missed_messages_from_l2(nil | FullBlock.block_number()) :: @spec rollup_block_to_discover_missed_messages_from_l2(FullBlock.block_number(), FullBlock.block_number()) ::
nil | FullBlock.block_number() nil | FullBlock.block_number()
def rollup_block_to_discover_missed_messages_from_l2(value_if_nil \\ nil) def rollup_block_to_discover_missed_messages_from_l2(initial_value, rollup_first_block) do
when (is_integer(value_if_nil) and value_if_nil >= 0) or is_nil(value_if_nil) do arbsys_contract = Application.get_env(:indexer, Indexer.Fetcher.Arbitrum.Messaging)[:arbsys_contract]
case Reader.rollup_block_of_earliest_discovered_message_from_l2() do
nil ->
log_warning("No messages from L2 found in DB")
value_if_nil
value -> with {:block, nil} <-
value - 1 {:block, Reader.rollup_block_of_first_missed_message_from_l2(arbsys_contract, @l2_to_l1_event)},
{:synced, true} <- {:synced, rollup_synced?()} do
log_info("No missed messages from L2 found")
rollup_first_block - 1
else
{:block, value} ->
log_info("First missed message from L2 found in block #{value}")
value
{:synced, false} ->
log_info("No missed messages from L2 found but historical blocks fetching still in progress")
initial_value
end end
end end
@doc """ @doc """
Determines the rollup block number to start searching for missed messages originating to L2. Determines the rollup block number to discover missed L1-to-L2 messages within
a specified range.
The function checks for the first missed L1-to-L2 message and whether historical
block fetching is still in progress. If no missed messages are found and
historical fetching is complete, it returns the block number just before the
first rollup block. Otherwise, it returns the appropriate block number based on
the findings.
## Parameters ## Parameters
- `value_if_nil`: The default value to return if no messages originating to L2 have been found. - `initial_value`: The initial block number to start the further search of the
missed messages from if no missed messages are found and historical blocks
are not fetched yet.
- `rollup_first_block`: The block number of the first rollup block.
## Returns ## Returns
- The rollup block number just before the earliest discovered message to L2, - The block number of the first missed L1-to-L2 message.
or `value_if_nil` if no messages to L2 are found.
""" """
@spec rollup_block_to_discover_missed_messages_to_l2(nil | FullBlock.block_number()) :: nil | FullBlock.block_number() @spec rollup_block_to_discover_missed_messages_to_l2(FullBlock.block_number(), FullBlock.block_number()) ::
def rollup_block_to_discover_missed_messages_to_l2(value_if_nil \\ nil) nil | FullBlock.block_number()
when (is_integer(value_if_nil) and value_if_nil >= 0) or is_nil(value_if_nil) do def rollup_block_to_discover_missed_messages_to_l2(initial_value, rollup_first_block) do
case Reader.rollup_block_of_earliest_discovered_message_to_l2() do with {:block, nil} <- {:block, Reader.rollup_block_of_first_missed_message_to_l2()},
nil -> {:synced, true} <- {:synced, rollup_synced?()} do
# In theory it could be a situation when when the earliest message points log_info("No missed messages to L2 found")
# to a completion transaction which is not indexed yet. In this case, this rollup_first_block - 1
# warning will occur. else
log_warning("No completed messages to L2 found in DB") {:block, value} ->
value_if_nil log_info("First missed message to L2 found in block #{value}")
value
value -> {:synced, false} ->
value - 1 log_info("No missed messages to L2 found but historical blocks fetching still in progress")
initial_value
end end
end end
@ -371,7 +398,11 @@ defmodule Indexer.Fetcher.Arbitrum.Utils.Db do
- A list of `Explorer.Chain.Block` instances containing detailed information for each - A list of `Explorer.Chain.Block` instances containing detailed information for each
block number in the input list. Returns an empty list if no blocks are found for the given numbers. block number in the input list. Returns an empty list if no blocks are found for the given numbers.
""" """
@spec rollup_blocks(maybe_improper_list(FullBlock.block_number(), [])) :: [FullBlock.t()] @spec rollup_blocks([FullBlock.block_number()]) :: [FullBlock.t()]
def rollup_blocks(list_of_block_numbers)
def rollup_blocks([]), do: []
def rollup_blocks(list_of_block_numbers) def rollup_blocks(list_of_block_numbers)
when is_list(list_of_block_numbers) do when is_list(list_of_block_numbers) do
query = query =
@ -641,50 +672,64 @@ defmodule Indexer.Fetcher.Arbitrum.Utils.Db do
end end
@doc """ @doc """
Retrieves all rollup logs in the range of blocks from `start_block` to `end_block` Retrieves the transaction hashes as strings for missed L1-to-L2 messages within
corresponding to the `L2ToL1Tx` event emitted by the ArbSys contract. a specified block range.
The function identifies missed messages by checking transactions of specific
types that are supposed to contain L1-to-L2 messages and verifying if there are
corresponding entries in the messages table. A message is considered missed if
there is a transaction without a matching message record within the specified
block range.
## Parameters ## Parameters
- `start_block`: The starting block number of the range from which to - `start_block`: The starting block number of the range.
retrieve the transaction logs containing L2-to-L1 messages.
- `end_block`: The ending block number of the range. - `end_block`: The ending block number of the range.
## Returns ## Returns
- A list of log maps for the `L2ToL1Tx` event where binary values for hashes - A list of transaction hashes as strings for missed L1-to-L2 messages.
and data are decoded into hex strings, containing detailed information about
each event within the specified block range. Returns an empty list if no
relevant logs are found.
""" """
@spec l2_to_l1_logs(FullBlock.block_number(), FullBlock.block_number()) :: [ @spec transactions_for_missed_messages_to_l2(non_neg_integer(), non_neg_integer()) :: [String.t()]
def transactions_for_missed_messages_to_l2(start_block, end_block) do
# credo:disable-for-lines:2 Credo.Check.Refactor.PipeChainStart
Reader.transactions_for_missed_messages_to_l2(start_block, end_block)
|> Enum.map(&Hash.to_string/1)
end
@doc """
Retrieves the logs for missed L2-to-L1 messages within a specified block range
and converts them to maps.
The function identifies missed messages by checking logs for the specified
L2-to-L1 event and verifying if there are corresponding entries in the messages
table. A message is considered missed if there is a log entry without a
matching message record within the specified block range.
## Parameters
- `start_block`: The starting block number of the range.
- `end_block`: The ending block number of the range.
## Returns
- A list of maps representing the logs for missed L2-to-L1 messages.
"""
@spec logs_for_missed_messages_from_l2(non_neg_integer(), non_neg_integer()) :: [
%{ %{
data: String, data: String.t(),
index: non_neg_integer(), index: non_neg_integer(),
first_topic: String, first_topic: String.t(),
second_topic: String, second_topic: String.t(),
third_topic: String, third_topic: String.t(),
fourth_topic: String, fourth_topic: String.t(),
address_hash: String, address_hash: String.t(),
transaction_hash: String, transaction_hash: String.t(),
block_hash: String, block_hash: String.t(),
block_number: FullBlock.block_number() block_number: FullBlock.block_number()
} }
] ]
def l2_to_l1_logs(start_block, end_block) def logs_for_missed_messages_from_l2(start_block, end_block) do
when is_integer(start_block) and start_block >= 0 and
is_integer(end_block) and start_block <= end_block do
arbsys_contract = Application.get_env(:indexer, Indexer.Fetcher.Arbitrum.Messaging)[:arbsys_contract] arbsys_contract = Application.get_env(:indexer, Indexer.Fetcher.Arbitrum.Messaging)[:arbsys_contract]
query = # credo:disable-for-lines:2 Credo.Check.Refactor.PipeChainStart
from(log in Log, Reader.logs_for_missed_messages_from_l2(start_block, end_block, arbsys_contract, @l2_to_l1_event)
where:
log.block_number >= ^start_block and
log.block_number <= ^end_block and
log.address_hash == ^arbsys_contract and
log.first_topic == ^@l2_to_l1_event
)
query
|> Repo.all(timeout: :infinity)
|> Enum.map(&logs_to_map/1) |> Enum.map(&logs_to_map/1)
end end
@ -870,11 +915,42 @@ defmodule Indexer.Fetcher.Arbitrum.Utils.Db do
not Enum.empty?(Reader.get_anytrust_keyset(keyset_hash)) not Enum.empty?(Reader.get_anytrust_keyset(keyset_hash))
end end
@spec get_da_info_by_batch_number(non_neg_integer()) :: map() | nil @doc """
Retrieves Data Availability (DA) information for a specific Arbitrum batch number.
This function queries the database for DA information stored in the
`DaMultiPurposeRecord`. It specifically looks for records where
the `data_type` is 0, which corresponds to batch-specific DA information.
## Parameters
- `batch_number`: The Arbitrum batch number.
## Returns
- A map containing the DA information for the specified batch number. This map
corresponds to the `data` field of the `DaMultiPurposeRecord`.
- An empty map (`%{}`) if no DA information is found for the given batch number.
"""
@spec get_da_info_by_batch_number(non_neg_integer()) :: map()
def get_da_info_by_batch_number(batch_number) do def get_da_info_by_batch_number(batch_number) do
Reader.get_da_info_by_batch_number(batch_number) Reader.get_da_info_by_batch_number(batch_number)
end end
# Checks if the rollup is synced by verifying if the block after the first block exists in the database.
@spec rollup_synced?() :: boolean()
defp rollup_synced? do
# Since zero block does not have any useful data, it make sense to consider
# the block just after it
rollup_tail = Application.get_all_env(:indexer)[:first_block] + 1
query =
from(
block in FullBlock,
where: block.number == ^rollup_tail and block.consensus == true
)
if(is_nil(query |> Repo.one()), do: false, else: true)
end
@spec lifecycle_transaction_to_map(Arbitrum.LifecycleTransaction.t()) :: Arbitrum.LifecycleTransaction.to_import() @spec lifecycle_transaction_to_map(Arbitrum.LifecycleTransaction.t()) :: Arbitrum.LifecycleTransaction.to_import()
defp lifecycle_transaction_to_map(tx) do defp lifecycle_transaction_to_map(tx) do
[:id, :hash, :block_number, :timestamp, :status] [:id, :hash, :block_number, :timestamp, :status]
@ -918,6 +994,25 @@ defmodule Indexer.Fetcher.Arbitrum.Utils.Db do
|> db_record_to_map(log, true) |> db_record_to_map(log, true)
end end
# Converts an Arbitrum-related database record to a map with specified keys and optional encoding.
#
# This function is used to transform various Arbitrum-specific database records
# (such as LifecycleTransaction, BatchBlock, or Message) into maps containing
# only the specified keys. It's particularly useful for preparing data for
# import or further processing of Arbitrum blockchain data.
#
# Parameters:
# - `required_keys`: A list of atoms representing the keys to include in the
# output map.
# - `record`: The database record or struct to be converted.
# - `encode`: Boolean flag to determine if Hash and Data types should be
# encoded to strings (default: false). When true, Hash and Data are
# converted to string representations; otherwise, their raw bytes are used.
#
# Returns:
# - A map containing only the required keys from the input record. Hash and
# Data types are either encoded to strings or left as raw bytes based on
# the `encode` parameter. @spec db_record_to_map([atom()], map(), boolean()) :: map()
defp db_record_to_map(required_keys, record, encode \\ false) do defp db_record_to_map(required_keys, record, encode \\ false) do
required_keys required_keys
|> Enum.reduce(%{}, fn key, record_as_map -> |> Enum.reduce(%{}, fn key, record_as_map ->

@ -2,23 +2,26 @@ defmodule Indexer.Fetcher.Arbitrum.Workers.HistoricalMessagesOnL2 do
@moduledoc """ @moduledoc """
Handles the discovery and processing of historical messages between Layer 1 (L1) and Layer 2 (L2) within an Arbitrum rollup. Handles the discovery and processing of historical messages between Layer 1 (L1) and Layer 2 (L2) within an Arbitrum rollup.
L1-to-L2 messages are discovered by requesting rollup transactions through RPC. ## L1-to-L2 Messages
This is necessary because some Arbitrum-specific fields are not included in the L1-to-L2 messages are discovered by first inspecting the database to identify
already indexed transactions within the database. potentially missed messages. Then, rollup transactions are requested through RPC
to fetch the necessary data. This is required because some Arbitrum-specific fields,
such as the `requestId`, are not included in the already indexed transactions within
the database.
## L2-to-L1 Messages
L2-to-L1 messages are discovered by analyzing the logs of already indexed rollup L2-to-L1 messages are discovered by analyzing the logs of already indexed rollup
transactions. transactions.
""" """
import Indexer.Fetcher.Arbitrum.Utils.Logging, only: [log_warning: 1, log_info: 1] import Indexer.Fetcher.Arbitrum.Utils.Logging, only: [log_warning: 1, log_info: 1, log_debug: 1]
alias EthereumJSONRPC.Block.ByNumber, as: BlockByNumber
alias EthereumJSONRPC.Transaction, as: TransactionByRPC alias EthereumJSONRPC.Transaction, as: TransactionByRPC
alias Explorer.Chain alias Explorer.Chain
alias Indexer.Fetcher.Arbitrum.Messaging alias Indexer.Fetcher.Arbitrum.Messaging
alias Indexer.Fetcher.Arbitrum.Utils.{Db, Logging, Rpc} alias Indexer.Fetcher.Arbitrum.Utils.{Db, Rpc}
require Logger require Logger
@ -34,25 +37,31 @@ defmodule Indexer.Fetcher.Arbitrum.Workers.HistoricalMessagesOnL2 do
## Parameters ## Parameters
- `end_block`: The ending block number up to which the discovery should occur. - `end_block`: The ending block number up to which the discovery should occur.
If `nil` or lesser than the indexer first block, the function If `nil` or less than the indexer's first block, the function returns with no
returns with no action taken. action taken.
- `state`: Contains the operational configuration, including the depth of - `state`: Contains the operational configuration, including the depth of
blocks to consider for the starting point of message discovery. blocks to consider for the starting point of message discovery and the
first block of the rollup chain.
## Returns ## Returns
- `{:ok, nil}`: If `end_block` is `nil`, indicating no discovery action was required. - `{:ok, nil}`: If `end_block` is `nil`, indicating no discovery action was
- `{:ok, rollup_first_block}`: If `end_block` is lesser than the indexer first block, required.
indicating that the "genesis" of the block chain was reached. - `{:ok, rollup_first_block}`: If `end_block` is less than the indexer's first
block, indicating that the "genesis" of the blockchain was reached.
- `{:ok, start_block}`: Upon successful discovery of historical messages, where - `{:ok, start_block}`: Upon successful discovery of historical messages, where
`start_block` indicates the necessity to consider another block range in the next `start_block` indicates the necessity to consider another block range in the
iteration of message discovery. next iteration of message discovery.
- `{:ok, end_block + 1}`: If the required block range is not fully indexed, - `{:ok, end_block + 1}`: If the required block range is not fully indexed,
indicating that the next iteration of message discovery should start with the same indicating that the next iteration of message discovery should start with the
block range. same block range.
""" """
@spec discover_historical_messages_from_l2(nil | integer(), %{ @spec discover_historical_messages_from_l2(nil | integer(), %{
:config => %{ :config => %{
:messages_to_l2_blocks_depth => non_neg_integer(), :missed_messages_blocks_depth => non_neg_integer(),
:rollup_rpc => %{
:first_block => non_neg_integer(),
optional(any()) => any()
},
optional(any()) => any() optional(any()) => any()
}, },
optional(any()) => any() optional(any()) => any()
@ -72,13 +81,13 @@ defmodule Indexer.Fetcher.Arbitrum.Workers.HistoricalMessagesOnL2 do
end_block, end_block,
%{ %{
config: %{ config: %{
messages_from_l2_blocks_depth: messages_from_l2_blocks_depth, missed_messages_blocks_depth: missed_messages_blocks_depth,
rollup_rpc: %{first_block: rollup_first_block} rollup_rpc: %{first_block: rollup_first_block}
} }
} = _state } = _state
) )
when is_integer(end_block) do when is_integer(end_block) do
start_block = max(rollup_first_block, end_block - messages_from_l2_blocks_depth + 1) start_block = max(rollup_first_block, end_block - missed_messages_blocks_depth + 1)
if Db.indexed_blocks?(start_block, end_block) do if Db.indexed_blocks?(start_block, end_block) do
do_discover_historical_messages_from_l2(start_block, end_block) do_discover_historical_messages_from_l2(start_block, end_block)
@ -107,10 +116,11 @@ defmodule Indexer.Fetcher.Arbitrum.Workers.HistoricalMessagesOnL2 do
# ## Returns # ## Returns
# - `{:ok, start_block}`: A tuple indicating successful processing, returning the initial # - `{:ok, start_block}`: A tuple indicating successful processing, returning the initial
# starting block number. # starting block number.
@spec do_discover_historical_messages_from_l2(non_neg_integer(), non_neg_integer()) :: {:ok, non_neg_integer()}
defp do_discover_historical_messages_from_l2(start_block, end_block) do defp do_discover_historical_messages_from_l2(start_block, end_block) do
log_info("Block range for discovery historical messages from L2: #{start_block}..#{end_block}") log_info("Block range for discovery historical messages from L2: #{start_block}..#{end_block}")
logs = Db.l2_to_l1_logs(start_block, end_block) logs = Db.logs_for_missed_messages_from_l2(start_block, end_block)
unless logs == [] do unless logs == [] do
messages = messages =
@ -126,35 +136,40 @@ defmodule Indexer.Fetcher.Arbitrum.Workers.HistoricalMessagesOnL2 do
@doc """ @doc """
Initiates the discovery of historical messages sent from L1 to L2 up to a specified block number. Initiates the discovery of historical messages sent from L1 to L2 up to a specified block number.
This function orchestrates the process of discovering historical L1-to-L2 messages within This function orchestrates the process of discovering historical L1-to-L2
a given rollup block range, based on the existence of the `requestId` field in the rollup messages within a given rollup block range, based on the existence of the
transaction body. Transactions are requested through RPC because already indexed `requestId` field in the rollup transaction body. The initial list of
transactions from the database cannot be utilized; the `requestId` field is not included transactions that could contain the messages is received from the database, and
in the transaction model. The function ensures that the block range has been indexed then their bodies are re-requested through RPC because already indexed
before proceeding with message discovery and import. The imported messages are marked as transactions from the database cannot be utilized; the `requestId` field is not
`:relayed`, as they represent completed actions from L1 to L2. included in the transaction model. The function ensures that the block range
has been indexed before proceeding with message discovery and import. The
imported messages are marked as `:relayed`, as they represent completed actions
from L1 to L2.
## Parameters ## Parameters
- `end_block`: The ending block number for the discovery operation. - `end_block`: The ending block number for the discovery operation. If `nil` or
If `nil` or lesser than the indexer first block, the function less than the indexer's first block, the function returns with no action
returns with no action taken. taken.
- `state`: The current state of the operation, containing configuration parameters - `state`: The current state of the operation, containing configuration
including `messages_to_l2_blocks_depth`, `chunk_size`, and JSON RPC connection parameters including the depth of blocks to consider for the starting point
settings. of message discovery, size of chunk to make request to RPC, and JSON RPC
connection settings.
## Returns ## Returns
- `{:ok, nil}`: If `end_block` is `nil`, indicating no action was necessary. - `{:ok, nil}`: If `end_block` is `nil`, indicating no action was necessary.
- `{:ok, rollup_first_block}`: If `end_block` is lesser than the indexer first block, - `{:ok, rollup_first_block}`: If `end_block` is less than the indexer's first
indicating that the "genesis" of the block chain was reached. block, indicating that the "genesis" of the blockchain was reached.
- `{:ok, start_block}`: On successful completion of historical message discovery, where - `{:ok, start_block}`: On successful completion of historical message
`start_block` indicates the necessity to consider another block range in the next discovery, where `start_block` indicates the necessity to consider another
iteration of message discovery. block range in the next iteration of message discovery.
- `{:ok, end_block + 1}`: If the required block range is not fully indexed, indicating - `{:ok, end_block + 1}`: If the required block range is not fully indexed,
that the next iteration of message discovery should start with the same block range. indicating that the next iteration of message discovery should start with the
same block range.
""" """
@spec discover_historical_messages_to_l2(nil | integer(), %{ @spec discover_historical_messages_to_l2(nil | integer(), %{
:config => %{ :config => %{
:messages_to_l2_blocks_depth => non_neg_integer(), :missed_messages_blocks_depth => non_neg_integer(),
:rollup_rpc => %{ :rollup_rpc => %{
:chunk_size => non_neg_integer(), :chunk_size => non_neg_integer(),
:json_rpc_named_arguments => EthereumJSONRPC.json_rpc_named_arguments(), :json_rpc_named_arguments => EthereumJSONRPC.json_rpc_named_arguments(),
@ -177,10 +192,10 @@ defmodule Indexer.Fetcher.Arbitrum.Workers.HistoricalMessagesOnL2 do
def discover_historical_messages_to_l2( def discover_historical_messages_to_l2(
end_block, end_block,
%{config: %{messages_to_l2_blocks_depth: _, rollup_rpc: %{first_block: _}} = config} = _state %{config: %{missed_messages_blocks_depth: _, rollup_rpc: %{first_block: _}} = config} = _state
) )
when is_integer(end_block) do when is_integer(end_block) do
start_block = max(config.rollup_rpc.first_block, end_block - config.messages_to_l2_blocks_depth + 1) start_block = max(config.rollup_rpc.first_block, end_block - config.missed_messages_blocks_depth + 1)
# Although indexing blocks is not necessary to determine the completion of L1-to-L2 messages, # Although indexing blocks is not necessary to determine the completion of L1-to-L2 messages,
# for database consistency, it is preferable to delay marking these messages as completed. # for database consistency, it is preferable to delay marking these messages as completed.
@ -195,22 +210,36 @@ defmodule Indexer.Fetcher.Arbitrum.Workers.HistoricalMessagesOnL2 do
end end
end end
# The function iterates through the block range in chunks, making RPC calls to fetch rollup block # Discovers and processes historical messages sent from L1 to L2 within a
# data and extract transactions. Each transaction is filtered for L1-to-L2 messages based on # specified rollup block range.
# existence of `requestId` field in the transaction body, and then imported into the database. #
# The imported messages are marked as `:relayed` as they represent completed actions from L1 to L2. # This function identifies which of already indexed transactions within the
# block range contains L1-to-L2 messages and makes RPC calls to fetch
# transaction data. These transactions are then processed to construct proper
# message structures, which are imported into the database. The imported
# messages are marked as `:relayed` as they represent completed actions from L1
# to L2.
# #
# Already indexed transactions from the database cannot be used because the `requestId` field is # Note: Already indexed transactions from the database cannot be used because
# not included in the transaction model. # the `requestId` field is not included in the transaction model.
# #
# ## Parameters # ## Parameters
# - `start_block`: The starting block number for the discovery range. # - `start_block`: The starting block number for the discovery range.
# - `end_block`: The ending block number for the discovery range. # - `end_block`: The ending block number for the discovery range.
# - `config`: The configuration map containing settings for RPC communication and chunk size. # - `config`: The configuration map containing settings for RPC communication
# and chunk size.
# #
# ## Returns # ## Returns
# - `{:ok, start_block}`: A tuple indicating successful processing, returning the initial # - `{:ok, start_block}`: A tuple indicating successful processing, returning
# starting block number. # the initial starting block number.
@spec do_discover_historical_messages_to_l2(non_neg_integer(), non_neg_integer(), %{
:rollup_rpc => %{
:chunk_size => non_neg_integer(),
:json_rpc_named_arguments => EthereumJSONRPC.json_rpc_named_arguments(),
optional(any()) => any()
},
optional(any()) => any()
}) :: {:ok, non_neg_integer()}
defp do_discover_historical_messages_to_l2( defp do_discover_historical_messages_to_l2(
start_block, start_block,
end_block, end_block,
@ -218,68 +247,56 @@ defmodule Indexer.Fetcher.Arbitrum.Workers.HistoricalMessagesOnL2 do
) do ) do
log_info("Block range for discovery historical messages to L2: #{start_block}..#{end_block}") log_info("Block range for discovery historical messages to L2: #{start_block}..#{end_block}")
{messages, _} = transactions = Db.transactions_for_missed_messages_to_l2(start_block, end_block)
start_block..end_block transactions_length = length(transactions)
|> Enum.chunk_every(chunk_size)
|> Enum.reduce({[], 0}, fn chunk, {messages_acc, chunks_counter} -> if transactions_length > 0 do
Logging.log_details_chunk_handling( log_debug("#{transactions_length} historical messages to L2 discovered")
"Collecting rollup data",
{"block", "blocks"},
chunk,
chunks_counter,
end_block - start_block + 1
)
messages =
transactions
|> Enum.chunk_every(chunk_size)
|> Enum.reduce([], fn chunk, messages_acc ->
# Since DB does not contain the field RequestId specific to Arbitrum # Since DB does not contain the field RequestId specific to Arbitrum
# all transactions will be requested from the rollup RPC endpoint. # all transactions will be requested from the rollup RPC endpoint.
# The catchup process intended to be run once and only for the BS instance # The catchup process intended to be run once and only for the BS instance
# which are already exist, so it does not make sense to introduce # which are already exist, so it does not make sense to introduce
# the new field in DB # the new field in DB
requests = build_block_by_number_requests(chunk) requests = build_transaction_requests(chunk)
messages = messages =
requests requests
|> Rpc.make_chunked_request(json_rpc_named_arguments, "eth_getBlockByNumber") |> Rpc.make_chunked_request(json_rpc_named_arguments, "eth_getTransactionByHash")
|> get_transactions() |> Enum.map(&transaction_json_to_map/1)
|> Enum.map(fn tx ->
tx
|> TransactionByRPC.to_elixir()
|> TransactionByRPC.elixir_to_params()
end)
|> Messaging.filter_l1_to_l2_messages(false) |> Messaging.filter_l1_to_l2_messages(false)
{messages ++ messages_acc, chunks_counter + length(chunk)} messages ++ messages_acc
end) end)
unless messages == [] do
log_info("#{length(messages)} completions of L1-to-L2 messages will be imported") log_info("#{length(messages)} completions of L1-to-L2 messages will be imported")
end
import_to_db(messages) import_to_db(messages)
end
{:ok, start_block} {:ok, start_block}
end end
# Constructs a list of `eth_getBlockByNumber` requests for a given list of block numbers. # Constructs a list of `eth_getTransactionByHash` requests for a given list of transaction hashes.
defp build_block_by_number_requests(block_numbers) do defp build_transaction_requests(tx_hashes) do
block_numbers tx_hashes
|> Enum.reduce([], fn block_num, requests_list -> |> Enum.reduce([], fn tx_hash, requests_list ->
[ [
BlockByNumber.request(%{ Rpc.transaction_by_hash_request(%{id: 0, hash: tx_hash})
id: block_num,
number: block_num
})
| requests_list | requests_list
] ]
end) end)
end end
# Aggregates transactions from a list of blocks, combining them into a single list. # Transforms a JSON transaction object into a map.
defp get_transactions(blocks_by_rpc) do @spec transaction_json_to_map(%{String.t() => any()}) :: map()
blocks_by_rpc defp transaction_json_to_map(transaction_json) do
|> Enum.reduce([], fn block_by_rpc, txs -> transaction_json
block_by_rpc["transactions"] ++ txs |> TransactionByRPC.to_elixir()
end) |> TransactionByRPC.elixir_to_params()
end end
# Imports a list of messages into the database. # Imports a list of messages into the database.

@ -907,10 +907,8 @@ config :indexer, Indexer.Fetcher.Arbitrum.TrackingBatchesStatuses.Supervisor,
config :indexer, Indexer.Fetcher.Arbitrum.RollupMessagesCatchup, config :indexer, Indexer.Fetcher.Arbitrum.RollupMessagesCatchup,
recheck_interval: ConfigHelper.parse_time_env_var("INDEXER_ARBITRUM_MISSED_MESSAGES_RECHECK_INTERVAL", "1h"), recheck_interval: ConfigHelper.parse_time_env_var("INDEXER_ARBITRUM_MISSED_MESSAGES_RECHECK_INTERVAL", "1h"),
messages_to_l2_blocks_depth: missed_messages_blocks_depth:
ConfigHelper.parse_integer_env_var("INDEXER_ARBITRUM_MISSED_MESSAGES_TO_L2_BLOCK_DEPTH", 50), ConfigHelper.parse_integer_env_var("INDEXER_ARBITRUM_MISSED_MESSAGES_BLOCKS_DEPTH", 10000)
messages_to_l1_blocks_depth:
ConfigHelper.parse_integer_env_var("INDEXER_ARBITRUM_MISSED_MESSAGES_TO_L1_BLOCK_DEPTH", 1000)
config :indexer, Indexer.Fetcher.Arbitrum.RollupMessagesCatchup.Supervisor, config :indexer, Indexer.Fetcher.Arbitrum.RollupMessagesCatchup.Supervisor,
enabled: ConfigHelper.parse_bool_env_var("INDEXER_ARBITRUM_BRIDGE_MESSAGES_TRACKING_ENABLED") enabled: ConfigHelper.parse_bool_env_var("INDEXER_ARBITRUM_BRIDGE_MESSAGES_TRACKING_ENABLED")

@ -285,6 +285,7 @@
"lkve", "lkve",
"llhauc", "llhauc",
"loggable", "loggable",
"LPAD",
"LUKSO", "LUKSO",
"luxon", "luxon",
"mabi", "mabi",

@ -246,8 +246,7 @@ INDEXER_DISABLE_INTERNAL_TRANSACTIONS_FETCHER=false
# INDEXER_ARBITRUM_BRIDGE_MESSAGES_TRACKING_ENABLED= # INDEXER_ARBITRUM_BRIDGE_MESSAGES_TRACKING_ENABLED=
# INDEXER_ARBITRUM_TRACKING_MESSAGES_ON_L1_RECHECK_INTERVAL= # INDEXER_ARBITRUM_TRACKING_MESSAGES_ON_L1_RECHECK_INTERVAL=
# INDEXER_ARBITRUM_MISSED_MESSAGES_RECHECK_INTERVAL= # INDEXER_ARBITRUM_MISSED_MESSAGES_RECHECK_INTERVAL=
# INDEXER_ARBITRUM_MISSED_MESSAGES_TO_L2_BLOCK_DEPTH= # INDEXER_ARBITRUM_MISSED_MESSAGES_BLOCKS_DEPTH=
# INDEXER_ARBITRUM_MISSED_MESSAGES_TO_L1_BLOCK_DEPTH=
# INDEXER_REALTIME_FETCHER_MAX_GAP= # INDEXER_REALTIME_FETCHER_MAX_GAP=
# INDEXER_FETCHER_INIT_QUERY_LIMIT= # INDEXER_FETCHER_INIT_QUERY_LIMIT=
# INDEXER_TOKEN_BALANCES_FETCHER_INIT_QUERY_LIMIT= # INDEXER_TOKEN_BALANCES_FETCHER_INIT_QUERY_LIMIT=

Loading…
Cancel
Save