Relayer retry endpoints (#3476)

### Description

Adds a `/message_retry` relayer endpoint, which supports retrying
OpQueue operations either by ID or by destination domain. Such a request
will be sent to an `mpmc` channel's receiving end in the OpQueue, which
is emptied when `OpQueue::pop` is called.

Example calls:
```
GET http://127.0.0.1:60843/message_retry?destination_domain=42
GET http://127.0.0.1:60843/message_retry?message_id=0x46910b1329ee53c86a023b322e9ca1c17e5f9f0bee789c77b0abced0a173d714
```

If the endpoint is called specifying both filters, **two** requests will
be sent across the channel, one for each condition.


Efficiency note: The entire queue is iterated over if there's at least
one retry request in the channel. The good thing is that rebuilding the
heap is `O(n)` so not too bad.

### Drive-by changes

- Removed the usage of `enum_dispatch`, because it would have made unit
tests messy. The drawback is that we have to work around object safety
restrictions in `PendingOperation` - for instance it can't be cloned
(but can be stored behind an `Arc` if we ever need this).
- Added new methods to `PendingOperation`, either to match by ID, or to
implement heap element prioritization without depending on the concrete
`PendingMessage` type (as was done before). Additions: `id()`,
`origin_domain()`, `priority()`. Don't think this conflicts with any
operations we may add in the future (e.g. gas oracle updates).
- Although `tokio`'s broadcast channel is multi-producer-multi-consumer
by default, the consumer end isn't `Clone` - you instead need to have a
producer to call `.subscribe()` on to get a new consumer, so I added an
`MpmcChannel` struct to encapsulate keeping a producer around to get new
consumers.
- OpQueue is moved into its own file and has unit tests added for
retries (covering broadcast to multi-queue, and both retry types).
- Did a bit of agent `server `cleanup, including `EigenNodeApi`


### Related issues

- Fixes https://github.com/hyperlane-xyz/hyperlane-monorepo/issues/3098

### Backward compatibility

Yes

### Testing

Unit tests for the server route (channel transmitter logic) and for the
OpQueue retries (channel receiver logic).
pull/3489/head
Daniel Savu 8 months ago committed by GitHub
parent 258bf85e43
commit 6e39cf01af
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
  1. 2
      rust/Cargo.lock
  2. 1
      rust/Cargo.toml
  3. 2
      rust/agents/relayer/Cargo.toml
  4. 1
      rust/agents/relayer/src/main.rs
  5. 1
      rust/agents/relayer/src/msg/mod.rs
  6. 303
      rust/agents/relayer/src/msg/op_queue.rs
  7. 20
      rust/agents/relayer/src/msg/pending_message.rs
  8. 76
      rust/agents/relayer/src/msg/pending_operation.rs
  9. 16
      rust/agents/relayer/src/msg/processor.rs
  10. 77
      rust/agents/relayer/src/msg/serial_submitter.rs
  11. 32
      rust/agents/relayer/src/relayer.rs
  12. 179
      rust/agents/relayer/src/server.rs
  13. 2
      rust/agents/scraper/src/agent.rs
  14. 12
      rust/agents/validator/src/server/eigen_node.rs
  15. 20
      rust/agents/validator/src/server/mod.rs
  16. 20
      rust/agents/validator/src/server/validator_server.rs
  17. 9
      rust/agents/validator/src/validator.rs
  18. 18
      rust/hyperlane-base/src/server/base_server.rs
  19. 50
      rust/hyperlane-core/src/types/channel.rs
  20. 4
      rust/hyperlane-core/src/types/mod.rs

2
rust/Cargo.lock generated

@ -6940,11 +6940,11 @@ name = "relayer"
version = "0.1.0"
dependencies = [
"async-trait",
"axum",
"config",
"convert_case 0.6.0",
"derive-new",
"derive_more",
"enum_dispatch",
"ethers",
"ethers-contract",
"eyre",

@ -80,7 +80,6 @@ derive-new = "0.5"
derive_builder = "0.12"
derive_more = "0.99"
ed25519-dalek = "~1.0"
enum_dispatch = "0.3"
eyre = "=0.6.8"
fixed-hash = "0.8.0"
fuels = "0.38"

@ -11,11 +11,11 @@ version.workspace = true
[dependencies]
async-trait.workspace = true
axum.workspace = true
config.workspace = true
convert_case.workspace = true
derive-new.workspace = true
derive_more.workspace = true
enum_dispatch.workspace = true
ethers-contract.workspace = true
ethers.workspace = true
eyre.workspace = true

@ -18,6 +18,7 @@ mod msg;
mod processor;
mod prover;
mod relayer;
mod server;
mod settings;
#[tokio::main(flavor = "current_thread")]

@ -27,6 +27,7 @@
pub(crate) mod gas_payment;
pub(crate) mod metadata;
pub(crate) mod op_queue;
pub(crate) mod pending_message;
pub(crate) mod pending_operation;
pub(crate) mod processor;

@ -0,0 +1,303 @@
use std::{cmp::Reverse, collections::BinaryHeap, sync::Arc};
use derive_new::new;
use hyperlane_core::MpmcReceiver;
use prometheus::{IntGauge, IntGaugeVec};
use tokio::sync::Mutex;
use tracing::info;
use crate::server::MessageRetryRequest;
use super::pending_operation::PendingOperation;
pub type QueueOperation = Box<dyn PendingOperation>;
/// Queue of generic operations that can be submitted to a destination chain.
/// Includes logic for maintaining queue metrics by the destination and `app_context` of an operation
#[derive(Debug, Clone, new)]
pub struct OpQueue {
metrics: IntGaugeVec,
queue_metrics_label: String,
retry_rx: MpmcReceiver<MessageRetryRequest>,
#[new(default)]
queue: Arc<Mutex<BinaryHeap<Reverse<QueueOperation>>>>,
}
impl OpQueue {
/// Push an element onto the queue and update metrics
pub async fn push(&self, op: QueueOperation) {
// increment the metric before pushing onto the queue, because we lose ownership afterwards
self.get_operation_metric(op.as_ref()).inc();
self.queue.lock().await.push(Reverse(op));
}
/// Pop an element from the queue and update metrics
pub async fn pop(&mut self) -> Option<Reverse<QueueOperation>> {
self.process_retry_requests().await;
let op = self.queue.lock().await.pop();
op.map(|op| {
// even if the metric is decremented here, the operation may fail to process and be re-added to the queue.
// in those cases, the queue length will decrease to zero until the operation is re-added.
self.get_operation_metric(op.0.as_ref()).dec();
op
})
}
pub async fn process_retry_requests(&mut self) {
// TODO: could rate-limit ourselves here, but we expect the volume of messages over this channel to
// be very low.
// The other consideration is whether to put the channel receiver in the OpQueue or in a dedicated task
// that also holds an Arc to the Mutex. For simplicity, we'll put it in the OpQueue for now.
let mut message_retry_requests = vec![];
while let Ok(message_id) = self.retry_rx.receiver.try_recv() {
message_retry_requests.push(message_id);
}
if message_retry_requests.is_empty() {
return;
}
let mut queue = self.queue.lock().await;
let mut reprioritized_queue: BinaryHeap<_> = queue
.drain()
.map(|Reverse(mut e)| {
// Can check for equality here because of the PartialEq implementation for MessageRetryRequest,
// but can't use `contains` because the types are different
if message_retry_requests.iter().any(|r| r == e) {
let destination_domain = e.destination_domain().to_string();
info!(
id = ?e.id(),
destination_domain, "Retrying OpQueue operation"
);
e.reset_attempts()
}
Reverse(e)
})
.collect();
queue.append(&mut reprioritized_queue);
}
/// Get the metric associated with this operation
fn get_operation_metric(&self, operation: &dyn PendingOperation) -> IntGauge {
let (destination, app_context) = operation.get_operation_labels();
self.metrics
.with_label_values(&[&destination, &self.queue_metrics_label, &app_context])
}
}
#[cfg(test)]
mod test {
use super::*;
use crate::msg::pending_operation::PendingOperationResult;
use hyperlane_core::{HyperlaneDomain, KnownHyperlaneDomain, MpmcChannel, H256};
use std::{
collections::VecDeque,
time::{Duration, Instant},
};
#[derive(Debug, Clone)]
struct MockPendingOperation {
id: H256,
seconds_to_next_attempt: u64,
destination_domain: HyperlaneDomain,
}
impl MockPendingOperation {
fn new(seconds_to_next_attempt: u64, destination_domain: HyperlaneDomain) -> Self {
Self {
id: H256::random(),
seconds_to_next_attempt,
destination_domain,
}
}
}
#[async_trait::async_trait]
impl PendingOperation for MockPendingOperation {
fn id(&self) -> H256 {
self.id
}
fn reset_attempts(&mut self) {
self.seconds_to_next_attempt = 0;
}
fn priority(&self) -> u32 {
todo!()
}
fn get_operation_labels(&self) -> (String, String) {
Default::default()
}
fn origin_domain_id(&self) -> u32 {
todo!()
}
fn destination_domain(&self) -> &HyperlaneDomain {
&self.destination_domain
}
fn app_context(&self) -> Option<String> {
todo!()
}
async fn prepare(&mut self) -> PendingOperationResult {
todo!()
}
/// Submit this operation to the blockchain and report if it was successful
/// or not.
async fn submit(&mut self) -> PendingOperationResult {
todo!()
}
/// This will be called after the operation has been submitted and is
/// responsible for checking if the operation has reached a point at
/// which we consider it safe from reorgs.
async fn confirm(&mut self) -> PendingOperationResult {
todo!()
}
fn next_attempt_after(&self) -> Option<Instant> {
Some(
Instant::now()
.checked_add(Duration::from_secs(self.seconds_to_next_attempt))
.unwrap(),
)
}
fn set_retries(&mut self, _retries: u32) {
todo!()
}
}
fn dummy_metrics_and_label() -> (IntGaugeVec, String) {
(
IntGaugeVec::new(
prometheus::Opts::new("op_queue", "OpQueue metrics"),
&["destination", "queue_metrics_label", "app_context"],
)
.unwrap(),
"queue_metrics_label".to_string(),
)
}
#[tokio::test]
async fn test_multiple_op_queues_message_id() {
let (metrics, queue_metrics_label) = dummy_metrics_and_label();
let mpmc_channel = MpmcChannel::new(100);
let mut op_queue_1 = OpQueue::new(
metrics.clone(),
queue_metrics_label.clone(),
mpmc_channel.receiver(),
);
let mut op_queue_2 = OpQueue::new(metrics, queue_metrics_label, mpmc_channel.receiver());
// Add some operations to the queue with increasing `next_attempt_after` values
let destination_domain: HyperlaneDomain = KnownHyperlaneDomain::Injective.into();
let messages_to_send = 5;
let mut ops: VecDeque<_> = (1..=messages_to_send)
.into_iter()
.map(|seconds_to_next_attempt| {
Box::new(MockPendingOperation::new(
seconds_to_next_attempt,
destination_domain.clone(),
)) as QueueOperation
})
.collect();
let op_ids: Vec<_> = ops.iter().map(|op| op.id()).collect();
// push to queue 1
for _ in 0..=2 {
op_queue_1.push(ops.pop_front().unwrap()).await;
}
// push to queue 2
for _ in 3..messages_to_send {
op_queue_2.push(ops.pop_front().unwrap()).await;
}
// Retry by message ids
let mpmc_tx = mpmc_channel.sender();
mpmc_tx
.send(MessageRetryRequest::MessageId(op_ids[1]))
.unwrap();
mpmc_tx
.send(MessageRetryRequest::MessageId(op_ids[2]))
.unwrap();
// Pop elements from queue 1
let mut queue_1_popped = vec![];
while let Some(op) = op_queue_1.pop().await {
queue_1_popped.push(op.0);
}
// The elements sent over the channel should be the first ones popped,
// regardless of their initial `next_attempt_after`
assert_eq!(queue_1_popped[0].id(), op_ids[2]);
assert_eq!(queue_1_popped[1].id(), op_ids[1]);
assert_eq!(queue_1_popped[2].id(), op_ids[0]);
// Pop elements from queue 2
let mut queue_2_popped = vec![];
while let Some(op) = op_queue_2.pop().await {
queue_2_popped.push(op.0);
}
// The elements should be popped in the order they were pushed, because there was no retry request for them
assert_eq!(queue_2_popped[0].id(), op_ids[3]);
assert_eq!(queue_2_popped[1].id(), op_ids[4]);
}
#[tokio::test]
async fn test_destination_domain() {
let (metrics, queue_metrics_label) = dummy_metrics_and_label();
let mpmc_channel = MpmcChannel::new(100);
let mut op_queue = OpQueue::new(
metrics.clone(),
queue_metrics_label.clone(),
mpmc_channel.receiver(),
);
// Add some operations to the queue with increasing `next_attempt_after` values
let destination_domain_1: HyperlaneDomain = KnownHyperlaneDomain::Injective.into();
let destination_domain_2: HyperlaneDomain = KnownHyperlaneDomain::Ethereum.into();
let ops = vec![
Box::new(MockPendingOperation::new(1, destination_domain_1.clone())) as QueueOperation,
Box::new(MockPendingOperation::new(2, destination_domain_1.clone())) as QueueOperation,
Box::new(MockPendingOperation::new(3, destination_domain_2.clone())) as QueueOperation,
Box::new(MockPendingOperation::new(4, destination_domain_2.clone())) as QueueOperation,
Box::new(MockPendingOperation::new(5, destination_domain_2.clone())) as QueueOperation,
];
let op_ids: Vec<_> = ops.iter().map(|op| op.id()).collect();
// push to queue
for op in ops {
op_queue.push(op).await;
}
// Retry by domain
let mpmc_tx = mpmc_channel.sender();
mpmc_tx
.send(MessageRetryRequest::DestinationDomain(
destination_domain_2.id(),
))
.unwrap();
// Pop elements from queue
let mut popped = vec![];
while let Some(op) = op_queue.pop().await {
popped.push(op.0.id());
}
// First messages should be those to `destination_domain_2` - their exact order depends on
// how they were stored in the heap
assert_eq!(popped[0], op_ids[2]);
assert_eq!(popped[1], op_ids[4]);
assert_eq!(popped[2], op_ids[3]);
// Non-retried messages should be at the end
assert_eq!(popped[3], op_ids[0]);
assert_eq!(popped[4], op_ids[1]);
}
}

@ -8,7 +8,7 @@ use async_trait::async_trait;
use derive_new::new;
use eyre::Result;
use hyperlane_base::{db::HyperlaneRocksDB, CoreMetrics};
use hyperlane_core::{HyperlaneChain, HyperlaneDomain, HyperlaneMessage, Mailbox, U256};
use hyperlane_core::{HyperlaneChain, HyperlaneDomain, HyperlaneMessage, Mailbox, H256, U256};
use prometheus::{IntCounter, IntGauge};
use tracing::{debug, error, info, instrument, trace, warn};
@ -101,7 +101,19 @@ impl Eq for PendingMessage {}
#[async_trait]
impl PendingOperation for PendingMessage {
fn domain(&self) -> &HyperlaneDomain {
fn id(&self) -> H256 {
self.message.id()
}
fn priority(&self) -> u32 {
self.message.nonce
}
fn origin_domain_id(&self) -> u32 {
self.message.origin
}
fn destination_domain(&self) -> &HyperlaneDomain {
self.ctx.destination_mailbox.domain()
}
@ -311,6 +323,10 @@ impl PendingOperation for PendingMessage {
self.next_attempt_after
}
fn reset_attempts(&mut self) {
self.reset_attempts();
}
#[cfg(test)]
fn set_retries(&mut self, retries: u32) {
self.set_retries(retries);

@ -1,11 +1,9 @@
use std::{cmp::Ordering, time::Instant};
use std::{cmp::Ordering, fmt::Debug, time::Instant};
use async_trait::async_trait;
use enum_dispatch::enum_dispatch;
use hyperlane_core::HyperlaneDomain;
use hyperlane_core::{HyperlaneDomain, H256};
#[allow(unused_imports)] // required for enum_dispatch
use super::pending_message::PendingMessage;
use super::op_queue::QueueOperation;
/// A pending operation that will be run by the submitter and cause a
/// transaction to be sent.
@ -27,10 +25,22 @@ use super::pending_message::PendingMessage;
/// responsible for checking if the operation has reached a point at which we
/// consider it safe from reorgs.
#[async_trait]
#[enum_dispatch]
pub trait PendingOperation {
pub trait PendingOperation: Send + Sync + Debug {
/// Get the unique identifier for this operation.
fn id(&self) -> H256;
/// A lower value means a higher priority, such as the message nonce
/// As new types of PendingOperations are added, an idea is to just use the
/// current length of the queue as this item's priority.
/// Overall this method isn't critical, since it's only used to compare
/// operations when neither of them have a `next_attempt_after`
fn priority(&self) -> u32;
/// The domain this originates from.
fn origin_domain_id(&self) -> u32;
/// The domain this operation will take place on.
fn domain(&self) -> &HyperlaneDomain;
fn destination_domain(&self) -> &HyperlaneDomain;
/// Label to use for metrics granularity.
fn app_context(&self) -> Option<String>;
@ -38,7 +48,7 @@ pub trait PendingOperation {
/// Get tuple of labels for metrics.
fn get_operation_labels(&self) -> (String, String) {
let app_context = self.app_context().unwrap_or("Unknown".to_string());
let destination = self.domain().to_string();
let destination = self.destination_domain().to_string();
(destination, app_context)
}
@ -62,50 +72,46 @@ pub trait PendingOperation {
/// returning `NotReady` if it is too early and matters.
fn next_attempt_after(&self) -> Option<Instant>;
/// Reset the number of attempts this operation has made, causing it to be
/// retried immediately.
fn reset_attempts(&mut self);
#[cfg(test)]
/// Set the number of times this operation has been retried.
fn set_retries(&mut self, retries: u32);
}
/// A "dynamic" pending operation implementation which knows about the
/// different sub types and can properly implement PartialEq and
/// PartialOrd for them.
#[enum_dispatch(PendingOperation)]
#[derive(Debug, PartialEq, Eq)]
pub enum DynPendingOperation {
PendingMessage,
}
impl PartialOrd for DynPendingOperation {
impl PartialOrd for QueueOperation {
fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
Some(self.cmp(other))
}
}
/// Sort by their next allowed attempt time and if no allowed time is set,
/// then put it in front of those with a time (they have been tried
/// before) and break ties between ones that have not been tried with
/// the nonce.
impl Ord for DynPendingOperation {
impl PartialEq for QueueOperation {
fn eq(&self, other: &Self) -> bool {
self.id().eq(&other.id())
}
}
impl Eq for QueueOperation {}
impl Ord for QueueOperation {
fn cmp(&self, other: &Self) -> Ordering {
use DynPendingOperation::*;
use Ordering::*;
match (self.next_attempt_after(), other.next_attempt_after()) {
(Some(a), Some(b)) => a.cmp(&b),
// No time means it should come before
(None, Some(_)) => Less,
(Some(_), None) => Greater,
(None, None) => match (self, other) {
(PendingMessage(a), PendingMessage(b)) => {
if a.message.origin == b.message.origin {
// Should execute in order of nonce for the same origin
a.message.nonce.cmp(&b.message.nonce)
} else {
// There is no priority between these messages, so arbitrarily use the id
a.message.id().cmp(&b.message.id())
}
(None, None) => {
if self.origin_domain_id() == other.origin_domain_id() {
// Should execute in order of nonce for the same origin
self.priority().cmp(&other.priority())
} else {
// There is no priority between these messages, so arbitrarily use the id
self.id().cmp(&other.id())
}
},
}
}
}
}

@ -14,8 +14,7 @@ use prometheus::IntGauge;
use tokio::sync::mpsc::UnboundedSender;
use tracing::{debug, trace};
use super::{metadata::AppContextClassifier, pending_message::*};
use crate::msg::pending_operation::DynPendingOperation;
use super::{metadata::AppContextClassifier, op_queue::QueueOperation, pending_message::*};
use crate::{processor::ProcessorExt, settings::matching_list::MatchingList};
/// Finds unprocessed messages from an origin and submits then through a channel
@ -29,7 +28,7 @@ pub struct MessageProcessor {
metrics: MessageProcessorMetrics,
/// channel for each destination chain to send operations (i.e. message
/// submissions) to
send_channels: HashMap<u32, UnboundedSender<Box<DynPendingOperation>>>,
send_channels: HashMap<u32, UnboundedSender<QueueOperation>>,
/// Needed context to send a message for each destination chain
destination_ctxs: HashMap<u32, Arc<MessageContext>>,
metric_app_contexts: Vec<(MatchingList, String)>,
@ -106,7 +105,7 @@ impl ProcessorExt for MessageProcessor {
self.destination_ctxs[&destination].clone(),
app_context,
);
self.send_channels[&destination].send(Box::new(pending_msg.into()))?;
self.send_channels[&destination].send(Box::new(pending_msg) as QueueOperation)?;
self.message_nonce += 1;
} else {
tokio::time::sleep(Duration::from_secs(1)).await;
@ -280,10 +279,7 @@ mod test {
origin_domain: &HyperlaneDomain,
destination_domain: &HyperlaneDomain,
db: &HyperlaneRocksDB,
) -> (
MessageProcessor,
UnboundedReceiver<Box<DynPendingOperation>>,
) {
) -> (MessageProcessor, UnboundedReceiver<QueueOperation>) {
let base_metadata_builder = dummy_metadata_builder(origin_domain, destination_domain, db);
let message_context = Arc::new(MessageContext {
destination_mailbox: Arc::new(MockMailboxContract::default()),
@ -294,7 +290,7 @@ mod test {
metrics: dummy_submission_metrics(),
});
let (send_channel, receive_channel) = mpsc::unbounded_channel::<Box<DynPendingOperation>>();
let (send_channel, receive_channel) = mpsc::unbounded_channel::<QueueOperation>();
(
MessageProcessor::new(
db.clone(),
@ -364,7 +360,7 @@ mod test {
destination_domain: &HyperlaneDomain,
db: &HyperlaneRocksDB,
num_operations: usize,
) -> Vec<Box<DynPendingOperation>> {
) -> Vec<QueueOperation> {
let (message_processor, mut receive_channel) =
dummy_message_processor(origin_domain, destination_domain, db);

@ -1,62 +1,22 @@
use std::cmp::Reverse;
use std::collections::BinaryHeap;
use std::sync::Arc;
use std::time::Duration;
use derive_new::new;
use futures_util::future::try_join_all;
use prometheus::{IntCounter, IntGauge, IntGaugeVec};
use prometheus::{IntCounter, IntGaugeVec};
use tokio::spawn;
use tokio::sync::{
mpsc::{self},
Mutex,
};
use tokio::sync::mpsc;
use tokio::task::JoinHandle;
use tokio::time::sleep;
use tracing::{debug, info_span, instrument, instrument::Instrumented, trace, Instrument};
use hyperlane_base::CoreMetrics;
use hyperlane_core::HyperlaneDomain;
use hyperlane_core::{HyperlaneDomain, MpmcReceiver};
use super::pending_operation::*;
/// Queue of generic operations that can be submitted to a destination chain.
/// Includes logic for maintaining queue metrics by the destination and `app_context` of an operation
#[derive(Debug, Clone, new)]
struct OpQueue {
metrics: IntGaugeVec,
queue_metrics_label: String,
#[new(default)]
queue: Arc<Mutex<BinaryHeap<Reverse<Box<DynPendingOperation>>>>>,
}
impl OpQueue {
/// Push an element onto the queue and update metrics
async fn push(&self, op: Box<DynPendingOperation>) {
// increment the metric before pushing onto the queue, because we lose ownership afterwards
self.get_operation_metric(&op).inc();
use crate::server::MessageRetryRequest;
self.queue.lock().await.push(Reverse(op));
}
/// Pop an element from the queue and update metrics
async fn pop(&self) -> Option<Reverse<Box<DynPendingOperation>>> {
let op = self.queue.lock().await.pop();
op.map(|op| {
// even if the metric is decremented here, the operation may fail to process and be re-added to the queue.
// in those cases, the queue length will decrease to zero until the operation is re-added.
self.get_operation_metric(&op.0).dec();
op
})
}
/// Get the metric associated with this operation
fn get_operation_metric(&self, operation: &DynPendingOperation) -> IntGauge {
let (destination, app_context) = operation.get_operation_labels();
self.metrics
.with_label_values(&[&destination, &self.queue_metrics_label, &app_context])
}
}
use super::op_queue::{OpQueue, QueueOperation};
use super::pending_operation::*;
/// SerialSubmitter accepts operations over a channel. It is responsible for
/// executing the right strategy to deliver those messages to the destination
@ -110,7 +70,9 @@ pub struct SerialSubmitter {
/// Domain this submitter delivers to.
domain: HyperlaneDomain,
/// Receiver for new messages to submit.
rx: mpsc::UnboundedReceiver<Box<DynPendingOperation>>,
rx: mpsc::UnboundedReceiver<QueueOperation>,
/// Receiver for retry requests.
retry_rx: MpmcReceiver<MessageRetryRequest>,
/// Metrics for serial submitter.
metrics: SerialSubmitterMetrics,
}
@ -126,14 +88,17 @@ impl SerialSubmitter {
domain,
metrics,
rx: rx_prepare,
retry_rx,
} = self;
let prepare_queue = OpQueue::new(
metrics.submitter_queue_length.clone(),
"prepare_queue".to_string(),
retry_rx.clone(),
);
let confirm_queue = OpQueue::new(
metrics.submitter_queue_length.clone(),
"confirm_queue".to_string(),
retry_rx,
);
// This is a channel because we want to only have a small number of messages
@ -182,7 +147,7 @@ impl SerialSubmitter {
#[instrument(skip_all, fields(%domain))]
async fn receive_task(
domain: HyperlaneDomain,
mut rx: mpsc::UnboundedReceiver<Box<DynPendingOperation>>,
mut rx: mpsc::UnboundedReceiver<QueueOperation>,
prepare_queue: OpQueue,
) {
// Pull any messages sent to this submitter
@ -190,7 +155,7 @@ async fn receive_task(
trace!(?op, "Received new operation");
// make sure things are getting wired up correctly; if this works in testing it
// should also be valid in production.
debug_assert_eq!(*op.domain(), domain);
debug_assert_eq!(*op.destination_domain(), domain);
prepare_queue.push(op).await;
}
}
@ -198,8 +163,8 @@ async fn receive_task(
#[instrument(skip_all, fields(%domain))]
async fn prepare_task(
domain: HyperlaneDomain,
prepare_queue: OpQueue,
tx_submit: mpsc::Sender<Box<DynPendingOperation>>,
mut prepare_queue: OpQueue,
tx_submit: mpsc::Sender<QueueOperation>,
metrics: SerialSubmitterMetrics,
) {
loop {
@ -213,7 +178,7 @@ async fn prepare_task(
};
trace!(?op, "Preparing operation");
debug_assert_eq!(*op.domain(), domain);
debug_assert_eq!(*op.destination_domain(), domain);
match op.prepare().await {
PendingOperationResult::Success => {
@ -243,14 +208,14 @@ async fn prepare_task(
#[instrument(skip_all, fields(%domain))]
async fn submit_task(
domain: HyperlaneDomain,
mut rx_submit: mpsc::Receiver<Box<DynPendingOperation>>,
mut rx_submit: mpsc::Receiver<QueueOperation>,
prepare_queue: OpQueue,
confirm_queue: OpQueue,
metrics: SerialSubmitterMetrics,
) {
while let Some(mut op) = rx_submit.recv().await {
trace!(?op, "Submitting operation");
debug_assert_eq!(*op.domain(), domain);
debug_assert_eq!(*op.destination_domain(), domain);
match op.submit().await {
PendingOperationResult::Success => {
@ -276,7 +241,7 @@ async fn submit_task(
async fn confirm_task(
domain: HyperlaneDomain,
prepare_queue: OpQueue,
confirm_queue: OpQueue,
mut confirm_queue: OpQueue,
metrics: SerialSubmitterMetrics,
) {
loop {
@ -287,7 +252,7 @@ async fn confirm_task(
};
trace!(?op, "Confirming operation");
debug_assert_eq!(*op.domain(), domain);
debug_assert_eq!(*op.destination_domain(), domain);
match op.confirm().await {
PendingOperationResult::Success => {

@ -16,7 +16,8 @@ use hyperlane_base::{
SequencedDataContractSync, WatermarkContractSync,
};
use hyperlane_core::{
HyperlaneDomain, HyperlaneMessage, InterchainGasPayment, MerkleTreeInsertion, U256,
HyperlaneDomain, HyperlaneMessage, InterchainGasPayment, MerkleTreeInsertion, MpmcChannel,
MpmcReceiver, U256,
};
use tokio::{
sync::{
@ -27,23 +28,24 @@ use tokio::{
};
use tracing::{info, info_span, instrument::Instrumented, warn, Instrument};
use crate::processor::Processor;
use crate::{
merkle_tree::builder::MerkleTreeBuilder,
msg::{
gas_payment::GasPaymentEnforcer,
metadata::{BaseMetadataBuilder, IsmAwareAppContextClassifier},
op_queue::QueueOperation,
pending_message::{MessageContext, MessageSubmissionMetrics},
pending_operation::DynPendingOperation,
processor::{MessageProcessor, MessageProcessorMetrics},
serial_submitter::{SerialSubmitter, SerialSubmitterMetrics},
},
server::{self as relayer_server, MessageRetryRequest},
settings::{matching_list::MatchingList, RelayerSettings},
};
use crate::{
merkle_tree::processor::{MerkleTreeProcessor, MerkleTreeProcessorMetrics},
processor::ProcessorExt,
};
use crate::{processor::Processor, server::ENDPOINT_MESSAGES_QUEUE_SIZE};
#[derive(Debug, Hash, PartialEq, Eq, Copy, Clone)]
struct ContextKey {
@ -276,23 +278,31 @@ impl BaseAgent for Relayer {
async fn run(self) {
let mut tasks = vec![];
// running http server
// run server
let mpmc_channel = MpmcChannel::<MessageRetryRequest>::new(ENDPOINT_MESSAGES_QUEUE_SIZE);
let custom_routes = relayer_server::routes(mpmc_channel.sender());
let server = self
.core
.settings
.server(self.core_metrics.clone())
.expect("Failed to create server");
let server_task = server.run(vec![]).instrument(info_span!("Relayer server"));
let server_task = server
.run_with_custom_routes(custom_routes)
.instrument(info_span!("Relayer server"));
tasks.push(server_task);
// send channels by destination chain
let mut send_channels = HashMap::with_capacity(self.destination_chains.len());
for (dest_domain, dest_conf) in &self.destination_chains {
let (send_channel, receive_channel) =
mpsc::unbounded_channel::<Box<DynPendingOperation>>();
let (send_channel, receive_channel) = mpsc::unbounded_channel::<QueueOperation>();
send_channels.insert(dest_domain.id(), send_channel);
tasks.push(self.run_destination_submitter(dest_domain, receive_channel));
tasks.push(self.run_destination_submitter(
dest_domain,
receive_channel,
mpmc_channel.receiver(),
));
let metrics_updater = MetricsUpdater::new(
dest_conf,
@ -374,7 +384,7 @@ impl Relayer {
fn run_message_processor(
&self,
origin: &HyperlaneDomain,
send_channels: HashMap<u32, UnboundedSender<Box<DynPendingOperation>>>,
send_channels: HashMap<u32, UnboundedSender<QueueOperation>>,
) -> Instrumented<JoinHandle<()>> {
let metrics = MessageProcessorMetrics::new(
&self.core.metrics,
@ -431,11 +441,13 @@ impl Relayer {
fn run_destination_submitter(
&self,
destination: &HyperlaneDomain,
receiver: UnboundedReceiver<Box<DynPendingOperation>>,
receiver: UnboundedReceiver<QueueOperation>,
retry_receiver_channel: MpmcReceiver<MessageRetryRequest>,
) -> Instrumented<JoinHandle<()>> {
let serial_submitter = SerialSubmitter::new(
destination.clone(),
receiver,
retry_receiver_channel,
SerialSubmitterMetrics::new(&self.core.metrics, destination),
);
let span = info_span!("SerialSubmitter", destination=%destination);

@ -0,0 +1,179 @@
use axum::{
extract::{Query, State},
routing, Router,
};
use derive_new::new;
use hyperlane_core::{ChainCommunicationError, H256};
use serde::Deserialize;
use std::str::FromStr;
use tokio::sync::broadcast::Sender;
use crate::msg::op_queue::QueueOperation;
const MESSAGE_RETRY_API_BASE: &str = "/message_retry";
pub const ENDPOINT_MESSAGES_QUEUE_SIZE: usize = 1_000;
/// Returns a vector of agent-specific endpoint routes to be served.
/// Can be extended with additional routes and feature flags to enable/disable individually.
pub fn routes(tx: Sender<MessageRetryRequest>) -> Vec<(&'static str, Router)> {
let message_retry_api = MessageRetryApi::new(tx);
vec![message_retry_api.get_route()]
}
#[derive(Clone, Debug, PartialEq, Eq)]
pub enum MessageRetryRequest {
MessageId(H256),
DestinationDomain(u32),
}
impl PartialEq<QueueOperation> for &MessageRetryRequest {
fn eq(&self, other: &QueueOperation) -> bool {
match self {
MessageRetryRequest::MessageId(message_id) => message_id == &other.id(),
MessageRetryRequest::DestinationDomain(destination_domain) => {
destination_domain == &other.destination_domain().id()
}
}
}
}
#[derive(new, Clone)]
pub struct MessageRetryApi {
tx: Sender<MessageRetryRequest>,
}
#[derive(Deserialize)]
struct RawMessageRetryRequest {
message_id: Option<String>,
destination_domain: Option<u32>,
}
impl TryFrom<RawMessageRetryRequest> for Vec<MessageRetryRequest> {
type Error = ChainCommunicationError;
fn try_from(request: RawMessageRetryRequest) -> Result<Self, Self::Error> {
let mut retry_requests = Vec::new();
if let Some(message_id) = request.message_id {
retry_requests.push(MessageRetryRequest::MessageId(H256::from_str(&message_id)?));
}
if let Some(destination_domain) = request.destination_domain {
retry_requests.push(MessageRetryRequest::DestinationDomain(destination_domain));
}
Ok(retry_requests)
}
}
async fn retry_message(
State(tx): State<Sender<MessageRetryRequest>>,
Query(request): Query<RawMessageRetryRequest>,
) -> String {
let retry_requests: Vec<MessageRetryRequest> = match request.try_into() {
Ok(retry_requests) => retry_requests,
// Technically it's bad practice to print the error message to the user, but
// this endpoint is for debugging purposes only.
Err(err) => {
return format!("Failed to parse retry request: {}", err);
}
};
if retry_requests.is_empty() {
return "No retry requests found. Please provide either a message_id or destination_domain.".to_string();
}
if let Err(err) = retry_requests
.into_iter()
.map(|req| tx.send(req))
.collect::<Result<Vec<_>, _>>()
{
return format!("Failed to send retry request to the queue: {}", err);
}
"Moved message(s) to the front of the queue".to_string()
}
impl MessageRetryApi {
pub fn router(&self) -> Router {
Router::new()
.route("/", routing::get(retry_message))
.with_state(self.tx.clone())
}
pub fn get_route(&self) -> (&'static str, Router) {
(MESSAGE_RETRY_API_BASE, self.router())
}
}
#[cfg(test)]
mod tests {
use super::*;
use axum::http::StatusCode;
use ethers::utils::hex::ToHex;
use hyperlane_core::{MpmcChannel, MpmcReceiver};
use std::net::SocketAddr;
fn setup_test_server() -> (SocketAddr, MpmcReceiver<MessageRetryRequest>) {
let mpmc_channel = MpmcChannel::<MessageRetryRequest>::new(ENDPOINT_MESSAGES_QUEUE_SIZE);
let message_retry_api = MessageRetryApi::new(mpmc_channel.sender());
let (path, retry_router) = message_retry_api.get_route();
let app = Router::new().nest(path, retry_router);
// Running the app in the background using a test server
let server =
axum::Server::bind(&"127.0.0.1:0".parse().unwrap()).serve(app.into_make_service());
let addr = server.local_addr();
tokio::spawn(server);
(addr, mpmc_channel.receiver())
}
#[tokio::test]
async fn test_message_id_retry() {
let (addr, mut rx) = setup_test_server();
// Create a random message ID
let message_id = H256::random();
// Send a GET request to the server
let response = reqwest::get(format!(
"http://{}{}?message_id={}",
addr,
MESSAGE_RETRY_API_BASE,
message_id.encode_hex::<String>()
))
.await
.unwrap();
// Check that the response status code is OK
assert_eq!(response.status(), StatusCode::OK);
assert_eq!(
rx.receiver.try_recv().unwrap(),
MessageRetryRequest::MessageId(message_id)
);
}
#[tokio::test]
async fn test_destination_domain_retry() {
let (addr, mut rx) = setup_test_server();
// Create a random destination domain
let destination_domain = 42;
// Send a GET request to the server
let response = reqwest::get(format!(
"http://{}{}?destination_domain={}",
addr, MESSAGE_RETRY_API_BASE, destination_domain
))
.await
.unwrap();
// Check that the response status code is OK
assert_eq!(response.status(), StatusCode::OK);
assert_eq!(
rx.receiver.try_recv().unwrap(),
MessageRetryRequest::DestinationDomain(destination_domain)
);
}
}

@ -100,7 +100,7 @@ impl BaseAgent for Scraper {
.settings
.server(self.core_metrics.clone())
.expect("Failed to create server");
let server_task = server.run(vec![]).instrument(info_span!("Relayer server"));
let server_task = server.run().instrument(info_span!("Relayer server"));
tasks.push(server_task);
for (domain, scraper) in self.scrapers.iter() {

@ -24,6 +24,8 @@ use hyperlane_core::HyperlaneDomain;
use serde::{Deserialize, Serialize};
use std::sync::Arc;
const EIGEN_NODE_API_BASE: &str = "/eigen";
#[derive(Serialize, Deserialize, PartialEq, Debug)]
enum ServiceStatus {
Up,
@ -47,12 +49,16 @@ struct Service {
}
#[derive(new)]
pub struct EigenNodeAPI {
pub struct EigenNodeApi {
origin_chain: HyperlaneDomain,
core_metrics: Arc<CoreMetrics>,
}
impl EigenNodeAPI {
impl EigenNodeApi {
pub fn get_route(&self) -> (&'static str, Router) {
(EIGEN_NODE_API_BASE, self.router())
}
pub fn router(&self) -> Router {
let core_metrics_clone = self.core_metrics.clone();
let origin_chain = self.origin_chain.clone();
@ -154,7 +160,7 @@ mod tests {
.with_label_values(&["validator_observed", "ethereum"])
.set(HEALTHY_OBSERVED_CHECKPOINT);
let node_api = EigenNodeAPI::new(
let node_api = EigenNodeApi::new(
HyperlaneDomain::new_test_domain("ethereum"),
Arc::clone(&core_metrics),
);

@ -1,5 +1,19 @@
pub mod eigen_node;
pub use eigen_node::EigenNodeAPI;
use std::{sync::Arc, vec};
pub mod validator_server;
pub use validator_server::ValidatorServer;
use axum::Router;
pub use eigen_node::EigenNodeApi;
use hyperlane_base::CoreMetrics;
use hyperlane_core::HyperlaneDomain;
/// Returns a vector of validator-specific endpoint routes to be served.
/// Can be extended with additional routes and feature flags to enable/disable individually.
pub fn routes(
origin_chain: HyperlaneDomain,
metrics: Arc<CoreMetrics>,
) -> Vec<(&'static str, Router)> {
let eigen_node_api = EigenNodeApi::new(origin_chain, metrics);
vec![eigen_node_api.get_route()]
}

@ -1,20 +0,0 @@
use crate::server::eigen_node::EigenNodeAPI;
use axum::routing::Router;
use hyperlane_base::CoreMetrics; // Add missing import statement
use hyperlane_core::HyperlaneDomain;
use std::sync::Arc;
pub struct ValidatorServer {
pub routes: Vec<(&'static str, Router)>,
}
impl ValidatorServer {
// add routes for servering EigenLayer specific routes compliant with the spec here https://eigen.nethermind.io/docs/spec/api/
pub fn new(origin_chain: HyperlaneDomain, metrics: Arc<CoreMetrics>) -> Self {
let mut routes = vec![];
let eigen_node_api = EigenNodeAPI::new(origin_chain, metrics);
routes.push(("/eigen", eigen_node_api.router()));
Self { routes }
}
}

@ -1,6 +1,6 @@
use std::{num::NonZeroU64, sync::Arc, time::Duration};
use crate::server::validator_server::ValidatorServer;
use crate::server as validator_server;
use async_trait::async_trait;
use derive_more::AsRef;
use eyre::Result;
@ -130,17 +130,16 @@ impl BaseAgent for Validator {
async fn run(mut self) {
let mut tasks = vec![];
let routes =
ValidatorServer::new(self.origin_chain.clone(), self.core.metrics.clone()).routes;
// run server
let custom_routes =
validator_server::routes(self.origin_chain.clone(), self.core.metrics.clone());
let server = self
.core
.settings
.server(self.core_metrics.clone())
.expect("Failed to create server");
let server_task = tokio::spawn(async move {
server.run(routes);
server.run_with_custom_routes(custom_routes);
})
.instrument(info_span!("Validator server"));
tasks.push(server_task);

@ -12,13 +12,21 @@ pub struct Server {
}
impl Server {
/// Run an HTTP server
pub fn run(self: Arc<Self>) -> JoinHandle<()> {
self.run_with_custom_routes(vec![])
}
/// Run an HTTP server serving agent-specific different routes
///
/// routes:
/// - metrics - serving OpenMetrics format reports on `/metrics`
/// - metrics - serving OpenMetrics format reports on `/metrics`
/// (this is compatible with Prometheus, which ought to be configured to scrape this endpoint)
/// - additional_routes - additional routes to be served by the server as per the specific agent
pub fn run(self: Arc<Self>, additional_routes: Vec<(&str, Router)>) -> JoinHandle<()> {
/// - custom_routes - additional routes to be served by the server as per the specific agent
pub fn run_with_custom_routes(
self: Arc<Self>,
custom_routes: Vec<(&str, Router)>,
) -> JoinHandle<()> {
let port = self.listen_port;
tracing::info!(port, "starting server on 0.0.0.0");
@ -29,7 +37,7 @@ impl Server {
get(move || Self::gather_metrics(core_metrics_clone)),
);
for (route, router) in additional_routes {
for (route, router) in custom_routes {
app = app.nest(route, router);
}
@ -88,7 +96,7 @@ mod tests {
let server = Arc::new(server);
// Run the server in the background
let _server_task = tokio::spawn(async move {
server.run(vec![]).await.unwrap();
server.run().await.unwrap();
});
tokio::time::sleep(tokio::time::Duration::from_millis(1000)).await;

@ -0,0 +1,50 @@
use derive_new::new;
use tokio::sync::broadcast::{Receiver, Sender};
/// Multi-producer, multi-consumer channel
pub struct MpmcChannel<T> {
sender: Sender<T>,
receiver: MpmcReceiver<T>,
}
impl<T: Clone> MpmcChannel<T> {
/// Creates a new `MpmcChannel` with the specified capacity.
///
/// # Arguments
///
/// * `capacity` - The maximum number of messages that can be buffered in the channel.
pub fn new(capacity: usize) -> Self {
let (sender, receiver) = tokio::sync::broadcast::channel(capacity);
Self {
sender: sender.clone(),
receiver: MpmcReceiver::new(sender, receiver),
}
}
/// Returns a clone of the sender end of the channel.
pub fn sender(&self) -> Sender<T> {
self.sender.clone()
}
/// Returns a clone of the receiver end of the channel.
pub fn receiver(&self) -> MpmcReceiver<T> {
self.receiver.clone()
}
}
/// Clonable receiving end of a multi-producer, multi-consumer channel
#[derive(Debug, new)]
pub struct MpmcReceiver<T> {
sender: Sender<T>,
/// The receiving end of the channel.
pub receiver: Receiver<T>,
}
impl<T> Clone for MpmcReceiver<T> {
fn clone(&self) -> Self {
Self {
sender: self.sender.clone(),
receiver: self.sender.subscribe(),
}
}
}

@ -8,6 +8,8 @@ pub use self::primitive_types::*;
pub use ::primitive_types as ethers_core_types;
pub use announcement::*;
pub use chain_data::*;
#[cfg(feature = "async")]
pub use channel::*;
pub use checkpoint::*;
pub use log_metadata::*;
pub use merkle_tree::*;
@ -17,6 +19,8 @@ use crate::{Decode, Encode, HyperlaneProtocolError};
mod announcement;
mod chain_data;
#[cfg(feature = "async")]
mod channel;
mod checkpoint;
mod log_metadata;
mod merkle_tree;

Loading…
Cancel
Save