Update message processor gauge first (#1512)

trevor/deploy-v2
Asa Oines 2 years ago committed by GitHub
parent eb3550ad39
commit 389eec56ec
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 167
      rust/agents/relayer/src/msg/processor.rs

@ -63,92 +63,107 @@ impl MessageProcessor {
}
}
/// Tries to get the next message to process.
///
/// If no message with self.message_nonce is found, returns None.
/// If the message with self.message_nonce is found and has previously
/// been marked as processed, increments self.message_nonce and returns
/// None.
fn try_get_unprocessed_message(&mut self) -> Result<Option<HyperlaneMessage>> {
// First, see if we can find the message so we can update the gauge.
if let Some(message) = self.db.message_by_nonce(self.message_nonce)? {
// Update the latest nonce gauge if the message is destined for one
// of the domains we service.
if let Some(metrics) = self.metrics.get(message.destination) {
metrics.set(message.nonce as i64);
}
// If this message has already been processed, on to the next one.
if self
.db
.retrieve_message_processed(self.message_nonce)?
.is_none()
{
Ok(Some(message))
} else {
debug!(
msg_nonce=?self.message_nonce,
"Message already marked as processed in DB");
self.message_nonce += 1;
Ok(None)
}
} else {
debug!(
msg_nonce=?self.message_nonce,
"No message found in DB for nonce");
Ok(None)
}
}
/// One round of processing, extracted from infinite work loop for
/// testing purposes.
async fn tick(&mut self) -> Result<()> {
// Scan until we find next nonce without delivery confirmation.
if self
.db
.retrieve_message_processed(self.message_nonce)?
.is_some()
{
debug!(
nonce=?self.message_nonce,
"Skipping since message_nonce already in DB");
self.message_nonce += 1;
return Ok(());
}
let message = if let Some(msg) = self
.db
.message_by_nonce(self.message_nonce)?
.map(HyperlaneMessage::from)
{
debug!(msg=?msg, "Working on msg");
msg
} else {
debug!("Leaf in db without message nonce: {}", self.message_nonce);
// Not clear what the best thing to do here is, but there is seemingly an
// existing race wherein an indexer might non-atomically write leaf
// info to rocksdb across a few records, so we might see the leaf
// status above, but not the message contents here. For now,
// optimistically yield and then re-enter the loop in hopes that the
// DB is now coherent. TODO(webbhorn): Why can't we yield here
// instead of sleep?
tokio::time::sleep(Duration::from_secs(1)).await;
return Ok(());
};
if let Some(metrics) = self.metrics.get(message.destination) {
metrics.set(self.message_nonce as i64);
}
if let Some(message) = self.try_get_unprocessed_message()? {
debug!(msg=?message, "Working on message");
// Skip if not whitelisted.
if !self.whitelist.msg_matches(&message, true) {
debug!(
msg_id=?message.id(),
msg_destination=message.destination,
msg_nonce=message.nonce,
whitelist=?self.whitelist,
"Message not whitelisted, skipping");
self.message_nonce += 1;
return Ok(());
}
// Skip if the message is blacklisted
if self.blacklist.msg_matches(&message, false) {
debug!(
msg_id=?message.id(),
msg_destination=message.destination,
msg_nonce=message.nonce,
blacklist=?self.blacklist,
"Message blacklisted, skipping");
self.message_nonce += 1;
return Ok(());
}
// Skip if the message is intended for a destination we do not service
if self.send_channels.get(&message.destination).is_none() {
debug!(
msg_id=?message.id(),
msg_destination=message.destination,
msg_nonce=message.nonce,
"Message destined for unknown domain, skipping");
self.message_nonce += 1;
return Ok(());
}
// Feed the message to the prover sync
self.prover_sync
.write()
.await
.update_to_index(message.nonce)
.await?;
// Skip if not whitelisted.
if !self.whitelist.msg_matches(&message, true) {
debug!(
id=?message.id(),
destination=message.destination,
nonce=message.nonce,
whitelist=?self.whitelist,
"Message not whitelisted, skipping");
self.message_nonce += 1;
return Ok(());
}
// Skip if the message is blacklisted
if self.blacklist.msg_matches(&message, false) {
debug!(
id=?message.id(),
destination=message.destination,
nonce=message.nonce,
blacklist=?self.blacklist,
"Message blacklisted, skipping");
self.message_nonce += 1;
return Ok(());
}
msg_id=?message.id(),
msg_nonce=message.nonce,
"Sending message to submitter"
);
// Feed the message to the prover sync
self.prover_sync
.write()
.await
.update_to_index(message.nonce)
.await?;
debug!(
msg_id=?message.id(),
msg_nonce=message.nonce,
"Sending message to submitter"
);
// Finally, build the submit arg and dispatch it to the submitter.
let submit_args = SubmitMessageArgs::new(message.clone());
if let Some(send_channel) = self.send_channels.get(&message.destination) {
// Finally, build the submit arg and dispatch it to the submitter.
let submit_args = SubmitMessageArgs::new(message.clone());
// Guaranteed to exist as we return early above if it does not.
let send_channel = self.send_channels.get(&message.destination).unwrap();
send_channel.send(submit_args)?;
self.message_nonce += 1;
} else {
debug!(
id=?message.id(),
destination=message.destination,
nonce=message.nonce,
"Message destined for unknown domain, skipping");
tokio::time::sleep(Duration::from_secs(1)).await;
}
self.message_nonce += 1;
Ok(())
}
}

Loading…
Cancel
Save