fix: proper handling confirmations for Arbitrum rollup block in the middle of a batch (#10482)

* Proper handling confirmation for block in middle of a batch

* proper check for gaps, proper output for range of blocks

* adjusted approach to revisit missing confirmations

* module documentation updated

* function comment extended
pull/10568/head
Alexander Kolotov 4 months ago committed by GitHub
parent 0555eaa656
commit e876e03763
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
  1. 36
      apps/indexer/lib/indexer/fetcher/arbitrum/tracking_batches_statuses.ex
  2. 124
      apps/indexer/lib/indexer/fetcher/arbitrum/utils/helper.ex
  3. 51
      apps/indexer/lib/indexer/fetcher/arbitrum/utils/rpc.ex
  4. 137
      apps/indexer/lib/indexer/fetcher/arbitrum/workers/new_batches.ex
  5. 760
      apps/indexer/lib/indexer/fetcher/arbitrum/workers/new_confirmations.ex
  6. 1
      config/runtime.exs
  7. 1
      docker-compose/envs/common-blockscout.env

@ -168,18 +168,11 @@ defmodule Indexer.Fetcher.Arbitrum.TrackingBatchesStatuses do
l1_start_block = Rpc.get_l1_start_block(state.config.l1_start_block, json_l1_rpc_named_arguments)
# TODO: it is necessary to develop a way to discover missed batches to cover the case
# when the batch #1, #2 and #4 are in DB, but #3 is not
# One of the approaches is to look deeper than the latest committed batch and
# check whether batches were already handled or not.
new_batches_start_block = Db.l1_block_to_discover_latest_committed_batch(l1_start_block)
historical_batches_end_block = Db.l1_block_to_discover_earliest_committed_batch(l1_start_block - 1)
new_confirmations_start_block = Db.l1_block_of_latest_confirmed_block(l1_start_block)
# TODO: it is necessary to develop a way to discover missed executions.
# One of the approaches is to look deeper than the latest execution and
# check whether executions were already handled or not.
new_executions_start_block = Db.l1_block_to_discover_latest_execution(l1_start_block)
historical_executions_end_block = Db.l1_block_to_discover_earliest_execution(l1_start_block - 1)
@ -268,22 +261,11 @@ defmodule Indexer.Fetcher.Arbitrum.TrackingBatchesStatuses do
# block for the next iteration of new confirmation discovery.
@impl GenServer
def handle_info(:check_new_confirmations, state) do
{handle_duration, {retcode, end_block}} = :timer.tc(&NewConfirmations.discover_new_rollup_confirmation/1, [state])
{handle_duration, {_, new_state}} = :timer.tc(&NewConfirmations.discover_new_rollup_confirmation/1, [state])
Process.send(self(), :check_new_executions, [])
updated_fields =
case retcode do
:ok -> %{}
_ -> %{historical_confirmations_end_block: nil, historical_confirmations_start_block: nil}
end
|> Map.merge(%{
# credo:disable-for-previous-line Credo.Check.Refactor.PipeChainStart
duration: increase_duration(state.data, handle_duration),
new_confirmations_start_block: end_block + 1
})
new_data = Map.merge(state.data, updated_fields)
new_data = Map.put(new_state.data, :duration, increase_duration(state.data, handle_duration))
{:noreply, %{state | data: new_data}}
end
@ -417,22 +399,12 @@ defmodule Indexer.Fetcher.Arbitrum.TrackingBatchesStatuses do
# end blocks for the next iteration of historical confirmations discovery.
@impl GenServer
def handle_info(:check_historical_confirmations, state) do
{handle_duration, {retcode, {start_block, end_block}}} =
{handle_duration, {_, new_state}} =
:timer.tc(&NewConfirmations.discover_historical_rollup_confirmation/1, [state])
Process.send(self(), :check_historical_executions, [])
updated_fields =
case retcode do
:ok -> %{historical_confirmations_end_block: start_block - 1, historical_confirmations_start_block: end_block}
_ -> %{historical_confirmations_end_block: nil, historical_confirmations_start_block: nil}
end
|> Map.merge(%{
# credo:disable-for-previous-line Credo.Check.Refactor.PipeChainStart
duration: increase_duration(state.data, handle_duration)
})
new_data = Map.merge(state.data, updated_fields)
new_data = Map.put(new_state.data, :duration, increase_duration(state.data, handle_duration))
{:noreply, %{state | data: new_data}}
end

@ -1,4 +1,8 @@
defmodule Indexer.Fetcher.Arbitrum.Utils.Helper do
alias Explorer.Chain.Arbitrum.LifecycleTransaction
import Indexer.Fetcher.Arbitrum.Utils.Logging, only: [log_info: 1]
@moduledoc """
Provides utility functions to support the handling of Arbitrum-specific data fetching and processing in the indexer.
"""
@ -55,14 +59,7 @@ defmodule Indexer.Fetcher.Arbitrum.Utils.Helper do
%{binary() => %{:block => non_neg_integer(), optional(any()) => any()}},
%{non_neg_integer() => DateTime.t()},
boolean()
) :: %{
binary() => %{
:block => non_neg_integer(),
:timestamp => DateTime.t(),
:status => :unfinalized | :finalized,
optional(any()) => any()
}
}
) :: %{binary() => LifecycleTransaction.to_import()}
def extend_lifecycle_txs_with_ts_and_status(lifecycle_txs, blocks_to_ts, track_finalization?)
when is_map(lifecycle_txs) and is_map(blocks_to_ts) and is_boolean(track_finalization?) do
lifecycle_txs
@ -84,6 +81,44 @@ defmodule Indexer.Fetcher.Arbitrum.Utils.Helper do
end)
end
@doc """
Compares a lifecycle transaction with new block number and timestamp, and updates if necessary.
This function checks if the given lifecycle transaction has the same block number
and timestamp as the provided values. If they are the same, it returns `{:same, nil}`.
If they differ, it updates the transaction with the new block number and timestamp,
logs the update, and returns `{:updated, updated_tx}`.
## Parameters
- `tx`: The lifecycle transaction to compare and potentially update.
- `{new_block_num, new_ts}`: A tuple containing the new block number and timestamp.
- `tx_type_str`: A string describing the type of the transaction for logging purposes.
## Returns
- `{:same, nil}` if the transaction block number and timestamp are the same as the provided values.
- `{:updated, updated_tx}` if the transaction was updated with the new block number and timestamp.
"""
@spec compare_lifecycle_tx_and_update(
LifecycleTransaction.to_import(),
{non_neg_integer(), DateTime.t()},
String.t()
) :: {:same, nil} | {:updated, LifecycleTransaction.to_import()}
def compare_lifecycle_tx_and_update(tx, {new_block_num, new_ts}, tx_type_str) do
if tx.block_number == new_block_num and DateTime.compare(tx.timestamp, new_ts) == :eq do
{:same, nil}
else
log_info(
"The #{tx_type_str} transaction 0x#{tx.hash |> Base.encode16(case: :lower)} will be updated with the new block number and timestamp"
)
{:updated,
Map.merge(tx, %{
block_number: new_block_num,
timestamp: new_ts
})}
end
end
@doc """
Converts a binary data to a hexadecimal string.
@ -97,4 +132,77 @@ defmodule Indexer.Fetcher.Arbitrum.Utils.Helper do
def bytes_to_hex_str(data) do
"0x" <> Base.encode16(data, case: :lower)
end
@doc """
Executes a function over a specified block range in chunks.
This function divides a block range into smaller chunks and executes the provided
function for each chunk. It collects the results of each function execution and
returns them as a list of tuples. Each tuple contains the start and end block numbers
of the chunk and the result of the function execution for that chunk.
If `halt_on_error` is set to `true` and the function returns anything other than
`:ok` or `{:ok, ...}`, the execution halts. However, the result of the last function
execution is still included in the resulting list.
## Parameters
- `start_block`: The starting block number of the range.
- `end_block`: The ending block number of the range.
- `chunk_size`: The size of each chunk in terms of block numbers.
- `func`: The function to execute for each chunk. The function should accept two
arguments: the start and end block numbers of the chunk.
- `halt_on_error` (optional): A boolean flag indicating whether to halt execution
if an error occurs. Defaults to `false`.
## Returns
- A list of tuples. Each tuple contains:
- A tuple with the start and end block numbers of the chunk.
- The result of the function execution for that chunk.
## Examples
iex> execute_for_block_range_in_chunks(5, 25, 7, fn (start_block, end_block) ->
...> {:ok, start_block, end_block}
...> end)
[
{{5, 11}, {:ok, 5, 11}},
{{12, 18}, {:ok, 12, 18}},
{{19, 25}, {:ok, 19, 25}}
]
"""
@spec execute_for_block_range_in_chunks(
non_neg_integer(),
non_neg_integer(),
non_neg_integer(),
fun()
) :: [
{{non_neg_integer(), non_neg_integer()}, any()}
]
@spec execute_for_block_range_in_chunks(
non_neg_integer(),
non_neg_integer(),
non_neg_integer(),
fun(),
boolean()
) :: [
{{non_neg_integer(), non_neg_integer()}, any()}
]
def execute_for_block_range_in_chunks(start_block, end_block, chunk_size, func, halt_on_error \\ false) do
0..div(end_block - start_block, chunk_size)
|> Enum.reduce_while([], fn i, res ->
chunk_start = start_block + i * chunk_size
chunk_end = min(chunk_start + chunk_size - 1, end_block)
func_res = func.(chunk_start, chunk_end)
acc_res = [{{chunk_start, chunk_end}, func_res} | res]
case {halt_on_error, func_res} do
{false, _} -> {:cont, acc_res}
{true, :ok} -> {:cont, acc_res}
{true, {:ok, _}} -> {:cont, acc_res}
{true, _} -> {:halt, acc_res}
end
end)
|> Enum.reverse()
end
end

@ -422,6 +422,57 @@ defmodule Indexer.Fetcher.Arbitrum.Utils.Rpc do
end
end
@doc """
Retrieves the safe and latest L1 block numbers.
This function fetches the latest block number from the chain and tries to determine
the safe block number. If the RPC node does not support the safe block feature or
if the safe block is too far behind the latest block, the safe block is determined
based on the finalization threshold. In both cases, it steps back from the latest
block to mark some blocks as unfinalized.
## Parameters
- `json_rpc_named_arguments`: The named arguments for the JSON RPC call.
- `hard_limit`: The maximum number of blocks to step back when determining the safe block.
## Returns
- A tuple containing the safe block number and the latest block number.
"""
@spec get_safe_and_latest_l1_blocks(EthereumJSONRPC.json_rpc_named_arguments(), non_neg_integer()) ::
{EthereumJSONRPC.block_number(), EthereumJSONRPC.block_number()}
def get_safe_and_latest_l1_blocks(json_rpc_named_arguments, hard_limit) do
finalization_threshold = Application.get_all_env(:indexer)[Indexer.Fetcher.Arbitrum][:l1_finalization_threshold]
{safe_chain_block, is_latest?} = IndexerHelper.get_safe_block(json_rpc_named_arguments)
latest_chain_block =
case is_latest? do
true ->
safe_chain_block
false ->
{:ok, latest_block} =
IndexerHelper.get_block_number_by_tag("latest", json_rpc_named_arguments, get_resend_attempts())
latest_block
end
safe_block =
if safe_chain_block < latest_chain_block + 1 - finalization_threshold or is_latest? do
# The first condition handles the case when the safe block is too far behind
# the latest block (L3 case).
# The second condition handles the case when the L1 RPC node does not support
# the safe block feature (non standard Arbitrum deployments).
# In both cases, it is necessary to step back a bit from the latest block to
# suspect these blocks as unfinalized.
latest_chain_block + 1 - min(finalization_threshold, hard_limit)
else
safe_chain_block
end
{safe_block, latest_chain_block}
end
@doc """
Identifies the block range for a batch by using the block number located on one end of the range.

