feat: Refactor Relayer Retry Server (#4693)

### Description

Refactor of the Relayer Retry Server to utilize matching lists.
This allows for a single POST request to be able to retry a wide range
of messages.

Since a matching list is used it is now possible to retry messages
depending on:
- Message Id
- Origin Domain
- Destination Domain
- Recipient Address
- Sender Address

## Example

Instead of the old request format
```
GET http://127.0.0.1:60843/message_retry?destination_domain=42
GET http://127.0.0.1:60843/message_retry?message_id=0x46910b1329ee53c86a023b322e9ca1c17e5f9f0bee789c77b0abced0a173d714
```
The new one should be used
```
POST http://127.0.0.1:60843/message_retry
```
With a JSON body like so:
```
[{"messageid": "*", "origindomain": "*", "senderaddress": "*", "destinationdomain": "*", "recipientaddress": "*"}]
```
Retry params can be both specific values and wildcards.

### Drive-by changes

- Moved `MockPendingOperation` to `hyperlane-test`
- Moved certain test utils for the relayer server to `hyperlane-test`
- Added functions to retrieve recipient and sender address from the
`PendingOperation`
- Added a `message_id` to the `MatchingList` struct to allow retries by
message id
- Removed `MessageRetryRequest` enum since it was no longer in use

### Related issues

Fixes #3571 #4079

### Backward compatibility

Yes

### Testing

Unit tests
pull/4789/head
Mantas-M 3 weeks ago committed by GitHub
parent cca032fb18
commit fc82b30a2e
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
  1. 87
      rust/main/agents/relayer/src/msg/op_queue.rs
  2. 4
      rust/main/agents/relayer/src/msg/op_submitter.rs
  3. 8
      rust/main/agents/relayer/src/msg/pending_message.rs
  4. 4
      rust/main/agents/relayer/src/relayer.rs
  5. 28
      rust/main/agents/relayer/src/server/list_messages.rs
  6. 280
      rust/main/agents/relayer/src/server/message_retry.rs
  7. 6
      rust/main/agents/relayer/src/server/mod.rs
  8. 65
      rust/main/agents/relayer/src/settings/matching_list.rs
  9. 6
      rust/main/hyperlane-core/src/traits/pending_operation.rs

@ -6,7 +6,7 @@ use prometheus::{IntGauge, IntGaugeVec};
use tokio::sync::{broadcast::Receiver, Mutex};
use tracing::{debug, info, instrument};
use crate::server::MessageRetryRequest;
use crate::settings::matching_list::MatchingList;
pub type OperationPriorityQueue = Arc<Mutex<BinaryHeap<Reverse<QueueOperation>>>>;
@ -16,7 +16,7 @@ pub type OperationPriorityQueue = Arc<Mutex<BinaryHeap<Reverse<QueueOperation>>>
pub struct OpQueue {
metrics: IntGaugeVec,
queue_metrics_label: String,
retry_rx: Arc<Mutex<Receiver<MessageRetryRequest>>>,
retry_rx: Arc<Mutex<Receiver<MatchingList>>>,
#[new(default)]
pub queue: OperationPriorityQueue,
}
@ -84,9 +84,7 @@ impl OpQueue {
let mut reprioritized_queue: BinaryHeap<_> = queue
.drain()
.map(|Reverse(mut op)| {
// Can check for equality here because of the PartialEq implementation for MessageRetryRequest,
// but can't use `contains` because the types are different
if message_retry_requests.iter().any(|r| r == op) {
if message_retry_requests.iter().any(|r| r.op_matches(&op)) {
info!(
operation = %op,
queue_label = %self.queue_metrics_label,
@ -116,12 +114,14 @@ impl OpQueue {
pub mod test {
use super::*;
use hyperlane_core::{
HyperlaneDomain, HyperlaneMessage, KnownHyperlaneDomain, PendingOperationResult,
HyperlaneDomain, HyperlaneDomainProtocol, HyperlaneDomainTechnicalStack,
HyperlaneDomainType, HyperlaneMessage, KnownHyperlaneDomain, PendingOperationResult,
TryBatchAs, TxOutcome, H256, U256,
};
use serde::Serialize;
use std::{
collections::VecDeque,
str::FromStr,
time::{Duration, Instant},
};
use tokio::sync;
@ -129,6 +129,10 @@ pub mod test {
#[derive(Debug, Clone, Serialize)]
pub struct MockPendingOperation {
id: H256,
sender_address: H256,
origin_domain_id: u32,
destination_domain_id: u32,
recipient_address: H256,
seconds_to_next_attempt: u64,
destination_domain: HyperlaneDomain,
}
@ -138,12 +142,51 @@ pub mod test {
Self {
id: H256::random(),
seconds_to_next_attempt,
destination_domain_id: destination_domain.id(),
destination_domain,
sender_address: H256::random(),
recipient_address: H256::random(),
origin_domain_id: 0,
}
}
pub fn with_id(self, id: H256) -> Self {
Self { id, ..self }
pub fn with_message_data(message: HyperlaneMessage) -> Self {
Self {
id: message.id(),
sender_address: message.sender,
recipient_address: message.recipient,
origin_domain_id: message.origin,
destination_domain_id: message.destination,
seconds_to_next_attempt: 0,
destination_domain: HyperlaneDomain::Unknown {
domain_id: message.destination,
domain_name: "test".to_string(),
domain_type: HyperlaneDomainType::Unknown,
domain_protocol: HyperlaneDomainProtocol::Ethereum,
domain_technical_stack: HyperlaneDomainTechnicalStack::Other,
},
}
}
pub fn with_id(self, id: &str) -> Self {
Self {
id: H256::from_str(id).unwrap(),
..self
}
}
pub fn with_sender_address(self, sender_address: &str) -> Self {
Self {
sender_address: H256::from_str(sender_address).unwrap(),
..self
}
}
pub fn with_recipient_address(self, recipient_address: &str) -> Self {
Self {
recipient_address: H256::from_str(recipient_address).unwrap(),
..self
}
}
}
@ -166,6 +209,20 @@ pub mod test {
self.seconds_to_next_attempt = 0;
}
fn sender_address(&self) -> &H256 {
&self.sender_address
}
fn recipient_address(&self) -> &H256 {
&self.recipient_address
}
fn get_metric(&self) -> Option<Arc<IntGauge>> {
None
}
fn set_metric(&mut self, _metric: Arc<IntGauge>) {}
fn priority(&self) -> u32 {
todo!()
}
@ -179,7 +236,7 @@ pub mod test {
}
fn origin_domain_id(&self) -> u32 {
todo!()
self.origin_domain_id
}
fn destination_domain(&self) -> &HyperlaneDomain {
@ -238,12 +295,6 @@ pub mod test {
fn set_retries(&mut self, _retries: u32) {
todo!()
}
fn get_metric(&self) -> Option<Arc<IntGauge>> {
None
}
fn set_metric(&mut self, _metric: Arc<IntGauge>) {}
}
pub fn dummy_metrics_and_label() -> (IntGaugeVec, String) {
@ -312,10 +363,10 @@ pub mod test {
// Retry by message ids
broadcaster
.send(MessageRetryRequest::MessageId(op_ids[1]))
.send(MatchingList::with_message_id(op_ids[1]))
.unwrap();
broadcaster
.send(MessageRetryRequest::MessageId(op_ids[2]))
.send(MatchingList::with_message_id(op_ids[2]))
.unwrap();
// Pop elements from queue 1
@ -373,7 +424,7 @@ pub mod test {
// Retry by domain
broadcaster
.send(MessageRetryRequest::DestinationDomain(
.send(MatchingList::with_destination_domain(
destination_domain_2.id(),
))
.unwrap();

@ -32,7 +32,7 @@ use hyperlane_core::{
};
use crate::msg::pending_message::CONFIRM_DELAY;
use crate::server::MessageRetryRequest;
use crate::settings::matching_list::MatchingList;
use super::op_queue::OpQueue;
use super::op_queue::OperationPriorityQueue;
@ -105,7 +105,7 @@ impl SerialSubmitter {
pub fn new(
domain: HyperlaneDomain,
rx: mpsc::UnboundedReceiver<QueueOperation>,
retry_op_transmitter: Sender<MessageRetryRequest>,
retry_op_transmitter: Sender<MatchingList>,
metrics: SerialSubmitterMetrics,
max_batch_size: u32,
task_monitor: TaskMonitor,

@ -167,6 +167,14 @@ impl PendingOperation for PendingMessage {
self.ctx.destination_mailbox.domain()
}
fn sender_address(&self) -> &H256 {
&self.message.sender
}
fn recipient_address(&self) -> &H256 {
&self.message.recipient
}
fn retrieve_status_from_db(&self) -> Option<PendingOperationStatus> {
match self.ctx.origin_db.retrieve_status_by_message_id(&self.id()) {
Ok(status) => status,

@ -41,7 +41,7 @@ use crate::{
pending_message::{MessageContext, MessageSubmissionMetrics},
processor::{MessageProcessor, MessageProcessorMetrics},
},
server::{self as relayer_server, MessageRetryRequest},
server::{self as relayer_server},
settings::{matching_list::MatchingList, RelayerSettings},
};
use crate::{
@ -311,7 +311,7 @@ impl BaseAgent for Relayer {
}));
tasks.push(console_server.instrument(info_span!("Tokio console server")));
}
let sender = BroadcastSender::<MessageRetryRequest>::new(ENDPOINT_MESSAGES_QUEUE_SIZE);
let sender = BroadcastSender::<MatchingList>::new(ENDPOINT_MESSAGES_QUEUE_SIZE);
// send channels by destination chain
let mut send_channels = HashMap::with_capacity(self.destination_chains.len());
let mut prep_queues = HashMap::with_capacity(self.destination_chains.len());

@ -79,8 +79,6 @@ impl ListOperationsApi {
}
}
// TODO: there's some duplication between the setup for these tests and the one in `message_retry.rs`,
// which should be refactored into a common test setup.
#[cfg(test)]
mod tests {
use crate::msg::op_queue::{
@ -91,7 +89,7 @@ mod tests {
use super::*;
use axum::http::StatusCode;
use hyperlane_core::KnownHyperlaneDomain;
use std::{cmp::Reverse, net::SocketAddr, str::FromStr, sync::Arc};
use std::{cmp::Reverse, net::SocketAddr, sync::Arc};
use tokio::sync::{self, Mutex};
const DUMMY_DOMAIN: KnownHyperlaneDomain = KnownHyperlaneDomain::Arbitrum;
@ -109,6 +107,7 @@ mod tests {
let list_operations_api = ListOperationsApi::new(op_queues_map);
let (path, router) = list_operations_api.get_route();
let app = Router::new().nest(path, router);
// Running the app in the background using a test server
@ -125,13 +124,20 @@ mod tests {
let (addr, op_queue) = setup_test_server();
let id_1 = "0x1acbee9798118b11ebef0d94b0a2936eafd58e3bfab91b05da875825c4a1c39b";
let id_2 = "0x51e7be221ce90a49dee46ca0d0270c48d338a7b9d85c2a89d83fac0816571914";
let sender_address = "0x586d41b02fb35df0f84ecb2b73e076b40c929ee3e1ceeada9a078aa7b46d3b08";
let recipient_address =
"0x586d41b02fb35df0f84ecb2b73e076b40c929ee3e1ceeada9a078aa7b46d3b08";
let dummy_operation_1 = Box::new(
MockPendingOperation::new(1, DUMMY_DOMAIN.into())
.with_id(H256::from_str(id_1).unwrap()),
.with_id(id_1)
.with_sender_address(sender_address)
.with_recipient_address(recipient_address),
) as QueueOperation;
let dummy_operation_2 = Box::new(
MockPendingOperation::new(2, DUMMY_DOMAIN.into())
.with_id(H256::from_str(id_2).unwrap()),
.with_id(id_2)
.with_sender_address(sender_address)
.with_recipient_address(recipient_address),
) as QueueOperation;
// The reason there already is an id inside `operation` here is because it's a field on `MockPendingOperation` - that field is
@ -143,8 +149,12 @@ mod tests {
"destination_domain": {
"Known": "Arbitrum"
},
"destination_domain_id": 42161,
"id": "0x1acbee9798118b11ebef0d94b0a2936eafd58e3bfab91b05da875825c4a1c39b",
"origin_domain_id": 0,
"recipient_address": "0x586d41b02fb35df0f84ecb2b73e076b40c929ee3e1ceeada9a078aa7b46d3b08",
"seconds_to_next_attempt": 1,
"sender_address": "0x586d41b02fb35df0f84ecb2b73e076b40c929ee3e1ceeada9a078aa7b46d3b08",
"type": "MockPendingOperation"
}
},
@ -154,8 +164,12 @@ mod tests {
"destination_domain": {
"Known": "Arbitrum"
},
"destination_domain_id": 42161,
"id": "0x51e7be221ce90a49dee46ca0d0270c48d338a7b9d85c2a89d83fac0816571914",
"origin_domain_id": 0,
"recipient_address": "0x586d41b02fb35df0f84ecb2b73e076b40c929ee3e1ceeada9a078aa7b46d3b08",
"seconds_to_next_attempt": 2,
"sender_address": "0x586d41b02fb35df0f84ecb2b73e076b40c929ee3e1ceeada9a078aa7b46d3b08",
"type": "MockPendingOperation"
}
}
@ -173,6 +187,8 @@ mod tests {
// Check that the response status code is OK
assert_eq!(response.status(), StatusCode::OK);
assert_eq!(response.text().await.unwrap(), expected_response);
let response_text = response.text().await.unwrap();
assert_eq!(response_text, expected_response);
}
}

@ -1,90 +1,31 @@
use axum::{
extract::{Query, State},
routing, Router,
};
use crate::settings::matching_list::MatchingList;
use axum::{extract::State, routing, Json, Router};
use derive_new::new;
use hyperlane_core::{ChainCommunicationError, QueueOperation, H256};
use serde::Deserialize;
use std::str::FromStr;
use tokio::sync::broadcast::Sender;
const MESSAGE_RETRY_API_BASE: &str = "/message_retry";
#[derive(Clone, Debug, PartialEq, Eq)]
pub enum MessageRetryRequest {
MessageId(H256),
DestinationDomain(u32),
}
impl PartialEq<QueueOperation> for &MessageRetryRequest {
fn eq(&self, other: &QueueOperation) -> bool {
match self {
MessageRetryRequest::MessageId(message_id) => message_id == &other.id(),
MessageRetryRequest::DestinationDomain(destination_domain) => {
destination_domain == &other.destination_domain().id()
}
}
}
}
#[derive(new, Clone)]
pub struct MessageRetryApi {
tx: Sender<MessageRetryRequest>,
}
#[derive(Deserialize)]
struct RawMessageRetryRequest {
message_id: Option<String>,
destination_domain: Option<u32>,
}
impl TryFrom<RawMessageRetryRequest> for Vec<MessageRetryRequest> {
type Error = ChainCommunicationError;
fn try_from(request: RawMessageRetryRequest) -> Result<Self, Self::Error> {
let mut retry_requests = Vec::new();
if let Some(message_id) = request.message_id {
retry_requests.push(MessageRetryRequest::MessageId(H256::from_str(&message_id)?));
}
if let Some(destination_domain) = request.destination_domain {
retry_requests.push(MessageRetryRequest::DestinationDomain(destination_domain));
}
Ok(retry_requests)
}
tx: Sender<MatchingList>,
}
async fn retry_message(
State(tx): State<Sender<MessageRetryRequest>>,
Query(request): Query<RawMessageRetryRequest>,
State(tx): State<Sender<MatchingList>>,
Json(retry_req_payload): Json<MatchingList>,
) -> String {
let retry_requests: Vec<MessageRetryRequest> = match request.try_into() {
Ok(retry_requests) => retry_requests,
match tx.send(retry_req_payload) {
Ok(_) => "Moved message(s) to the front of the queue".to_string(),
// Technically it's bad practice to print the error message to the user, but
// this endpoint is for debugging purposes only.
Err(err) => {
return format!("Failed to parse retry request: {}", err);
}
};
if retry_requests.is_empty() {
return "No retry requests found. Please provide either a message_id or destination_domain.".to_string();
Err(err) => format!("Failed to send retry request to the queue: {}", err),
}
if let Err(err) = retry_requests
.into_iter()
.map(|req| tx.send(req))
.collect::<Result<Vec<_>, _>>()
{
return format!("Failed to send retry request to the queue: {}", err);
}
"Moved message(s) to the front of the queue".to_string()
}
impl MessageRetryApi {
pub fn router(&self) -> Router {
Router::new()
.route("/", routing::get(retry_message))
.route("/", routing::post(retry_message))
.with_state(self.tx.clone())
}
@ -95,18 +36,20 @@ impl MessageRetryApi {
#[cfg(test)]
mod tests {
use crate::server::ENDPOINT_MESSAGES_QUEUE_SIZE;
use crate::{msg::op_queue::test::MockPendingOperation, server::ENDPOINT_MESSAGES_QUEUE_SIZE};
use super::*;
use axum::http::StatusCode;
use ethers::utils::hex::ToHex;
use hyperlane_core::{HyperlaneMessage, QueueOperation};
use serde_json::json;
use std::net::SocketAddr;
use tokio::sync::broadcast::{Receiver, Sender};
fn setup_test_server() -> (SocketAddr, Receiver<MessageRetryRequest>) {
let broadcast_tx = Sender::<MessageRetryRequest>::new(ENDPOINT_MESSAGES_QUEUE_SIZE);
fn setup_test_server() -> (SocketAddr, Receiver<MatchingList>) {
let broadcast_tx = Sender::<MatchingList>::new(ENDPOINT_MESSAGES_QUEUE_SIZE);
let message_retry_api = MessageRetryApi::new(broadcast_tx.clone());
let (path, retry_router) = message_retry_api.get_route();
let app = Router::new().nest(path, retry_router);
// Running the app in the background using a test server
@ -122,49 +65,186 @@ mod tests {
async fn test_message_id_retry() {
let (addr, mut rx) = setup_test_server();
// Create a random message ID
let message_id = H256::random();
let client = reqwest::Client::new();
// Create a random message with a random message ID
let message = HyperlaneMessage::default();
let pending_operation = MockPendingOperation::with_message_data(message.clone());
let matching_list_body = json!([
{
"messageid": message.id()
}
]);
// Send a GET request to the server
let response = reqwest::get(format!(
"http://{}{}?message_id={}",
addr,
MESSAGE_RETRY_API_BASE,
message_id.encode_hex::<String>()
))
.await
.unwrap();
// Send a POST request to the server
let response = client
.post(format!("http://{}{}", addr, MESSAGE_RETRY_API_BASE))
.json(&matching_list_body) // Set the request body
.send()
.await
.unwrap();
// Check that the response status code is OK
assert_eq!(response.status(), StatusCode::OK);
assert_eq!(
rx.try_recv().unwrap(),
MessageRetryRequest::MessageId(message_id)
);
let list = rx.try_recv().unwrap();
// Check that the list received by the server matches the pending operation
assert!(list.op_matches(&(Box::new(pending_operation) as QueueOperation)));
}
#[tokio::test]
async fn test_destination_domain_retry() {
let (addr, mut rx) = setup_test_server();
// Create a random destination domain
let destination_domain = 42;
let client = reqwest::Client::new();
let mut message = HyperlaneMessage::default();
// Use a random destination domain
message.destination = 42;
let pending_operation = MockPendingOperation::with_message_data(message.clone());
let matching_list_body = json!([
{
"destinationdomain": message.destination
}
]);
// Send a POST request to the server
let response = client
.post(format!("http://{}{}", addr, MESSAGE_RETRY_API_BASE))
.json(&matching_list_body) // Set the request body
.send()
.await
.unwrap();
// Check that the response status code is OK
assert_eq!(response.status(), StatusCode::OK);
let list = rx.try_recv().unwrap();
// Check that the list received by the server matches the pending operation
assert!(list.op_matches(&(Box::new(pending_operation) as QueueOperation)));
}
#[tokio::test]
async fn test_origin_domain_retry() {
let (addr, mut rx) = setup_test_server();
let client = reqwest::Client::new();
let mut message = HyperlaneMessage::default();
// Use a random origin domain
message.origin = 42;
let pending_operation = MockPendingOperation::with_message_data(message.clone());
let matching_list_body = json!([
{
"origindomain": message.origin
}
]);
// Send a POST request to the server
let response = client
.post(format!("http://{}{}", addr, MESSAGE_RETRY_API_BASE))
.json(&matching_list_body) // Set the request body
.send()
.await
.unwrap();
// Check that the response status code is OK
assert_eq!(response.status(), StatusCode::OK);
let list = rx.try_recv().unwrap();
// Check that the list received by the server matches the pending operation
assert!(list.op_matches(&(Box::new(pending_operation) as QueueOperation)));
}
#[tokio::test]
async fn test_sender_address_retry() {
let (addr, mut rx) = setup_test_server();
let client = reqwest::Client::new();
let message = HyperlaneMessage::default();
let pending_operation = MockPendingOperation::with_message_data(message.clone());
let matching_list_body = json!([
{
"senderaddress": message.sender
}
]);
// Send a POST request to the server
let response = client
.post(format!("http://{}{}", addr, MESSAGE_RETRY_API_BASE))
.json(&matching_list_body) // Set the request body
.send()
.await
.unwrap();
// Check that the response status code is OK
assert_eq!(response.status(), StatusCode::OK);
let list = rx.try_recv().unwrap();
// Check that the list received by the server matches the pending operation
assert!(list.op_matches(&(Box::new(pending_operation) as QueueOperation)));
}
#[tokio::test]
async fn test_recipient_address_retry() {
let (addr, mut rx) = setup_test_server();
let client = reqwest::Client::new();
let message = HyperlaneMessage::default();
let pending_operation = MockPendingOperation::with_message_data(message.clone());
let matching_list_body = json!([
{
"recipientaddress": message.recipient
}
]);
// Send a POST request to the server
let response = client
.post(format!("http://{}{}", addr, MESSAGE_RETRY_API_BASE))
.json(&matching_list_body) // Set the request body
.send()
.await
.unwrap();
// Check that the response status code is OK
assert_eq!(response.status(), StatusCode::OK);
let list = rx.try_recv().unwrap();
// Check that the list received by the server matches the pending operation
assert!(list.op_matches(&(Box::new(pending_operation) as QueueOperation)));
}
#[tokio::test]
async fn test_multiple_retry() {
let (addr, mut rx) = setup_test_server();
let client = reqwest::Client::new();
let mut message = HyperlaneMessage::default();
// Use a random origin domain
message.origin = 42;
let pending_operation = MockPendingOperation::with_message_data(message.clone());
let matching_list_body = json!([
{
"origindomain": message.origin
},
{
"destinationdomain": message.destination
},
{
"messageid": message.id()
}
]);
// Send a GET request to the server
let response = reqwest::get(format!(
"http://{}{}?destination_domain={}",
addr, MESSAGE_RETRY_API_BASE, destination_domain
))
.await
.unwrap();
// Send a POST request to the server
let response = client
.post(format!("http://{}{}", addr, MESSAGE_RETRY_API_BASE))
.json(&matching_list_body) // Set the request body
.send()
.await
.unwrap();
// Check that the response status code is OK
assert_eq!(response.status(), StatusCode::OK);
assert_eq!(
rx.try_recv().unwrap(),
MessageRetryRequest::DestinationDomain(destination_domain)
);
let list = rx.try_recv().unwrap();
// Check that the list received by the server matches the pending operation
assert!(list.op_matches(&(Box::new(pending_operation) as QueueOperation)));
}
}

@ -3,7 +3,7 @@ use derive_new::new;
use std::collections::HashMap;
use tokio::sync::broadcast::Sender;
use crate::msg::op_queue::OperationPriorityQueue;
use crate::{msg::op_queue::OperationPriorityQueue, settings::matching_list::MatchingList};
pub const ENDPOINT_MESSAGES_QUEUE_SIZE: usize = 100;
@ -16,13 +16,13 @@ mod message_retry;
#[derive(new)]
pub struct Server {
#[new(default)]
retry_transmitter: Option<Sender<MessageRetryRequest>>,
retry_transmitter: Option<Sender<MatchingList>>,
#[new(default)]
op_queues: Option<HashMap<u32, OperationPriorityQueue>>,
}
impl Server {
pub fn with_op_retry(mut self, transmitter: Sender<MessageRetryRequest>) -> Self {
pub fn with_op_retry(mut self, transmitter: Sender<MatchingList>) -> Self {
self.retry_transmitter = Some(transmitter);
self
}

@ -8,7 +8,9 @@ use std::{
marker::PhantomData,
};
use hyperlane_core::{config::StrOrInt, utils::hex_or_base58_to_h256, HyperlaneMessage, H256};
use hyperlane_core::{
config::StrOrInt, utils::hex_or_base58_to_h256, HyperlaneMessage, QueueOperation, H256,
};
use serde::{
de::{Error, SeqAccess, Visitor},
Deserialize, Deserializer,
@ -223,6 +225,8 @@ impl<'de> Deserialize<'de> for Filter<H256> {
#[derive(Debug, Deserialize, Clone)]
#[serde(tag = "type")]
struct ListElement {
#[serde(default, rename = "messageid")]
message_id: Filter<H256>,
#[serde(default, rename = "origindomain")]
origin_domain: Filter<u32>,
#[serde(default, rename = "senderaddress")]
@ -237,7 +241,8 @@ impl Display for ListElement {
fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result {
write!(
f,
"{{originDomain: {}, senderAddress: {}, destinationDomain: {}, recipientAddress: {}}}",
"{{messageId: {}, originDomain: {}, senderAddress: {}, destinationDomain: {}, recipientAddress: {}}}",
self.message_id,
self.origin_domain,
self.sender_address,
self.destination_domain,
@ -248,6 +253,7 @@ impl Display for ListElement {
#[derive(Copy, Clone, Debug)]
struct MatchInfo<'a> {
src_msg_id: H256,
src_domain: u32,
src_addr: &'a H256,
dst_domain: u32,
@ -257,6 +263,7 @@ struct MatchInfo<'a> {
impl<'a> From<&'a HyperlaneMessage> for MatchInfo<'a> {
fn from(msg: &'a HyperlaneMessage) -> Self {
Self {
src_msg_id: msg.id(),
src_domain: msg.origin,
src_addr: &msg.sender,
dst_domain: msg.destination,
@ -265,13 +272,51 @@ impl<'a> From<&'a HyperlaneMessage> for MatchInfo<'a> {
}
}
impl<'a> From<&'a QueueOperation> for MatchInfo<'a> {
fn from(op: &'a QueueOperation) -> Self {
Self {
src_msg_id: op.id(),
src_domain: op.origin_domain_id(),
src_addr: op.sender_address(),
dst_domain: op.destination_domain().id(),
dst_addr: op.recipient_address(),
}
}
}
impl MatchingList {
pub fn with_message_id(message_id: H256) -> Self {
Self(Some(vec![ListElement {
message_id: Filter::Enumerated(vec![message_id]),
origin_domain: Default::default(),
sender_address: Default::default(),
destination_domain: Default::default(),
recipient_address: Default::default(),
}]))
}
pub fn with_destination_domain(destination_domain: u32) -> Self {
Self(Some(vec![ListElement {
message_id: Default::default(),
origin_domain: Default::default(),
sender_address: Default::default(),
destination_domain: Filter::Enumerated(vec![destination_domain]),
recipient_address: Default::default(),
}]))
}
/// Check if a message matches any of the rules.
/// - `default`: What to return if the matching list is empty.
pub fn msg_matches(&self, msg: &HyperlaneMessage, default: bool) -> bool {
self.matches(msg.into(), default)
}
/// Check if queue operation matches any of the rules.
/// If the matching list is empty, we assume the queue operation does not match.
pub fn op_matches(&self, op: &QueueOperation) -> bool {
self.matches(op.into(), false)
}
/// Check if a message matches any of the rules.
/// - `default`: What to return if the matching list is empty.
fn matches(&self, info: MatchInfo, default: bool) -> bool {
@ -285,7 +330,8 @@ impl MatchingList {
fn matches_any_rule<'a>(mut rules: impl Iterator<Item = &'a ListElement>, info: MatchInfo) -> bool {
rules.any(|rule| {
rule.origin_domain.matches(&info.src_domain)
rule.message_id.matches(&info.src_msg_id)
&& rule.origin_domain.matches(&info.src_domain)
&& rule.sender_address.matches(info.src_addr)
&& rule.destination_domain.matches(&info.dst_domain)
&& rule.recipient_address.matches(info.dst_addr)
@ -323,23 +369,26 @@ mod test {
#[test]
fn basic_config() {
let list: MatchingList = serde_json::from_str(r#"[{"origindomain": "*", "senderaddress": "*", "destinationdomain": "*", "recipientaddress": "*"}, {}]"#).unwrap();
let list: MatchingList = serde_json::from_str(r#"[{"messageid": "*", "origindomain": "*", "senderaddress": "*", "destinationdomain": "*", "recipientaddress": "*"}, {}]"#).unwrap();
assert!(list.0.is_some());
assert_eq!(list.0.as_ref().unwrap().len(), 2);
let elem = &list.0.as_ref().unwrap()[0];
assert_eq!(elem.destination_domain, Wildcard);
assert_eq!(elem.message_id, Wildcard);
assert_eq!(elem.recipient_address, Wildcard);
assert_eq!(elem.origin_domain, Wildcard);
assert_eq!(elem.sender_address, Wildcard);
let elem = &list.0.as_ref().unwrap()[1];
assert_eq!(elem.destination_domain, Wildcard);
assert_eq!(elem.message_id, Wildcard);
assert_eq!(elem.recipient_address, Wildcard);
assert_eq!(elem.origin_domain, Wildcard);
assert_eq!(elem.sender_address, Wildcard);
assert!(list.matches(
MatchInfo {
src_msg_id: H256::random(),
src_domain: 0,
src_addr: &H256::default(),
dst_domain: 0,
@ -350,6 +399,7 @@ mod test {
assert!(list.matches(
MatchInfo {
src_msg_id: H256::random(),
src_domain: 34,
src_addr: &"0x9d4454B023096f34B160D6B654540c56A1F81688"
.parse::<H160>()
@ -369,6 +419,7 @@ mod test {
assert_eq!(list.0.as_ref().unwrap().len(), 1);
let elem = &list.0.as_ref().unwrap()[0];
assert_eq!(elem.destination_domain, Wildcard);
assert_eq!(elem.message_id, Wildcard);
assert_eq!(
elem.recipient_address,
Enumerated(vec!["0x9d4454B023096f34B160D6B654540c56A1F81688"
@ -387,6 +438,7 @@ mod test {
assert!(list.matches(
MatchInfo {
src_msg_id: H256::default(),
src_domain: 34,
src_addr: &"0x9d4454B023096f34B160D6B654540c56A1F81688"
.parse::<H160>()
@ -403,6 +455,7 @@ mod test {
assert!(!list.matches(
MatchInfo {
src_msg_id: H256::default(),
src_domain: 34,
src_addr: &"0x9d4454B023096f34B160D6B654540c56A1F81688"
.parse::<H160>()
@ -423,6 +476,7 @@ mod test {
assert_eq!(whitelist.0.as_ref().unwrap().len(), 1);
let elem = &whitelist.0.as_ref().unwrap()[0];
assert_eq!(elem.destination_domain, Enumerated(vec![9913372, 9913373]));
assert_eq!(elem.message_id, Wildcard);
assert_eq!(elem.recipient_address, Wildcard);
assert_eq!(elem.origin_domain, Wildcard);
assert_eq!(elem.sender_address, Wildcard);
@ -437,6 +491,7 @@ mod test {
#[test]
fn matches_empty_list() {
let info = MatchInfo {
src_msg_id: H256::default(),
src_domain: 0,
src_addr: &H256::default(),
dst_domain: 0,
@ -451,7 +506,7 @@ mod test {
#[test]
fn supports_base58() {
serde_json::from_str::<MatchingList>(
r#"[{"origindomain":1399811151,"senderaddress":"DdTMkk9nuqH5LnD56HLkPiKMV3yB3BNEYSQfgmJHa5i7","destinationdomain":11155111,"recipientaddress":"0x6AD4DEBA8A147d000C09de6465267a9047d1c217"}]"#,
r#"[{"messageid": "*", "origindomain":1399811151,"senderaddress":"DdTMkk9nuqH5LnD56HLkPiKMV3yB3BNEYSQfgmJHa5i7","destinationdomain":11155111,"recipientaddress":"0x6AD4DEBA8A147d000C09de6465267a9047d1c217"}]"#,
).unwrap();
}

@ -61,6 +61,12 @@ pub trait PendingOperation: Send + Sync + Debug + TryBatchAs<HyperlaneMessage> {
/// The domain this operation will take place on.
fn destination_domain(&self) -> &HyperlaneDomain;
/// The sender address of this operation.
fn sender_address(&self) -> &H256;
/// The recipient address of this operation.
fn recipient_address(&self) -> &H256;
/// Label to use for metrics granularity.
fn app_context(&self) -> Option<String>;

Loading…
Cancel
Save