fix: proper handling for re-discovered Arbitrum batches (#10326)

* extended types documentation

* proper handling for re-discovered batches

* specification added

* Code review comment addressed
pull/10381/head
Alexander Kolotov 5 months ago committed by GitHub
parent 98c5668765
commit d6080e04a1
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
  1. 2
      apps/explorer/lib/explorer/chain/arbitrum/batch_block.ex
  2. 31
      apps/explorer/lib/explorer/chain/arbitrum/reader.ex
  3. 33
      apps/indexer/lib/indexer/fetcher/arbitrum/utils/db.ex
  4. 231
      apps/indexer/lib/indexer/fetcher/arbitrum/workers/new_batches.ex

@ -22,7 +22,7 @@ defmodule Explorer.Chain.Arbitrum.BatchBlock do
* `batch_number` - The number of the Arbitrum batch. * `batch_number` - The number of the Arbitrum batch.
* `block_number` - The number of the rollup block. * `block_number` - The number of the rollup block.
* `confirmation_id` - The ID of the confirmation L1 transaction from * `confirmation_id` - The ID of the confirmation L1 transaction from
`Explorer.Chain.LifecycleTransaction`, or `nil` if the `Explorer.Chain.Arbitrum.LifecycleTransaction`, or `nil` if the
block is not confirmed yet. block is not confirmed yet.
""" """
@type to_import :: %{ @type to_import :: %{

@ -176,6 +176,29 @@ defmodule Explorer.Chain.Arbitrum.Reader do
|> Repo.one() |> Repo.one()
end end
@doc """
Reads a list of L1 transactions by their hashes from the `arbitrum_lifecycle_l1_transactions` table and returns their IDs.
## Parameters
- `l1_tx_hashes`: A list of hashes to retrieve L1 transactions for.
## Returns
- A list of tuples containing transaction hashes and IDs for the transaction
hashes from the input list. The output list may be smaller than the input
list.
"""
@spec lifecycle_transaction_ids([binary()]) :: [{Hash.t(), non_neg_integer}]
def lifecycle_transaction_ids(l1_tx_hashes) when is_list(l1_tx_hashes) do
query =
from(
lt in LifecycleTransaction,
select: {lt.hash, lt.id},
where: lt.hash in ^l1_tx_hashes
)
Repo.all(query, timeout: :infinity)
end
@doc """ @doc """
Reads a list of L1 transactions by their hashes from the `arbitrum_lifecycle_l1_transactions` table. Reads a list of L1 transactions by their hashes from the `arbitrum_lifecycle_l1_transactions` table.
@ -183,15 +206,15 @@ defmodule Explorer.Chain.Arbitrum.Reader do
- `l1_tx_hashes`: A list of hashes to retrieve L1 transactions for. - `l1_tx_hashes`: A list of hashes to retrieve L1 transactions for.
## Returns ## Returns
- A list of `Explorer.Chain.Arbitrum.LifecycleTransaction` corresponding to the hashes from - A list of `Explorer.Chain.Arbitrum.LifecycleTransaction` corresponding to the
the input list. The output list may be smaller than the input list. hashes from the input list. The output list may be smaller than the input
list.
""" """
@spec lifecycle_transactions(maybe_improper_list(Hash.t(), [])) :: [LifecycleTransaction] @spec lifecycle_transactions([binary()]) :: [LifecycleTransaction.t()]
def lifecycle_transactions(l1_tx_hashes) when is_list(l1_tx_hashes) do def lifecycle_transactions(l1_tx_hashes) when is_list(l1_tx_hashes) do
query = query =
from( from(
lt in LifecycleTransaction, lt in LifecycleTransaction,
select: {lt.hash, lt.id},
where: lt.hash in ^l1_tx_hashes where: lt.hash in ^l1_tx_hashes
) )

@ -34,16 +34,23 @@ defmodule Indexer.Fetcher.Arbitrum.Utils.Db do
the key `:id`, representing the index of the L1 transaction in the the key `:id`, representing the index of the L1 transaction in the
`arbitrum_lifecycle_l1_transactions` table. `arbitrum_lifecycle_l1_transactions` table.
""" """
@spec get_indices_for_l1_transactions(map()) :: map() @spec get_indices_for_l1_transactions(%{
binary() => %{
:hash => binary(),
:block_number => FullBlock.block_number(),
:timestamp => DateTime.t(),
:status => :unfinalized | :finalized,
optional(:id) => non_neg_integer()
}
}) :: %{binary() => Arbitrum.LifecycleTransaction.to_import()}
# TODO: consider a way to remove duplicate with ZkSync.Utils.Db # TODO: consider a way to remove duplicate with ZkSync.Utils.Db
# credo:disable-for-next-line Credo.Check.Design.DuplicatedCode
def get_indices_for_l1_transactions(new_l1_txs) def get_indices_for_l1_transactions(new_l1_txs)
when is_map(new_l1_txs) do when is_map(new_l1_txs) do
# Get indices for l1 transactions previously handled # Get indices for l1 transactions previously handled
l1_txs = l1_txs =
new_l1_txs new_l1_txs
|> Map.keys() |> Map.keys()
|> Reader.lifecycle_transactions() |> Reader.lifecycle_transaction_ids()
|> Enum.reduce(new_l1_txs, fn {hash, id}, txs -> |> Enum.reduce(new_l1_txs, fn {hash, id}, txs ->
{_, txs} = {_, txs} =
Map.get_and_update!(txs, hash.bytes, fn l1_tx -> Map.get_and_update!(txs, hash.bytes, fn l1_tx ->
@ -79,6 +86,25 @@ defmodule Indexer.Fetcher.Arbitrum.Utils.Db do
updated_l1_txs updated_l1_txs
end end
@doc """
Reads a list of L1 transactions by their hashes from the
`arbitrum_lifecycle_l1_transactions` table and converts them to maps.
## Parameters
- `l1_tx_hashes`: A list of hashes to retrieve L1 transactions for.
## Returns
- A list of maps representing the `Explorer.Chain.Arbitrum.LifecycleTransaction`
corresponding to the hashes from the input list. The output list is
compatible with the database import operation.
"""
@spec lifecycle_transactions([binary()]) :: [Arbitrum.LifecycleTransaction.to_import()]
def lifecycle_transactions(l1_tx_hashes) do
l1_tx_hashes
|> Reader.lifecycle_transactions()
|> Enum.map(&lifecycle_transaction_to_map/1)
end
@doc """ @doc """
Calculates the next L1 block number to search for the latest committed batch. Calculates the next L1 block number to search for the latest committed batch.
@ -719,6 +745,7 @@ defmodule Indexer.Fetcher.Arbitrum.Utils.Db do
Chain.timestamp_to_block_number(timestamp, :after, false) Chain.timestamp_to_block_number(timestamp, :after, false)
end end
@spec lifecycle_transaction_to_map(Arbitrum.LifecycleTransaction.t()) :: Arbitrum.LifecycleTransaction.to_import()
defp lifecycle_transaction_to_map(tx) do defp lifecycle_transaction_to_map(tx) do
[:id, :hash, :block_number, :timestamp, :status] [:id, :hash, :block_number, :timestamp, :status]
|> db_record_to_map(tx) |> db_record_to_map(tx)

@ -448,13 +448,20 @@ defmodule Indexer.Fetcher.Arbitrum.Workers.NewBatches do
|> parse_logs_to_get_batch_numbers() |> parse_logs_to_get_batch_numbers()
|> Db.batches_exist() |> Db.batches_exist()
{batches, txs_requests, blocks_requests} = parse_logs_for_new_batches(logs, existing_batches) {batches, txs_requests, blocks_requests, existing_commitment_txs} =
parse_logs_for_new_batches(logs, existing_batches)
blocks_to_ts = Rpc.execute_blocks_requests_and_get_ts(blocks_requests, json_rpc_named_arguments, chunk_size) blocks_to_ts = Rpc.execute_blocks_requests_and_get_ts(blocks_requests, json_rpc_named_arguments, chunk_size)
{lifecycle_txs_wo_indices, batches_to_import} = {initial_lifecycle_txs, batches_to_import} =
execute_tx_requests_parse_txs_calldata(txs_requests, msg_to_block_shift, blocks_to_ts, batches, l1_rpc_config) execute_tx_requests_parse_txs_calldata(txs_requests, msg_to_block_shift, blocks_to_ts, batches, l1_rpc_config)
# Check if the commitment transactions for the batches which are already in the database
# needs to be updated in case of reorgs
lifecycle_txs_wo_indices =
initial_lifecycle_txs
|> Map.merge(update_lifecycle_txs_for_new_blocks(existing_commitment_txs, blocks_to_ts))
{blocks_to_import, rollup_txs_to_import} = get_rollup_blocks_and_transactions(batches_to_import, rollup_rpc_config) {blocks_to_import, rollup_txs_to_import} = get_rollup_blocks_and_transactions(batches_to_import, rollup_rpc_config)
lifecycle_txs = lifecycle_txs =
@ -482,14 +489,20 @@ defmodule Indexer.Fetcher.Arbitrum.Workers.NewBatches do
] ]
end) end)
committed_txs = # It is safe to not re-mark messages as committed for the batches that are already in the database
blocks_to_import committed_messages =
|> Map.keys() if Enum.empty?(blocks_to_import) do
|> Enum.max() []
|> get_committed_l2_to_l1_messages() else
# Without check on the empty list of keys `Enum.max()` will raise an error
blocks_to_import
|> Map.keys()
|> Enum.max()
|> get_committed_l2_to_l1_messages()
end
{batches_list_to_import, Map.values(lifecycle_txs), Map.values(blocks_to_import), rollup_txs_to_import, {batches_list_to_import, Map.values(lifecycle_txs), Map.values(blocks_to_import), rollup_txs_to_import,
committed_txs} committed_messages}
end end
# Extracts batch numbers from logs of SequencerBatchDelivered events. # Extracts batch numbers from logs of SequencerBatchDelivered events.
@ -506,8 +519,10 @@ defmodule Indexer.Fetcher.Arbitrum.Workers.NewBatches do
# This function sifts through logs of SequencerBatchDelivered events, extracts the # This function sifts through logs of SequencerBatchDelivered events, extracts the
# necessary data, and assembles a map of new batch descriptions. Additionally, it # necessary data, and assembles a map of new batch descriptions. Additionally, it
# prepares RPC `eth_getTransactionByHash` and `eth_getBlockByNumber` requests to # prepares RPC `eth_getTransactionByHash` and `eth_getBlockByNumber` requests to
# fetch details not present in the logs. To minimize subsequent RPC calls, only # fetch details not present in the logs. To minimize subsequent RPC calls, requests to
# batches not previously known (i.e., absent in `existing_batches`) are processed. # get the transactions details are only made for batches not previously known.
# For the existing batches, the function prepares a map of commitment transactions
# assuming that they must be updated if reorgs occur.
# #
# ## Parameters # ## Parameters
# - `logs`: A list of event logs to be processed. # - `logs`: A list of event logs to be processed.
@ -520,50 +535,66 @@ defmodule Indexer.Fetcher.Arbitrum.Workers.NewBatches do
# the L1 transactions associated with these batches. # the L1 transactions associated with these batches.
# - A list of RPC requests to fetch details of the L1 blocks where these batches # - A list of RPC requests to fetch details of the L1 blocks where these batches
# were included. # were included.
# - A map of commitment transactions for the existing batches where the value is
# the block number of the transaction.
@spec parse_logs_for_new_batches(
[%{String.t() => any()}],
[non_neg_integer()]
) :: {
%{:number => non_neg_integer(), :before_acc => binary(), :after_acc => binary(), :tx_hash => binary()},
[EthereumJSONRPC.Transport.request()],
[EthereumJSONRPC.Transport.request()],
%{binary() => non_neg_integer()}
}
defp parse_logs_for_new_batches(logs, existing_batches) do defp parse_logs_for_new_batches(logs, existing_batches) do
{batches, txs_requests, blocks_requests} = {batches, txs_requests, blocks_requests, existing_commitment_txs} =
logs logs
|> Enum.reduce({%{}, [], %{}}, fn event, {batches, txs_requests, blocks_requests} -> |> Enum.reduce({%{}, [], %{}, %{}}, fn event, {batches, txs_requests, blocks_requests, existing_commitment_txs} ->
{batch_num, before_acc, after_acc} = sequencer_batch_delivered_event_parse(event) {batch_num, before_acc, after_acc} = sequencer_batch_delivered_event_parse(event)
tx_hash_raw = event["transactionHash"] tx_hash_raw = event["transactionHash"]
tx_hash = Rpc.string_hash_to_bytes_hash(tx_hash_raw) tx_hash = Rpc.string_hash_to_bytes_hash(tx_hash_raw)
blk_num = quantity_to_integer(event["blockNumber"]) blk_num = quantity_to_integer(event["blockNumber"])
if batch_num in existing_batches do {updated_batches, updated_txs_requests, updated_existing_commitment_txs} =
{batches, txs_requests, blocks_requests} if batch_num in existing_batches do
else {batches, txs_requests, Map.put(existing_commitment_txs, tx_hash, blk_num)}
updated_batches = else
Map.put( log_info("New batch #{batch_num} found in #{tx_hash_raw}")
batches,
batch_num, updated_batches =
%{ Map.put(
number: batch_num, batches,
before_acc: before_acc, batch_num,
after_acc: after_acc, %{
tx_hash: tx_hash number: batch_num,
} before_acc: before_acc,
) after_acc: after_acc,
tx_hash: tx_hash
updated_txs_requests = [ }
Rpc.transaction_by_hash_request(%{id: 0, hash: tx_hash_raw}) )
| txs_requests
] updated_txs_requests = [
Rpc.transaction_by_hash_request(%{id: 0, hash: tx_hash_raw})
updated_blocks_requests = | txs_requests
Map.put( ]
blocks_requests,
blk_num, {updated_batches, updated_txs_requests, existing_commitment_txs}
BlockByNumber.request(%{id: 0, number: blk_num}, false, true) end
)
# In order to have an ability to update commitment transaction for the existing batches
log_info("New batch #{batch_num} found in #{tx_hash_raw}") # in case of reorgs, we need to re-execute the block requests
updated_blocks_requests =
Map.put(
blocks_requests,
blk_num,
BlockByNumber.request(%{id: 0, number: blk_num}, false, true)
)
{updated_batches, updated_txs_requests, updated_blocks_requests} {updated_batches, updated_txs_requests, updated_blocks_requests, updated_existing_commitment_txs}
end
end) end)
{batches, txs_requests, Map.values(blocks_requests)} {batches, txs_requests, Map.values(blocks_requests), existing_commitment_txs}
end end
# Parses SequencerBatchDelivered event to get batch sequence number and associated accumulators # Parses SequencerBatchDelivered event to get batch sequence number and associated accumulators
@ -693,6 +724,51 @@ defmodule Indexer.Fetcher.Arbitrum.Workers.NewBatches do
end end
end end
# Updates lifecycle transactions for new blocks by setting the block number and
# timestamp for each transaction.
#
# The function checks if a transaction's block number and timestamp match the
# new values. If they do not, the transaction is updated with the new block
# number and timestamp.
#
# Parameters:
# - `existing_commitment_txs`: A map where keys are transaction hashes and
# values are block numbers.
# - `block_to_ts`: A map where keys are block numbers and values are timestamps.
#
# Returns:
# - A map where keys are transaction hashes and values are updated lifecycle
# transactions with the block number and timestamp set, compatible with the
# database import operation.
@spec update_lifecycle_txs_for_new_blocks(%{binary() => non_neg_integer()}, %{non_neg_integer() => non_neg_integer()}) ::
%{binary() => Arbitrum.LifecycleTransaction.to_import()}
defp update_lifecycle_txs_for_new_blocks(existing_commitment_txs, block_to_ts) do
existing_commitment_txs
|> Map.keys()
|> Db.lifecycle_transactions()
|> Enum.reduce(%{}, fn tx, txs ->
block_num = existing_commitment_txs[tx.hash]
ts = block_to_ts[block_num]
if tx.block_number == block_num and DateTime.compare(tx.timestamp, ts) == :eq do
txs
else
log_info(
"The commitment transaction 0x#{tx.hash |> Base.encode16(case: :lower)} will be updated with the new block number and timestamp"
)
Map.put(
txs,
tx.hash,
Map.merge(tx, %{
block_number: block_num,
timestamp: ts
})
)
end
end)
end
# Retrieves rollup blocks and transactions for a list of batches. # Retrieves rollup blocks and transactions for a list of batches.
# #
# This function extracts rollup block ranges from each batch's data to determine # This function extracts rollup block ranges from each batch's data to determine
@ -706,8 +782,24 @@ defmodule Indexer.Fetcher.Arbitrum.Workers.NewBatches do
# #
# ## Returns # ## Returns
# - A tuple containing: # - A tuple containing:
# - A map of rollup blocks, ready for database import. # - A map of rollup blocks, where each block is ready for database import.
# - A list of rollup transactions, ready for database import. # - A list of rollup transactions, ready for database import.
@spec get_rollup_blocks_and_transactions(
%{
non_neg_integer() => %{
:number => non_neg_integer(),
:start_block => non_neg_integer(),
:end_block => non_neg_integer(),
optional(any()) => any()
}
},
%{
:json_rpc_named_arguments => EthereumJSONRPC.json_rpc_named_arguments(),
:chunk_size => non_neg_integer(),
optional(any()) => any()
}
) ::
{%{non_neg_integer() => Arbitrum.BatchBlock.to_import()}, [Arbitrum.BatchTransaction.to_import()]}
defp get_rollup_blocks_and_transactions( defp get_rollup_blocks_and_transactions(
batches, batches,
rollup_rpc_config rollup_rpc_config
@ -715,31 +807,36 @@ defmodule Indexer.Fetcher.Arbitrum.Workers.NewBatches do
blocks_to_batches = unwrap_rollup_block_ranges(batches) blocks_to_batches = unwrap_rollup_block_ranges(batches)
required_blocks_numbers = Map.keys(blocks_to_batches) required_blocks_numbers = Map.keys(blocks_to_batches)
log_debug("Identified #{length(required_blocks_numbers)} rollup blocks")
{blocks_to_import_map, txs_to_import_list} =
get_rollup_blocks_and_txs_from_db(required_blocks_numbers, blocks_to_batches)
# While it's not entirely aligned with data integrity principles to recover
# rollup blocks and transactions from RPC that are not yet indexed, it's
# a practical compromise to facilitate the progress of batch discovery. Given
# the potential high frequency of new batch appearances and the substantial
# volume of blocks and transactions, prioritizing discovery process advancement
# is deemed reasonable.
{blocks_to_import, txs_to_import} =
recover_data_if_necessary(
blocks_to_import_map,
txs_to_import_list,
required_blocks_numbers,
blocks_to_batches,
rollup_rpc_config
)
log_info( if required_blocks_numbers == [] do
"Found #{length(Map.keys(blocks_to_import))} rollup blocks and #{length(txs_to_import)} rollup transactions in DB" {%{}, []}
) else
log_debug("Identified #{length(required_blocks_numbers)} rollup blocks")
{blocks_to_import_map, txs_to_import_list} =
get_rollup_blocks_and_txs_from_db(required_blocks_numbers, blocks_to_batches)
# While it's not entirely aligned with data integrity principles to recover
# rollup blocks and transactions from RPC that are not yet indexed, it's
# a practical compromise to facilitate the progress of batch discovery. Given
# the potential high frequency of new batch appearances and the substantial
# volume of blocks and transactions, prioritizing discovery process advancement
# is deemed reasonable.
{blocks_to_import, txs_to_import} =
recover_data_if_necessary(
blocks_to_import_map,
txs_to_import_list,
required_blocks_numbers,
blocks_to_batches,
rollup_rpc_config
)
{blocks_to_import, txs_to_import} log_info(
"Found #{length(Map.keys(blocks_to_import))} rollup blocks and #{length(txs_to_import)} rollup transactions in DB"
)
{blocks_to_import, txs_to_import}
end
end end
# Unwraps rollup block ranges from batch data to create a block-to-batch number map. # Unwraps rollup block ranges from batch data to create a block-to-batch number map.

Loading…
Cancel
Save