From 97588b6fc1c1e6d1451b6b150c0a74e21eff83b2 Mon Sep 17 00:00:00 2001 From: Daniel Savu <23065004+daniel-savu@users.noreply.github.com> Date: Mon, 22 Jul 2024 12:25:56 +0100 Subject: [PATCH] fix(relayer): don't submit reverting messages in batches (#4169) ### Description Depends on https://github.com/hyperlane-xyz/hyperlane-monorepo/pull/4159 Some messages in a Multicall would revert because previous messages would invalidate them temporarily. The IGP allowance of those messages would be deducted, and if this happened more than once, those messages would become stuck. This PR fixes those unnecesary spends on reverting `process` txs by using `.call()` to simulate the outcome of a Multicall tx. Reverting messages are removed from the batch and submitted serially, and if the batch is only left with one non-reverting message, even that one is submitted serially (rather than through a batch of one message). Messages that did get submitted have their igp allowance deducted and status updated in `update_sent_ops_state`. ### Drive-by changes ### Related issues - Fixes https://github.com/hyperlane-xyz/issues/issues/1294 ### Backward compatibility ### Testing Tested by manually setting the result of `call` to failed for all messages, to see if they are submitted serially. In prod, we can track the `Submitted transaction batch` log, which now includes any excluded messages from the batch. --- rust/Cargo.lock | 1 + rust/agents/relayer/src/msg/op_submitter.rs | 94 ++++++++++---- .../agents/relayer/src/msg/pending_message.rs | 4 + rust/chains/hyperlane-ethereum/Cargo.toml | 1 + .../src/contracts/mailbox.rs | 116 ++++++++++++++++-- rust/hyperlane-core/src/traits/mailbox.rs | 35 +++++- .../src/traits/pending_operation.rs | 8 +- rust/hyperlane-test/src/mocks/mailbox.rs | 2 +- 8 files changed, 219 insertions(+), 42 deletions(-) diff --git a/rust/Cargo.lock b/rust/Cargo.lock index 3f1fc7a5e..1ae801fed 100644 --- a/rust/Cargo.lock +++ b/rust/Cargo.lock @@ -4345,6 +4345,7 @@ dependencies = [ "futures-util", "hex 0.4.3", "hyperlane-core", + "itertools 0.10.5", "num 0.4.1", "num-traits", "reqwest", diff --git a/rust/agents/relayer/src/msg/op_submitter.rs b/rust/agents/relayer/src/msg/op_submitter.rs index 4b47d5659..53dba8525 100644 --- a/rust/agents/relayer/src/msg/op_submitter.rs +++ b/rust/agents/relayer/src/msg/op_submitter.rs @@ -5,8 +5,12 @@ use derive_new::new; use futures::future::join_all; use futures_util::future::try_join_all; use hyperlane_core::total_estimated_cost; +use hyperlane_core::BatchResult; use hyperlane_core::ConfirmReason::*; +use hyperlane_core::PendingOperation; use hyperlane_core::PendingOperationStatus; +use itertools::Either; +use itertools::Itertools; use prometheus::{IntCounter, IntGaugeVec}; use tokio::sync::broadcast::Sender; use tokio::sync::mpsc; @@ -19,8 +23,8 @@ use tracing::{info, warn}; use hyperlane_base::CoreMetrics; use hyperlane_core::{ - BatchItem, ChainCommunicationError, ChainResult, HyperlaneDomain, HyperlaneDomainProtocol, - HyperlaneMessage, PendingOperationResult, QueueOperation, TxOutcome, + ChainCommunicationError, ChainResult, HyperlaneDomain, HyperlaneDomainProtocol, + PendingOperationResult, QueueOperation, TxOutcome, }; use crate::msg::pending_message::CONFIRM_DELAY; @@ -480,48 +484,84 @@ struct OperationBatch { impl OperationBatch { async fn submit(self, confirm_queue: &mut OpQueue, metrics: &SerialSubmitterMetrics) { - match self.try_submit_as_batch(metrics).await { - Ok(outcome) => { - info!(outcome=?outcome, batch_size=self.operations.len(), batch=?self.operations, "Submitted transaction batch"); - let total_estimated_cost = total_estimated_cost(&self.operations); - for mut op in self.operations { - op.set_operation_outcome(outcome.clone(), total_estimated_cost); - op.set_next_attempt_after(CONFIRM_DELAY); - confirm_queue - .push(op, Some(PendingOperationStatus::Confirm(SubmittedBySelf))) - .await; - } - return; + let excluded_ops = match self.try_submit_as_batch(metrics).await { + Ok(batch_result) => { + Self::handle_batch_result(self.operations, batch_result, confirm_queue).await } Err(e) => { - warn!(error=?e, batch=?self.operations, "Error when submitting batch. Falling back to serial submission."); + warn!(error=?e, batch=?self.operations, "Error when submitting batch"); + self.operations } + }; + + if !excluded_ops.is_empty() { + warn!(excluded_ops=?excluded_ops, "Either the batch tx would revert, or the operations would revert in the batch. Falling back to serial submission."); + OperationBatch::new(excluded_ops, self.domain) + .submit_serially(confirm_queue, metrics) + .await; } - self.submit_serially(confirm_queue, metrics).await; } #[instrument(skip(metrics), ret, level = "debug")] async fn try_submit_as_batch( &self, metrics: &SerialSubmitterMetrics, - ) -> ChainResult { - let batch = self - .operations - .iter() - .map(|op| op.try_batch()) - .collect::>>>()?; - + ) -> ChainResult { // We already assume that the relayer submits to a single mailbox per destination. // So it's fine to use the first item in the batch to get the mailbox. - let Some(first_item) = batch.first() else { + let Some(first_item) = self.operations.first() else { return Err(ChainCommunicationError::BatchIsEmpty); }; - - let outcome = first_item.mailbox.process_batch(&batch).await?; - metrics.ops_submitted.inc_by(self.operations.len() as u64); + let outcome = if let Some(mailbox) = first_item.try_get_mailbox() { + mailbox + .try_process_batch(self.operations.iter().collect_vec()) + .await? + } else { + BatchResult::failed(self.operations.len()) + }; + let ops_submitted = self.operations.len() - outcome.failed_indexes.len(); + metrics.ops_submitted.inc_by(ops_submitted as u64); Ok(outcome) } + /// Process the operations sent by a batch. + /// Returns the operations that were not sent + async fn handle_batch_result( + operations: Vec, + batch_result: BatchResult, + confirm_queue: &mut OpQueue, + ) -> Vec> { + let (sent_ops, excluded_ops): (Vec<_>, Vec<_>) = + operations.into_iter().enumerate().partition_map(|(i, op)| { + if !batch_result.failed_indexes.contains(&i) { + Either::Left(op) + } else { + Either::Right(op) + } + }); + + if let Some(outcome) = batch_result.outcome { + info!(batch_size=sent_ops.len(), outcome=?outcome, batch=?sent_ops, ?excluded_ops, "Submitted transaction batch"); + Self::update_sent_ops_state(sent_ops, outcome, confirm_queue).await; + } + excluded_ops + } + + async fn update_sent_ops_state( + sent_ops: Vec>, + outcome: TxOutcome, + confirm_queue: &mut OpQueue, + ) { + let total_estimated_cost = total_estimated_cost(sent_ops.as_slice()); + for mut op in sent_ops { + op.set_operation_outcome(outcome.clone(), total_estimated_cost); + op.set_next_attempt_after(CONFIRM_DELAY); + confirm_queue + .push(op, Some(PendingOperationStatus::Confirm(SubmittedBySelf))) + .await; + } + } + async fn submit_serially(self, confirm_queue: &mut OpQueue, metrics: &SerialSubmitterMetrics) { for op in self.operations.into_iter() { submit_single_operation(op, confirm_queue, metrics).await; diff --git a/rust/agents/relayer/src/msg/pending_message.rs b/rust/agents/relayer/src/msg/pending_message.rs index 07f473874..8fbe29b25 100644 --- a/rust/agents/relayer/src/msg/pending_message.rs +++ b/rust/agents/relayer/src/msg/pending_message.rs @@ -453,6 +453,10 @@ impl PendingOperation for PendingMessage { fn set_retries(&mut self, retries: u32) { self.set_retries(retries); } + + fn try_get_mailbox(&self) -> Option> { + Some(self.ctx.destination_mailbox.clone()) + } } impl PendingMessage { diff --git a/rust/chains/hyperlane-ethereum/Cargo.toml b/rust/chains/hyperlane-ethereum/Cargo.toml index 9f4a5453b..b1f75335f 100644 --- a/rust/chains/hyperlane-ethereum/Cargo.toml +++ b/rust/chains/hyperlane-ethereum/Cargo.toml @@ -20,6 +20,7 @@ ethers.workspace = true eyre.workspace = true futures-util.workspace = true hex.workspace = true +itertools.workspace = true num.workspace = true num-traits.workspace = true reqwest.workspace = true diff --git a/rust/chains/hyperlane-ethereum/src/contracts/mailbox.rs b/rust/chains/hyperlane-ethereum/src/contracts/mailbox.rs index 49ae2d7d9..450060d1f 100644 --- a/rust/chains/hyperlane-ethereum/src/contracts/mailbox.rs +++ b/rust/chains/hyperlane-ethereum/src/contracts/mailbox.rs @@ -7,11 +7,14 @@ use std::ops::RangeInclusive; use std::sync::Arc; use async_trait::async_trait; +use derive_new::new; use ethers::abi::{AbiEncode, Detokenize}; use ethers::prelude::Middleware; use ethers_contract::builders::ContractCall; +use ethers_contract::{Multicall, MulticallResult}; use futures_util::future::join_all; -use hyperlane_core::H512; +use hyperlane_core::{BatchResult, QueueOperation, H512}; +use itertools::Itertools; use tracing::instrument; use hyperlane_core::{ @@ -28,7 +31,7 @@ use crate::interfaces::i_mailbox::{ }; use crate::interfaces::mailbox::DispatchFilter; use crate::tx::{call_with_lag, fill_tx_gas_params, report_tx}; -use crate::{BuildableWithProvider, ConnectionConf, EthereumProvider}; +use crate::{BuildableWithProvider, ConnectionConf, EthereumProvider, TransactionOverrides}; use super::multicall::{self, build_multicall}; use super::utils::fetch_raw_logs_and_log_meta; @@ -324,6 +327,96 @@ where ) .await } + + async fn simulate_batch( + &self, + multicall: &mut Multicall, + contract_calls: Vec>, + ) -> ChainResult> { + let batch = multicall::batch::<_, ()>(multicall, contract_calls.clone()).await?; + let call_results = batch.call().await?; + + let failed_calls = contract_calls + .iter() + .zip(call_results.iter()) + .enumerate() + .filter_map( + |(index, (_, result))| { + if !result.success { + Some(index) + } else { + None + } + }, + ) + .collect_vec(); + + // only send a batch if there are at least two successful calls + let call_count = contract_calls.len(); + let successful_calls = call_count - failed_calls.len(); + if successful_calls >= 2 { + Ok(BatchSimulation::new( + Some(self.submittable_batch(batch)), + failed_calls, + )) + } else { + Ok(BatchSimulation::failed(call_count)) + } + } + + fn submittable_batch( + &self, + call: ContractCall>, + ) -> SubmittableBatch { + SubmittableBatch { + call, + provider: self.provider.clone(), + transaction_overrides: self.conn.transaction_overrides.clone(), + } + } +} + +#[derive(new)] +pub struct BatchSimulation { + pub call: Option>, + /// Indexes of excluded calls in the batch (because they either failed the simulation + /// or they were the only successful call) + pub excluded_call_indexes: Vec, +} + +impl BatchSimulation { + pub fn failed(ops_count: usize) -> Self { + Self::new(None, (0..ops_count).collect()) + } +} + +impl BatchSimulation { + pub async fn try_submit(self) -> ChainResult { + if let Some(submittable_batch) = self.call { + let batch_outcome = submittable_batch.submit().await?; + Ok(BatchResult::new( + Some(batch_outcome), + self.excluded_call_indexes, + )) + } else { + Ok(BatchResult::failed(self.excluded_call_indexes.len())) + } + } +} + +pub struct SubmittableBatch { + pub call: ContractCall>, + provider: Arc, + transaction_overrides: TransactionOverrides, +} + +impl SubmittableBatch { + pub async fn submit(self) -> ChainResult { + let call_with_gas_overrides = + fill_tx_gas_params(self.call, self.provider, &self.transaction_overrides).await?; + let outcome = report_tx(call_with_gas_overrides).await?; + Ok(outcome.into()) + } } impl HyperlaneChain for EthereumMailbox @@ -397,11 +490,15 @@ where Ok(receipt.into()) } - #[instrument(skip(self, messages), fields(size=%messages.len()))] - async fn process_batch( + #[instrument(skip(self, ops), fields(size=%ops.len()))] + async fn try_process_batch<'a>( &self, - messages: &[BatchItem], - ) -> ChainResult { + ops: Vec<&'a QueueOperation>, + ) -> ChainResult { + let messages = ops + .iter() + .map(|op| op.try_batch()) + .collect::>>>()?; let mut multicall = build_multicall(self.provider.clone(), &self.conn, self.domain.clone()) .await .map_err(|e| HyperlaneEthereumError::MulticallError(e.to_string()))?; @@ -421,11 +518,8 @@ where .into_iter() .collect::>>()?; - let batch_call = multicall::batch::<_, ()>(&mut multicall, contract_calls).await?; - let call = self.add_gas_overrides(batch_call).await?; - - let receipt = report_tx(call).await?; - Ok(receipt.into()) + let batch_simulation = self.simulate_batch(&mut multicall, contract_calls).await?; + batch_simulation.try_submit().await } #[instrument(skip(self), fields(msg=%message, metadata=%bytes_to_hex(metadata)))] diff --git a/rust/hyperlane-core/src/traits/mailbox.rs b/rust/hyperlane-core/src/traits/mailbox.rs index fbd9dd5bc..d5e9081b6 100644 --- a/rust/hyperlane-core/src/traits/mailbox.rs +++ b/rust/hyperlane-core/src/traits/mailbox.rs @@ -2,10 +2,11 @@ use std::fmt::Debug; use std::num::NonZeroU64; use async_trait::async_trait; +use derive_new::new; use crate::{ traits::TxOutcome, utils::domain_hash, BatchItem, ChainCommunicationError, ChainResult, - HyperlaneContract, HyperlaneMessage, TxCostEstimate, H256, U256, + HyperlaneContract, HyperlaneMessage, QueueOperation, TxCostEstimate, H256, U256, }; /// Interface for the Mailbox chain contract. Allows abstraction over different @@ -44,7 +45,17 @@ pub trait Mailbox: HyperlaneContract + Send + Sync + Debug { async fn process_batch( &self, _messages: &[BatchItem], - ) -> ChainResult { + ) -> ChainResult { + // Batching is not supported by default + Err(ChainCommunicationError::BatchingFailed) + } + + /// Try process the given operations as a batch. Returns the outcome of the + /// batch (if one was submitted) and the operations that were not submitted. + async fn try_process_batch<'a>( + &self, + _ops: Vec<&'a QueueOperation>, + ) -> ChainResult { // Batching is not supported by default Err(ChainCommunicationError::BatchingFailed) } @@ -60,3 +71,23 @@ pub trait Mailbox: HyperlaneContract + Send + Sync + Debug { /// against the provided signed checkpoint fn process_calldata(&self, message: &HyperlaneMessage, metadata: &[u8]) -> Vec; } + +/// The result of processing a batch of messages +#[derive(new, Debug)] +pub struct BatchResult { + /// The outcome of executing the batch, if one was sent + pub outcome: Option, + /// Indexes of excluded calls from the batch (i.e. that were not executed) + pub failed_indexes: Vec, +} + +impl BatchResult { + /// Create a BatchResult from a failed simulation, given the number of operations + /// in the simulated batch + pub fn failed(ops_count: usize) -> Self { + Self { + outcome: None, + failed_indexes: (0..ops_count).collect(), + } + } +} diff --git a/rust/hyperlane-core/src/traits/pending_operation.rs b/rust/hyperlane-core/src/traits/pending_operation.rs index ec7e3df9c..599a0e400 100644 --- a/rust/hyperlane-core/src/traits/pending_operation.rs +++ b/rust/hyperlane-core/src/traits/pending_operation.rs @@ -3,12 +3,13 @@ use std::{ cmp::Ordering, fmt::{Debug, Display}, io::Write, + sync::Arc, time::{Duration, Instant}, }; use crate::{ ChainResult, Decode, Encode, FixedPointNumber, HyperlaneDomain, HyperlaneMessage, - HyperlaneProtocolError, TryBatchAs, TxOutcome, H256, U256, + HyperlaneProtocolError, Mailbox, TryBatchAs, TxOutcome, H256, U256, }; use async_trait::async_trait; use num::CheckedDiv; @@ -118,6 +119,11 @@ pub trait PendingOperation: Send + Sync + Debug + TryBatchAs { /// Set the number of times this operation has been retried. #[cfg(any(test, feature = "test-utils"))] fn set_retries(&mut self, retries: u32); + + /// If this operation points to a mailbox contract, return it + fn try_get_mailbox(&self) -> Option> { + None + } } #[derive(Debug, Display, Clone, Serialize, Deserialize, PartialEq)] diff --git a/rust/hyperlane-test/src/mocks/mailbox.rs b/rust/hyperlane-test/src/mocks/mailbox.rs index 8d93706f8..dc09e1026 100644 --- a/rust/hyperlane-test/src/mocks/mailbox.rs +++ b/rust/hyperlane-test/src/mocks/mailbox.rs @@ -96,7 +96,7 @@ impl Mailbox for MockMailboxContract { async fn process_batch( &self, messages: &[BatchItem], - ) -> ChainResult { + ) -> ChainResult { self.process_batch(messages).await }