@ -1,4 +1,5 @@
use std ::{
cmp ::max ,
collections ::HashMap ,
fmt ::{ Debug , Formatter } ,
sync ::Arc ,
@ -8,11 +9,14 @@ use std::{
use async_trait ::async_trait ;
use derive_new ::new ;
use eyre ::Result ;
use hyperlane_base ::{ db ::HyperlaneRocksDB , CoreMetrics } ;
use hyperlane_base ::{
db ::{ HyperlaneRocksDB , ProcessMessage } ,
CoreMetrics ,
} ;
use hyperlane_core ::{ HyperlaneDomain , HyperlaneMessage } ;
use prometheus ::IntGauge ;
use tokio ::sync ::mpsc ::UnboundedSender ;
use tracing ::{ debug , trace } ;
use tracing ::{ debug , instrument , trace } ;
use super ::{ metadata ::AppContextClassifier , op_queue ::QueueOperation , pending_message ::* } ;
use crate ::{ processor ::ProcessorExt , settings ::matching_list ::MatchingList } ;
@ -20,9 +24,7 @@ use crate::{processor::ProcessorExt, settings::matching_list::MatchingList};
/// Finds unprocessed messages from an origin and submits then through a channel
/// for to the appropriate destination.
#[ allow(clippy::too_many_arguments) ]
#[ derive(new) ]
pub struct MessageProcessor {
db : HyperlaneRocksDB ,
whitelist : Arc < MatchingList > ,
blacklist : Arc < MatchingList > ,
metrics : MessageProcessorMetrics ,
@ -32,16 +34,187 @@ pub struct MessageProcessor {
/// Needed context to send a message for each destination chain
destination_ctxs : HashMap < u32 , Arc < MessageContext > > ,
metric_app_contexts : Vec < ( MatchingList , String ) > ,
#[ new(default) ]
message_nonce : u32 ,
nonce_iterator : ForwardBackwardIterator ,
}
#[ derive(Debug) ]
struct ForwardBackwardIterator {
low_nonce_iter : DirectionalNonceIterator ,
high_nonce_iter : DirectionalNonceIterator ,
// here for debugging purposes
_domain : String ,
}
impl ForwardBackwardIterator {
#[ instrument(skip(db), ret) ]
fn new ( db : Arc < dyn ProcessMessage > ) -> Self {
let high_nonce = db . retrieve_highest_seen_message_nonce ( ) . ok ( ) . flatten ( ) ;
let domain = db . domain ( ) . name ( ) . to_owned ( ) ;
let high_nonce_iter = DirectionalNonceIterator ::new (
// If the high nonce is None, we start from the beginning
high_nonce . unwrap_or_default ( ) . into ( ) ,
NonceDirection ::High ,
db . clone ( ) ,
domain . clone ( ) ,
) ;
let mut low_nonce_iter =
DirectionalNonceIterator ::new ( high_nonce , NonceDirection ::Low , db , domain . clone ( ) ) ;
// Decrement the low nonce to avoid processing the same message twice, which causes double counts in metrics
low_nonce_iter . iterate ( ) ;
debug ! (
? low_nonce_iter ,
? high_nonce_iter ,
? domain ,
"Initialized ForwardBackwardIterator"
) ;
Self {
low_nonce_iter ,
high_nonce_iter ,
_domain : domain ,
}
}
async fn try_get_next_message (
& mut self ,
metrics : & MessageProcessorMetrics ,
) -> Result < Option < HyperlaneMessage > > {
loop {
let high_nonce_message_status = self . high_nonce_iter . try_get_next_nonce ( metrics ) ? ;
let low_nonce_message_status = self . low_nonce_iter . try_get_next_nonce ( metrics ) ? ;
// Always prioritize the high nonce message
match ( high_nonce_message_status , low_nonce_message_status ) {
// Keep iterating if only processed messages are found
( MessageStatus ::Processed , _ ) = > {
self . high_nonce_iter . iterate ( ) ;
}
( _ , MessageStatus ::Processed ) = > {
self . low_nonce_iter . iterate ( ) ;
}
// Otherwise return - either a processable message or nothing to process
( MessageStatus ::Processable ( high_nonce_message ) , _ ) = > {
self . high_nonce_iter . iterate ( ) ;
return Ok ( Some ( high_nonce_message ) ) ;
}
( _ , MessageStatus ::Processable ( low_nonce_message ) ) = > {
self . low_nonce_iter . iterate ( ) ;
return Ok ( Some ( low_nonce_message ) ) ;
}
( MessageStatus ::Unindexed , MessageStatus ::Unindexed ) = > return Ok ( None ) ,
}
// This loop may iterate through millions of processed messages, blocking the runtime.
// So, to avoid starving other futures in this task, yield to the runtime
// on each iteration
tokio ::task ::yield_now ( ) . await ;
}
}
}
#[ derive(Debug, Clone, Copy, Default, PartialEq) ]
enum NonceDirection {
#[ default ]
High ,
Low ,
}
#[ derive(new) ]
struct DirectionalNonceIterator {
nonce : Option < u32 > ,
direction : NonceDirection ,
db : Arc < dyn ProcessMessage > ,
domain_name : String ,
}
impl Debug for DirectionalNonceIterator {
fn fmt ( & self , f : & mut Formatter < ' _ > ) -> std ::fmt ::Result {
write! (
f ,
"DirectionalNonceIterator {{ nonce: {:?}, direction: {:?}, domain: {:?} }}" ,
self . nonce , self . direction , self . domain_name
)
}
}
impl DirectionalNonceIterator {
#[ instrument ]
fn iterate ( & mut self ) {
match self . direction {
NonceDirection ::High = > self . nonce = self . nonce . map ( | n | n . saturating_add ( 1 ) ) ,
NonceDirection ::Low = > {
if let Some ( nonce ) = self . nonce {
// once the message with nonce zero is processed, we should stop going backwards
self . nonce = nonce . checked_sub ( 1 ) ;
}
}
}
}
fn try_get_next_nonce (
& mut self ,
metrics : & MessageProcessorMetrics ,
) -> Result < MessageStatus < HyperlaneMessage > > {
if let Some ( message ) = self . indexed_message_with_nonce ( ) ? {
Self ::update_max_nonce_gauge ( & message , metrics ) ;
if ! self . is_message_processed ( ) ? {
return Ok ( MessageStatus ::Processable ( message ) ) ;
} else {
return Ok ( MessageStatus ::Processed ) ;
}
}
Ok ( MessageStatus ::Unindexed )
}
fn update_max_nonce_gauge ( message : & HyperlaneMessage , metrics : & MessageProcessorMetrics ) {
let current_max = metrics . max_last_known_message_nonce_gauge . get ( ) ;
metrics
. max_last_known_message_nonce_gauge
. set ( max ( current_max , message . nonce as i64 ) ) ;
if let Some ( metrics ) = metrics . get ( message . destination ) {
metrics . set ( message . nonce as i64 ) ;
}
}
fn indexed_message_with_nonce ( & self ) -> Result < Option < HyperlaneMessage > > {
match self . nonce {
Some ( nonce ) = > {
let msg = self . db . retrieve_message_by_nonce ( nonce ) ? ;
Ok ( msg )
}
None = > Ok ( None ) ,
}
}
fn is_message_processed ( & self ) -> Result < bool > {
let Some ( nonce ) = self . nonce else {
return Ok ( false ) ;
} ;
let processed = self . db . retrieve_processed_by_nonce ( nonce ) ? . unwrap_or ( false ) ;
if processed {
trace ! (
nonce ,
domain = self . db . domain ( ) . name ( ) ,
"Message already marked as processed in DB"
) ;
}
Ok ( processed )
}
}
#[ derive(Debug) ]
enum MessageStatus < T > {
/// The message wasn't indexed yet so can't be processed.
Unindexed ,
// The message was indexed and is ready to be processed.
Processable ( T ) ,
// The message was indexed and already processed.
Processed ,
}
impl Debug for MessageProcessor {
fn fmt ( & self , f : & mut Formatter < ' _ > ) -> std ::fmt ::Result {
write! (
f ,
"MessageProcessor {{ whitelist: {:?}, blacklist: {:?}, message_nonce: {:?} }}" ,
self . whitelist , self . blacklist , self . message_nonce
"MessageProcessor {{ whitelist: {:?}, blacklist: {:?}, nonce_iterator : {:?}}}" ,
self . whitelist , self . blacklist , self . nonce_iterator
)
}
}
@ -50,7 +223,7 @@ impl Debug for MessageProcessor {
impl ProcessorExt for MessageProcessor {
/// The domain this processor is getting messages from.
fn domain ( & self ) -> & HyperlaneDomain {
self . db . domain ( )
self . nonce_iterator . high_nonce_iter . db . domain ( )
}
/// One round of processing, extracted from infinite work loop for
@ -61,35 +234,31 @@ impl ProcessorExt for MessageProcessor {
// self.tx_msg and then continue the scan at the next highest
// nonce.
// Scan until we find next nonce without delivery confirmation.
if let Some ( msg ) = self . try_get_unprocessed_message ( ) ? {
if let Some ( msg ) = self . try_get_unprocessed_message ( ) . await ? {
debug ! ( ? msg , "Processor working on message" ) ;
let destination = msg . destination ;
// Skip if not whitelisted.
if ! self . whitelist . msg_matches ( & msg , true ) {
debug ! ( ? msg , whitelist = ? self . whitelist , "Message not whitelisted, skipping" ) ;
self . message_nonce + = 1 ;
return Ok ( ( ) ) ;
}
// Skip if the message is blacklisted
if self . blacklist . msg_matches ( & msg , false ) {
debug ! ( ? msg , blacklist = ? self . blacklist , "Message blacklisted, skipping" ) ;
self . message_nonce + = 1 ;
return Ok ( ( ) ) ;
}
// Skip if the message is intended for this origin
if destination = = self . domain ( ) . id ( ) {
debug ! ( ? msg , "Message destined for self, skipping" ) ;
self . message_nonce + = 1 ;
return Ok ( ( ) ) ;
}
// Skip if the message is intended for a destination we do not service
if ! self . send_channels . contains_key ( & destination ) {
debug ! ( ? msg , "Message destined for unknown domain, skipping" ) ;
self . message_nonce + = 1 ;
return Ok ( ( ) ) ;
}
@ -106,7 +275,6 @@ impl ProcessorExt for MessageProcessor {
app_context ,
) ;
self . send_channels [ & destination ] . send ( Box ::new ( pending_msg ) as QueueOperation ) ? ;
self . message_nonce + = 1 ;
} else {
tokio ::time ::sleep ( Duration ::from_secs ( 1 ) ) . await ;
}
@ -115,34 +283,36 @@ impl ProcessorExt for MessageProcessor {
}
impl MessageProcessor {
fn try_get_unprocessed_message ( & mut self ) -> Result < Option < HyperlaneMessage > > {
loop {
// First, see if we can find the message so we can update the gauge.
if let Some ( message ) = self . db . retrieve_message_by_nonce ( self . message_nonce ) ? {
// Update the latest nonce gauges
self . metrics
. max_last_known_message_nonce_gauge
. set ( message . nonce as i64 ) ;
if let Some ( metrics ) = self . metrics . get ( message . destination ) {
metrics . set ( message . nonce as i64 ) ;
}
pub fn new (
db : HyperlaneRocksDB ,
whitelist : Arc < MatchingList > ,
blacklist : Arc < MatchingList > ,
metrics : MessageProcessorMetrics ,
send_channels : HashMap < u32 , UnboundedSender < QueueOperation > > ,
destination_ctxs : HashMap < u32 , Arc < MessageContext > > ,
metric_app_contexts : Vec < ( MatchingList , String ) > ,
) -> Self {
Self {
whitelist ,
blacklist ,
metrics ,
send_channels ,
destination_ctxs ,
metric_app_contexts ,
nonce_iterator : ForwardBackwardIterator ::new ( Arc ::new ( db ) as Arc < dyn ProcessMessage > ) ,
}
}
// If this message has already been processed, on to the next one.
if ! self
. db
. retrieve_processed_by_nonce ( & self . message_nonce ) ?
. unwrap_or ( false )
{
return Ok ( Some ( message ) ) ;
} else {
debug ! ( nonce = ? self . message_nonce , "Message already marked as processed in DB" ) ;
self . message_nonce + = 1 ;
}
} else {
trace ! ( nonce = ? self . message_nonce , "No message found in DB for nonce" ) ;
return Ok ( None ) ;
}
async fn try_get_unprocessed_message ( & mut self ) -> Result < Option < HyperlaneMessage > > {
trace ! ( nonce_iterator = ? self . nonce_iterator , "Trying to get the next processor message" ) ;
let next_message = self
. nonce_iterator
. try_get_next_message ( & self . metrics )
. await ? ;
if next_message . is_none ( ) {
trace ! ( nonce_iterator = ? self . nonce_iterator , "No message found in DB for nonce" ) ;
}
Ok ( next_message )
}
}
@ -197,7 +367,7 @@ mod test {
use super ::* ;
use hyperlane_base ::{
db ::{ test_utils , HyperlaneRocksDB } ,
db ::{ test_utils , DbResult , HyperlaneRocksDB } ,
settings ::{ ChainConf , ChainConnectionConf , Settings } ,
} ;
use hyperlane_test ::mocks ::{ MockMailboxContract , MockValidatorAnnounceContract } ;
@ -387,6 +557,21 @@ mod test {
pending_messages
}
mockall ::mock ! {
pub Db { }
impl Debug for Db {
fn fmt < ' a > ( & self , f : & mut std ::fmt ::Formatter < ' a > ) -> std ::fmt ::Result ;
}
impl ProcessMessage for Db {
fn retrieve_highest_seen_message_nonce ( & self ) -> DbResult < Option < u32 > > ;
fn retrieve_message_by_nonce ( & self , nonce : u32 ) -> DbResult < Option < HyperlaneMessage > > ;
fn retrieve_processed_by_nonce ( & self , nonce : u32 ) -> DbResult < Option < bool > > ;
fn domain ( & self ) -> & HyperlaneDomain ;
}
}
#[ tokio::test ]
async fn test_full_pending_message_persistence_flow ( ) {
test_utils ::run_test_db ( | db | async move {
@ -441,4 +626,77 @@ mod test {
} )
. await ;
}
#[ tokio::test ]
async fn test_forward_backward_iterator ( ) {
let mut mock_db = MockDb ::new ( ) ;
const MAX_ONCHAIN_NONCE : u32 = 4 ;
const MOCK_HIGHEST_SEEN_NONCE : u32 = 2 ;
// How many times the db was queried for the max onchain nonce message
let mut retrieve_calls_for_max_onchain_nonce = 0 ;
mock_db
. expect_domain ( )
. return_const ( dummy_domain ( 0 , "dummy_domain" ) ) ;
mock_db
. expect_retrieve_highest_seen_message_nonce ( )
. returning ( | | Ok ( Some ( MOCK_HIGHEST_SEEN_NONCE ) ) ) ;
mock_db
. expect_retrieve_message_by_nonce ( )
. returning ( move | nonce | {
// return `None` the first time we get a query for the last message
// (the `MAX_ONCHAIN_NONCE`th one), to simulate an ongoing indexing that hasn't finished
if nonce = = MAX_ONCHAIN_NONCE & & retrieve_calls_for_max_onchain_nonce = = 0 {
retrieve_calls_for_max_onchain_nonce + = 1 ;
return Ok ( None ) ;
}
// otherwise return a message for every nonce in the closed
// interval [0, MAX_ONCHAIN_NONCE]
if nonce > MAX_ONCHAIN_NONCE {
Ok ( None )
} else {
Ok ( Some ( dummy_hyperlane_message (
& dummy_domain ( 1 , "dummy_domain" ) ,
nonce ,
) ) )
}
} ) ;
// The messages must be marked as "not processed" in the db for them to be returned
// when the iterator queries them
mock_db
. expect_retrieve_processed_by_nonce ( )
. returning ( | _ | Ok ( Some ( false ) ) ) ;
let dummy_metrics = dummy_processor_metrics ( 0 ) ;
let db = Arc ::new ( mock_db ) ;
let mut forward_backward_iterator = ForwardBackwardIterator ::new ( db . clone ( ) ) ;
let mut messages = vec! [ ] ;
while let Some ( msg ) = forward_backward_iterator
. try_get_next_message ( & dummy_metrics )
. await
. unwrap ( )
{
messages . push ( msg . nonce ) ;
}
// we start with 2 (MOCK_HIGHEST_SEEN_NONCE) as the highest seen nonce,
// so we go forward and get 3.
// then we try going forward again but get a `None` (not indexed yet), for nonce 4 (MAX_ONCHAIN_NONCE).
// then we go backwards once and get 1.
// then retry the forward iteration, which should return a message the second time, for nonce 4.
// finally, going forward again returns None so we go backward and get 0.
assert_eq! ( messages , vec! [ 2 , 3 , 1 , 4 , 0 ] ) ;
// the final bounds of the iterator are (None, MAX_ONCHAIN_NONCE + 1), where None means
// the backward iterator has reached the beginning (iterated past nonce 0)
assert_eq! ( forward_backward_iterator . low_nonce_iter . nonce , None ) ;
assert_eq! (
forward_backward_iterator . high_nonce_iter . nonce ,
Some ( MAX_ONCHAIN_NONCE + 1 )
) ;
}
}