chore: fix queue metric juggedness (#4689)

### Description

See https://github.com/hyperlane-xyz/hyperlane-monorepo/issues/4068 for
the problem description.

In this fix, whenever an operation is moved from one queue to another,
its metric count is decremented from the old queue and incremented for
the new one.

My initial implementation approach was to update these metrics inside
`queue.push(op)`, but the metrics for the operation's previous queue
aren't accessible there. #4068 suggests updating them in
`op.set_status`, which can't be done for the same reason, even if `op`
has a pointer to the current queue's metric internally.

So the fix I went for does store a pointer to the current queue metric
internally in `op`, but also adds a new
`op.set_status_and_update_metrics(status, new_queue_metric)` method,
which **must** be used if the queue metrics are to be correctly
calculated.

This works well except for when ops are removed from the confirm queue,
because in the absence of a call to `set_status_and_update_metrics`, no
metric decrementing is done. I considered using the `Drop` trait to
decrement, but it'd have to be implemented individually for each
`PendingOperation` type, which isn't very maintainable. I ended up
decrementing the metric in `confirm_operation`, which is called for both
batches and single submissions and, of course, all implementations of
`PendingOperation`.

Here's a screenshot of my local grafana server showing no jaggedness in
the e2e run, with prometheus configured to scrape every 2s:
![Screenshot 2024-10-15 at 17 26
56](https://github.com/user-attachments/assets/26004e0e-2ccf-4cec-aa23-ee2d032df25a)


### Drive-by changes

Adds the `prepare_queue` arg of `submit_single_operation` to the
`instrument(skip(...))` list so it no longer pollutes logs.

### Related issues

- Fixes https://github.com/hyperlane-xyz/hyperlane-monorepo/issues/4068

### Backward compatibility

Yes

### Testing

Manually, by checking the queue length metric of an e2e run in grafana
pull/4686/head
Daniel Savu 2 weeks ago committed by GitHub
parent 9382658db3
commit efd438f9b2
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
  1. 1
      rust/main/Cargo.lock
  2. 16
      rust/main/agents/relayer/src/msg/op_queue.rs
  3. 5
      rust/main/agents/relayer/src/msg/op_submitter.rs
  4. 11
      rust/main/agents/relayer/src/msg/pending_message.rs
  5. 1
      rust/main/hyperlane-core/Cargo.toml
  6. 21
      rust/main/hyperlane-core/src/traits/pending_operation.rs
  7. 33
      rust/sealevel/Cargo.lock

@ -4489,6 +4489,7 @@ dependencies = [
"num-derive 0.4.2",
"num-traits",
"primitive-types",
"prometheus",
"serde",
"serde_json",
"sha3 0.10.8",

@ -30,10 +30,11 @@ impl OpQueue {
#[instrument(skip(self), ret, fields(queue_label=%self.queue_metrics_label), level = "trace")]
pub async fn push(&self, mut op: QueueOperation, new_status: Option<PendingOperationStatus>) {
if let Some(new_status) = new_status {
op.set_status(new_status);
op.set_status_and_update_metrics(
new_status,
Arc::new(self.get_operation_metric(op.as_ref())),
);
}
// increment the metric before pushing onto the queue, because we lose ownership afterwards
self.get_operation_metric(op.as_ref()).inc();
self.queue.lock().await.push(Reverse(op));
}
@ -52,9 +53,6 @@ impl OpQueue {
let mut queue = self.queue.lock().await;
let mut popped = vec![];
while let Some(Reverse(op)) = queue.pop() {
// even if the metric is decremented here, the operation may fail to process and be re-added to the queue.
// in those cases, the queue length will look like it has spikes whose sizes are at most `limit`
self.get_operation_metric(op.as_ref()).dec();
popped.push(op);
if popped.len() >= limit {
break;
@ -242,6 +240,12 @@ pub mod test {
fn set_retries(&mut self, _retries: u32) {
todo!()
}
fn get_metric(&self) -> Option<Arc<IntGauge>> {
None
}
fn set_metric(&mut self, _metric: Arc<IntGauge>) {}
}
pub fn dummy_metrics_and_label() -> (IntGaugeVec, String) {

@ -341,7 +341,7 @@ async fn submit_task(
}
}
#[instrument(skip(confirm_queue, metrics), ret, level = "debug")]
#[instrument(skip(prepare_queue, confirm_queue, metrics), ret, level = "debug")]
async fn submit_single_operation(
mut op: QueueOperation,
prepare_queue: &mut OpQueue,
@ -457,6 +457,9 @@ async fn confirm_operation(
PendingOperationResult::Success => {
debug!(?op, "Operation confirmed");
metrics.ops_confirmed.inc();
if let Some(metric) = op.get_metric() {
metric.dec()
}
}
PendingOperationResult::NotReady => {
confirm_queue.push(op, None).await;

@ -82,6 +82,9 @@ pub struct PendingMessage {
#[new(default)]
#[serde(skip_serializing)]
metadata: Option<Vec<u8>>,
#[new(default)]
#[serde(skip_serializing)]
metric: Option<Arc<IntGauge>>,
}
impl Debug for PendingMessage {
@ -481,6 +484,14 @@ impl PendingOperation for PendingMessage {
fn try_get_mailbox(&self) -> Option<Arc<dyn Mailbox>> {
Some(self.ctx.destination_mailbox.clone())
}
fn get_metric(&self) -> Option<Arc<IntGauge>> {
self.metric.clone()
}
fn set_metric(&mut self, metric: Arc<IntGauge>) {
self.metric = Some(metric);
}
}
impl PendingMessage {

@ -33,6 +33,7 @@ itertools.workspace = true
num = { workspace = true, features = ["serde"] }
num-derive.workspace = true
num-traits.workspace = true
prometheus.workspace = true
serde = { workspace = true }
serde_json = { workspace = true }
sha3 = { workspace = true }

@ -13,6 +13,7 @@ use crate::{
};
use async_trait::async_trait;
use num::CheckedDiv;
use prometheus::IntGauge;
use strum::Display;
use tracing::warn;
@ -63,6 +64,12 @@ pub trait PendingOperation: Send + Sync + Debug + TryBatchAs<HyperlaneMessage> {
/// Label to use for metrics granularity.
fn app_context(&self) -> Option<String>;
/// Get the metric associated with this operation.
fn get_metric(&self) -> Option<Arc<IntGauge>>;
/// Set the metric associated with this operation.
fn set_metric(&mut self, metric: Arc<IntGauge>);
/// The status of the operation, which should explain why it is in the
/// queue.
fn status(&self) -> PendingOperationStatus;
@ -70,6 +77,20 @@ pub trait PendingOperation: Send + Sync + Debug + TryBatchAs<HyperlaneMessage> {
/// Set the status of the operation.
fn set_status(&mut self, status: PendingOperationStatus);
/// Set the status of the operation and update the metrics.
fn set_status_and_update_metrics(
&mut self,
status: PendingOperationStatus,
new_metric: Arc<IntGauge>,
) {
self.set_status(status);
if let Some(old_metric) = self.get_metric() {
old_metric.dec();
}
new_metric.inc();
self.set_metric(new_metric);
}
/// Get tuple of labels for metrics.
fn get_operation_labels(&self) -> (String, String) {
let app_context = self.app_context().unwrap_or("Unknown".to_string());

@ -2390,10 +2390,11 @@ dependencies = [
"fixed-hash 0.8.0 (registry+https://github.com/rust-lang/crates.io-index)",
"getrandom 0.2.15",
"hex",
"itertools 0.12.1",
"itertools 0.13.0",
"num 0.4.3",
"num-derive 0.4.2",
"num-traits",
"prometheus",
"serde",
"serde_json",
"sha3 0.10.8",
@ -2995,15 +2996,6 @@ dependencies = [
"either",
]
[[package]]
name = "itertools"
version = "0.12.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "ba291022dbbd398a455acf126c1e341954079855bc60dfdda641363bd6922569"
dependencies = [
"either",
]
[[package]]
name = "itertools"
version = "0.13.0"
@ -3993,6 +3985,27 @@ dependencies = [
"unicode-ident",
]
[[package]]
name = "prometheus"
version = "0.13.4"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "3d33c28a30771f7f96db69893f78b857f7450d7e0237e9c8fc6427a81bae7ed1"
dependencies = [
"cfg-if",
"fnv",
"lazy_static",
"memchr",
"parking_lot 0.12.3",
"protobuf",
"thiserror",
]
[[package]]
name = "protobuf"
version = "2.28.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "106dd99e98437432fed6519dedecfade6a06a73bb7b2a1e019fdd2bee5778d94"
[[package]]
name = "qstring"
version = "0.7.2"

Loading…
Cancel
Save