diff --git a/apps/explorer/lib/explorer/chain/arbitrum/batch_block.ex b/apps/explorer/lib/explorer/chain/arbitrum/batch_block.ex index 51b8f541a1..ab1ed9db99 100644 --- a/apps/explorer/lib/explorer/chain/arbitrum/batch_block.ex +++ b/apps/explorer/lib/explorer/chain/arbitrum/batch_block.ex @@ -22,7 +22,7 @@ defmodule Explorer.Chain.Arbitrum.BatchBlock do * `batch_number` - The number of the Arbitrum batch. * `block_number` - The number of the rollup block. * `confirmation_id` - The ID of the confirmation L1 transaction from - `Explorer.Chain.LifecycleTransaction`, or `nil` if the + `Explorer.Chain.Arbitrum.LifecycleTransaction`, or `nil` if the block is not confirmed yet. """ @type to_import :: %{ diff --git a/apps/explorer/lib/explorer/chain/arbitrum/reader.ex b/apps/explorer/lib/explorer/chain/arbitrum/reader.ex index 7b8dc92a45..822072b544 100644 --- a/apps/explorer/lib/explorer/chain/arbitrum/reader.ex +++ b/apps/explorer/lib/explorer/chain/arbitrum/reader.ex @@ -176,6 +176,29 @@ defmodule Explorer.Chain.Arbitrum.Reader do |> Repo.one() end + @doc """ + Reads a list of L1 transactions by their hashes from the `arbitrum_lifecycle_l1_transactions` table and returns their IDs. + + ## Parameters + - `l1_tx_hashes`: A list of hashes to retrieve L1 transactions for. + + ## Returns + - A list of tuples containing transaction hashes and IDs for the transaction + hashes from the input list. The output list may be smaller than the input + list. + """ + @spec lifecycle_transaction_ids([binary()]) :: [{Hash.t(), non_neg_integer}] + def lifecycle_transaction_ids(l1_tx_hashes) when is_list(l1_tx_hashes) do + query = + from( + lt in LifecycleTransaction, + select: {lt.hash, lt.id}, + where: lt.hash in ^l1_tx_hashes + ) + + Repo.all(query, timeout: :infinity) + end + @doc """ Reads a list of L1 transactions by their hashes from the `arbitrum_lifecycle_l1_transactions` table. @@ -183,15 +206,15 @@ defmodule Explorer.Chain.Arbitrum.Reader do - `l1_tx_hashes`: A list of hashes to retrieve L1 transactions for. ## Returns - - A list of `Explorer.Chain.Arbitrum.LifecycleTransaction` corresponding to the hashes from - the input list. The output list may be smaller than the input list. + - A list of `Explorer.Chain.Arbitrum.LifecycleTransaction` corresponding to the + hashes from the input list. The output list may be smaller than the input + list. """ - @spec lifecycle_transactions(maybe_improper_list(Hash.t(), [])) :: [LifecycleTransaction] + @spec lifecycle_transactions([binary()]) :: [LifecycleTransaction.t()] def lifecycle_transactions(l1_tx_hashes) when is_list(l1_tx_hashes) do query = from( lt in LifecycleTransaction, - select: {lt.hash, lt.id}, where: lt.hash in ^l1_tx_hashes ) diff --git a/apps/indexer/lib/indexer/fetcher/arbitrum/utils/db.ex b/apps/indexer/lib/indexer/fetcher/arbitrum/utils/db.ex index a76361a5cd..5ca90219df 100644 --- a/apps/indexer/lib/indexer/fetcher/arbitrum/utils/db.ex +++ b/apps/indexer/lib/indexer/fetcher/arbitrum/utils/db.ex @@ -34,16 +34,23 @@ defmodule Indexer.Fetcher.Arbitrum.Utils.Db do the key `:id`, representing the index of the L1 transaction in the `arbitrum_lifecycle_l1_transactions` table. """ - @spec get_indices_for_l1_transactions(map()) :: map() + @spec get_indices_for_l1_transactions(%{ + binary() => %{ + :hash => binary(), + :block_number => FullBlock.block_number(), + :timestamp => DateTime.t(), + :status => :unfinalized | :finalized, + optional(:id) => non_neg_integer() + } + }) :: %{binary() => Arbitrum.LifecycleTransaction.to_import()} # TODO: consider a way to remove duplicate with ZkSync.Utils.Db - # credo:disable-for-next-line Credo.Check.Design.DuplicatedCode def get_indices_for_l1_transactions(new_l1_txs) when is_map(new_l1_txs) do # Get indices for l1 transactions previously handled l1_txs = new_l1_txs |> Map.keys() - |> Reader.lifecycle_transactions() + |> Reader.lifecycle_transaction_ids() |> Enum.reduce(new_l1_txs, fn {hash, id}, txs -> {_, txs} = Map.get_and_update!(txs, hash.bytes, fn l1_tx -> @@ -79,6 +86,25 @@ defmodule Indexer.Fetcher.Arbitrum.Utils.Db do updated_l1_txs end + @doc """ + Reads a list of L1 transactions by their hashes from the + `arbitrum_lifecycle_l1_transactions` table and converts them to maps. + + ## Parameters + - `l1_tx_hashes`: A list of hashes to retrieve L1 transactions for. + + ## Returns + - A list of maps representing the `Explorer.Chain.Arbitrum.LifecycleTransaction` + corresponding to the hashes from the input list. The output list is + compatible with the database import operation. + """ + @spec lifecycle_transactions([binary()]) :: [Arbitrum.LifecycleTransaction.to_import()] + def lifecycle_transactions(l1_tx_hashes) do + l1_tx_hashes + |> Reader.lifecycle_transactions() + |> Enum.map(&lifecycle_transaction_to_map/1) + end + @doc """ Calculates the next L1 block number to search for the latest committed batch. @@ -719,6 +745,7 @@ defmodule Indexer.Fetcher.Arbitrum.Utils.Db do Chain.timestamp_to_block_number(timestamp, :after, false) end + @spec lifecycle_transaction_to_map(Arbitrum.LifecycleTransaction.t()) :: Arbitrum.LifecycleTransaction.to_import() defp lifecycle_transaction_to_map(tx) do [:id, :hash, :block_number, :timestamp, :status] |> db_record_to_map(tx) diff --git a/apps/indexer/lib/indexer/fetcher/arbitrum/workers/new_batches.ex b/apps/indexer/lib/indexer/fetcher/arbitrum/workers/new_batches.ex index 8e3cdb017f..c0672139ec 100644 --- a/apps/indexer/lib/indexer/fetcher/arbitrum/workers/new_batches.ex +++ b/apps/indexer/lib/indexer/fetcher/arbitrum/workers/new_batches.ex @@ -448,13 +448,20 @@ defmodule Indexer.Fetcher.Arbitrum.Workers.NewBatches do |> parse_logs_to_get_batch_numbers() |> Db.batches_exist() - {batches, txs_requests, blocks_requests} = parse_logs_for_new_batches(logs, existing_batches) + {batches, txs_requests, blocks_requests, existing_commitment_txs} = + parse_logs_for_new_batches(logs, existing_batches) blocks_to_ts = Rpc.execute_blocks_requests_and_get_ts(blocks_requests, json_rpc_named_arguments, chunk_size) - {lifecycle_txs_wo_indices, batches_to_import} = + {initial_lifecycle_txs, batches_to_import} = execute_tx_requests_parse_txs_calldata(txs_requests, msg_to_block_shift, blocks_to_ts, batches, l1_rpc_config) + # Check if the commitment transactions for the batches which are already in the database + # needs to be updated in case of reorgs + lifecycle_txs_wo_indices = + initial_lifecycle_txs + |> Map.merge(update_lifecycle_txs_for_new_blocks(existing_commitment_txs, blocks_to_ts)) + {blocks_to_import, rollup_txs_to_import} = get_rollup_blocks_and_transactions(batches_to_import, rollup_rpc_config) lifecycle_txs = @@ -482,14 +489,20 @@ defmodule Indexer.Fetcher.Arbitrum.Workers.NewBatches do ] end) - committed_txs = - blocks_to_import - |> Map.keys() - |> Enum.max() - |> get_committed_l2_to_l1_messages() + # It is safe to not re-mark messages as committed for the batches that are already in the database + committed_messages = + if Enum.empty?(blocks_to_import) do + [] + else + # Without check on the empty list of keys `Enum.max()` will raise an error + blocks_to_import + |> Map.keys() + |> Enum.max() + |> get_committed_l2_to_l1_messages() + end {batches_list_to_import, Map.values(lifecycle_txs), Map.values(blocks_to_import), rollup_txs_to_import, - committed_txs} + committed_messages} end # Extracts batch numbers from logs of SequencerBatchDelivered events. @@ -506,8 +519,10 @@ defmodule Indexer.Fetcher.Arbitrum.Workers.NewBatches do # This function sifts through logs of SequencerBatchDelivered events, extracts the # necessary data, and assembles a map of new batch descriptions. Additionally, it # prepares RPC `eth_getTransactionByHash` and `eth_getBlockByNumber` requests to - # fetch details not present in the logs. To minimize subsequent RPC calls, only - # batches not previously known (i.e., absent in `existing_batches`) are processed. + # fetch details not present in the logs. To minimize subsequent RPC calls, requests to + # get the transactions details are only made for batches not previously known. + # For the existing batches, the function prepares a map of commitment transactions + # assuming that they must be updated if reorgs occur. # # ## Parameters # - `logs`: A list of event logs to be processed. @@ -520,50 +535,66 @@ defmodule Indexer.Fetcher.Arbitrum.Workers.NewBatches do # the L1 transactions associated with these batches. # - A list of RPC requests to fetch details of the L1 blocks where these batches # were included. + # - A map of commitment transactions for the existing batches where the value is + # the block number of the transaction. + @spec parse_logs_for_new_batches( + [%{String.t() => any()}], + [non_neg_integer()] + ) :: { + %{:number => non_neg_integer(), :before_acc => binary(), :after_acc => binary(), :tx_hash => binary()}, + [EthereumJSONRPC.Transport.request()], + [EthereumJSONRPC.Transport.request()], + %{binary() => non_neg_integer()} + } defp parse_logs_for_new_batches(logs, existing_batches) do - {batches, txs_requests, blocks_requests} = + {batches, txs_requests, blocks_requests, existing_commitment_txs} = logs - |> Enum.reduce({%{}, [], %{}}, fn event, {batches, txs_requests, blocks_requests} -> + |> Enum.reduce({%{}, [], %{}, %{}}, fn event, {batches, txs_requests, blocks_requests, existing_commitment_txs} -> {batch_num, before_acc, after_acc} = sequencer_batch_delivered_event_parse(event) tx_hash_raw = event["transactionHash"] tx_hash = Rpc.string_hash_to_bytes_hash(tx_hash_raw) blk_num = quantity_to_integer(event["blockNumber"]) - if batch_num in existing_batches do - {batches, txs_requests, blocks_requests} - else - updated_batches = - Map.put( - batches, - batch_num, - %{ - number: batch_num, - before_acc: before_acc, - after_acc: after_acc, - tx_hash: tx_hash - } - ) - - updated_txs_requests = [ - Rpc.transaction_by_hash_request(%{id: 0, hash: tx_hash_raw}) - | txs_requests - ] - - updated_blocks_requests = - Map.put( - blocks_requests, - blk_num, - BlockByNumber.request(%{id: 0, number: blk_num}, false, true) - ) - - log_info("New batch #{batch_num} found in #{tx_hash_raw}") + {updated_batches, updated_txs_requests, updated_existing_commitment_txs} = + if batch_num in existing_batches do + {batches, txs_requests, Map.put(existing_commitment_txs, tx_hash, blk_num)} + else + log_info("New batch #{batch_num} found in #{tx_hash_raw}") + + updated_batches = + Map.put( + batches, + batch_num, + %{ + number: batch_num, + before_acc: before_acc, + after_acc: after_acc, + tx_hash: tx_hash + } + ) + + updated_txs_requests = [ + Rpc.transaction_by_hash_request(%{id: 0, hash: tx_hash_raw}) + | txs_requests + ] + + {updated_batches, updated_txs_requests, existing_commitment_txs} + end + + # In order to have an ability to update commitment transaction for the existing batches + # in case of reorgs, we need to re-execute the block requests + updated_blocks_requests = + Map.put( + blocks_requests, + blk_num, + BlockByNumber.request(%{id: 0, number: blk_num}, false, true) + ) - {updated_batches, updated_txs_requests, updated_blocks_requests} - end + {updated_batches, updated_txs_requests, updated_blocks_requests, updated_existing_commitment_txs} end) - {batches, txs_requests, Map.values(blocks_requests)} + {batches, txs_requests, Map.values(blocks_requests), existing_commitment_txs} end # Parses SequencerBatchDelivered event to get batch sequence number and associated accumulators @@ -693,6 +724,51 @@ defmodule Indexer.Fetcher.Arbitrum.Workers.NewBatches do end end + # Updates lifecycle transactions for new blocks by setting the block number and + # timestamp for each transaction. + # + # The function checks if a transaction's block number and timestamp match the + # new values. If they do not, the transaction is updated with the new block + # number and timestamp. + # + # Parameters: + # - `existing_commitment_txs`: A map where keys are transaction hashes and + # values are block numbers. + # - `block_to_ts`: A map where keys are block numbers and values are timestamps. + # + # Returns: + # - A map where keys are transaction hashes and values are updated lifecycle + # transactions with the block number and timestamp set, compatible with the + # database import operation. + @spec update_lifecycle_txs_for_new_blocks(%{binary() => non_neg_integer()}, %{non_neg_integer() => non_neg_integer()}) :: + %{binary() => Arbitrum.LifecycleTransaction.to_import()} + defp update_lifecycle_txs_for_new_blocks(existing_commitment_txs, block_to_ts) do + existing_commitment_txs + |> Map.keys() + |> Db.lifecycle_transactions() + |> Enum.reduce(%{}, fn tx, txs -> + block_num = existing_commitment_txs[tx.hash] + ts = block_to_ts[block_num] + + if tx.block_number == block_num and DateTime.compare(tx.timestamp, ts) == :eq do + txs + else + log_info( + "The commitment transaction 0x#{tx.hash |> Base.encode16(case: :lower)} will be updated with the new block number and timestamp" + ) + + Map.put( + txs, + tx.hash, + Map.merge(tx, %{ + block_number: block_num, + timestamp: ts + }) + ) + end + end) + end + # Retrieves rollup blocks and transactions for a list of batches. # # This function extracts rollup block ranges from each batch's data to determine @@ -706,8 +782,24 @@ defmodule Indexer.Fetcher.Arbitrum.Workers.NewBatches do # # ## Returns # - A tuple containing: - # - A map of rollup blocks, ready for database import. + # - A map of rollup blocks, where each block is ready for database import. # - A list of rollup transactions, ready for database import. + @spec get_rollup_blocks_and_transactions( + %{ + non_neg_integer() => %{ + :number => non_neg_integer(), + :start_block => non_neg_integer(), + :end_block => non_neg_integer(), + optional(any()) => any() + } + }, + %{ + :json_rpc_named_arguments => EthereumJSONRPC.json_rpc_named_arguments(), + :chunk_size => non_neg_integer(), + optional(any()) => any() + } + ) :: + {%{non_neg_integer() => Arbitrum.BatchBlock.to_import()}, [Arbitrum.BatchTransaction.to_import()]} defp get_rollup_blocks_and_transactions( batches, rollup_rpc_config @@ -715,31 +807,36 @@ defmodule Indexer.Fetcher.Arbitrum.Workers.NewBatches do blocks_to_batches = unwrap_rollup_block_ranges(batches) required_blocks_numbers = Map.keys(blocks_to_batches) - log_debug("Identified #{length(required_blocks_numbers)} rollup blocks") - - {blocks_to_import_map, txs_to_import_list} = - get_rollup_blocks_and_txs_from_db(required_blocks_numbers, blocks_to_batches) - - # While it's not entirely aligned with data integrity principles to recover - # rollup blocks and transactions from RPC that are not yet indexed, it's - # a practical compromise to facilitate the progress of batch discovery. Given - # the potential high frequency of new batch appearances and the substantial - # volume of blocks and transactions, prioritizing discovery process advancement - # is deemed reasonable. - {blocks_to_import, txs_to_import} = - recover_data_if_necessary( - blocks_to_import_map, - txs_to_import_list, - required_blocks_numbers, - blocks_to_batches, - rollup_rpc_config - ) - log_info( - "Found #{length(Map.keys(blocks_to_import))} rollup blocks and #{length(txs_to_import)} rollup transactions in DB" - ) + if required_blocks_numbers == [] do + {%{}, []} + else + log_debug("Identified #{length(required_blocks_numbers)} rollup blocks") + + {blocks_to_import_map, txs_to_import_list} = + get_rollup_blocks_and_txs_from_db(required_blocks_numbers, blocks_to_batches) + + # While it's not entirely aligned with data integrity principles to recover + # rollup blocks and transactions from RPC that are not yet indexed, it's + # a practical compromise to facilitate the progress of batch discovery. Given + # the potential high frequency of new batch appearances and the substantial + # volume of blocks and transactions, prioritizing discovery process advancement + # is deemed reasonable. + {blocks_to_import, txs_to_import} = + recover_data_if_necessary( + blocks_to_import_map, + txs_to_import_list, + required_blocks_numbers, + blocks_to_batches, + rollup_rpc_config + ) - {blocks_to_import, txs_to_import} + log_info( + "Found #{length(Map.keys(blocks_to_import))} rollup blocks and #{length(txs_to_import)} rollup transactions in DB" + ) + + {blocks_to_import, txs_to_import} + end end # Unwraps rollup block ranges from batch data to create a block-to-batch number map.