|
|
@ -107,26 +107,26 @@ impl MessageProcessor { |
|
|
|
/// testing purposes.
|
|
|
|
/// testing purposes.
|
|
|
|
async fn tick(&mut self) -> Result<()> { |
|
|
|
async fn tick(&mut self) -> Result<()> { |
|
|
|
// Scan until we find next nonce without delivery confirmation.
|
|
|
|
// Scan until we find next nonce without delivery confirmation.
|
|
|
|
if let Some(message) = self.try_get_unprocessed_message()? { |
|
|
|
if let Some(msg) = self.try_get_unprocessed_message()? { |
|
|
|
debug!(?message, "Processor working on message"); |
|
|
|
debug!(?msg, "Processor working on message"); |
|
|
|
|
|
|
|
|
|
|
|
// Skip if not whitelisted.
|
|
|
|
// Skip if not whitelisted.
|
|
|
|
if !self.whitelist.msg_matches(&message, true) { |
|
|
|
if !self.whitelist.msg_matches(&msg, true) { |
|
|
|
debug!(?message, whitelist=?self.whitelist, "Message not whitelisted, skipping"); |
|
|
|
debug!(?msg, whitelist=?self.whitelist, "Message not whitelisted, skipping"); |
|
|
|
self.message_nonce += 1; |
|
|
|
self.message_nonce += 1; |
|
|
|
return Ok(()); |
|
|
|
return Ok(()); |
|
|
|
} |
|
|
|
} |
|
|
|
|
|
|
|
|
|
|
|
// Skip if the message is blacklisted
|
|
|
|
// Skip if the message is blacklisted
|
|
|
|
if self.blacklist.msg_matches(&message, false) { |
|
|
|
if self.blacklist.msg_matches(&msg, false) { |
|
|
|
debug!(?message, blacklist=?self.blacklist, "Message blacklisted, skipping"); |
|
|
|
debug!(?msg, blacklist=?self.blacklist, "Message blacklisted, skipping"); |
|
|
|
self.message_nonce += 1; |
|
|
|
self.message_nonce += 1; |
|
|
|
return Ok(()); |
|
|
|
return Ok(()); |
|
|
|
} |
|
|
|
} |
|
|
|
|
|
|
|
|
|
|
|
// Skip if the message is intended for a destination we do not service
|
|
|
|
// Skip if the message is intended for a destination we do not service
|
|
|
|
if self.send_channels.get(&message.destination).is_none() { |
|
|
|
if self.send_channels.get(&msg.destination).is_none() { |
|
|
|
debug!(?message, "Message destined for unknown domain, skipping"); |
|
|
|
debug!(?msg, "Message destined for unknown domain, skipping"); |
|
|
|
self.message_nonce += 1; |
|
|
|
self.message_nonce += 1; |
|
|
|
return Ok(()); |
|
|
|
return Ok(()); |
|
|
|
} |
|
|
|
} |
|
|
@ -135,15 +135,15 @@ impl MessageProcessor { |
|
|
|
self.prover_sync |
|
|
|
self.prover_sync |
|
|
|
.write() |
|
|
|
.write() |
|
|
|
.await |
|
|
|
.await |
|
|
|
.update_to_index(message.nonce) |
|
|
|
.update_to_index(msg.nonce) |
|
|
|
.await?; |
|
|
|
.await?; |
|
|
|
|
|
|
|
|
|
|
|
debug!(%message, "Sending message to submitter"); |
|
|
|
debug!(%msg, "Sending message to submitter"); |
|
|
|
|
|
|
|
|
|
|
|
// Finally, build the submit arg and dispatch it to the submitter.
|
|
|
|
// Finally, build the submit arg and dispatch it to the submitter.
|
|
|
|
let submit_args = SubmitMessageArgs::new(message.clone()); |
|
|
|
let submit_args = SubmitMessageArgs::new(msg.clone()); |
|
|
|
// Guaranteed to exist as we return early above if it does not.
|
|
|
|
// Guaranteed to exist as we return early above if it does not.
|
|
|
|
let send_channel = self.send_channels.get(&message.destination).unwrap(); |
|
|
|
let send_channel = self.send_channels.get(&msg.destination).unwrap(); |
|
|
|
send_channel.send(submit_args)?; |
|
|
|
send_channel.send(submit_args)?; |
|
|
|
self.message_nonce += 1; |
|
|
|
self.message_nonce += 1; |
|
|
|
} else { |
|
|
|
} else { |
|
|
|