fix: queue length metric bug (#4691)

### Description

Fix for https://github.com/hyperlane-xyz/hyperlane-monorepo/pull/4689,
where `Dropped` status operations weren't decremented from the metric. I
noticed there's still a small difference on a handful of chains, such as
celo, where even if Dropped ones are considered, the prep queue metric
reports an extra 53 ops compared to the actual queue. Could be that I
manually miscounted, I expect releasing this will be a big improvement
anyway.

### Backward compatibility

Yes

### Testing

No, just checked all places where we `.pop_many` but don't `push`
dan/bump-relayer-image
Daniel Savu 1 month ago committed by GitHub
parent 95fdc5f793
commit 777ef084a6
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
  1. 10
      rust/main/agents/relayer/src/msg/op_queue.rs
  2. 7
      rust/main/agents/relayer/src/msg/op_submitter.rs
  3. 13
      rust/main/hyperlane-core/src/traits/pending_operation.rs

@ -29,12 +29,10 @@ impl OpQueue {
/// it's very likely that its status has just changed, so this forces the caller to consider the new status
#[instrument(skip(self), ret, fields(queue_label=%self.queue_metrics_label), level = "trace")]
pub async fn push(&self, mut op: QueueOperation, new_status: Option<PendingOperationStatus>) {
if let Some(new_status) = new_status {
op.set_status_and_update_metrics(
new_status,
Arc::new(self.get_operation_metric(op.as_ref())),
);
}
op.set_status_and_update_metrics(
new_status,
Arc::new(self.get_operation_metric(op.as_ref())),
);
self.queue.lock().await.push(Reverse(op));
}

@ -293,6 +293,7 @@ async fn prepare_task(
}
PendingOperationResult::Drop => {
metrics.ops_dropped.inc();
op.decrement_metric_if_exists();
}
PendingOperationResult::Confirm(reason) => {
debug!(?op, "Pushing operation to confirm queue");
@ -369,6 +370,7 @@ async fn submit_single_operation(
}
PendingOperationResult::Drop => {
// Not expected to hit this case in `submit`, but it's here for completeness
op.decrement_metric_if_exists();
}
PendingOperationResult::Success | PendingOperationResult::Confirm(_) => {
confirm_op(op, confirm_queue, metrics).await
@ -457,9 +459,7 @@ async fn confirm_operation(
PendingOperationResult::Success => {
debug!(?op, "Operation confirmed");
metrics.ops_confirmed.inc();
if let Some(metric) = op.get_metric() {
metric.dec()
}
op.decrement_metric_if_exists();
}
PendingOperationResult::NotReady => {
confirm_queue.push(op, None).await;
@ -478,6 +478,7 @@ async fn confirm_operation(
}
PendingOperationResult::Drop => {
metrics.ops_dropped.inc();
op.decrement_metric_if_exists();
}
}
operation_result

@ -67,6 +67,13 @@ pub trait PendingOperation: Send + Sync + Debug + TryBatchAs<HyperlaneMessage> {
/// Get the metric associated with this operation.
fn get_metric(&self) -> Option<Arc<IntGauge>>;
/// Decrement the metric associated with this operation if it exists.
fn decrement_metric_if_exists(&self) {
if let Some(metric) = self.get_metric() {
metric.dec();
}
}
/// Set the metric associated with this operation.
fn set_metric(&mut self, metric: Arc<IntGauge>);
@ -80,10 +87,12 @@ pub trait PendingOperation: Send + Sync + Debug + TryBatchAs<HyperlaneMessage> {
/// Set the status of the operation and update the metrics.
fn set_status_and_update_metrics(
&mut self,
status: PendingOperationStatus,
status: Option<PendingOperationStatus>,
new_metric: Arc<IntGauge>,
) {
self.set_status(status);
if let Some(status) = status {
self.set_status(status);
}
if let Some(old_metric) = self.get_metric() {
old_metric.dec();
}

Loading…
Cancel
Save