diff --git a/rust/agents/relayer/src/msg/processor.rs b/rust/agents/relayer/src/msg/processor.rs index f386cb8e5..f0d002234 100644 --- a/rust/agents/relayer/src/msg/processor.rs +++ b/rust/agents/relayer/src/msg/processor.rs @@ -63,92 +63,107 @@ impl MessageProcessor { } } + /// Tries to get the next message to process. + /// + /// If no message with self.message_nonce is found, returns None. + /// If the message with self.message_nonce is found and has previously + /// been marked as processed, increments self.message_nonce and returns + /// None. + fn try_get_unprocessed_message(&mut self) -> Result> { + // First, see if we can find the message so we can update the gauge. + if let Some(message) = self.db.message_by_nonce(self.message_nonce)? { + // Update the latest nonce gauge if the message is destined for one + // of the domains we service. + if let Some(metrics) = self.metrics.get(message.destination) { + metrics.set(message.nonce as i64); + } + + // If this message has already been processed, on to the next one. + if self + .db + .retrieve_message_processed(self.message_nonce)? + .is_none() + { + Ok(Some(message)) + } else { + debug!( + msg_nonce=?self.message_nonce, + "Message already marked as processed in DB"); + self.message_nonce += 1; + Ok(None) + } + } else { + debug!( + msg_nonce=?self.message_nonce, + "No message found in DB for nonce"); + Ok(None) + } + } + /// One round of processing, extracted from infinite work loop for /// testing purposes. async fn tick(&mut self) -> Result<()> { // Scan until we find next nonce without delivery confirmation. - if self - .db - .retrieve_message_processed(self.message_nonce)? - .is_some() - { - debug!( - nonce=?self.message_nonce, - "Skipping since message_nonce already in DB"); - self.message_nonce += 1; - return Ok(()); - } - let message = if let Some(msg) = self - .db - .message_by_nonce(self.message_nonce)? - .map(HyperlaneMessage::from) - { - debug!(msg=?msg, "Working on msg"); - msg - } else { - debug!("Leaf in db without message nonce: {}", self.message_nonce); - // Not clear what the best thing to do here is, but there is seemingly an - // existing race wherein an indexer might non-atomically write leaf - // info to rocksdb across a few records, so we might see the leaf - // status above, but not the message contents here. For now, - // optimistically yield and then re-enter the loop in hopes that the - // DB is now coherent. TODO(webbhorn): Why can't we yield here - // instead of sleep? - tokio::time::sleep(Duration::from_secs(1)).await; - return Ok(()); - }; - if let Some(metrics) = self.metrics.get(message.destination) { - metrics.set(self.message_nonce as i64); - } + if let Some(message) = self.try_get_unprocessed_message()? { + debug!(msg=?message, "Working on message"); + + // Skip if not whitelisted. + if !self.whitelist.msg_matches(&message, true) { + debug!( + msg_id=?message.id(), + msg_destination=message.destination, + msg_nonce=message.nonce, + whitelist=?self.whitelist, + "Message not whitelisted, skipping"); + self.message_nonce += 1; + return Ok(()); + } + + // Skip if the message is blacklisted + if self.blacklist.msg_matches(&message, false) { + debug!( + msg_id=?message.id(), + msg_destination=message.destination, + msg_nonce=message.nonce, + blacklist=?self.blacklist, + "Message blacklisted, skipping"); + self.message_nonce += 1; + return Ok(()); + } + + // Skip if the message is intended for a destination we do not service + if self.send_channels.get(&message.destination).is_none() { + debug!( + msg_id=?message.id(), + msg_destination=message.destination, + msg_nonce=message.nonce, + "Message destined for unknown domain, skipping"); + self.message_nonce += 1; + return Ok(()); + } + + // Feed the message to the prover sync + self.prover_sync + .write() + .await + .update_to_index(message.nonce) + .await?; - // Skip if not whitelisted. - if !self.whitelist.msg_matches(&message, true) { debug!( - id=?message.id(), - destination=message.destination, - nonce=message.nonce, - whitelist=?self.whitelist, - "Message not whitelisted, skipping"); - self.message_nonce += 1; - return Ok(()); - } - - // Skip if the message is blacklisted - if self.blacklist.msg_matches(&message, false) { - debug!( - id=?message.id(), - destination=message.destination, - nonce=message.nonce, - blacklist=?self.blacklist, - "Message blacklisted, skipping"); - self.message_nonce += 1; - return Ok(()); - } + msg_id=?message.id(), + msg_nonce=message.nonce, + "Sending message to submitter" + ); - // Feed the message to the prover sync - self.prover_sync - .write() - .await - .update_to_index(message.nonce) - .await?; - - debug!( - msg_id=?message.id(), - msg_nonce=message.nonce, - "Sending message to submitter" - ); - // Finally, build the submit arg and dispatch it to the submitter. - let submit_args = SubmitMessageArgs::new(message.clone()); - if let Some(send_channel) = self.send_channels.get(&message.destination) { + // Finally, build the submit arg and dispatch it to the submitter. + let submit_args = SubmitMessageArgs::new(message.clone()); + // Guaranteed to exist as we return early above if it does not. + let send_channel = self.send_channels.get(&message.destination).unwrap(); send_channel.send(submit_args)?; + self.message_nonce += 1; } else { - debug!( - id=?message.id(), - destination=message.destination, - nonce=message.nonce, - "Message destined for unknown domain, skipping"); + tokio::time::sleep(Duration::from_secs(1)).await; } - self.message_nonce += 1; Ok(()) } }