fix(relayer): don't submit reverting messages in batches (#4169)

### Description

Depends on https://github.com/hyperlane-xyz/hyperlane-monorepo/pull/4159

Some messages in a Multicall would revert because previous messages
would invalidate them temporarily. The IGP allowance of those messages
would be deducted, and if this happened more than once, those messages
would become stuck.

This PR fixes those unnecesary spends on reverting `process` txs by
using `.call()` to simulate the outcome of a Multicall tx. Reverting
messages are removed from the batch and submitted serially, and if the
batch is only left with one non-reverting message, even that one is
submitted serially (rather than through a batch of one message).

Messages that did get submitted have their igp allowance deducted and
status updated in `update_sent_ops_state`.

### Drive-by changes

<!--
Are there any minor or drive-by changes also included?
-->

### Related issues

- Fixes https://github.com/hyperlane-xyz/issues/issues/1294
### Backward compatibility

<!--
Are these changes backward compatible? Are there any infrastructure
implications, e.g. changes that would prohibit deploying older commits
using this infra tooling?

Yes/No
-->

### Testing

Tested by manually setting the result of `call` to failed for all
messages, to see if they are submitted serially. In prod, we can track
the `Submitted transaction batch` log, which now includes any excluded
messages from the batch.
pull/3149/merge
Daniel Savu 3 months ago committed by GitHub
parent bb16dfa728
commit 97588b6fc1
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
  1. 1
      rust/Cargo.lock
  2. 94
      rust/agents/relayer/src/msg/op_submitter.rs
  3. 4
      rust/agents/relayer/src/msg/pending_message.rs
  4. 1
      rust/chains/hyperlane-ethereum/Cargo.toml
  5. 116
      rust/chains/hyperlane-ethereum/src/contracts/mailbox.rs
  6. 35
      rust/hyperlane-core/src/traits/mailbox.rs
  7. 8
      rust/hyperlane-core/src/traits/pending_operation.rs
  8. 2
      rust/hyperlane-test/src/mocks/mailbox.rs

1
rust/Cargo.lock generated

