diff --git a/apps/indexer/lib/indexer/fetcher/arbitrum/tracking_batches_statuses.ex b/apps/indexer/lib/indexer/fetcher/arbitrum/tracking_batches_statuses.ex index 2b90b6d523..af44220f0d 100644 --- a/apps/indexer/lib/indexer/fetcher/arbitrum/tracking_batches_statuses.ex +++ b/apps/indexer/lib/indexer/fetcher/arbitrum/tracking_batches_statuses.ex @@ -168,18 +168,11 @@ defmodule Indexer.Fetcher.Arbitrum.TrackingBatchesStatuses do l1_start_block = Rpc.get_l1_start_block(state.config.l1_start_block, json_l1_rpc_named_arguments) - # TODO: it is necessary to develop a way to discover missed batches to cover the case - # when the batch #1, #2 and #4 are in DB, but #3 is not - # One of the approaches is to look deeper than the latest committed batch and - # check whether batches were already handled or not. new_batches_start_block = Db.l1_block_to_discover_latest_committed_batch(l1_start_block) historical_batches_end_block = Db.l1_block_to_discover_earliest_committed_batch(l1_start_block - 1) new_confirmations_start_block = Db.l1_block_of_latest_confirmed_block(l1_start_block) - # TODO: it is necessary to develop a way to discover missed executions. - # One of the approaches is to look deeper than the latest execution and - # check whether executions were already handled or not. new_executions_start_block = Db.l1_block_to_discover_latest_execution(l1_start_block) historical_executions_end_block = Db.l1_block_to_discover_earliest_execution(l1_start_block - 1) @@ -268,22 +261,11 @@ defmodule Indexer.Fetcher.Arbitrum.TrackingBatchesStatuses do # block for the next iteration of new confirmation discovery. @impl GenServer def handle_info(:check_new_confirmations, state) do - {handle_duration, {retcode, end_block}} = :timer.tc(&NewConfirmations.discover_new_rollup_confirmation/1, [state]) + {handle_duration, {_, new_state}} = :timer.tc(&NewConfirmations.discover_new_rollup_confirmation/1, [state]) Process.send(self(), :check_new_executions, []) - updated_fields = - case retcode do - :ok -> %{} - _ -> %{historical_confirmations_end_block: nil, historical_confirmations_start_block: nil} - end - |> Map.merge(%{ - # credo:disable-for-previous-line Credo.Check.Refactor.PipeChainStart - duration: increase_duration(state.data, handle_duration), - new_confirmations_start_block: end_block + 1 - }) - - new_data = Map.merge(state.data, updated_fields) + new_data = Map.put(new_state.data, :duration, increase_duration(state.data, handle_duration)) {:noreply, %{state | data: new_data}} end @@ -417,22 +399,12 @@ defmodule Indexer.Fetcher.Arbitrum.TrackingBatchesStatuses do # end blocks for the next iteration of historical confirmations discovery. @impl GenServer def handle_info(:check_historical_confirmations, state) do - {handle_duration, {retcode, {start_block, end_block}}} = + {handle_duration, {_, new_state}} = :timer.tc(&NewConfirmations.discover_historical_rollup_confirmation/1, [state]) Process.send(self(), :check_historical_executions, []) - updated_fields = - case retcode do - :ok -> %{historical_confirmations_end_block: start_block - 1, historical_confirmations_start_block: end_block} - _ -> %{historical_confirmations_end_block: nil, historical_confirmations_start_block: nil} - end - |> Map.merge(%{ - # credo:disable-for-previous-line Credo.Check.Refactor.PipeChainStart - duration: increase_duration(state.data, handle_duration) - }) - - new_data = Map.merge(state.data, updated_fields) + new_data = Map.put(new_state.data, :duration, increase_duration(state.data, handle_duration)) {:noreply, %{state | data: new_data}} end diff --git a/apps/indexer/lib/indexer/fetcher/arbitrum/utils/helper.ex b/apps/indexer/lib/indexer/fetcher/arbitrum/utils/helper.ex index cd11474917..d7abb6aa00 100644 --- a/apps/indexer/lib/indexer/fetcher/arbitrum/utils/helper.ex +++ b/apps/indexer/lib/indexer/fetcher/arbitrum/utils/helper.ex @@ -1,4 +1,8 @@ defmodule Indexer.Fetcher.Arbitrum.Utils.Helper do + alias Explorer.Chain.Arbitrum.LifecycleTransaction + + import Indexer.Fetcher.Arbitrum.Utils.Logging, only: [log_info: 1] + @moduledoc """ Provides utility functions to support the handling of Arbitrum-specific data fetching and processing in the indexer. """ @@ -55,14 +59,7 @@ defmodule Indexer.Fetcher.Arbitrum.Utils.Helper do %{binary() => %{:block => non_neg_integer(), optional(any()) => any()}}, %{non_neg_integer() => DateTime.t()}, boolean() - ) :: %{ - binary() => %{ - :block => non_neg_integer(), - :timestamp => DateTime.t(), - :status => :unfinalized | :finalized, - optional(any()) => any() - } - } + ) :: %{binary() => LifecycleTransaction.to_import()} def extend_lifecycle_txs_with_ts_and_status(lifecycle_txs, blocks_to_ts, track_finalization?) when is_map(lifecycle_txs) and is_map(blocks_to_ts) and is_boolean(track_finalization?) do lifecycle_txs @@ -84,6 +81,44 @@ defmodule Indexer.Fetcher.Arbitrum.Utils.Helper do end) end + @doc """ + Compares a lifecycle transaction with new block number and timestamp, and updates if necessary. + + This function checks if the given lifecycle transaction has the same block number + and timestamp as the provided values. If they are the same, it returns `{:same, nil}`. + If they differ, it updates the transaction with the new block number and timestamp, + logs the update, and returns `{:updated, updated_tx}`. + + ## Parameters + - `tx`: The lifecycle transaction to compare and potentially update. + - `{new_block_num, new_ts}`: A tuple containing the new block number and timestamp. + - `tx_type_str`: A string describing the type of the transaction for logging purposes. + + ## Returns + - `{:same, nil}` if the transaction block number and timestamp are the same as the provided values. + - `{:updated, updated_tx}` if the transaction was updated with the new block number and timestamp. + """ + @spec compare_lifecycle_tx_and_update( + LifecycleTransaction.to_import(), + {non_neg_integer(), DateTime.t()}, + String.t() + ) :: {:same, nil} | {:updated, LifecycleTransaction.to_import()} + def compare_lifecycle_tx_and_update(tx, {new_block_num, new_ts}, tx_type_str) do + if tx.block_number == new_block_num and DateTime.compare(tx.timestamp, new_ts) == :eq do + {:same, nil} + else + log_info( + "The #{tx_type_str} transaction 0x#{tx.hash |> Base.encode16(case: :lower)} will be updated with the new block number and timestamp" + ) + + {:updated, + Map.merge(tx, %{ + block_number: new_block_num, + timestamp: new_ts + })} + end + end + @doc """ Converts a binary data to a hexadecimal string. @@ -97,4 +132,77 @@ defmodule Indexer.Fetcher.Arbitrum.Utils.Helper do def bytes_to_hex_str(data) do "0x" <> Base.encode16(data, case: :lower) end + + @doc """ + Executes a function over a specified block range in chunks. + + This function divides a block range into smaller chunks and executes the provided + function for each chunk. It collects the results of each function execution and + returns them as a list of tuples. Each tuple contains the start and end block numbers + of the chunk and the result of the function execution for that chunk. + + If `halt_on_error` is set to `true` and the function returns anything other than + `:ok` or `{:ok, ...}`, the execution halts. However, the result of the last function + execution is still included in the resulting list. + + ## Parameters + - `start_block`: The starting block number of the range. + - `end_block`: The ending block number of the range. + - `chunk_size`: The size of each chunk in terms of block numbers. + - `func`: The function to execute for each chunk. The function should accept two + arguments: the start and end block numbers of the chunk. + - `halt_on_error` (optional): A boolean flag indicating whether to halt execution + if an error occurs. Defaults to `false`. + + ## Returns + - A list of tuples. Each tuple contains: + - A tuple with the start and end block numbers of the chunk. + - The result of the function execution for that chunk. + + ## Examples + + iex> execute_for_block_range_in_chunks(5, 25, 7, fn (start_block, end_block) -> + ...> {:ok, start_block, end_block} + ...> end) + [ + {{5, 11}, {:ok, 5, 11}}, + {{12, 18}, {:ok, 12, 18}}, + {{19, 25}, {:ok, 19, 25}} + ] + """ + @spec execute_for_block_range_in_chunks( + non_neg_integer(), + non_neg_integer(), + non_neg_integer(), + fun() + ) :: [ + {{non_neg_integer(), non_neg_integer()}, any()} + ] + @spec execute_for_block_range_in_chunks( + non_neg_integer(), + non_neg_integer(), + non_neg_integer(), + fun(), + boolean() + ) :: [ + {{non_neg_integer(), non_neg_integer()}, any()} + ] + def execute_for_block_range_in_chunks(start_block, end_block, chunk_size, func, halt_on_error \\ false) do + 0..div(end_block - start_block, chunk_size) + |> Enum.reduce_while([], fn i, res -> + chunk_start = start_block + i * chunk_size + chunk_end = min(chunk_start + chunk_size - 1, end_block) + + func_res = func.(chunk_start, chunk_end) + acc_res = [{{chunk_start, chunk_end}, func_res} | res] + + case {halt_on_error, func_res} do + {false, _} -> {:cont, acc_res} + {true, :ok} -> {:cont, acc_res} + {true, {:ok, _}} -> {:cont, acc_res} + {true, _} -> {:halt, acc_res} + end + end) + |> Enum.reverse() + end end diff --git a/apps/indexer/lib/indexer/fetcher/arbitrum/utils/rpc.ex b/apps/indexer/lib/indexer/fetcher/arbitrum/utils/rpc.ex index 8fc1f2afd9..6f4ef02ba3 100644 --- a/apps/indexer/lib/indexer/fetcher/arbitrum/utils/rpc.ex +++ b/apps/indexer/lib/indexer/fetcher/arbitrum/utils/rpc.ex @@ -422,6 +422,57 @@ defmodule Indexer.Fetcher.Arbitrum.Utils.Rpc do end end + @doc """ + Retrieves the safe and latest L1 block numbers. + + This function fetches the latest block number from the chain and tries to determine + the safe block number. If the RPC node does not support the safe block feature or + if the safe block is too far behind the latest block, the safe block is determined + based on the finalization threshold. In both cases, it steps back from the latest + block to mark some blocks as unfinalized. + + ## Parameters + - `json_rpc_named_arguments`: The named arguments for the JSON RPC call. + - `hard_limit`: The maximum number of blocks to step back when determining the safe block. + + ## Returns + - A tuple containing the safe block number and the latest block number. + """ + @spec get_safe_and_latest_l1_blocks(EthereumJSONRPC.json_rpc_named_arguments(), non_neg_integer()) :: + {EthereumJSONRPC.block_number(), EthereumJSONRPC.block_number()} + def get_safe_and_latest_l1_blocks(json_rpc_named_arguments, hard_limit) do + finalization_threshold = Application.get_all_env(:indexer)[Indexer.Fetcher.Arbitrum][:l1_finalization_threshold] + + {safe_chain_block, is_latest?} = IndexerHelper.get_safe_block(json_rpc_named_arguments) + + latest_chain_block = + case is_latest? do + true -> + safe_chain_block + + false -> + {:ok, latest_block} = + IndexerHelper.get_block_number_by_tag("latest", json_rpc_named_arguments, get_resend_attempts()) + + latest_block + end + + safe_block = + if safe_chain_block < latest_chain_block + 1 - finalization_threshold or is_latest? do + # The first condition handles the case when the safe block is too far behind + # the latest block (L3 case). + # The second condition handles the case when the L1 RPC node does not support + # the safe block feature (non standard Arbitrum deployments). + # In both cases, it is necessary to step back a bit from the latest block to + # suspect these blocks as unfinalized. + latest_chain_block + 1 - min(finalization_threshold, hard_limit) + else + safe_chain_block + end + + {safe_block, latest_chain_block} + end + @doc """ Identifies the block range for a batch by using the block number located on one end of the range. diff --git a/apps/indexer/lib/indexer/fetcher/arbitrum/workers/new_batches.ex b/apps/indexer/lib/indexer/fetcher/arbitrum/workers/new_batches.ex index 8543d339eb..6a5e8c389e 100644 --- a/apps/indexer/lib/indexer/fetcher/arbitrum/workers/new_batches.ex +++ b/apps/indexer/lib/indexer/fetcher/arbitrum/workers/new_batches.ex @@ -33,6 +33,7 @@ defmodule Indexer.Fetcher.Arbitrum.Workers.NewBatches do alias Indexer.Fetcher.Arbitrum.DA.Common, as: DataAvailabilityInfo alias Indexer.Fetcher.Arbitrum.DA.{Anytrust, Celestia} alias Indexer.Fetcher.Arbitrum.Utils.{Db, Logging, Rpc} + alias Indexer.Fetcher.Arbitrum.Utils.Helper, as: ArbitrumHelper alias Indexer.Helper, as: IndexerHelper alias Explorer.Chain @@ -44,8 +45,6 @@ defmodule Indexer.Fetcher.Arbitrum.Workers.NewBatches do # keccak256("SequencerBatchDelivered(uint256,bytes32,bytes32,bytes32,uint256,(uint64,uint64,uint64,uint64),uint8)") @event_sequencer_batch_delivered "0x7394f4a19a13c7b92b5bb71033245305946ef78452f7b4986ac1390b5df4ebd7" - @max_depth_for_safe_block 1000 - @doc """ Discovers and imports new batches of rollup transactions within the current L1 block range. @@ -115,31 +114,15 @@ defmodule Indexer.Fetcher.Arbitrum.Workers.NewBatches do ) do # Requesting the "latest" block instead of "safe" allows to catch new batches # without latency. - {:ok, latest_block} = - IndexerHelper.get_block_number_by_tag( - "latest", - l1_rpc_config.json_rpc_named_arguments, - Rpc.get_resend_attempts() - ) - - {safe_chain_block, _} = IndexerHelper.get_safe_block(l1_rpc_config.json_rpc_named_arguments) - - # max() cannot be used here since l1_rpc_config.logs_block_range must not - # be taken into account to identify if it is L3 or not - safe_block = - if safe_chain_block < latest_block + 1 - @max_depth_for_safe_block do - # The case of L3, the safe block is too far behind the latest block, - # therefore it is assumed that there is no so deep re-orgs there. - latest_block + 1 - min(@max_depth_for_safe_block, l1_rpc_config.logs_block_range) - else - safe_chain_block - end # It is necessary to re-visit some amount of the previous blocks to ensure that # no batches are missed due to reorgs. The amount of blocks to re-visit depends - # either on the current safe block but must not exceed @max_depth_for_safe_block - # (or L1 RPC max block range for getting logs) since on L3 chains the safe block - # could be too far behind the latest block. + # on the current safe block or the block which is considered as safest in case + # of L3 (where the safe block could be too far behind the latest block) or if + # RPC does not support "safe" block. + {safe_block, latest_block} = + Rpc.get_safe_and_latest_l1_blocks(l1_rpc_config.json_rpc_named_arguments, l1_rpc_config.logs_block_range) + # At the same time it does not make sense to re-visit blocks that will be # re-visited by the historical batches discovery process. # If the new batches discovery process does not reach the chain head previously @@ -154,30 +137,23 @@ defmodule Indexer.Fetcher.Arbitrum.Workers.NewBatches do # Since with taking the safe block into account, the range safe_start_block..end_block # could be larger than L1 RPC max block range for getting logs, it is necessary to # divide the range into the chunks - safe_start_block - |> Stream.unfold(fn - current when current > end_block -> - nil - - current -> - next = min(current + l1_rpc_config.logs_block_range - 1, end_block) - {current, next + 1} - end) - |> Stream.each(fn chunk_start -> - chunk_end = min(chunk_start + l1_rpc_config.logs_block_range - 1, end_block) - - discover( - sequencer_inbox_address, - chunk_start, - chunk_end, - new_batches_limit, - messages_to_blocks_shift, - l1_rpc_config, - node_interface_address, - rollup_rpc_config - ) - end) - |> Stream.run() + ArbitrumHelper.execute_for_block_range_in_chunks( + safe_start_block, + end_block, + l1_rpc_config.logs_block_range, + fn chunk_start, chunk_end -> + discover( + sequencer_inbox_address, + chunk_start, + chunk_end, + new_batches_limit, + messages_to_blocks_shift, + l1_rpc_config, + node_interface_address, + rollup_rpc_config + ) + end + ) {:ok, end_block} else @@ -532,25 +508,25 @@ defmodule Indexer.Fetcher.Arbitrum.Workers.NewBatches do rollup_rpc_config ) do Enum.each(l1_block_ranges, fn {start_block, end_block} -> - Enum.each(0..div(end_block - start_block, l1_rpc_config.logs_block_range), fn i -> - start_block = start_block + i * l1_rpc_config.logs_block_range - end_block = min(start_block + l1_rpc_config.logs_block_range - 1, end_block) - - log_info("Block range for missing batches discovery: #{start_block}..#{end_block}") - - # `do_discover` is not used here to demonstrate the need to fetch batches - # which are already historical - discover_historical( - sequencer_inbox_address, - start_block, - end_block, - new_batches_limit, - messages_to_blocks_shift, - l1_rpc_config, - node_interface_address, - rollup_rpc_config - ) - end) + ArbitrumHelper.execute_for_block_range_in_chunks( + start_block, + end_block, + l1_rpc_config.logs_block_range, + fn chunk_start, chunk_end -> + # `do_discover` is not used here to demonstrate the need to fetch batches + # which are already historical + discover_historical( + sequencer_inbox_address, + chunk_start, + chunk_end, + new_batches_limit, + messages_to_blocks_shift, + l1_rpc_config, + node_interface_address, + rollup_rpc_config + ) + end + ) end) end @@ -699,9 +675,11 @@ defmodule Indexer.Fetcher.Arbitrum.Workers.NewBatches do # This function analyzes SequencerBatchDelivered event logs to identify new batches # and retrieves their details, avoiding the reprocessing of batches already known # in the database. It enriches the details of new batches with data from corresponding - # L1 transactions and blocks, including timestamps and block ranges. The function - # then prepares batches, associated rollup blocks and transactions, lifecycle - # transactions and Data Availability related records for database import. + # L1 transactions and blocks, including timestamps and block ranges. The lifecycle + # transactions for already known batches are updated with actual block numbers and + # timestamps. The function then prepares batches, associated rollup blocks and + # transactions, lifecycle transactions and Data Availability related records for + # database import. # Additionally, L2-to-L1 messages initiated in the rollup blocks associated with the # discovered batches are retrieved from the database, marked as `:sent`, and prepared # for database import. @@ -1341,21 +1319,12 @@ defmodule Indexer.Fetcher.Arbitrum.Workers.NewBatches do block_num = existing_commitment_txs[tx.hash] ts = block_to_ts[block_num] - if tx.block_number == block_num and DateTime.compare(tx.timestamp, ts) == :eq do - txs - else - log_info( - "The commitment transaction 0x#{tx.hash |> Base.encode16(case: :lower)} will be updated with the new block number and timestamp" - ) + case ArbitrumHelper.compare_lifecycle_tx_and_update(tx, {block_num, ts}, "commitment") do + {:updated, updated_tx} -> + Map.put(txs, tx.hash, updated_tx) - Map.put( - txs, - tx.hash, - Map.merge(tx, %{ - block_number: block_num, - timestamp: ts - }) - ) + _ -> + txs end end) end diff --git a/apps/indexer/lib/indexer/fetcher/arbitrum/workers/new_confirmations.ex b/apps/indexer/lib/indexer/fetcher/arbitrum/workers/new_confirmations.ex index ceb0d1695d..9631ccf788 100644 --- a/apps/indexer/lib/indexer/fetcher/arbitrum/workers/new_confirmations.ex +++ b/apps/indexer/lib/indexer/fetcher/arbitrum/workers/new_confirmations.ex @@ -24,10 +24,34 @@ defmodule Indexer.Fetcher.Arbitrum.Workers.NewConfirmations do confirmation (where X is the rollup block mentioned in an even earlier confirmation). - Currently, the process of discovering confirmed rollup blocks works with any - position of the top confirmed block in a batch. Later, if it is confirmed that - the top block in a confirmation always aligns with the last block in a batch, - this approach to rollup block discovery can be revisited for simplification. + Since the confirmations discovery process is asynchronous with respect to the + block fetching process and the batches discovery process, there could be + situations when the information about rollup blocks or their linkage with a + batch is not available yet. Here is a list of possible scenarios and expected + behavior: + 1. A rollup block required to proceed with the new confirmation discovery is + not indexed yet, or the batch where this block is included is not indexed + yet. + - The new confirmation discovery process will proceed with discovering new + confirmations and the L1 blocks range where the confirmation handling is + aborted will be passed to the historical confirmations discovery process. + 2. A rollup block required to proceed with the historical confirmation discovery + is not indexed yet, or the batch where this block is included is not indexed + yet. + - The historical confirmation discovery process will proceed with the same + L1 blocks range where the confirmation handling is aborted until this + confirmation is handled properly. + + As it is clear from the above, the historical confirmation discovery process + could be interrupted by the new confirmation discovery process. As soon as the + historical confirmation discovery process reaches the lower end of the L1 block + range where the new confirmation discovery process is aborted, the historical + confirmation discovery process will request the database to provide the next L1 + block range of missing confirmations. Such a range could be closed when there + are end and start L1 blocks in which a missing confirmation is expected, or + open-ended where the start block is not defined and the end block is the block + preceding the L1 block where a confirmation was already handled by the discovery + process. """ import EthereumJSONRPC, only: [quantity_to_integer: 1] @@ -62,12 +86,25 @@ defmodule Indexer.Fetcher.Arbitrum.Workers.NewConfirmations do Discovers and processes new confirmations of rollup blocks within a calculated block range. This function identifies the appropriate L1 block range for discovering new - rollup confirmations. It fetches logs representing `SendRootUpdated` events - within this range to identify the new tops of rollup block confirmations. The + rollup confirmations. In order to make sure that no confirmation is missed due + to re-orgs, it adjusts the range to re-inspect some L1 blocks in the past. + Therefore the lower bound of the L1 blocks range is identified based on the + safe block or the block which is considered as safest if RPC does not support + "safe" block retrieval. + + Then the function fetches logs representing `SendRootUpdated` events within + the found range to identify the new tops of rollup block confirmations. The discovered confirmations are processed to update the status of rollup blocks - and L2-to-L1 messages accordingly. Eventually, updated rollup blocks, - cross-chain messages, and newly constructed lifecycle transactions are imported - into the database. + and L2-to-L1 messages accordingly. Eventually, updated rollup blocks, cross-chain + messages, and newly constructed lifecycle transactions are imported into the + database. + + After processing the confirmations, the function updates the state to prepare + for the next iteration. It adjusts the `new_confirmations_start_block` to the + block number after the last processed block. If a confirmation is missed, the + range for the next iteration of the historical confirmations discovery process + is adjusted to re-inspect the range where the confirmation was not handled + properly. ## Parameters - A map containing: @@ -77,13 +114,9 @@ defmodule Indexer.Fetcher.Arbitrum.Workers.NewConfirmations do confirmation discovery. ## Returns - - `{retcode, end_block}` where `retcode` is either `:ok` or - `:confirmation_missed` indicating the success or failure of the discovery - process, and `end_block` is used to determine the start block number for the - next iteration of new confirmations discovery. - - `{:ok, start_block - 1}` if there are no new blocks to be processed, - indicating that the current start block should be reconsidered in the next - iteration. + - `{:ok, new_state}`: If the discovery process completes successfully. + - `{:confirmation_missed, new_state}`: If a confirmation is missed and further + action is needed. """ @spec discover_new_rollup_confirmation(%{ :config => %{ @@ -96,57 +129,165 @@ defmodule Indexer.Fetcher.Arbitrum.Workers.NewConfirmations do }, optional(any()) => any() }, - :data => %{:new_confirmations_start_block => non_neg_integer(), optional(any()) => any()}, + :data => %{ + :new_confirmations_start_block => non_neg_integer(), + :historical_confirmations_end_block => non_neg_integer(), + optional(any()) => any() + }, optional(any()) => any() - }) :: {:confirmation_missed, non_neg_integer()} | {:ok, non_neg_integer()} + }) :: + {:ok | :confirmation_missed, + %{ + :data => %{ + :new_confirmations_start_block => non_neg_integer(), + :historical_confirmations_end_block => nil | non_neg_integer(), + :historical_confirmations_start_block => nil | non_neg_integer(), + optional(any()) => any() + }, + optional(any()) => any() + }} def discover_new_rollup_confirmation( %{ config: %{ l1_rpc: l1_rpc_config, l1_outbox_address: outbox_address }, - data: %{new_confirmations_start_block: start_block} - } = _state + data: %{ + new_confirmations_start_block: start_block, + historical_confirmations_end_block: historical_confirmations_end_block + } + } = state ) do - # It makes sense to use "safe" here. Blocks are confirmed with delay in one week - # (applicable for ArbitrumOne and Nova), so 10 mins delay is not significant - {:ok, latest_block} = - IndexerHelper.get_block_number_by_tag( - if(l1_rpc_config.finalized_confirmations, do: "safe", else: "latest"), - l1_rpc_config.json_rpc_named_arguments, - Rpc.get_resend_attempts() - ) + {safe_start_block, latest_block} = + if l1_rpc_config.finalized_confirmations do + # It makes sense to use "safe" here. Blocks are confirmed with delay in one week + # (applicable for ArbitrumOne and Nova), so 10 mins delay is not significant. + # By using "safe" we can avoid re-visiting the same blocks in case of reorgs. + {safe_chain_block, _} = IndexerHelper.get_safe_block(l1_rpc_config.json_rpc_named_arguments) + + {start_block, safe_chain_block} + else + # There are situations when it could be necessary to react on L1 confirmation + # transactions earlier than the safe block. For example, for testnets. + # Another situation when the rollup uses L1 RPC which does not support "safe" + # block retrieval. + # In both cases it is desired to re-visit some amount head blocks to ensure + # that no confirmation is missed due to reorgs. + + # The amount of blocks to re-visit depends on the current safe block or the + # block which is considered as safest if RPC does not support "safe" block. + {safe_block, latest_block} = + Rpc.get_safe_and_latest_l1_blocks(l1_rpc_config.json_rpc_named_arguments, l1_rpc_config.logs_block_range) + + # If the new confirmations discovery process does not reach the chain head + # previously no need to re-visit the blocks. + {min(start_block, safe_block), latest_block} + end + + # If ranges for the new confirmations discovery and the historical confirmations + # discovery are overlapped - it could be after confirmations gap identification, + # it is necessary to adjust the start block for the new confirmations discovery. + actual_start_block = + if is_nil(historical_confirmations_end_block) do + safe_start_block + else + max(safe_start_block, historical_confirmations_end_block + 1) + end end_block = min(start_block + l1_rpc_config.logs_block_range - 1, latest_block) - if start_block <= end_block do - log_info("Block range for new rollup confirmations discovery: #{start_block}..#{end_block}") + if actual_start_block <= end_block do + log_info("Block range for new rollup confirmations discovery: #{actual_start_block}..#{end_block}") - retcode = - discover( - outbox_address, - start_block, + # Since for the case l1_rpc_config.finalized_confirmations = false the range + # actual_start_block..end_block could be larger than L1 RPC max block range for + # getting logs, it is necessary to divide the range into the chunks. + results = + ArbitrumHelper.execute_for_block_range_in_chunks( + actual_start_block, end_block, - l1_rpc_config + l1_rpc_config.logs_block_range, + fn chunk_start, chunk_end -> + discover( + outbox_address, + chunk_start, + chunk_end, + l1_rpc_config + ) + end, + true ) - {retcode, end_block} + # Since halt_on_error was set to true, it is OK to consider the last result + # only. + {{start_block, end_block}, retcode} = List.last(results) + + case retcode do + :ok -> + {retcode, state_for_next_iteration_new(state, end_block + 1)} + + :confirmation_missed -> + {retcode, state_for_next_iteration_new(state, end_block + 1, {start_block, end_block})} + end else - {:ok, start_block - 1} + {:ok, state_for_next_iteration_new(state, start_block)} end end + # Updates the state for the next iteration of new confirmations discovery. + @spec state_for_next_iteration_new( + %{ + :data => map(), + optional(any()) => any() + }, + non_neg_integer(), + nil | {non_neg_integer(), non_neg_integer()} + ) :: + %{ + :data => %{ + :new_confirmations_start_block => non_neg_integer(), + :historical_confirmations_end_block => non_neg_integer(), + :historical_confirmations_start_block => non_neg_integer(), + optional(any()) => any() + }, + optional(any()) => any() + } + defp state_for_next_iteration_new(prev_state, start_block, historical_blocks \\ nil) do + data_for_new_confirmations = + %{new_confirmations_start_block: start_block} + + data_to_update = + case historical_blocks do + nil -> + data_for_new_confirmations + + {start_block, end_block} -> + Map.merge(data_for_new_confirmations, %{ + historical_confirmations_start_block: start_block, + historical_confirmations_end_block: end_block + }) + end + + %{ + prev_state + | data: Map.merge(prev_state.data, data_to_update) + } + end + @doc """ Discovers and processes historical confirmations of rollup blocks within a calculated block range. This function determines the appropriate L1 block range for discovering historical rollup confirmations based on the provided end block or from the - analysis of confirmations missed in the database. It then fetches logs - representing `SendRootUpdated` events within this range to identify the - historical tops of rollup block confirmations. The discovered confirmations - are processed to update the status of rollup blocks and L2-to-L1 messages - accordingly. Eventually, updated rollup blocks, cross-chain messages, and newly - constructed lifecycle transactions are imported into the database. + analysis of confirmations missed in the database. It fetches logs representing + `SendRootUpdated` events within this range to identify the historical tops of + rollup block confirmations. The discovered confirmations are processed to update + the status of rollup blocks and L2-to-L1 messages accordingly. Eventually, + updated rollup blocks, cross-chain messages, and newly constructed lifecycle + transactions are imported into the database. + + After processing the confirmations, the function updates the state with the + blocks range for the next iteration. ## Parameters - A map containing: @@ -157,10 +298,9 @@ defmodule Indexer.Fetcher.Arbitrum.Workers.NewConfirmations do for historical confirmation discovery. ## Returns - - `{retcode, {start_block, interim_start_block}}` where - - `retcode` is either `:ok` or `:confirmation_missed` - - `start_block` is the starting block for the next iteration of discovery - - `interim_start_block` is the end block for the next iteration of discovery + - `{:ok, new_state}`: If the discovery process completes successfully. + - `{:confirmation_missed, new_state}`: If a confirmation is missed and further + action is needed. """ @spec discover_historical_rollup_confirmation(%{ :config => %{ @@ -182,8 +322,15 @@ defmodule Indexer.Fetcher.Arbitrum.Workers.NewConfirmations do }, optional(any()) => any() }) :: - {:confirmation_missed, {nil | non_neg_integer(), nil | non_neg_integer()}} - | {:ok, {nil | non_neg_integer(), nil | non_neg_integer()}} + {:ok | :confirmation_missed, + %{ + :data => %{ + :historical_confirmations_end_block => nil | non_neg_integer(), + :historical_confirmations_start_block => nil | non_neg_integer(), + optional(any()) => any() + }, + optional(any()) => any() + }} def discover_historical_rollup_confirmation( %{ config: %{ @@ -196,11 +343,17 @@ defmodule Indexer.Fetcher.Arbitrum.Workers.NewConfirmations do historical_confirmations_end_block: expected_confirmation_end_block, historical_confirmations_start_block: expected_confirmation_start_block } - } = _state + } = state ) do {interim_start_block, end_block} = case expected_confirmation_end_block do nil -> + # Three options are possible: + # {nil, nil} - there are no confirmations + # {nil, value} - there are no confirmations between L1 block corresponding + # to the rollup genesis and the L1 block _value_. + # {lower, higher} - there are no confirmations between L1 block _lower_ + # and the L1 block _higher_. Db.l1_blocks_to_expect_rollup_blocks_confirmation(nil) _ -> @@ -215,6 +368,9 @@ defmodule Indexer.Fetcher.Arbitrum.Workers.NewConfirmations do max(l1_rollup_init_block, end_block - l1_rpc_config.logs_block_range + 1) value -> + # The interim start block is not nil when a gap between two confirmations + # identified. Therefore there is no need to go deeper than the interim + # start block. Enum.max([l1_rollup_init_block, value, end_block - l1_rpc_config.logs_block_range + 1]) end @@ -228,20 +384,70 @@ defmodule Indexer.Fetcher.Arbitrum.Workers.NewConfirmations do l1_rpc_config ) - {retcode, {start_block, interim_start_block}} + case {retcode, start_block == interim_start_block} do + {:ok, true} -> + # The situation when the interim start block is equal to the start block + # means that gap between confirmation has been inspected. It is necessary + # to identify the next gap. + {retcode, state_for_next_iteration_historical(state, nil, nil)} + + {:ok, false} -> + # The situation when the interim start block is not equal to the start block + # means that the confirmations gap has not been inspected fully yet. It is + # necessary to continue the confirmations discovery from the interim start + # block to the the block predecessor of the current start block. + {retcode, state_for_next_iteration_historical(state, start_block - 1, interim_start_block)} + + {:confirmation_missed, _} -> + # The situation when the confirmation has been missed. It is necessary to + # re-do the confirmations discovery for the same block range. + {retcode, state_for_next_iteration_historical(state, end_block, interim_start_block)} + end else # TODO: Investigate on a live system what will happen when all blocks are confirmed # the situation when end block is `nil` is possible when there is no confirmed # block in the database and the historical confirmations discovery must start # from the L1 block specified as L1 start block (configured, or the latest block number) - {:end_block_defined, false} -> {:ok, {l1_start_block, nil}} + {:end_block_defined, false} -> {:ok, state_for_next_iteration_historical(state, l1_start_block - 1, nil)} # If the genesis of the rollup has been reached during historical confirmations # discovery, no further actions are needed. - {:genesis_not_reached, false} -> {:ok, {l1_rollup_init_block, nil}} + {:genesis_not_reached, false} -> {:ok, state_for_next_iteration_historical(state, l1_rollup_init_block - 1, nil)} end end + # Updates the state for the next iteration of historical confirmations discovery. + @spec state_for_next_iteration_historical( + %{ + :data => %{ + :historical_confirmations_end_block => non_neg_integer() | nil, + :historical_confirmations_start_block => non_neg_integer() | nil, + optional(any()) => any() + }, + optional(any()) => any() + }, + non_neg_integer() | nil, + non_neg_integer() | nil + ) :: + %{ + :data => %{ + :historical_confirmations_end_block => non_neg_integer() | nil, + :historical_confirmations_start_block => non_neg_integer() | nil, + optional(any()) => any() + }, + optional(any()) => any() + } + defp state_for_next_iteration_historical(prev_state, end_block, lowest_block_in_gap) when end_block >= 0 do + %{ + prev_state + | data: %{ + prev_state.data + | historical_confirmations_end_block: end_block, + historical_confirmations_start_block: lowest_block_in_gap + } + } + end + # Discovers and processes new confirmations of rollup blocks within the given block range. # # This function fetches logs within the specified L1 block range to find new @@ -260,6 +466,18 @@ defmodule Indexer.Fetcher.Arbitrum.Workers.NewConfirmations do # ## Returns # - The retcode indicating the result of the discovery and processing operation, # either `:ok` or `:confirmation_missed`. + @spec discover( + binary(), + non_neg_integer(), + non_neg_integer(), + %{ + :json_rpc_named_arguments => EthereumJSONRPC.json_rpc_named_arguments(), + :logs_block_range => non_neg_integer(), + :chunk_size => non_neg_integer(), + :finalized_confirmations => boolean(), + optional(any()) => any() + } + ) :: :ok | :confirmation_missed defp discover( outbox_address, start_block, @@ -295,12 +513,15 @@ defmodule Indexer.Fetcher.Arbitrum.Workers.NewConfirmations do # Processes logs to handle confirmations for rollup blocks. # # This function analyzes logs containing `SendRootUpdated` events with information - # about the confirmations up to a specific point in time. It identifies the ranges - # of rollup blocks covered by the confirmations and constructs lifecycle - # transactions linked to these confirmed blocks. Considering the highest confirmed - # rollup block number, it discovers L2-to-L1 messages that have been committed and - # updates their status to confirmed. Lists of confirmed rollup blocks, lifecycle - # transactions, and confirmed messages are prepared for database import. + # about the confirmations up to a specific point in time, avoiding the reprocessing + # of confirmations already known in the database. It identifies the ranges of + # rollup blocks covered by the confirmations and constructs lifecycle transactions + # linked to these confirmed blocks. Considering the highest confirmed rollup block + # number, it discovers L2-to-L1 messages that have been committed and updates their + # status to confirmed. The confirmations already processed are also updated to + # ensure the correct L1 block number and timestamp, which may have changed due to + # re-orgs. Lists of confirmed rollup blocks, lifecycle transactions, and confirmed + # messages are prepared for database import. # # ## Parameters # - `logs`: Log entries representing `SendRootUpdated` events. @@ -323,7 +544,8 @@ defmodule Indexer.Fetcher.Arbitrum.Workers.NewConfirmations do :json_rpc_named_arguments => EthereumJSONRPC.json_rpc_named_arguments(), :logs_block_range => non_neg_integer(), :chunk_size => non_neg_integer(), - :finalized_confirmations => boolean() + :finalized_confirmations => boolean(), + optional(any()) => any() }, binary() ) :: @@ -341,38 +563,72 @@ defmodule Indexer.Fetcher.Arbitrum.Workers.NewConfirmations do l1_rpc_config, outbox_address ) do - {rollup_blocks_to_l1_txs, lifecycle_txs_basic, blocks_requests} = parse_logs_for_new_confirmations(logs) - + # On this step there could be lifecycle transactions for the rollup blocks which are + # already confirmed. It is only possible in the scenario when the confirmation + # discovery process does not wait for the safe L1 block. In this case: + # - rollup_blocks_to_l1_txs will not contain the corresponding block hash associated + # with the L1 transaction hash + # - lifecycle_txs_basic will contain all discovered lifecycle transactions + # - blocks_requests will contain all requests to fetch block data for the lifecycle + # transactions + # - existing_lifecycle_txs will contain lifecycle transactions which was found in the + # logs and already imported into the database. + {rollup_blocks_to_l1_txs, lifecycle_txs_basic, blocks_requests, existing_lifecycle_txs} = + parse_logs_for_new_confirmations(logs) + + # This step must be run only if there are hashes of the confirmed rollup blocks + # in rollup_blocks_to_l1_txs - when there are newly discovered confirmations. rollup_blocks = - discover_rollup_blocks( - rollup_blocks_to_l1_txs, - %{ - json_rpc_named_arguments: l1_rpc_config.json_rpc_named_arguments, - logs_block_range: l1_rpc_config.logs_block_range, - outbox_address: outbox_address - } - ) + if Enum.empty?(rollup_blocks_to_l1_txs) do + [] + else + discover_rollup_blocks( + rollup_blocks_to_l1_txs, + %{ + json_rpc_named_arguments: l1_rpc_config.json_rpc_named_arguments, + logs_block_range: l1_rpc_config.logs_block_range, + outbox_address: outbox_address + } + ) + end + # Will return %{} if there are no new confirmations applicable_lifecycle_txs = take_lifecycle_txs_for_confirmed_blocks(rollup_blocks, lifecycle_txs_basic) + # Will contain :ok if no new confirmations are found retcode = - if Enum.count(lifecycle_txs_basic) != Enum.count(applicable_lifecycle_txs) do + if Enum.count(lifecycle_txs_basic) != Enum.count(applicable_lifecycle_txs) + length(existing_lifecycle_txs) do :confirmation_missed else :ok end - if Enum.empty?(applicable_lifecycle_txs) do + if Enum.empty?(applicable_lifecycle_txs) and existing_lifecycle_txs == [] do + # Only if both new confirmations and already existing confirmations are empty {retcode, {[], [], []}} else - {lifecycle_txs, rollup_blocks, highest_confirmed_block_number} = + l1_blocks_to_ts = + Rpc.execute_blocks_requests_and_get_ts( + blocks_requests, + l1_rpc_config.json_rpc_named_arguments, + l1_rpc_config.chunk_size + ) + + # The lifecycle transactions for the new confirmations are finalized here. + {lifecycle_txs_for_new_confirmations, rollup_blocks, highest_confirmed_block_number} = finalize_lifecycle_txs_and_confirmed_blocks( applicable_lifecycle_txs, rollup_blocks, - blocks_requests, - l1_rpc_config + l1_blocks_to_ts, + l1_rpc_config.track_finalization ) + # The lifecycle transactions for the already existing confirmations are updated here + # to ensure correct L1 block number and timestamp that could appear due to re-orgs. + lifecycle_txs = + lifecycle_txs_for_new_confirmations ++ + update_lifecycle_txs_for_new_blocks(existing_lifecycle_txs, lifecycle_txs_basic, l1_blocks_to_ts) + # Drawback of marking messages as confirmed during a new confirmation handling # is that the status change could become stuck if confirmations are not handled. # For example, due to DB inconsistency: some blocks/batches are missed. @@ -384,12 +640,13 @@ defmodule Indexer.Fetcher.Arbitrum.Workers.NewConfirmations do # Parses logs to extract new confirmations for rollup blocks and prepares related data. # - # This function processes `SendRootUpdated` event logs. For each event, it maps - # the hash of the top confirmed rollup block provided in the event to - # the confirmation description, containing the L1 transaction hash and - # block number. It also prepares a set of lifecycle transactions in basic form - # and block requests to later fetch timestamps for the corresponding lifecycle - # transactions. + # This function processes `SendRootUpdated` event logs. For each event which + # was not processed before, it maps the hash of the top confirmed rollup block + # provided in the event to the confirmation description, containing the L1 + # transaction hash and block number. It also prepares a set of lifecycle + # transactions in basic form, the set of lifecycle transaction already + # existing in the database and block requests to later fetch timestamps for + # the corresponding lifecycle transactions. # # ## Parameters # - `logs`: A list of log entries representing `SendRootUpdated` events. @@ -399,22 +656,53 @@ defmodule Indexer.Fetcher.Arbitrum.Workers.NewConfirmations do # - A map associating rollup block hashes with their confirmation descriptions. # - A map of basic-form lifecycle transactions keyed by L1 transaction hash. # - A list of RPC requests to fetch block data for these lifecycle transactions. + # - A list of discovered lifecycle transactions which are already in the + # database. Each transaction is compatible with the database import operation. + @spec parse_logs_for_new_confirmations([%{String.t() => any()}]) :: + { + %{binary() => %{l1_tx_hash: binary(), l1_block_num: non_neg_integer()}}, + %{binary() => %{hash: binary(), block_number: non_neg_integer()}}, + [EthereumJSONRPC.Transport.request()], + [Arbitrum.LifecycleTransaction.to_import()] + } defp parse_logs_for_new_confirmations(logs) do + transaction_hashes = + logs + |> Enum.reduce(%{}, fn event, acc -> + l1_tx_hash_raw = event["transactionHash"] + Map.put_new(acc, l1_tx_hash_raw, Rpc.string_hash_to_bytes_hash(l1_tx_hash_raw)) + end) + + existing_lifecycle_txs = + transaction_hashes + |> Map.values() + |> Db.lifecycle_transactions() + |> Enum.reduce(%{}, fn tx, acc -> + Map.put(acc, tx.hash, tx) + end) + {rollup_block_to_l1_txs, lifecycle_txs, blocks_requests} = logs |> Enum.reduce({%{}, %{}, %{}}, fn event, {block_to_txs, lifecycle_txs, blocks_requests} -> rollup_block_hash = send_root_updated_event_parse(event) l1_tx_hash_raw = event["transactionHash"] - l1_tx_hash = Rpc.string_hash_to_bytes_hash(l1_tx_hash_raw) + l1_tx_hash = transaction_hashes[l1_tx_hash_raw] l1_blk_num = quantity_to_integer(event["blockNumber"]) + # There is no need to include the found block hash for the consequent confirmed + # blocks discovery step since it is assumed that already existing lifecycle + # transactions are already linked with the corresponding rollup blocks. updated_block_to_txs = - Map.put( - block_to_txs, - rollup_block_hash, - %{l1_tx_hash: l1_tx_hash, l1_block_num: l1_blk_num} - ) + if Map.has_key?(existing_lifecycle_txs, l1_tx_hash) do + block_to_txs + else + Map.put( + block_to_txs, + rollup_block_hash, + %{l1_tx_hash: l1_tx_hash, l1_block_num: l1_blk_num} + ) + end updated_lifecycle_txs = Map.put( @@ -435,7 +723,7 @@ defmodule Indexer.Fetcher.Arbitrum.Workers.NewConfirmations do {updated_block_to_txs, updated_lifecycle_txs, updated_blocks_requests} end) - {rollup_block_to_l1_txs, lifecycle_txs, Map.values(blocks_requests)} + {rollup_block_to_l1_txs, lifecycle_txs, Map.values(blocks_requests), Map.values(existing_lifecycle_txs)} end # Transforms rollup block hashes to numbers and associates them with their confirmation descriptions. @@ -455,6 +743,15 @@ defmodule Indexer.Fetcher.Arbitrum.Workers.NewConfirmations do # ## Returns # - A list of rollup blocks each associated with the transaction's hash that # confirms the block. + @spec discover_rollup_blocks( + %{binary() => %{l1_tx_hash: binary(), l1_block_num: non_neg_integer()}}, + %{ + :logs_block_range => non_neg_integer(), + :outbox_address => binary(), + :json_rpc_named_arguments => EthereumJSONRPC.json_rpc_named_arguments(), + optional(any()) => any() + } + ) :: [Arbitrum.BatchBlock.to_import()] defp discover_rollup_blocks(rollup_blocks_to_l1_txs, outbox_config) do block_to_l1_txs = rollup_blocks_to_l1_txs @@ -510,11 +807,16 @@ defmodule Indexer.Fetcher.Arbitrum.Workers.NewConfirmations do # This function follows these steps to identify unconfirmed rollup blocks related # to a single confirmation event: # 1. Retrieve the batch associated with the specified rollup block number. - # 2. Obtain a list of unconfirmed blocks within that batch. - # 3. Determine the first unconfirmed block in the batch, considering potential - # gaps or already confirmed blocks. - # 4. Verify the continuity of the unconfirmed blocks range to ensure there are no - # database inconsistencies or unindexed blocks. + # 2. Obtain a list of unconfirmed blocks within that batch. For the historical + # confirmations discovery, the list will include both unconfirmed blocks that + # are covered by the current confirmation and those that a going to be covered + # by the predecessor confirmation. + # 3. Determine the first unconfirmed block in the batch. It could be the first + # block in the batch or a block the next after the last confirmed block in the + # predecessor confirmation. + # 4. Verify the continuity of the unconfirmed blocks to be covered by the current + # confirmation to ensure there are no database inconsistencies or unindexed + # blocks. # 5. If the first unconfirmed block is at the start of the batch, check if the # confirmation also covers blocks from previous batches. If so, include their # unconfirmed blocks in the range. @@ -552,6 +854,17 @@ defmodule Indexer.Fetcher.Arbitrum.Workers.NewConfirmations do # the current confirmation but not yet marked as confirmed in the database. # - `{:error, []}`: If a discrepancy or inconsistency is found during the # discovery process. + @spec discover_rollup_blocks_belonging_to_one_confirmation( + non_neg_integer(), + %{:l1_block_num => non_neg_integer(), optional(any()) => any()}, + %{ + :logs_block_range => non_neg_integer(), + :outbox_address => binary(), + :json_rpc_named_arguments => EthereumJSONRPC.json_rpc_named_arguments(), + optional(any()) => any() + }, + __MODULE__.cached_logs() + ) :: {:ok, [Arbitrum.BatchBlock.to_import()]} | {:error, []} defp discover_rollup_blocks_belonging_to_one_confirmation( rollup_block_num, confirmation_desc, @@ -561,7 +874,7 @@ defmodule Indexer.Fetcher.Arbitrum.Workers.NewConfirmations do # The following batch fields are required in the further processing: # number, start_block, end_block, commitment_transaction.block_number with {:ok, batch} <- discover_rollup_blocks__get_batch(rollup_block_num), - {:ok, unconfirmed_rollup_blocks} when unconfirmed_rollup_blocks != [] <- + {:ok, raw_unconfirmed_rollup_blocks} when raw_unconfirmed_rollup_blocks != [] <- discover_rollup_blocks__get_unconfirmed_rollup_blocks(batch, rollup_block_num), # It is not the issue to request logs for the first call of # discover_rollup_blocks_belonging_to_one_confirmation since we need @@ -572,14 +885,19 @@ defmodule Indexer.Fetcher.Arbitrum.Workers.NewConfirmations do {:ok, {first_unconfirmed_block, new_cache}} <- discover_rollup_blocks__check_confirmed_blocks_in_batch( rollup_block_num, - length(unconfirmed_rollup_blocks), batch, confirmation_desc, outbox_config, cache ), - true <- discover_rollup_blocks__check_consecutive_rollup_blocks(unconfirmed_rollup_blocks, batch.number) do - if List.first(unconfirmed_rollup_blocks).block_number == batch.start_block do + {:ok, unconfirmed_rollup_blocks} <- + discover_rollup_blocks__check_consecutive_rollup_blocks( + raw_unconfirmed_rollup_blocks, + first_unconfirmed_block, + rollup_block_num, + batch.number + ) do + if first_unconfirmed_block == batch.start_block do log_info("End of the batch #{batch.number} discovered, moving to the previous batch") {status, updated_rollup_blocks} = @@ -598,13 +916,19 @@ defmodule Indexer.Fetcher.Arbitrum.Workers.NewConfirmations do :ok -> {:ok, unconfirmed_rollup_blocks ++ updated_rollup_blocks} end else - log_info("All unconfirmed blocks in the batch ##{batch.number} found") + # During the process of new confirmations discovery it will show "N of N", + # for the process of historical confirmations discovery it will show "N of M". + log_info( + "#{length(unconfirmed_rollup_blocks)} of #{length(raw_unconfirmed_rollup_blocks)} blocks in the batch ##{batch.number} corresponds to current confirmation" + ) + {:ok, unconfirmed_rollup_blocks} end end end # Retrieves the batch containing the specified rollup block and logs the attempt. + @spec discover_rollup_blocks__get_batch(non_neg_integer()) :: {:ok, Arbitrum.L1Batch.t()} | {:error, []} defp discover_rollup_blocks__get_batch(rollup_block_num) do # Generally if batch is nil it means either # - a batch to a rollup block association is not found, not recoverable @@ -631,6 +955,10 @@ defmodule Indexer.Fetcher.Arbitrum.Workers.NewConfirmations do # Identifies unconfirmed rollup blocks within a batch up to specified block # number, checking for potential synchronization issues. + @spec discover_rollup_blocks__get_unconfirmed_rollup_blocks( + Arbitrum.L1Batch.t(), + non_neg_integer() + ) :: {:ok, [Arbitrum.BatchBlock.to_import()]} | {:error, []} defp discover_rollup_blocks__get_unconfirmed_rollup_blocks(batch, rollup_block_num) do unconfirmed_rollup_blocks = Db.unconfirmed_rollup_blocks(batch.start_block, rollup_block_num) @@ -657,14 +985,14 @@ defmodule Indexer.Fetcher.Arbitrum.Workers.NewConfirmations do # mentions any block of the batch as the top of the confirmed blocks. Depending # on the lookup result, it either considers the found block or the very # first block of the batch as the start of the range of unconfirmed blocks ending - # with `rollup_block_num`. It also checks for a gap in the identified rollup - # blocks range, indicating potential database inconsistency or an unprocessed batch. + # with `rollup_block_num`. # To optimize `eth_getLogs` calls required for the `SendRootUpdated` event lookup, # it uses a cache. + # Since this function only discovers the number of the unconfirmed block, it does + # not check continuity of the unconfirmed blocks range in the batch. # # ## Parameters # - `rollup_block_num`: The rollup block number to check for confirmation. - # - `unconfirmed_rollup_blocks_length`: The number of unconfirmed blocks in the batch. # - `batch`: The batch containing the rollup blocks. # - `confirmation_desc`: Details of the latest confirmation. # - `outbox_config`: Configuration for the Arbitrum outbox contract. @@ -676,20 +1004,30 @@ defmodule Indexer.Fetcher.Arbitrum.Workers.NewConfirmations do # found. # - `{:ok, {first_unconfirmed_block_in_batch, new_cache}}` with the number of the # first unconfirmed block in the batch and updated cache. + @spec discover_rollup_blocks__check_confirmed_blocks_in_batch( + non_neg_integer(), + Arbitrum.L1Batch.t(), + %{:l1_block_num => non_neg_integer(), optional(any()) => any()}, + %{ + :logs_block_range => non_neg_integer(), + :outbox_address => binary(), + :json_rpc_named_arguments => EthereumJSONRPC.json_rpc_named_arguments(), + optional(any()) => any() + }, + __MODULE__.cached_logs() + ) :: {:ok, {non_neg_integer(), __MODULE__.cached_logs()}} | {:ok, []} | {:error, []} defp discover_rollup_blocks__check_confirmed_blocks_in_batch( rollup_block_num, - unconfirmed_rollup_blocks_length, batch, confirmation_desc, outbox_config, cache ) do - # This function might be over-engineered, as confirmations are likely always - # aligned with the end of a batch. If, after analyzing the databases of fully - # synchronized BS instances across several Arbitrum-based chains, it is confirmed - # that this alignment is consistent, then this functionality can be optimized. + # This function might look like over-engineered, but confirmations are not always + # aligned with the boundaries of a batch unfortunately. - {status, block?, new_cache} = check_if_batch_confirmed(batch, confirmation_desc, outbox_config, cache) + {status, block?, new_cache} = + check_if_batch_confirmed(batch, confirmation_desc, outbox_config, rollup_block_num, cache) case {status, block? == rollup_block_num} do {:error, _} -> @@ -712,20 +1050,7 @@ defmodule Indexer.Fetcher.Arbitrum.Workers.NewConfirmations do value + 1 end - if unconfirmed_rollup_blocks_length == rollup_block_num - first_unconfirmed_block_in_batch + 1 do - {:ok, {first_unconfirmed_block_in_batch, new_cache}} - else - # The case when there is a gap in the blocks range is possible when there is - # a DB inconsistency. From another side, the case when the confirmation is for blocks - # in two batches -- one batch has been already indexed, another one has not been yet. - # Both cases should be handled in the same way - this confirmation must be postponed - # until the case resolution. - log_warning( - "Only #{unconfirmed_rollup_blocks_length} of #{rollup_block_num - first_unconfirmed_block_in_batch + 1} blocks found. Skipping the blocks from the batch #{batch.number}" - ) - - {:error, []} - end + {:ok, {first_unconfirmed_block_in_batch, new_cache}} end end @@ -741,6 +1066,8 @@ defmodule Indexer.Fetcher.Arbitrum.Workers.NewConfirmations do # - `confirmation_desc`: Description of the latest confirmation details. # - `l1_outbox_config`: Configuration for the L1 outbox contract, including block # range for logs retrieval. + # - `highest_unconfirmed_block`: The batch's highest rollup block number which is + # considered as unconfirmed. # - `cache`: A cache for the logs to reduce the number of `eth_getLogs` calls. # # ## Returns @@ -754,9 +1081,21 @@ defmodule Indexer.Fetcher.Arbitrum.Workers.NewConfirmations do # such as when a rollup block corresponding to a given hash is not found in the # database. # - `new_cache` contains the updated logs cache despite the error. - defp check_if_batch_confirmed(batch, confirmation_desc, l1_outbox_config, cache) do + @spec check_if_batch_confirmed( + Arbitrum.L1Batch.t(), + %{:l1_block_num => non_neg_integer(), optional(any()) => any()}, + %{ + :logs_block_range => non_neg_integer(), + :outbox_address => binary(), + :json_rpc_named_arguments => EthereumJSONRPC.json_rpc_named_arguments(), + optional(any()) => any() + }, + non_neg_integer(), + __MODULE__.cached_logs() + ) :: {:ok, nil | non_neg_integer(), __MODULE__.cached_logs()} | {:error, nil, __MODULE__.cached_logs()} + defp check_if_batch_confirmed(batch, confirmation_desc, l1_outbox_config, highest_unconfirmed_block, cache) do log_info( - "Use L1 blocks #{batch.commitment_transaction.block_number}..#{confirmation_desc.l1_block_num - 1} to look for a rollup block confirmation within #{batch.start_block}..#{batch.end_block} of ##{batch.number}" + "Use L1 blocks #{batch.commitment_transaction.block_number}..#{confirmation_desc.l1_block_num - 1} to look for a rollup block confirmation within #{batch.start_block}..#{highest_unconfirmed_block} of ##{batch.number}" ) block_pairs = @@ -851,7 +1190,7 @@ defmodule Indexer.Fetcher.Arbitrum.Workers.NewConfirmations do {non_neg_integer(), non_neg_integer()}, {non_neg_integer(), non_neg_integer()}, %{ - :outbox_address => String.t(), + :outbox_address => binary(), :json_rpc_named_arguments => EthereumJSONRPC.json_rpc_named_arguments(), optional(any()) => any() }, @@ -979,50 +1318,101 @@ defmodule Indexer.Fetcher.Arbitrum.Workers.NewConfirmations do end # Extracts the rollup block hash from a `SendRootUpdated` event log. + @spec send_root_updated_event_parse(%{String.t() => any()}) :: binary() defp send_root_updated_event_parse(event) do [_, _, l2_block_hash] = event["topics"] l2_block_hash end - # Validates if the list of rollup blocks are consecutive without gaps in their numbering. - defp discover_rollup_blocks__check_consecutive_rollup_blocks(unconfirmed_rollup_blocks, batch_number) do - if consecutive_rollup_blocks?(unconfirmed_rollup_blocks) do - true - else - # The case when there is a gap in the blocks range is possible when there is - # a DB inconsistency. From another side, the case when the confirmation is for blocks - # in two batches -- one batch has been already indexed, another one has not been yet. - # Both cases should be handled in the same way - this confirmation must be postponed - # until the case resolution. - log_warning("Skipping the blocks from the batch #{batch_number}") - {:error, []} + # Returns consecutive rollup blocks within the range of lowest_confirmed_block..highest_confirmed_block + # assuming that the list of unconfirmed rollup blocks finishes on highest_confirmed_block and + # is sorted by block number + @spec discover_rollup_blocks__check_consecutive_rollup_blocks( + [Arbitrum.BatchBlock.to_import()], + non_neg_integer(), + non_neg_integer(), + non_neg_integer() + ) :: {:ok, [Arbitrum.BatchBlock.to_import()]} | {:error, []} + defp discover_rollup_blocks__check_consecutive_rollup_blocks( + all_unconfirmed_rollup_blocks, + lowest_confirmed_block, + highest_confirmed_block, + batch_number + ) do + {status, unconfirmed_rollup_blocks} = + check_consecutive_rollup_blocks_and_cut(all_unconfirmed_rollup_blocks, lowest_confirmed_block) + + unconfirmed_rollup_blocks_length = length(unconfirmed_rollup_blocks) + expected_blocks_range_length = highest_confirmed_block - lowest_confirmed_block + 1 + + case {status, unconfirmed_rollup_blocks_length == expected_blocks_range_length} do + {true, true} -> + {:ok, unconfirmed_rollup_blocks} + + {true, false} -> + log_warning( + "Only #{unconfirmed_rollup_blocks_length} of #{expected_blocks_range_length} blocks found. Skipping the blocks from the batch #{batch_number}" + ) + + {:error, []} + + _ -> + # The case when there is a gap in the blocks range is possible when there is + # a DB inconsistency. From another side, the case when the confirmation is for blocks + # in two batches -- one batch has been already indexed, another one has not been yet. + # Both cases should be handled in the same way - this confirmation must be postponed + # until the case resolution. + log_warning("Skipping the blocks from the batch #{batch_number}") + {:error, []} end end - # Checks if the list of rollup blocks are consecutive without gaps in their numbering. - defp consecutive_rollup_blocks?(blocks) do - {_, status} = - Enum.reduce_while(blocks, {nil, false}, fn block, {prev, _} -> - case prev do - nil -> - {:cont, {block.block_number, true}} - - value -> + # Checks for consecutive rollup blocks starting from the lowest confirmed block + # and returns the status and the list of consecutive blocks. + # + # This function processes a list of rollup blocks to verify if they are consecutive, + # starting from the `lowest_confirmed_block`. If a gap is detected between the + # blocks, the process halts and returns false along with an empty list. If all + # blocks are consecutive, it returns true along with the list of consecutive + # blocks. + # + # ## Parameters + # - `blocks`: A list of rollup blocks to check. + # - `lowest_confirmed_block`: The lowest confirmed block number to start the check. + # + # ## Returns + # - A tuple where the first element is a boolean indicating if all blocks are + # consecutive, and the second element is the list of consecutive blocks if the + # first element is true, otherwise an empty list. + @spec check_consecutive_rollup_blocks_and_cut([Arbitrum.BatchBlock.to_import()], non_neg_integer()) :: + {boolean(), [Arbitrum.BatchBlock.to_import()]} + defp check_consecutive_rollup_blocks_and_cut(blocks, lowest_confirmed_block) do + {_, status, cut_blocks} = + Enum.reduce_while(blocks, {nil, false, []}, fn block, {prev, _, cut_blocks} -> + case {prev, block.block_number >= lowest_confirmed_block} do + {nil, true} -> + {:cont, {block.block_number, true, [block | cut_blocks]}} + + {value, true} -> # credo:disable-for-next-line Credo.Check.Refactor.Nesting if block.block_number - 1 == value do - {:cont, {block.block_number, true}} + {:cont, {block.block_number, true, [block | cut_blocks]}} else log_warning("A gap between blocks ##{value} and ##{block.block_number} found") - {:halt, {block.block_number, false}} + {:halt, {block.block_number, false, []}} end + + {_, false} -> + {:cont, {nil, false, []}} end end) - status + {status, cut_blocks} end # Adds the confirmation transaction hash to each rollup block description in the list. + @spec add_confirmation_transaction([Arbitrum.BatchBlock.to_import()], binary()) :: [Arbitrum.BatchBlock.to_import()] defp add_confirmation_transaction(block_descriptions_list, confirm_tx_hash) do block_descriptions_list |> Enum.reduce([], fn block_descr, updated -> @@ -1035,6 +1425,10 @@ defmodule Indexer.Fetcher.Arbitrum.Workers.NewConfirmations do end # Selects lifecycle transaction descriptions used for confirming a given list of rollup blocks. + @spec take_lifecycle_txs_for_confirmed_blocks( + [Arbitrum.BatchBlock.to_import()], + %{binary() => %{hash: binary(), block_number: non_neg_integer()}} + ) :: %{binary() => %{hash: binary(), block_number: non_neg_integer()}} defp take_lifecycle_txs_for_confirmed_blocks(confirmed_rollup_blocks, lifecycle_txs) do confirmed_rollup_blocks |> Enum.reduce(%{}, fn block_descr, updated_txs -> @@ -1064,25 +1458,40 @@ defmodule Indexer.Fetcher.Arbitrum.Workers.NewConfirmations do # # ## Returns # - A tuple containing: - # - The list of lifecycle transactions, ready for import. + # - The map of lifecycle transactions where each transaction is ready for import. # - The list of confirmed rollup blocks, ready for import. # - The highest confirmed block number processed during this run. + @spec finalize_lifecycle_txs_and_confirmed_blocks( + %{binary() => %{hash: binary(), block_number: non_neg_integer()}}, + [Arbitrum.BatchBlock.to_import()], + %{required(EthereumJSONRPC.block_number()) => DateTime.t()}, + boolean() + ) :: { + [Arbitrum.LifecycleTransaction.to_import()], + [Arbitrum.BatchBlock.to_import()], + integer() + } defp finalize_lifecycle_txs_and_confirmed_blocks( basic_lifecycle_txs, confirmed_rollup_blocks, - l1_blocks_requests, - %{ - json_rpc_named_arguments: l1_json_rpc_named_arguments, - chunk_size: l1_chunk_size, - track_finalization: track_finalization? - } = _l1_rpc_config - ) do - blocks_to_ts = - Rpc.execute_blocks_requests_and_get_ts(l1_blocks_requests, l1_json_rpc_named_arguments, l1_chunk_size) + l1_blocks_to_ts, + track_finalization? + ) + + defp finalize_lifecycle_txs_and_confirmed_blocks(basic_lifecycle_txs, _, _, _) + when map_size(basic_lifecycle_txs) == 0 do + {[], [], -1} + end + defp finalize_lifecycle_txs_and_confirmed_blocks( + basic_lifecycle_txs, + confirmed_rollup_blocks, + l1_blocks_to_ts, + track_finalization? + ) do lifecycle_txs = basic_lifecycle_txs - |> ArbitrumHelper.extend_lifecycle_txs_with_ts_and_status(blocks_to_ts, track_finalization?) + |> ArbitrumHelper.extend_lifecycle_txs_with_ts_and_status(l1_blocks_to_ts, track_finalization?) |> Db.get_indices_for_l1_transactions() {updated_rollup_blocks, highest_confirmed_block_number} = @@ -1101,7 +1510,40 @@ defmodule Indexer.Fetcher.Arbitrum.Workers.NewConfirmations do {Map.values(lifecycle_txs), updated_rollup_blocks, highest_confirmed_block_number} end + # Updates lifecycle transactions with new L1 block numbers and timestamps which could appear due to re-orgs. + # + # ## Parameters + # - `existing_commitment_txs`: A list of existing confirmation transactions to be checked and updated. + # - `tx_to_l1_block`: A map from transaction hashes to their corresponding new L1 block numbers. + # - `l1_block_to_ts`: A map from L1 block numbers to their corresponding new timestamps. + # + # ## Returns + # - A list of updated confirmation transactions with new block numbers and timestamps. + @spec update_lifecycle_txs_for_new_blocks( + [Arbitrum.LifecycleTransaction.to_import()], + %{binary() => non_neg_integer()}, + %{non_neg_integer() => DateTime.t()} + ) :: [Arbitrum.LifecycleTransaction.to_import()] + defp update_lifecycle_txs_for_new_blocks(existing_commitment_txs, tx_to_l1_block, l1_block_to_ts) do + existing_commitment_txs + |> Enum.reduce([], fn tx, updated_txs -> + new_block_num = tx_to_l1_block[tx.hash].block_number + new_ts = l1_block_to_ts[new_block_num] + + case ArbitrumHelper.compare_lifecycle_tx_and_update(tx, {new_block_num, new_ts}, "confirmation") do + {:updated, updated_tx} -> + [updated_tx | updated_txs] + + _ -> + updated_txs + end + end) + end + # Retrieves committed L2-to-L1 messages up to specified block number and marks them as 'confirmed'. + @spec get_confirmed_l2_to_l1_messages(integer()) :: [Arbitrum.Message.to_import()] + defp get_confirmed_l2_to_l1_messages(block_number) + defp get_confirmed_l2_to_l1_messages(-1) do [] end diff --git a/config/runtime.exs b/config/runtime.exs index 19c2bd4e8c..db25f49215 100644 --- a/config/runtime.exs +++ b/config/runtime.exs @@ -895,6 +895,7 @@ config :indexer, Indexer.Fetcher.Arbitrum, l1_rollup_address: System.get_env("INDEXER_ARBITRUM_L1_ROLLUP_CONTRACT"), l1_rollup_init_block: ConfigHelper.parse_integer_env_var("INDEXER_ARBITRUM_L1_ROLLUP_INIT_BLOCK", 1), l1_start_block: ConfigHelper.parse_integer_env_var("INDEXER_ARBITRUM_L1_COMMON_START_BLOCK", 0), + l1_finalization_threshold: ConfigHelper.parse_integer_env_var("INDEXER_ARBITRUM_L1_FINALIZATION_THRESHOLD", 1000), rollup_chunk_size: ConfigHelper.parse_integer_env_var("INDEXER_ARBITRUM_ROLLUP_CHUNK_SIZE", 20) config :indexer, Indexer.Fetcher.Arbitrum.TrackingMessagesOnL1, diff --git a/docker-compose/envs/common-blockscout.env b/docker-compose/envs/common-blockscout.env index 313ac7b8f6..2b23e63402 100644 --- a/docker-compose/envs/common-blockscout.env +++ b/docker-compose/envs/common-blockscout.env @@ -236,6 +236,7 @@ INDEXER_DISABLE_INTERNAL_TRANSACTIONS_FETCHER=false # INDEXER_ARBITRUM_L1_ROLLUP_CONTRACT= # INDEXER_ARBITRUM_L1_ROLLUP_INIT_BLOCK= # INDEXER_ARBITRUM_L1_COMMON_START_BLOCK= +# INDEXER_ARBITRUM_L1_FINALIZATION_THRESHOLD= # INDEXER_ARBITRUM_ROLLUP_CHUNK_SIZE= # INDEXER_ARBITRUM_BATCHES_TRACKING_ENABLED= # INDEXER_ARBITRUM_BATCHES_TRACKING_RECHECK_INTERVAL=