@ -1,5 +1,6 @@
use std ::cmp ::Ordering ;
use std ::fmt ::Debug ;
use std ::ops ::RangeInclusive ;
use std ::{
sync ::Arc ,
time ::{ Duration , Instant } ,
@ -12,9 +13,9 @@ use tokio::time::sleep;
use tracing ::{ debug , warn } ;
use hyperlane_core ::{
BlockRange , ChainResult , ContractSyncCursor , CursorAction , HyperlaneMessage ,
HyperlaneMessageStore , HyperlaneWatermarkedLogStore , IndexMode , IndexRange , Index er , LogMeta ,
MessageIndexer , SequenceRange ,
ChainCommunicationError , ChainResult , ContractSyncCursor , CursorAction , HyperlaneMessage ,
HyperlaneMessageStore , HyperlaneWatermarkedLogStore , IndexMode , Indexer , LogMeta ,
SequenceIndexer ,
} ;
use crate ::contract_sync ::eta_calculator ::SyncerEtaCalculator ;
@ -28,15 +29,110 @@ const MAX_SEQUENCE_RANGE: u32 = 100;
/// message sync cursors.
#[ derive(Debug, new) ]
pub ( crate ) struct MessageSyncCursor {
indexer : Arc < dyn MessageIndexer > ,
indexer : Arc < dyn SequenceIndexer < HyperlaneMessage > > ,
db : Arc < dyn HyperlaneMessageStore > ,
sync_state : SyncState ,
}
#[ derive(Debug, new) ]
pub ( crate ) struct SyncState {
chunk_size : u32 ,
/// The starting block for the cursor
start_block : u32 ,
/// The next block that should be indexed.
next_block : u32 ,
/// The next nonce that the cursor is looking for.
next_nonce : u32 ,
mode : IndexMode ,
/// The next sequence index that the cursor is looking for.
/// In the EVM, this is used for optimizing indexing,
/// because it's cheaper to make read calls for the sequence index than
/// to call `eth_getLogs` with a block range.
/// In Sealevel, historic queries aren't supported, so the sequence field
/// is used to query storage in sequence.
next_sequence : u32 ,
direction : SyncDirection ,
}
impl SyncState {
async fn get_next_range (
& mut self ,
max_sequence : Option < u32 > ,
tip : u32 ,
) -> ChainResult < Option < RangeInclusive < u32 > > > {
// We attempt to index a range of blocks that is as large as possible.
let range = match self . mode {
IndexMode ::Block = > self . block_range ( tip ) ,
IndexMode ::Sequence = > {
let max_sequence = max_sequence . ok_or_else ( | | {
ChainCommunicationError ::from_other_str (
"Sequence indexing requires a max sequence" ,
)
} ) ? ;
if let Some ( range ) = self . sequence_range ( tip , max_sequence ) ? {
range
} else {
return Ok ( None ) ;
}
}
} ;
if range . is_empty ( ) {
return Ok ( None ) ;
}
Ok ( Some ( range ) )
}
fn block_range ( & mut self , tip : u32 ) -> RangeInclusive < u32 > {
let ( from , to ) = match self . direction {
SyncDirection ::Forward = > {
let from = self . next_block ;
let mut to = from + self . chunk_size ;
to = u32 ::min ( to , tip ) ;
self . next_block = to + 1 ;
( from , to )
}
SyncDirection ::Backward = > {
let to = self . next_block ;
let from = to . saturating_sub ( self . chunk_size ) ;
self . next_block = from . saturating_sub ( 1 ) ;
( from , to )
}
} ;
from ..= to
}
/// Returns the next sequence range to index.
///
/// # Arguments
///
/// * `tip` - The current tip of the chain.
/// * `max_sequence` - The maximum sequence that should be indexed.
/// `max_sequence` is the exclusive upper bound of the range to be indexed.
/// (e.g. `0..max_sequence`)
fn sequence_range (
& mut self ,
tip : u32 ,
max_sequence : u32 ,
) -> ChainResult < Option < RangeInclusive < u32 > > > {
let ( from , to ) = match self . direction {
SyncDirection ::Forward = > {
let sequence_start = self . next_sequence ;
let mut sequence_end = sequence_start + MAX_SEQUENCE_RANGE ;
if self . next_sequence > = max_sequence {
return Ok ( None ) ;
}
sequence_end = u32 ::min ( sequence_end , max_sequence . saturating_sub ( 1 ) ) ;
self . next_block = tip ;
self . next_sequence = sequence_end + 1 ;
( sequence_start , sequence_end )
}
SyncDirection ::Backward = > {
let sequence_end = self . next_sequence ;
let sequence_start = sequence_end . saturating_sub ( MAX_SEQUENCE_RANGE ) ;
self . next_sequence = sequence_start . saturating_sub ( 1 ) ;
( sequence_start , sequence_end )
}
} ;
Ok ( Some ( from ..= to ) )
}
}
impl MessageSyncCursor {
@ -59,19 +155,23 @@ impl MessageSyncCursor {
async fn update (
& mut self ,
logs : Vec < ( HyperlaneMessage , LogMeta ) > ,
prev_no nce : u32 ,
prev_seque nce : u32 ,
) -> Result < ( ) > {
// If we found messages, but did *not* find the message we were looking for,
// we need to rewind to the block at which we found the last message.
if ! logs . is_empty ( ) & & ! logs . iter ( ) . any ( | m | m . 0. nonce = = self . next_nonce ) {
warn ! ( next_nonce = ? self . next_nonce , "Target nonce not found, rewinding" ) ;
if ! logs . is_empty ( )
& & ! logs
. iter ( )
. any ( | m | m . 0. nonce = = self . sync_state . next_sequence )
{
warn ! ( next_nonce = ? self . sync_state . next_sequence , "Target nonce not found, rewinding" ) ;
// If the previous nonce has been synced, rewind to the block number
// at which it was dispatched. Otherwise, rewind all the way back to the start block.
if let Some ( block_number ) = self . retrieve_dispatched_block_number ( prev_nonce ) . await {
self . next_block = block_number ;
if let Some ( block_number ) = self . retrieve_dispatched_block_number ( prev_seque nce ) . await {
self . sync_state . next_block = block_number ;
warn ! ( block_number , "Rewound to previous known message" ) ;
} else {
self . next_block = self . start_block ;
self . sync_state . next_block = self . sync_state . start_block ;
}
Ok ( ( ) )
} else {
@ -81,75 +181,87 @@ impl MessageSyncCursor {
}
/// A MessageSyncCursor that syncs forwards in perpetuity.
#[ derive(new) ]
pub ( crate ) struct ForwardMessageSyncCursor {
cursor : MessageSyncCursor ,
mode : IndexMode ,
}
impl ForwardMessageSyncCursor {
async fn get_next_range ( & mut self ) -> ChainResult < Option < IndexRange > > {
pub fn new (
indexer : Arc < dyn SequenceIndexer < HyperlaneMessage > > ,
db : Arc < dyn HyperlaneMessageStore > ,
chunk_size : u32 ,
start_block : u32 ,
next_block : u32 ,
mode : IndexMode ,
next_sequence : u32 ,
) -> Self {
Self {
cursor : MessageSyncCursor ::new (
indexer ,
db ,
SyncState ::new (
chunk_size ,
start_block ,
next_block ,
mode ,
next_sequence ,
SyncDirection ::Forward ,
) ,
) ,
}
}
async fn get_next_range ( & mut self ) -> ChainResult < Option < RangeInclusive < u32 > > > {
// Check if any new messages have been inserted into the DB,
// and update the cursor accordingly.
while self
. cursor
. retrieve_message_by_nonce ( self . cursor . next_nonce )
. retrieve_message_by_nonce ( self . cursor . sync_state . next_seque nce)
. await
. is_some ( )
{
if let Some ( block_number ) = self
. cursor
. retrieve_dispatched_block_number ( self . cursor . next_nonce )
. retrieve_dispatched_block_number ( self . cursor . sync_state . next_seque nce)
. await
{
debug ! ( next_block = block_number , "Fast forwarding next block" ) ;
// It's possible that eth_getLogs dropped logs from this block, therefore we cannot do block_number + 1.
self . cursor . next_block = block_number ;
self . cursor . sync_state . next_block = block_number ;
}
debug ! (
next_nonce = self . cursor . next_nonce + 1 ,
next_nonce = self . cursor . sync_state . next_seque nce + 1 ,
"Fast forwarding next nonce"
) ;
self . cursor . next_nonce + = 1 ;
self . cursor . sync_state . next_seque nce + = 1 ;
}
let ( mailbox_count , tip ) = self . cursor . indexer . fetch_count_at_tip ( ) . await ? ;
let cursor_count = self . cursor . next_nonce ;
let cmp = cursor_count . cmp ( & mailbox_count ) ;
match cmp {
let ( Some ( mailbox_count ) , tip ) = self . cursor . indexer . sequence_and_tip ( ) . await ?
else {
return Ok ( None ) ;
} ;
let cursor_count = self . cursor . sync_state . next_sequence ;
Ok ( match cursor_count . cmp ( & mailbox_count ) {
Ordering ::Equal = > {
// We are synced up to the latest nonce so we don't need to index anything.
// We update our next block number accordingly.
self . cursor . next_block = tip ;
Ok ( None )
self . cursor . sync_state . next_block = tip ;
None
}
Ordering ::Less = > {
// The cursor is behind the mailbox, so we need to index some blocks.
// We attempt to index a range of blocks that is as large as possible.
let from = self . cursor . next_block ;
let to = u32 ::min ( tip , from + self . cursor . chunk_size ) ;
self . cursor . next_block = to + 1 ;
let range = match self . mode {
IndexMode ::Block = > BlockRange ( from ..= to ) ,
IndexMode ::Sequence = > SequenceRange (
cursor_count
..= u32 ::min (
mailbox_count . saturating_sub ( 1 ) ,
cursor_count + MAX_SEQUENCE_RANGE ,
) ,
) ,
} ;
Ok ( Some ( range ) )
self . cursor
. sync_state
. get_next_range ( Some ( mailbox_count ) , tip )
. await ?
}
Ordering ::Greater = > {
// Providers may be internally inconsistent, e.g. RPC request A could hit a node
// whose tip is N and subsequent RPC request B could hit a node whose tip is < N.
debug ! ( "Cursor count is greater than Mailbox count" ) ;
Ok ( None )
None
}
}
} )
}
}
@ -167,99 +279,115 @@ impl ContractSyncCursor<HyperlaneMessage> for ForwardMessageSyncCursor {
}
fn latest_block ( & self ) -> u32 {
self . cursor . next_block . saturating_sub ( 1 )
self . cursor . sync_state . next_block . saturating_sub ( 1 )
}
/// If the previous block has been synced, rewind to the block number
/// at which it was dispatched.
/// Otherwise, rewind all the way back to the start block.
async fn update ( & mut self , logs : Vec < ( HyperlaneMessage , LogMeta ) > ) -> Result < ( ) > {
let prev_nonce = self . cursor . next_no nce. saturating_sub ( 1 ) ;
let prev_nonce = self . cursor . sync_state . next_seque nce. saturating_sub ( 1 ) ;
// We may wind up having re-indexed messages that are previous to the nonce that we are looking for.
// We should not consider these messages when checking for continuity errors.
let filtered_logs = logs
. into_iter ( )
. filter ( | m | m . 0. nonce > = self . cursor . next_no nce)
. filter ( | m | m . 0. nonce > = self . cursor . sync_state . next_seque nce)
. collect ( ) ;
self . cursor . update ( filtered_logs , prev_nonce ) . await
}
}
/// A MessageSyncCursor that syncs backwards to nonce zero.
#[ derive(new) ]
/// A MessageSyncCursor that syncs backwards to sequence (nonce) zero.
pub ( crate ) struct BackwardMessageSyncCursor {
cursor : MessageSyncCursor ,
synced : bool ,
mode : IndexMode ,
}
impl BackwardMessageSyncCursor {
async fn get_next_range ( & mut self ) -> Option < IndexRange > {
#[ allow(clippy::too_many_arguments) ]
pub fn new (
indexer : Arc < dyn SequenceIndexer < HyperlaneMessage > > ,
db : Arc < dyn HyperlaneMessageStore > ,
chunk_size : u32 ,
start_block : u32 ,
next_block : u32 ,
mode : IndexMode ,
next_sequence : u32 ,
synced : bool ,
) -> Self {
Self {
cursor : MessageSyncCursor ::new (
indexer ,
db ,
SyncState ::new (
chunk_size ,
start_block ,
next_block ,
mode ,
next_sequence ,
SyncDirection ::Backward ,
) ,
) ,
synced ,
}
}
async fn get_next_range ( & mut self ) -> ChainResult < Option < RangeInclusive < u32 > > > {
// Check if any new messages have been inserted into the DB,
// and update the cursor accordingly.
while ! self . synced {
if self
. cursor
. retrieve_message_by_nonce ( self . cursor . next_nonce )
. retrieve_message_by_nonce ( self . cursor . sync_state . next_seque nce)
. await
. is_none ( )
{
break ;
} ;
// If we found nonce zero or hit block zero, we are done rewinding.
if self . cursor . next_nonce = = 0 | | self . cursor . next_block = = 0 {
// If we found seque nce zero or hit block zero, we are done rewinding.
if self . cursor . sync_state . next_seque nce = = 0 | | self . cursor . sync_state . next_block = = 0 {
self . synced = true ;
break ;
}
if let Some ( block_number ) = self
. cursor
. retrieve_dispatched_block_number ( self . cursor . next_nonce )
. retrieve_dispatched_block_number ( self . cursor . sync_state . next_seque nce)
. await
{
// It's possible that eth_getLogs dropped logs from this block, therefore we cannot do block_number - 1.
self . cursor . next_block = block_number ;
self . cursor . sync_state . next_block = block_number ;
}
self . cursor . next_nonce = self . cursor . next_nonce . saturating_sub ( 1 ) ;
self . cursor . sync_state . next_sequence =
self . cursor . sync_state . next_sequence . saturating_sub ( 1 ) ;
}
if self . synced {
return None ;
return Ok ( None ) ;
}
// Just keep going backwards.
let to = self . cursor . next_block ;
let from = to . saturating_sub ( self . cursor . chunk_size ) ;
self . cursor . next_block = from . saturating_sub ( 1 ) ;
let next_nonce = self . cursor . next_nonce ;
let range = match self . mode {
IndexMode ::Block = > BlockRange ( from ..= to ) ,
IndexMode ::Sequence = > {
SequenceRange ( next_nonce . saturating_sub ( MAX_SEQUENCE_RANGE ) ..= next_nonce )
}
} ;
Some ( range )
let ( count , tip ) = self . cursor . indexer . sequence_and_tip ( ) . await ? ;
self . cursor . sync_state . get_next_range ( count , tip ) . await
}
/// If the previous block has been synced, rewind to the block number
/// at which it was dispatched.
/// Otherwise, rewind all the way back to the start block.
async fn update ( & mut self , logs : Vec < ( HyperlaneMessage , LogMeta ) > ) -> Result < ( ) > {
let prev_no nce = self . cursor . next_no nce. saturating_add ( 1 ) ;
// We may wind up having re-indexed messages that are previous to the nonce that we are looking for.
let prev_sequence = self . cursor . sync_state . next_sequence . saturating_add ( 1 ) ;
// We may wind up having re-indexed messages that are previous to the sequence (nonce) that we are looking for.
// We should not consider these messages when checking for continuity errors.
let filtered_logs = logs
. into_iter ( )
. filter ( | m | m . 0. nonce < = self . cursor . next_no nce)
. filter ( | m | m . 0. nonce < = self . cursor . sync_state . next_sequence )
. collect ( ) ;
self . cursor . update ( filtered_logs , prev_no nce ) . await
self . cursor . update ( filtered_logs , prev_seque nce ) . await
}
}
enum SyncDirection {
#[ derive(Debug) ]
pub enum SyncDirection {
Forward ,
Backward ,
}
@ -274,28 +402,33 @@ pub(crate) struct ForwardBackwardMessageSyncCursor {
impl ForwardBackwardMessageSyncCursor {
/// Construct a new contract sync helper.
pub async fn new (
indexer : Arc < dyn MessageIndexer > ,
indexer : Arc < dyn SequenceIndexer < HyperlaneMessage > > ,
db : Arc < dyn HyperlaneMessageStore > ,
chunk_size : u32 ,
mode : IndexMode ,
) -> Result < Self > {
let ( count , tip ) = indexer . fetch_count_at_tip ( ) . await ? ;
let ( count , tip ) = indexer . sequence_and_tip ( ) . await ? ;
let count = count . ok_or ( ChainCommunicationError ::from_other_str (
"Failed to query message count" ,
) ) ? ;
let forward_cursor = ForwardMessageSyncCursor ::new (
MessageSyncCursor ::new ( indexer . clone ( ) , db . clone ( ) , chunk_size , tip , tip , count ) ,
indexer . clone ( ) ,
db . clone ( ) ,
chunk_size ,
tip ,
tip ,
mode ,
count ,
) ;
let backward_cursor = BackwardMessageSyncCursor ::new (
MessageSyncCursor ::new (
indexer . clone ( ) ,
db . clone ( ) ,
chunk_size ,
tip ,
tip ,
count . saturating_sub ( 1 ) ,
) ,
count = = 0 ,
indexer . clone ( ) ,
db . clone ( ) ,
chunk_size ,
tip ,
tip ,
mode ,
count . saturating_sub ( 1 ) ,
count = = 0 ,
) ;
Ok ( Self {
forward : forward_cursor ,
@ -316,7 +449,7 @@ impl ContractSyncCursor<HyperlaneMessage> for ForwardBackwardMessageSyncCursor {
return Ok ( ( CursorAction ::Query ( forward_range ) , eta ) ) ;
}
if let Some ( backward_range ) = self . backward . get_next_range ( ) . await {
if let Some ( backward_range ) = self . backward . get_next_range ( ) . await ? {
self . direction = SyncDirection ::Backward ;
return Ok ( ( CursorAction ::Query ( backward_range ) , eta ) ) ;
}
@ -325,7 +458,7 @@ impl ContractSyncCursor<HyperlaneMessage> for ForwardBackwardMessageSyncCursor {
}
fn latest_block ( & self ) -> u32 {
self . forward . cursor . next_block . saturating_sub ( 1 )
self . forward . cursor . sync_state . next_block . saturating_sub ( 1 )
}
async fn update ( & mut self , logs : Vec < ( HyperlaneMessage , LogMeta ) > ) -> Result < ( ) > {
@ -340,41 +473,46 @@ impl ContractSyncCursor<HyperlaneMessage> for ForwardBackwardMessageSyncCursor {
/// queried is and also handling rate limiting. Rate limiting is automatically
/// performed by `next_action`.
pub ( crate ) struct RateLimitedContractSyncCursor < T > {
indexer : Arc < dyn Indexer < T > > ,
indexer : Arc < dyn Sequence Indexer< T > > ,
db : Arc < dyn HyperlaneWatermarkedLogStore < T > > ,
tip : u32 ,
last_tip_update : Instant ,
chunk_size : u32 ,
from : u32 ,
eta_calculator : SyncerEtaCalculator ,
initial_height : u32 ,
sync_state : SyncState ,
}
impl < T > RateLimitedContractSyncCursor < T > {
/// Construct a new contract sync helper.
pub async fn new (
indexer : Arc < dyn Indexer < T > > ,
indexer : Arc < dyn Sequence Indexer< T > > ,
db : Arc < dyn HyperlaneWatermarkedLogStore < T > > ,
chunk_size : u32 ,
initial_height : u32 ,
mode : IndexMode ,
) -> Result < Self > {
let tip = indexer . get_finalized_block_number ( ) . await ? ;
Ok ( Self {
indexer ,
db ,
tip ,
chunk_size ,
last_tip_update : Instant ::now ( ) ,
from : initial_height ,
initial_height ,
eta_calculator : SyncerEtaCalculator ::new ( initial_height , tip , ETA_TIME_WINDOW ) ,
sync_state : SyncState ::new (
chunk_size ,
initial_height ,
initial_height ,
mode ,
Default ::default ( ) ,
// The rate limited cursor currently only syncs in the forward direction.
SyncDirection ::Forward ,
) ,
} )
}
/// Wait based on how close we are to the tip and update the tip,
/// i.e. the highest block we may scrape.
async fn get_rate_limit ( & mut self ) -> ChainResult < Option < Duration > > {
if self . from + self . chunk_size < self . tip {
if self . sync_state . next_block + self . sync_state . chunk_size < self . tip {
// If doing the full chunk wouldn't exceed the already known tip we do not need to rate limit.
Ok ( None )
} else {
@ -409,8 +547,11 @@ where
T : Send + Debug + ' static ,
{
async fn next_action ( & mut self ) -> ChainResult < ( CursorAction , Duration ) > {
let to = u32 ::min ( self . tip , self . from + self . chunk_size ) ;
let from = to . saturating_sub ( self . chunk_size ) ;
let to = u32 ::min (
self . tip ,
self . sync_state . next_block + self . sync_state . chunk_size ,
) ;
let from = to . saturating_sub ( self . sync_state . chunk_size ) ;
let eta = if to < self . tip {
self . eta_calculator . calculate ( from , self . tip )
} else {
@ -418,21 +559,20 @@ where
} ;
let rate_limit = self . get_rate_limit ( ) . await ? ;
let action = if let Some ( rate_limit ) = rate_limit {
CursorAction ::Sleep ( rate_limit )
} else {
self . from = to + 1 ;
// TODO: note at the moment IndexModes are not considered here, and
// block-based indexing is always used.
// This should be changed when Sealevel IGP indexing is implemented,
// along with a refactor to better accommodate indexing modes.
CursorAction ::Query ( BlockRange ( from ..= to ) )
} ;
Ok ( ( action , eta ) )
if let Some ( rate_limit ) = rate_limit {
return Ok ( ( CursorAction ::Sleep ( rate_limit ) , eta ) ) ;
}
let ( count , tip ) = self . indexer . sequence_and_tip ( ) . await ? ;
if let Some ( range ) = self . sync_state . get_next_range ( count , tip ) . await ? {
return Ok ( ( CursorAction ::Query ( range ) , eta ) ) ;
}
// TODO: Define the sleep time from interval flag
Ok ( ( CursorAction ::Sleep ( Duration ::from_secs ( 5 ) ) , eta ) )
}
fn latest_block ( & self ) -> u32 {
self . from . saturating_sub ( 1 )
self . sync_state . next_block . saturating_sub ( 1 )
}
async fn update ( & mut self , _ : Vec < ( T , LogMeta ) > ) -> Result < ( ) > {
@ -440,8 +580,10 @@ where
// safely shared across multiple cursors, so long as they are running sufficiently in sync
self . db
. store_high_watermark ( u32 ::max (
self . initial_height ,
self . from . saturating_sub ( self . chunk_size ) ,
self . sync_state . start_block ,
self . sync_state
. next_block
. saturating_sub ( self . sync_state . chunk_size ) ,
) )
. await ? ;
Ok ( ( ) )