@ -4345,6 +4345,7 @@ dependencies = [
"futures-util", "futures-util",
"hex 0.4.3", "hex 0.4.3",
"hyperlane-core", "hyperlane-core",
"itertools 0.10.5",
"num 0.4.1", "num 0.4.1",
"num-traits", "num-traits",
"reqwest", "reqwest",

@ -5,8 +5,12 @@ use derive_new::new;
use futures::future::join_all; use futures::future::join_all;
use futures_util::future::try_join_all; use futures_util::future::try_join_all;
use hyperlane_core::total_estimated_cost; use hyperlane_core::total_estimated_cost;
use hyperlane_core::BatchResult;
use hyperlane_core::ConfirmReason::*; use hyperlane_core::ConfirmReason::*;
use hyperlane_core::PendingOperation;
use hyperlane_core::PendingOperationStatus; use hyperlane_core::PendingOperationStatus;
use itertools::Either;
use itertools::Itertools;
use prometheus::{IntCounter, IntGaugeVec}; use prometheus::{IntCounter, IntGaugeVec};
use tokio::sync::broadcast::Sender; use tokio::sync::broadcast::Sender;
use tokio::sync::mpsc; use tokio::sync::mpsc;
@ -19,8 +23,8 @@ use tracing::{info, warn};
use hyperlane_base::CoreMetrics; use hyperlane_base::CoreMetrics;
use hyperlane_core::{ use hyperlane_core::{
BatchItem, ChainCommunicationError, ChainResult, HyperlaneDomain, HyperlaneDomainProtocol, ChainCommunicationError, ChainResult, HyperlaneDomain, HyperlaneDomainProtocol,
HyperlaneMessage, PendingOperationResult, QueueOperation, TxOutcome, PendingOperationResult, QueueOperation, TxOutcome,
}; };
use crate::msg::pending_message::CONFIRM_DELAY; use crate::msg::pending_message::CONFIRM_DELAY;
@ -480,48 +484,84 @@ struct OperationBatch {
impl OperationBatch { impl OperationBatch {
async fn submit(self, confirm_queue: &mut OpQueue, metrics: &SerialSubmitterMetrics) { async fn submit(self, confirm_queue: &mut OpQueue, metrics: &SerialSubmitterMetrics) {
match self.try_submit_as_batch(metrics).await { let excluded_ops = match self.try_submit_as_batch(metrics).await {
Ok(outcome) => { Ok(batch_result) => {
info!(outcome=?outcome, batch_size=self.operations.len(), batch=?self.operations, "Submitted transaction batch"); Self::handle_batch_result(self.operations, batch_result, confirm_queue).await
let total_estimated_cost = total_estimated_cost(&self.operations);
for mut op in self.operations {
op.set_operation_outcome(outcome.clone(), total_estimated_cost);
op.set_next_attempt_after(CONFIRM_DELAY);
confirm_queue
.push(op, Some(PendingOperationStatus::Confirm(SubmittedBySelf)))
.await;
}
return;
} }
Err(e) => { Err(e) => {
warn!(error=?e, batch=?self.operations, "Error when submitting batch. Falling back to serial submission."); warn!(error=?e, batch=?self.operations, "Error when submitting batch");
self.operations
} }
};
if !excluded_ops.is_empty() {
warn!(excluded_ops=?excluded_ops, "Either the batch tx would revert, or the operations would revert in the batch. Falling back to serial submission.");
OperationBatch::new(excluded_ops, self.domain)
.submit_serially(confirm_queue, metrics)
.await;
} }
self.submit_serially(confirm_queue, metrics).await;
} }
#[instrument(skip(metrics), ret, level = "debug")] #[instrument(skip(metrics), ret, level = "debug")]
async fn try_submit_as_batch( async fn try_submit_as_batch(
&self, &self,
metrics: &SerialSubmitterMetrics, metrics: &SerialSubmitterMetrics,
) -> ChainResult<TxOutcome> { ) -> ChainResult<BatchResult> {
let batch = self
.operations
.iter()
.map(|op| op.try_batch())
.collect::<ChainResult<Vec<BatchItem<HyperlaneMessage>>>>()?;
// We already assume that the relayer submits to a single mailbox per destination. // We already assume that the relayer submits to a single mailbox per destination.
// So it's fine to use the first item in the batch to get the mailbox. // So it's fine to use the first item in the batch to get the mailbox.
let Some(first_item) = batch.first() else { let Some(first_item) = self.operations.first() else {
return Err(ChainCommunicationError::BatchIsEmpty); return Err(ChainCommunicationError::BatchIsEmpty);
}; };
let outcome = if let Some(mailbox) = first_item.try_get_mailbox() {
let outcome = first_item.mailbox.process_batch(&batch).await?; mailbox
metrics.ops_submitted.inc_by(self.operations.len() as u64); .try_process_batch(self.operations.iter().collect_vec())
.await?
} else {
BatchResult::failed(self.operations.len())
};
let ops_submitted = self.operations.len() - outcome.failed_indexes.len();
metrics.ops_submitted.inc_by(ops_submitted as u64);
Ok(outcome) Ok(outcome)
} }
/// Process the operations sent by a batch.
/// Returns the operations that were not sent
async fn handle_batch_result(
operations: Vec<QueueOperation>,
batch_result: BatchResult,
confirm_queue: &mut OpQueue,
) -> Vec<Box<dyn PendingOperation>> {
let (sent_ops, excluded_ops): (Vec<_>, Vec<_>) =
operations.into_iter().enumerate().partition_map(|(i, op)| {
if !batch_result.failed_indexes.contains(&i) {
Either::Left(op)
} else {
Either::Right(op)
}
});
if let Some(outcome) = batch_result.outcome {
info!(batch_size=sent_ops.len(), outcome=?outcome, batch=?sent_ops, ?excluded_ops, "Submitted transaction batch");
Self::update_sent_ops_state(sent_ops, outcome, confirm_queue).await;
}
excluded_ops
}
async fn update_sent_ops_state(
sent_ops: Vec<Box<dyn PendingOperation>>,
outcome: TxOutcome,
confirm_queue: &mut OpQueue,
) {
let total_estimated_cost = total_estimated_cost(sent_ops.as_slice());
for mut op in sent_ops {
op.set_operation_outcome(outcome.clone(), total_estimated_cost);
op.set_next_attempt_after(CONFIRM_DELAY);
confirm_queue
.push(op, Some(PendingOperationStatus::Confirm(SubmittedBySelf)))
.await;
}
}
async fn submit_serially(self, confirm_queue: &mut OpQueue, metrics: &SerialSubmitterMetrics) { async fn submit_serially(self, confirm_queue: &mut OpQueue, metrics: &SerialSubmitterMetrics) {
for op in self.operations.into_iter() { for op in self.operations.into_iter() {
submit_single_operation(op, confirm_queue, metrics).await; submit_single_operation(op, confirm_queue, metrics).await;

@ -453,6 +453,10 @@ impl PendingOperation for PendingMessage {
fn set_retries(&mut self, retries: u32) { fn set_retries(&mut self, retries: u32) {
self.set_retries(retries); self.set_retries(retries);
} }
fn try_get_mailbox(&self) -> Option<Arc<dyn Mailbox>> {
Some(self.ctx.destination_mailbox.clone())
}
} }
impl PendingMessage { impl PendingMessage {

@ -20,6 +20,7 @@ ethers.workspace = true
eyre.workspace = true eyre.workspace = true
futures-util.workspace = true futures-util.workspace = true
hex.workspace = true hex.workspace = true
itertools.workspace = true
num.workspace = true num.workspace = true
num-traits.workspace = true num-traits.workspace = true
reqwest.workspace = true reqwest.workspace = true

@ -7,11 +7,14 @@ use std::ops::RangeInclusive;
use std::sync::Arc; use std::sync::Arc;
use async_trait::async_trait; use async_trait::async_trait;
use derive_new::new;
use ethers::abi::{AbiEncode, Detokenize}; use ethers::abi::{AbiEncode, Detokenize};
use ethers::prelude::Middleware; use ethers::prelude::Middleware;
use ethers_contract::builders::ContractCall; use ethers_contract::builders::ContractCall;
use ethers_contract::{Multicall, MulticallResult};
use futures_util::future::join_all; use futures_util::future::join_all;
use hyperlane_core::H512; use hyperlane_core::{BatchResult, QueueOperation, H512};
use itertools::Itertools;
use tracing::instrument; use tracing::instrument;
use hyperlane_core::{ use hyperlane_core::{
@ -28,7 +31,7 @@ use crate::interfaces::i_mailbox::{
}; };
use crate::interfaces::mailbox::DispatchFilter; use crate::interfaces::mailbox::DispatchFilter;
use crate::tx::{call_with_lag, fill_tx_gas_params, report_tx}; use crate::tx::{call_with_lag, fill_tx_gas_params, report_tx};
use crate::{BuildableWithProvider, ConnectionConf, EthereumProvider}; use crate::{BuildableWithProvider, ConnectionConf, EthereumProvider, TransactionOverrides};
use super::multicall::{self, build_multicall}; use super::multicall::{self, build_multicall};
use super::utils::fetch_raw_logs_and_log_meta; use super::utils::fetch_raw_logs_and_log_meta;
@ -324,6 +327,96 @@ where
) )
.await .await
} }
async fn simulate_batch(
&self,
multicall: &mut Multicall<M>,
contract_calls: Vec<ContractCall<M, ()>>,
) -> ChainResult<BatchSimulation<M>> {
let batch = multicall::batch::<_, ()>(multicall, contract_calls.clone()).await?;
let call_results = batch.call().await?;
let failed_calls = contract_calls
.iter()
.zip(call_results.iter())
.enumerate()
.filter_map(
|(index, (_, result))| {
if !result.success {
Some(index)
} else {
None
}
},
)
.collect_vec();
// only send a batch if there are at least two successful calls
let call_count = contract_calls.len();
let successful_calls = call_count - failed_calls.len();
if successful_calls >= 2 {
Ok(BatchSimulation::new(
Some(self.submittable_batch(batch)),
failed_calls,
))
} else {
Ok(BatchSimulation::failed(call_count))
}
}
fn submittable_batch(
&self,
call: ContractCall<M, Vec<MulticallResult>>,
) -> SubmittableBatch<M> {
SubmittableBatch {
call,
provider: self.provider.clone(),
transaction_overrides: self.conn.transaction_overrides.clone(),
}
}
}
#[derive(new)]
pub struct BatchSimulation<M> {
pub call: Option<SubmittableBatch<M>>,
/// Indexes of excluded calls in the batch (because they either failed the simulation
/// or they were the only successful call)
pub excluded_call_indexes: Vec<usize>,
}
impl<M> BatchSimulation<M> {
pub fn failed(ops_count: usize) -> Self {
Self::new(None, (0..ops_count).collect())
}
}
impl<M: Middleware + 'static> BatchSimulation<M> {
pub async fn try_submit(self) -> ChainResult<BatchResult> {
if let Some(submittable_batch) = self.call {
let batch_outcome = submittable_batch.submit().await?;
Ok(BatchResult::new(
Some(batch_outcome),
self.excluded_call_indexes,
))
} else {
Ok(BatchResult::failed(self.excluded_call_indexes.len()))
}
}
}
pub struct SubmittableBatch<M> {
pub call: ContractCall<M, Vec<MulticallResult>>,
provider: Arc<M>,
transaction_overrides: TransactionOverrides,
}
impl<M: Middleware + 'static> SubmittableBatch<M> {
pub async fn submit(self) -> ChainResult<TxOutcome> {
let call_with_gas_overrides =
fill_tx_gas_params(self.call, self.provider, &self.transaction_overrides).await?;
let outcome = report_tx(call_with_gas_overrides).await?;
Ok(outcome.into())
}
} }
impl<M> HyperlaneChain for EthereumMailbox<M> impl<M> HyperlaneChain for EthereumMailbox<M>
@ -397,11 +490,15 @@ where
Ok(receipt.into()) Ok(receipt.into())
} }
#[instrument(skip(self, messages), fields(size=%messages.len()))] #[instrument(skip(self, ops), fields(size=%ops.len()))]
async fn process_batch( async fn try_process_batch<'a>(
&self, &self,
messages: &[BatchItem<HyperlaneMessage>], ops: Vec<&'a QueueOperation>,
) -> ChainResult<TxOutcome> { ) -> ChainResult<BatchResult> {
let messages = ops
.iter()
.map(|op| op.try_batch())
.collect::<ChainResult<Vec<BatchItem<HyperlaneMessage>>>>()?;
let mut multicall = build_multicall(self.provider.clone(), &self.conn, self.domain.clone()) let mut multicall = build_multicall(self.provider.clone(), &self.conn, self.domain.clone())
.await .await
.map_err(|e| HyperlaneEthereumError::MulticallError(e.to_string()))?; .map_err(|e| HyperlaneEthereumError::MulticallError(e.to_string()))?;
@ -421,11 +518,8 @@ where
.into_iter() .into_iter()
.collect::<ChainResult<Vec<_>>>()?; .collect::<ChainResult<Vec<_>>>()?;
let batch_call = multicall::batch::<_, ()>(&mut multicall, contract_calls).await?; let batch_simulation = self.simulate_batch(&mut multicall, contract_calls).await?;
let call = self.add_gas_overrides(batch_call).await?; batch_simulation.try_submit().await
let receipt = report_tx(call).await?;
Ok(receipt.into())
} }
#[instrument(skip(self), fields(msg=%message, metadata=%bytes_to_hex(metadata)))] #[instrument(skip(self), fields(msg=%message, metadata=%bytes_to_hex(metadata)))]

