@ -9,7 +9,7 @@ use async_trait::async_trait;
use derive_new ::new ;
use derive_new ::new ;
use eyre ::Result ;
use eyre ::Result ;
use tokio ::time ::sleep ;
use tokio ::time ::sleep ;
use tracing ::warn ;
use tracing ::{ debug , warn } ;
use hyperlane_core ::{
use hyperlane_core ::{
ChainResult , ContractSyncCursor , HyperlaneMessage , HyperlaneMessageStore ,
ChainResult , ContractSyncCursor , HyperlaneMessage , HyperlaneMessageStore ,
@ -61,10 +61,12 @@ impl MessageSyncCursor {
// If we found messages, but did *not* find the message we were looking for,
// If we found messages, but did *not* find the message we were looking for,
// we need to rewind to the block at which we found the last message.
// we need to rewind to the block at which we found the last message.
if ! logs . is_empty ( ) & & ! logs . iter ( ) . any ( | m | m . 0. nonce = = self . next_nonce ) {
if ! logs . is_empty ( ) & & ! logs . iter ( ) . any ( | m | m . 0. nonce = = self . next_nonce ) {
debug ! ( next_nonce = ? self . next_nonce , "Target nonce not found, rewinding" ) ;
// If the previous nonce has been synced, rewind to the block number
// If the previous nonce has been synced, rewind to the block number
// at which it was dispatched. Otherwise, rewind all the way back to the start block.
// at which it was dispatched. Otherwise, rewind all the way back to the start block.
if let Some ( block_number ) = self . retrieve_dispatched_block_number ( prev_nonce ) . await {
if let Some ( block_number ) = self . retrieve_dispatched_block_number ( prev_nonce ) . await {
self . next_block = block_number ;
self . next_block = block_number ;
debug ! ( block_number , "Rewound to previous known message" ) ;
} else {
} else {
self . next_block = self . start_block ;
self . next_block = self . start_block ;
}
}
@ -94,6 +96,7 @@ impl ForwardMessageSyncCursor {
. retrieve_dispatched_block_number ( self . 0. next_nonce )
. retrieve_dispatched_block_number ( self . 0. next_nonce )
. await
. await
{
{
// It's possible that eth_getLogs dropped logs from this block, therefore we cannot do block_number + 1.
self . 0. next_block = block_number ;
self . 0. next_block = block_number ;
}
}
self . 0. next_nonce + = 1 ;
self . 0. next_nonce + = 1 ;
@ -142,7 +145,13 @@ impl ContractSyncCursor<HyperlaneMessage> for ForwardMessageSyncCursor {
/// Otherwise, rewind all the way back to the start block.
/// Otherwise, rewind all the way back to the start block.
async fn update ( & mut self , logs : Vec < ( HyperlaneMessage , LogMeta ) > ) -> eyre ::Result < ( ) > {
async fn update ( & mut self , logs : Vec < ( HyperlaneMessage , LogMeta ) > ) -> eyre ::Result < ( ) > {
let prev_nonce = self . 0. next_nonce . saturating_sub ( 1 ) ;
let prev_nonce = self . 0. next_nonce . saturating_sub ( 1 ) ;
self . 0. update ( logs , prev_nonce ) . await
// We may wind up having re-indexed messages that are previous to the nonce that we are looking for.
// We should not consider these messages when checking for continuity errors.
let filtered_logs = logs
. into_iter ( )
. filter ( | m | m . 0. nonce > = self . 0. next_nonce )
. collect ( ) ;
self . 0. update ( filtered_logs , prev_nonce ) . await
}
}
}
}
@ -177,6 +186,7 @@ impl BackwardMessageSyncCursor {
. retrieve_dispatched_block_number ( self . cursor . next_nonce )
. retrieve_dispatched_block_number ( self . cursor . next_nonce )
. await
. await
{
{
// It's possible that eth_getLogs dropped logs from this block, therefore we cannot do block_number - 1.
self . cursor . next_block = block_number ;
self . cursor . next_block = block_number ;
}
}
@ -199,7 +209,13 @@ impl BackwardMessageSyncCursor {
/// Otherwise, rewind all the way back to the start block.
/// Otherwise, rewind all the way back to the start block.
async fn update ( & mut self , logs : Vec < ( HyperlaneMessage , LogMeta ) > ) -> eyre ::Result < ( ) > {
async fn update ( & mut self , logs : Vec < ( HyperlaneMessage , LogMeta ) > ) -> eyre ::Result < ( ) > {
let prev_nonce = self . cursor . next_nonce . saturating_add ( 1 ) ;
let prev_nonce = self . cursor . next_nonce . saturating_add ( 1 ) ;
self . cursor . update ( logs , prev_nonce ) . await
// We may wind up having re-indexed messages that are previous to the nonce that we are looking for.
// We should not consider these messages when checking for continuity errors.
let filtered_logs = logs
. into_iter ( )
. filter ( | m | m . 0. nonce < = self . cursor . next_nonce )
. collect ( ) ;
self . cursor . update ( filtered_logs , prev_nonce ) . await
}
}
}
}