@ -33,6 +33,7 @@ defmodule Indexer.Fetcher.Arbitrum.Workers.NewBatches do
alias Indexer.Fetcher.Arbitrum.DA.Common, as: DataAvailabilityInfo
alias Indexer.Fetcher.Arbitrum.DA.{Anytrust, Celestia}
alias Indexer.Fetcher.Arbitrum.Utils.{Db, Logging, Rpc}
alias Indexer.Fetcher.Arbitrum.Utils.Helper, as: ArbitrumHelper
alias Indexer.Helper, as: IndexerHelper
alias Explorer.Chain
@ -44,8 +45,6 @@ defmodule Indexer.Fetcher.Arbitrum.Workers.NewBatches do
# keccak256("SequencerBatchDelivered(uint256,bytes32,bytes32,bytes32,uint256,(uint64,uint64,uint64,uint64),uint8)")
@event_sequencer_batch_delivered "0x7394f4a19a13c7b92b5bb71033245305946ef78452f7b4986ac1390b5df4ebd7"
@max_depth_for_safe_block 1000
@doc """
Discovers and imports new batches of rollup transactions within the current L1 block range.
@ -115,31 +114,15 @@ defmodule Indexer.Fetcher.Arbitrum.Workers.NewBatches do
) do
# Requesting the "latest" block instead of "safe" allows to catch new batches
# without latency.
{:ok, latest_block} =
IndexerHelper.get_block_number_by_tag(
"latest",
l1_rpc_config.json_rpc_named_arguments,
Rpc.get_resend_attempts()
)
{safe_chain_block, _} = IndexerHelper.get_safe_block(l1_rpc_config.json_rpc_named_arguments)
# max() cannot be used here since l1_rpc_config.logs_block_range must not
# be taken into account to identify if it is L3 or not
safe_block =
if safe_chain_block < latest_block + 1 - @max_depth_for_safe_block do
# The case of L3, the safe block is too far behind the latest block,
# therefore it is assumed that there is no so deep re-orgs there.
latest_block + 1 - min(@max_depth_for_safe_block, l1_rpc_config.logs_block_range)
else
safe_chain_block
end
# It is necessary to re-visit some amount of the previous blocks to ensure that
# no batches are missed due to reorgs. The amount of blocks to re-visit depends
# either on the current safe block but must not exceed @max_depth_for_safe_block
# (or L1 RPC max block range for getting logs) since on L3 chains the safe block
# could be too far behind the latest block.
# on the current safe block or the block which is considered as safest in case
# of L3 (where the safe block could be too far behind the latest block) or if
# RPC does not support "safe" block.
{safe_block, latest_block} =
Rpc.get_safe_and_latest_l1_blocks(l1_rpc_config.json_rpc_named_arguments, l1_rpc_config.logs_block_range)
# At the same time it does not make sense to re-visit blocks that will be
# re-visited by the historical batches discovery process.
# If the new batches discovery process does not reach the chain head previously
@ -154,30 +137,23 @@ defmodule Indexer.Fetcher.Arbitrum.Workers.NewBatches do
# Since with taking the safe block into account, the range safe_start_block..end_block
# could be larger than L1 RPC max block range for getting logs, it is necessary to
# divide the range into the chunks
safe_start_block
|> Stream.unfold(fn
current when current > end_block ->
nil
current ->
next = min(current + l1_rpc_config.logs_block_range - 1, end_block)
{current, next + 1}
end)
|> Stream.each(fn chunk_start ->
chunk_end = min(chunk_start + l1_rpc_config.logs_block_range - 1, end_block)
discover(
sequencer_inbox_address,
chunk_start,
chunk_end,
new_batches_limit,
messages_to_blocks_shift,
l1_rpc_config,
node_interface_address,
rollup_rpc_config
)
end)
|> Stream.run()
ArbitrumHelper.execute_for_block_range_in_chunks(
safe_start_block,
end_block,
l1_rpc_config.logs_block_range,
fn chunk_start, chunk_end ->
discover(
sequencer_inbox_address,
chunk_start,
chunk_end,
new_batches_limit,
messages_to_blocks_shift,
l1_rpc_config,
node_interface_address,
rollup_rpc_config
)
end
)
{:ok, end_block}
else
@ -532,25 +508,25 @@ defmodule Indexer.Fetcher.Arbitrum.Workers.NewBatches do
rollup_rpc_config
) do
Enum.each(l1_block_ranges, fn {start_block, end_block} ->
Enum.each(0..div(end_block - start_block, l1_rpc_config.logs_block_range), fn i ->
start_block = start_block + i * l1_rpc_config.logs_block_range
end_block = min(start_block + l1_rpc_config.logs_block_range - 1, end_block)
log_info("Block range for missing batches discovery: #{start_block}..#{end_block}")
# `do_discover` is not used here to demonstrate the need to fetch batches
# which are already historical
discover_historical(
sequencer_inbox_address,
start_block,
end_block,
new_batches_limit,
messages_to_blocks_shift,
l1_rpc_config,
node_interface_address,
rollup_rpc_config
)
end)
ArbitrumHelper.execute_for_block_range_in_chunks(
start_block,
end_block,
l1_rpc_config.logs_block_range,
fn chunk_start, chunk_end ->
# `do_discover` is not used here to demonstrate the need to fetch batches
# which are already historical
discover_historical(
sequencer_inbox_address,
chunk_start,
chunk_end,
new_batches_limit,
messages_to_blocks_shift,
l1_rpc_config,
node_interface_address,
rollup_rpc_config
)
end
)
end)
end
@ -699,9 +675,11 @@ defmodule Indexer.Fetcher.Arbitrum.Workers.NewBatches do
# This function analyzes SequencerBatchDelivered event logs to identify new batches
# and retrieves their details, avoiding the reprocessing of batches already known
# in the database. It enriches the details of new batches with data from corresponding
# L1 transactions and blocks, including timestamps and block ranges. The function
# then prepares batches, associated rollup blocks and transactions, lifecycle
# transactions and Data Availability related records for database import.
# L1 transactions and blocks, including timestamps and block ranges. The lifecycle
# transactions for already known batches are updated with actual block numbers and
# timestamps. The function then prepares batches, associated rollup blocks and
# transactions, lifecycle transactions and Data Availability related records for
# database import.
# Additionally, L2-to-L1 messages initiated in the rollup blocks associated with the
# discovered batches are retrieved from the database, marked as `:sent`, and prepared
# for database import.
@ -1341,21 +1319,12 @@ defmodule Indexer.Fetcher.Arbitrum.Workers.NewBatches do
block_num = existing_commitment_txs[tx.hash]
ts = block_to_ts[block_num]
if tx.block_number == block_num and DateTime.compare(tx.timestamp, ts) == :eq do
txs
else
log_info(
"The commitment transaction 0x#{tx.hash |> Base.encode16(case: :lower)} will be updated with the new block number and timestamp"
)
case ArbitrumHelper.compare_lifecycle_tx_and_update(tx, {block_num, ts}, "commitment") do
{:updated, updated_tx} ->
Map.put(txs, tx.hash, updated_tx)
Map.put(
txs,
tx.hash,
Map.merge(tx, %{
block_number: block_num,
timestamp: ts
})
)
_ ->
txs
end
end)
end

@ -895,6 +895,7 @@ config :indexer, Indexer.Fetcher.Arbitrum,
l1_rollup_address: System.get_env("INDEXER_ARBITRUM_L1_ROLLUP_CONTRACT"),
l1_rollup_init_block: ConfigHelper.parse_integer_env_var("INDEXER_ARBITRUM_L1_ROLLUP_INIT_BLOCK", 1),
l1_start_block: ConfigHelper.parse_integer_env_var("INDEXER_ARBITRUM_L1_COMMON_START_BLOCK", 0),
l1_finalization_threshold: ConfigHelper.parse_integer_env_var("INDEXER_ARBITRUM_L1_FINALIZATION_THRESHOLD", 1000),
rollup_chunk_size: ConfigHelper.parse_integer_env_var("INDEXER_ARBITRUM_ROLLUP_CHUNK_SIZE", 20)
config :indexer, Indexer.Fetcher.Arbitrum.TrackingMessagesOnL1,

@ -236,6 +236,7 @@ INDEXER_DISABLE_INTERNAL_TRANSACTIONS_FETCHER=false
# INDEXER_ARBITRUM_L1_ROLLUP_CONTRACT=
# INDEXER_ARBITRUM_L1_ROLLUP_INIT_BLOCK=
# INDEXER_ARBITRUM_L1_COMMON_START_BLOCK=
# INDEXER_ARBITRUM_L1_FINALIZATION_THRESHOLD=
# INDEXER_ARBITRUM_ROLLUP_CHUNK_SIZE=
# INDEXER_ARBITRUM_BATCHES_TRACKING_ENABLED=
# INDEXER_ARBITRUM_BATCHES_TRACKING_RECHECK_INTERVAL=

Loading…
Cancel
Save