@ -2,10 +2,11 @@ use std::fmt::Debug;
use std::num::NonZeroU64; use std::num::NonZeroU64;
use async_trait::async_trait; use async_trait::async_trait;
use derive_new::new;
use crate::{ use crate::{
traits::TxOutcome, utils::domain_hash, BatchItem, ChainCommunicationError, ChainResult, traits::TxOutcome, utils::domain_hash, BatchItem, ChainCommunicationError, ChainResult,
HyperlaneContract, HyperlaneMessage, TxCostEstimate, H256, U256, HyperlaneContract, HyperlaneMessage, QueueOperation, TxCostEstimate, H256, U256,
}; };
/// Interface for the Mailbox chain contract. Allows abstraction over different /// Interface for the Mailbox chain contract. Allows abstraction over different
@ -44,7 +45,17 @@ pub trait Mailbox: HyperlaneContract + Send + Sync + Debug {
async fn process_batch( async fn process_batch(
&self, &self,
_messages: &[BatchItem<HyperlaneMessage>], _messages: &[BatchItem<HyperlaneMessage>],
) -> ChainResult<TxOutcome> { ) -> ChainResult<BatchResult> {
// Batching is not supported by default
Err(ChainCommunicationError::BatchingFailed)
}
/// Try process the given operations as a batch. Returns the outcome of the
/// batch (if one was submitted) and the operations that were not submitted.
async fn try_process_batch<'a>(
&self,
_ops: Vec<&'a QueueOperation>,
) -> ChainResult<BatchResult> {
// Batching is not supported by default // Batching is not supported by default
Err(ChainCommunicationError::BatchingFailed) Err(ChainCommunicationError::BatchingFailed)
} }
@ -60,3 +71,23 @@ pub trait Mailbox: HyperlaneContract + Send + Sync + Debug {
/// against the provided signed checkpoint /// against the provided signed checkpoint
fn process_calldata(&self, message: &HyperlaneMessage, metadata: &[u8]) -> Vec<u8>; fn process_calldata(&self, message: &HyperlaneMessage, metadata: &[u8]) -> Vec<u8>;
} }
/// The result of processing a batch of messages
#[derive(new, Debug)]
pub struct BatchResult {
/// The outcome of executing the batch, if one was sent
pub outcome: Option<TxOutcome>,
/// Indexes of excluded calls from the batch (i.e. that were not executed)
pub failed_indexes: Vec<usize>,
}
impl BatchResult {
/// Create a BatchResult from a failed simulation, given the number of operations
/// in the simulated batch
pub fn failed(ops_count: usize) -> Self {
Self {
outcome: None,
failed_indexes: (0..ops_count).collect(),
}
}
}

@ -3,12 +3,13 @@ use std::{
cmp::Ordering, cmp::Ordering,
fmt::{Debug, Display}, fmt::{Debug, Display},
io::Write, io::Write,
sync::Arc,
time::{Duration, Instant}, time::{Duration, Instant},
}; };
use crate::{ use crate::{
ChainResult, Decode, Encode, FixedPointNumber, HyperlaneDomain, HyperlaneMessage, ChainResult, Decode, Encode, FixedPointNumber, HyperlaneDomain, HyperlaneMessage,
HyperlaneProtocolError, TryBatchAs, TxOutcome, H256, U256, HyperlaneProtocolError, Mailbox, TryBatchAs, TxOutcome, H256, U256,
}; };
use async_trait::async_trait; use async_trait::async_trait;
use num::CheckedDiv; use num::CheckedDiv;
@ -118,6 +119,11 @@ pub trait PendingOperation: Send + Sync + Debug + TryBatchAs<HyperlaneMessage> {
/// Set the number of times this operation has been retried. /// Set the number of times this operation has been retried.
#[cfg(any(test, feature = "test-utils"))] #[cfg(any(test, feature = "test-utils"))]
fn set_retries(&mut self, retries: u32); fn set_retries(&mut self, retries: u32);
/// If this operation points to a mailbox contract, return it
fn try_get_mailbox(&self) -> Option<Arc<dyn Mailbox>> {
None
}
} }
#[derive(Debug, Display, Clone, Serialize, Deserialize, PartialEq)] #[derive(Debug, Display, Clone, Serialize, Deserialize, PartialEq)]

@ -96,7 +96,7 @@ impl Mailbox for MockMailboxContract {
async fn process_batch( async fn process_batch(
&self, &self,
messages: &[BatchItem<HyperlaneMessage>], messages: &[BatchItem<HyperlaneMessage>],
) -> ChainResult<TxOutcome> { ) -> ChainResult<BatchResult> {
self.process_batch(messages).await self.process_batch(messages).await
} }

Loading…
Cancel
Save