feat: batching igp (#3870)

### Description

Deducts IGP cost from senders of messages within a batch, based on the
total gas used by the batch and the estimates for submitting messages
individually.

The formula is `gas_used_by_operation = gas_used_by_tx *
(operation_estimated_gas / total_estimated_cost)`

Note that the individual estimate have a buffer value added to them
(currently 75k), which will slightly skew proportions, though by a
negligible amount.

For example, given these estimates in a batch: 150k, 160k, 180k, if we
add 80k to each, we go from e.g. 0.312 for the first message to 0.319 ->
2.25% error which isn't so bad. The error can be larger if operations
within a batch have very different estimates, but realistically within a
5% range based on back-of-the-napkin calculations.

### Drive-by changes

- The batching feature introduced an bug whereby gas expenditure would
only be deducted in the `confirm` step. Now this is done in the `submit`
step to account for txs that revert
- Sealevel e2e can now be disabled for faster iteration! I've not done
the cleanest job, but even the fact that we have this will reduce the
time to run e2e locally by more than half. To use it, set
`SEALEVEL_ENABLED=false` when running `run-locally`
- e2e uses a new utility called `get_matching_lines`, that can be used
to count (and in the future _parse_) logs, to reconstruct the state of
the relayer and have more expressive correctness checks. This is used to
make sure that gas is deducted for all messages, including those in
batches.

### Related issues

- Fixes https://github.com/hyperlane-xyz/hyperlane-monorepo/issues/3709

### Backward compatibility

Yes

### Testing

E2E, using `get_matching_lines`
pull/3930/head
Daniel Savu 6 months ago committed by GitHub
parent 36e9a2e783
commit 826b4ae57f
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
  1. 2
      rust/Cargo.lock
  2. 2
      rust/agents/relayer/Cargo.toml
  3. 10
      rust/agents/relayer/src/lib.rs
  4. 10
      rust/agents/relayer/src/main.rs
  5. 9
      rust/agents/relayer/src/msg/gas_payment/mod.rs
  6. 3
      rust/agents/relayer/src/msg/mod.rs
  7. 21
      rust/agents/relayer/src/msg/op_queue.rs
  8. 13
      rust/agents/relayer/src/msg/op_submitter.rs
  9. 68
      rust/agents/relayer/src/msg/pending_message.rs
  10. 4
      rust/agents/relayer/src/msg/processor.rs
  11. 4
      rust/agents/relayer/src/relayer.rs
  12. 4
      rust/agents/relayer/src/server.rs
  13. 6
      rust/hyperlane-base/src/db/rocks/hyperlane_db.rs
  14. 2
      rust/hyperlane-core/Cargo.toml
  15. 2
      rust/hyperlane-core/src/traits/mod.rs
  16. 56
      rust/hyperlane-core/src/traits/pending_operation.rs
  17. 27
      rust/hyperlane-core/src/types/primitive_types.rs
  18. 2
      rust/utils/run-locally/Cargo.toml
  19. 4
      rust/utils/run-locally/src/config.rs
  20. 2
      rust/utils/run-locally/src/cosmos/cli.rs
  21. 4
      rust/utils/run-locally/src/cosmos/mod.rs
  22. 2
      rust/utils/run-locally/src/ethereum/mod.rs
  23. 39
      rust/utils/run-locally/src/invariants.rs
  24. 169
      rust/utils/run-locally/src/main.rs
  25. 55
      rust/utils/run-locally/src/program.rs
  26. 2
      rust/utils/run-locally/src/solana.rs
  27. 18
      rust/utils/run-locally/src/utils.rs

2
rust/Cargo.lock generated

@ -7275,7 +7275,9 @@ dependencies = [
"macro_rules_attribute",
"maplit",
"nix 0.26.4",
"once_cell",
"regex",
"relayer",
"ripemd",
"serde",
"serde_json",

@ -38,7 +38,7 @@ tracing-futures.workspace = true
tracing.workspace = true
hyperlane-core = { path = "../../hyperlane-core", features = ["agent", "async"] }
hyperlane-base = { path = "../../hyperlane-base" }
hyperlane-base = { path = "../../hyperlane-base", features = ["test-utils"] }
hyperlane-ethereum = { path = "../../chains/hyperlane-ethereum" }
[dev-dependencies]

@ -0,0 +1,10 @@
mod merkle_tree;
mod msg;
mod processor;
mod prover;
mod relayer;
mod server;
mod settings;
pub use msg::GAS_EXPENDITURE_LOG_MESSAGE;
pub use relayer::*;

@ -11,15 +11,7 @@ use eyre::Result;
use hyperlane_base::agent_main;
use crate::relayer::Relayer;
mod merkle_tree;
mod msg;
mod processor;
mod prover;
mod relayer;
mod server;
mod settings;
use relayer::Relayer;
#[tokio::main(flavor = "multi_thread", worker_threads = 20)]
async fn main() -> Result<()> {

@ -19,6 +19,8 @@ use crate::{
mod policies;
pub const GAS_EXPENDITURE_LOG_MESSAGE: &str = "Recording gas expenditure for message";
#[async_trait]
pub trait GasPaymentPolicy: Debug + Send + Sync {
/// Returns Some(gas_limit) if the policy has approved the transaction or
@ -132,6 +134,13 @@ impl GasPaymentEnforcer {
}
pub fn record_tx_outcome(&self, message: &HyperlaneMessage, outcome: TxOutcome) -> Result<()> {
// This log is required in E2E, hence the use of a `const`
debug!(
msg=%message,
?outcome,
"{}",
GAS_EXPENDITURE_LOG_MESSAGE,
);
self.db.process_gas_expenditure(InterchainGasExpenditure {
message_id: message.id(),
gas_used: outcome.gas_used,

@ -30,5 +30,6 @@ pub(crate) mod metadata;
pub(crate) mod op_queue;
pub(crate) mod op_submitter;
pub(crate) mod pending_message;
pub(crate) mod pending_operation;
pub(crate) mod processor;
pub use gas_payment::GAS_EXPENDITURE_LOG_MESSAGE;

@ -1,16 +1,13 @@
use std::{cmp::Reverse, collections::BinaryHeap, sync::Arc};
use derive_new::new;
use hyperlane_core::{PendingOperation, QueueOperation};
use prometheus::{IntGauge, IntGaugeVec};
use tokio::sync::{broadcast::Receiver, Mutex};
use tracing::{debug, info, instrument};
use crate::server::MessageRetryRequest;
use super::pending_operation::PendingOperation;
pub type QueueOperation = Box<dyn PendingOperation>;
/// Queue of generic operations that can be submitted to a destination chain.
/// Includes logic for maintaining queue metrics by the destination and `app_context` of an operation
#[derive(Debug, Clone, new)]
@ -109,9 +106,9 @@ impl OpQueue {
#[cfg(test)]
mod test {
use super::*;
use crate::msg::pending_operation::PendingOperationResult;
use hyperlane_core::{
HyperlaneDomain, HyperlaneMessage, KnownHyperlaneDomain, TryBatchAs, TxOutcome, H256,
HyperlaneDomain, HyperlaneMessage, KnownHyperlaneDomain, PendingOperationResult,
TryBatchAs, TxOutcome, H256, U256,
};
use std::{
collections::VecDeque,
@ -182,6 +179,10 @@ mod test {
todo!()
}
fn get_tx_cost_estimate(&self) -> Option<U256> {
todo!()
}
/// This will be called after the operation has been submitted and is
/// responsible for checking if the operation has reached a point at
/// which we consider it safe from reorgs.
@ -189,6 +190,14 @@ mod test {
todo!()
}
fn set_operation_outcome(
&mut self,
_submission_outcome: TxOutcome,
_submission_estimated_cost: U256,
) {
todo!()
}
fn next_attempt_after(&self) -> Option<Instant> {
Some(
Instant::now()

@ -4,6 +4,7 @@ use std::time::Duration;
use derive_new::new;
use futures::future::join_all;
use futures_util::future::try_join_all;
use hyperlane_core::total_estimated_cost;
use prometheus::{IntCounter, IntGaugeVec};
use tokio::sync::broadcast::Sender;
use tokio::sync::mpsc;
@ -17,14 +18,13 @@ use tracing::{info, warn};
use hyperlane_base::CoreMetrics;
use hyperlane_core::{
BatchItem, ChainCommunicationError, ChainResult, HyperlaneDomain, HyperlaneDomainProtocol,
HyperlaneMessage, TxOutcome,
HyperlaneMessage, PendingOperationResult, QueueOperation, TxOutcome,
};
use crate::msg::pending_message::CONFIRM_DELAY;
use crate::server::MessageRetryRequest;
use super::op_queue::{OpQueue, QueueOperation};
use super::pending_operation::*;
use super::op_queue::OpQueue;
/// SerialSubmitter accepts operations over a channel. It is responsible for
/// executing the right strategy to deliver those messages to the destination
@ -428,11 +428,10 @@ impl OperationBatch {
async fn submit(self, confirm_queue: &mut OpQueue, metrics: &SerialSubmitterMetrics) {
match self.try_submit_as_batch(metrics).await {
Ok(outcome) => {
// TODO: use the `tx_outcome` with the total gas expenditure
// We'll need to proportionally set `used_gas` based on the tx_outcome, so it can be updated in the confirm step
// which means we need to add a `set_transaction_outcome` fn to `PendingOperation`
info!(outcome=?outcome, batch_size=self.operations.len(), batch=?self.operations, "Submitted transaction batch");
let total_estimated_cost = total_estimated_cost(&self.operations);
for mut op in self.operations {
op.set_operation_outcome(outcome.clone(), total_estimated_cost);
op.set_next_attempt_after(CONFIRM_DELAY);
confirm_queue.push(op).await;
}
@ -462,8 +461,6 @@ impl OperationBatch {
return Err(ChainCommunicationError::BatchIsEmpty);
};
// We use the estimated gas limit from the prior call to
// `process_estimate_costs` to avoid a second gas estimation.
let outcome = first_item.mailbox.process_batch(&batch).await?;
metrics.ops_submitted.inc_by(self.operations.len() as u64);
Ok(outcome)

@ -9,8 +9,9 @@ use derive_new::new;
use eyre::Result;
use hyperlane_base::{db::HyperlaneRocksDB, CoreMetrics};
use hyperlane_core::{
BatchItem, ChainCommunicationError, ChainResult, HyperlaneChain, HyperlaneDomain,
HyperlaneMessage, Mailbox, MessageSubmissionData, TryBatchAs, TxOutcome, H256, U256,
gas_used_by_operation, make_op_try, BatchItem, ChainCommunicationError, ChainResult,
HyperlaneChain, HyperlaneDomain, HyperlaneMessage, Mailbox, MessageSubmissionData,
PendingOperation, PendingOperationResult, TryBatchAs, TxOutcome, H256, U256,
};
use prometheus::{IntCounter, IntGauge};
use tracing::{debug, error, info, instrument, trace, warn};
@ -18,7 +19,6 @@ use tracing::{debug, error, info, instrument, trace, warn};
use super::{
gas_payment::GasPaymentEnforcer,
metadata::{BaseMetadataBuilder, MessageMetadataBuilder, MetadataBuilder},
pending_operation::*,
};
pub const CONFIRM_DELAY: Duration = if cfg!(any(test, feature = "test-utils")) {
@ -259,7 +259,7 @@ impl PendingOperation for PendingMessage {
let state = self
.submission_data
.take()
.clone()
.expect("Pending message must be prepared before it can be submitted");
// We use the estimated gas limit from the prior call to
@ -271,7 +271,7 @@ impl PendingOperation for PendingMessage {
.await;
match tx_outcome {
Ok(outcome) => {
self.set_submission_outcome(outcome);
self.set_operation_outcome(outcome, state.gas_limit);
}
Err(e) => {
error!(error=?e, "Error when processing message");
@ -283,6 +283,10 @@ impl PendingOperation for PendingMessage {
self.submission_outcome = Some(outcome);
}
fn get_tx_cost_estimate(&self) -> Option<U256> {
self.submission_data.as_ref().map(|d| d.gas_limit)
}
async fn confirm(&mut self) -> PendingOperationResult {
make_op_try!(|| {
// Provider error; just try again later
@ -313,15 +317,6 @@ impl PendingOperation for PendingMessage {
);
PendingOperationResult::Success
} else {
if let Some(outcome) = &self.submission_outcome {
if let Err(e) = self
.ctx
.origin_gas_payment_enforcer
.record_tx_outcome(&self.message, outcome.clone())
{
error!(error=?e, "Error when recording tx outcome");
}
}
warn!(
tx_outcome=?self.submission_outcome,
message_id=?self.message.id(),
@ -331,6 +326,50 @@ impl PendingOperation for PendingMessage {
}
}
fn set_operation_outcome(
&mut self,
submission_outcome: TxOutcome,
submission_estimated_cost: U256,
) {
let Some(operation_estimate) = self.get_tx_cost_estimate() else {
warn!("Cannot set operation outcome without a cost estimate set previously");
return;
};
// calculate the gas used by the operation
let gas_used_by_operation = match gas_used_by_operation(
&submission_outcome,
submission_estimated_cost,
operation_estimate,
) {
Ok(gas_used_by_operation) => gas_used_by_operation,
Err(e) => {
warn!(error = %e, "Error when calculating gas used by operation, falling back to charging the full cost of the tx. Are gas estimates enabled for this chain?");
submission_outcome.gas_used
}
};
let operation_outcome = TxOutcome {
gas_used: gas_used_by_operation,
..submission_outcome
};
// record it in the db, to subtract from the sender's igp allowance
if let Err(e) = self
.ctx
.origin_gas_payment_enforcer
.record_tx_outcome(&self.message, operation_outcome.clone())
{
error!(error=?e, "Error when recording tx outcome");
}
// set the outcome in `Self` as well, for later logging
self.set_submission_outcome(operation_outcome);
debug!(
actual_gas_for_message = ?gas_used_by_operation,
message_gas_estimate = ?operation_estimate,
submission_gas_estimate = ?submission_estimated_cost,
message = ?self.message,
"Gas used by message submission"
);
}
fn next_attempt_after(&self) -> Option<Instant> {
self.next_attempt_after
}
@ -343,7 +382,6 @@ impl PendingOperation for PendingMessage {
self.reset_attempts();
}
#[cfg(test)]
fn set_retries(&mut self, retries: u32) {
self.set_retries(retries);
}

@ -13,12 +13,12 @@ use hyperlane_base::{
db::{HyperlaneRocksDB, ProcessMessage},
CoreMetrics,
};
use hyperlane_core::{HyperlaneDomain, HyperlaneMessage};
use hyperlane_core::{HyperlaneDomain, HyperlaneMessage, QueueOperation};
use prometheus::IntGauge;
use tokio::sync::mpsc::UnboundedSender;
use tracing::{debug, instrument, trace};
use super::{metadata::AppContextClassifier, op_queue::QueueOperation, pending_message::*};
use super::{metadata::AppContextClassifier, pending_message::*};
use crate::{processor::ProcessorExt, settings::matching_list::MatchingList};
/// Finds unprocessed messages from an origin and submits then through a channel

@ -16,7 +16,8 @@ use hyperlane_base::{
SyncOptions,
};
use hyperlane_core::{
HyperlaneDomain, HyperlaneMessage, InterchainGasPayment, MerkleTreeInsertion, H512, U256,
HyperlaneDomain, HyperlaneMessage, InterchainGasPayment, MerkleTreeInsertion, QueueOperation,
H512, U256,
};
use tokio::{
sync::{
@ -34,7 +35,6 @@ use crate::{
msg::{
gas_payment::GasPaymentEnforcer,
metadata::{BaseMetadataBuilder, IsmAwareAppContextClassifier},
op_queue::QueueOperation,
op_submitter::{SerialSubmitter, SerialSubmitterMetrics},
pending_message::{MessageContext, MessageSubmissionMetrics},
processor::{MessageProcessor, MessageProcessorMetrics},

@ -3,13 +3,11 @@ use axum::{
routing, Router,
};
use derive_new::new;
use hyperlane_core::{ChainCommunicationError, H256};
use hyperlane_core::{ChainCommunicationError, QueueOperation, H256};
use serde::Deserialize;
use std::str::FromStr;
use tokio::sync::broadcast::Sender;
use crate::msg::op_queue::QueueOperation;
const MESSAGE_RETRY_API_BASE: &str = "/message_retry";
pub const ENDPOINT_MESSAGES_QUEUE_SIZE: usize = 1_000;

@ -242,10 +242,10 @@ impl HyperlaneRocksDB {
&self,
event: InterchainGasExpenditure,
) -> DbResult<()> {
let existing_payment = self.retrieve_gas_expenditure_by_message_id(event.message_id)?;
let total = existing_payment + event;
let existing_expenditure = self.retrieve_gas_expenditure_by_message_id(event.message_id)?;
let total = existing_expenditure + event;
debug!(?event, new_total_gas_payment=?total, "Storing gas payment");
debug!(?event, new_total_gas_expenditure=?total, "Storing gas expenditure");
self.store_interchain_gas_expenditure_data_by_message_id(
&total.message_id,
&InterchainGasExpenditureData {

@ -49,7 +49,7 @@ uint.workspace = true
tokio = { workspace = true, features = ["rt", "time"] }
[features]
default = []
default = ["strum"]
float = []
test-utils = ["dep:config"]
agent = ["ethers", "strum"]

@ -10,6 +10,7 @@ pub use interchain_security_module::*;
pub use mailbox::*;
pub use merkle_tree_hook::*;
pub use multisig_ism::*;
pub use pending_operation::*;
pub use provider::*;
pub use routing_ism::*;
pub use signing::*;
@ -29,6 +30,7 @@ mod interchain_security_module;
mod mailbox;
mod merkle_tree_hook;
mod multisig_ism;
mod pending_operation;
mod provider;
mod routing_ism;
mod signing;

@ -4,10 +4,16 @@ use std::{
time::{Duration, Instant},
};
use crate::{
ChainResult, FixedPointNumber, HyperlaneDomain, HyperlaneMessage, TryBatchAs, TxOutcome, H256,
U256,
};
use async_trait::async_trait;
use hyperlane_core::{HyperlaneDomain, HyperlaneMessage, TryBatchAs, TxOutcome, H256};
use num::CheckedDiv;
use tracing::warn;
use super::op_queue::QueueOperation;
/// Boxed operation that can be stored in an operation queue
pub type QueueOperation = Box<dyn PendingOperation>;
/// A pending operation that will be run by the submitter and cause a
/// transaction to be sent.
@ -67,11 +73,21 @@ pub trait PendingOperation: Send + Sync + Debug + TryBatchAs<HyperlaneMessage> {
/// Set the outcome of the `submit` call
fn set_submission_outcome(&mut self, outcome: TxOutcome);
/// Get the estimated the cost of the `submit` call
fn get_tx_cost_estimate(&self) -> Option<U256>;
/// This will be called after the operation has been submitted and is
/// responsible for checking if the operation has reached a point at
/// which we consider it safe from reorgs.
async fn confirm(&mut self) -> PendingOperationResult;
/// Record the outcome of the operation
fn set_operation_outcome(
&mut self,
submission_outcome: TxOutcome,
submission_estimated_cost: U256,
);
/// Get the earliest instant at which this should next be attempted.
///
/// This is only used for sorting, the functions are responsible for
@ -85,11 +101,41 @@ pub trait PendingOperation: Send + Sync + Debug + TryBatchAs<HyperlaneMessage> {
/// retried immediately.
fn reset_attempts(&mut self);
#[cfg(test)]
/// Set the number of times this operation has been retried.
#[cfg(any(test, feature = "test-utils"))]
fn set_retries(&mut self, retries: u32);
}
/// Utility fn to calculate the total estimated cost of an operation batch
pub fn total_estimated_cost(ops: &[Box<dyn PendingOperation>]) -> U256 {
ops.iter()
.fold(U256::zero(), |acc, op| match op.get_tx_cost_estimate() {
Some(cost_estimate) => acc.saturating_add(cost_estimate),
None => {
warn!(operation=?op, "No cost estimate available for operation, defaulting to 0");
acc
}
})
}
/// Calculate the gas used by an operation (either in a batch or single-submission), by looking at the total cost of the tx,
/// and the estimated cost of the operation compared to the sum of the estimates of all operations in the batch.
/// When using this for single-submission rather than a batch,
/// the `tx_estimated_cost` should be the same as the `tx_estimated_cost`
pub fn gas_used_by_operation(
tx_outcome: &TxOutcome,
tx_estimated_cost: U256,
operation_estimated_cost: U256,
) -> ChainResult<U256> {
let gas_used_by_tx = FixedPointNumber::try_from(tx_outcome.gas_used)?;
let operation_gas_estimate = FixedPointNumber::try_from(operation_estimated_cost)?;
let tx_gas_estimate = FixedPointNumber::try_from(tx_estimated_cost)?;
let gas_used_by_operation = (gas_used_by_tx * operation_gas_estimate)
.checked_div(&tx_gas_estimate)
.ok_or(eyre::eyre!("Division by zero"))?;
gas_used_by_operation.try_into()
}
impl Display for QueueOperation {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
write!(
@ -138,6 +184,7 @@ impl Ord for QueueOperation {
}
}
/// Possible outcomes of performing an action on a pending operation (such as `prepare`, `submit` or `confirm`).
#[derive(Debug)]
pub enum PendingOperationResult {
/// Promote to the next step
@ -153,6 +200,7 @@ pub enum PendingOperationResult {
}
/// create a `op_try!` macro for the `on_retry` handler.
#[macro_export]
macro_rules! make_op_try {
($on_retry:expr) => {
/// Handle a result and either return early with retry or a critical failure on
@ -181,5 +229,3 @@ macro_rules! make_op_try {
}
};
}
pub(super) use make_op_try;

@ -3,11 +3,15 @@
#![allow(clippy::assign_op_pattern)]
#![allow(clippy::reversed_empty_ranges)]
use std::{ops::Mul, str::FromStr};
use std::{
ops::{Div, Mul},
str::FromStr,
};
use bigdecimal::{BigDecimal, RoundingMode};
use borsh::{BorshDeserialize, BorshSerialize};
use fixed_hash::impl_fixed_hash_conversions;
use num::CheckedDiv;
use num_traits::Zero;
use uint::construct_uint;
@ -421,6 +425,27 @@ where
}
}
impl<T> Div<T> for FixedPointNumber
where
T: Into<FixedPointNumber>,
{
type Output = FixedPointNumber;
fn div(self, rhs: T) -> Self::Output {
let rhs = rhs.into();
Self(self.0 / rhs.0)
}
}
impl CheckedDiv for FixedPointNumber {
fn checked_div(&self, v: &Self) -> Option<Self> {
if v.0.is_zero() {
return None;
}
Some(Self(self.0.clone() / v.0.clone()))
}
}
impl FromStr for FixedPointNumber {
type Err = ChainCommunicationError;

@ -28,11 +28,13 @@ ethers-contract.workspace = true
tokio.workspace = true
maplit.workspace = true
nix = { workspace = true, features = ["signal"], default-features = false }
once_cell.workspace = true
tempfile.workspace = true
ureq = { workspace = true, default-features = false }
which.workspace = true
macro_rules_attribute.workspace = true
regex.workspace = true
relayer = { path = "../../agents/relayer"}
hyperlane-cosmwasm-interface.workspace = true
cosmwasm-schema.workspace = true

@ -6,6 +6,7 @@ pub struct Config {
pub ci_mode: bool,
pub ci_mode_timeout: u64,
pub kathy_messages: u64,
pub sealevel_enabled: bool,
// TODO: Include count of sealevel messages in a field separate from `kathy_messages`?
}
@ -26,6 +27,9 @@ impl Config {
.map(|r| r.parse::<u64>().unwrap());
r.unwrap_or(16)
},
sealevel_enabled: env::var("SEALEVEL_ENABLED")
.map(|k| k.parse::<bool>().unwrap())
.unwrap_or(true),
})
}
}

@ -152,7 +152,7 @@ impl OsmosisCLI {
.arg("grpc.address", &endpoint.grpc_addr) // default is 0.0.0.0:9090
.arg("rpc.pprof_laddr", pprof_addr) // default is localhost:6060
.arg("log_level", "panic")
.spawn("COSMOS");
.spawn("COSMOS", None);
endpoint.wait_for_node();

@ -271,7 +271,7 @@ fn launch_cosmos_validator(
.hyp_env("SIGNER_SIGNER_TYPE", "hexKey")
.hyp_env("SIGNER_KEY", agent_config.signer.key)
.hyp_env("TRACING_LEVEL", if debug { "debug" } else { "info" })
.spawn("VAL");
.spawn("VAL", None);
validator
}
@ -299,7 +299,7 @@ fn launch_cosmos_relayer(
.hyp_env("TRACING_LEVEL", if debug { "debug" } else { "info" })
.hyp_env("GASPAYMENTENFORCEMENT", "[{\"type\": \"none\"}]")
.hyp_env("METRICSPORT", metrics.to_string())
.spawn("RLY");
.spawn("RLY", None);
relayer
}

@ -36,7 +36,7 @@ pub fn start_anvil(config: Arc<Config>) -> AgentHandles {
}
log!("Launching anvil...");
let anvil_args = Program::new("anvil").flag("silent").filter_logs(|_| false); // for now do not keep any of the anvil logs
let anvil = anvil_args.spawn("ETH");
let anvil = anvil_args.spawn("ETH", None);
sleep(Duration::from_secs(10));

@ -1,12 +1,15 @@
use std::fs::File;
use std::path::Path;
use crate::config::Config;
use crate::metrics::agent_balance_sum;
use crate::utils::get_matching_lines;
use maplit::hashmap;
use relayer::GAS_EXPENDITURE_LOG_MESSAGE;
use crate::logging::log;
use crate::solana::solana_termination_invariants_met;
use crate::{fetch_metric, ZERO_MERKLE_INSERTION_KATHY_MESSAGES};
use crate::{fetch_metric, AGENT_LOGGING_DIR, ZERO_MERKLE_INSERTION_KATHY_MESSAGES};
// This number should be even, so the messages can be split into two equal halves
// sent before and after the relayer spins up, to avoid rounding errors.
@ -17,11 +20,16 @@ pub const SOL_MESSAGES_EXPECTED: u32 = 20;
pub fn termination_invariants_met(
config: &Config,
starting_relayer_balance: f64,
solana_cli_tools_path: &Path,
solana_config_path: &Path,
solana_cli_tools_path: Option<&Path>,
solana_config_path: Option<&Path>,
) -> eyre::Result<bool> {
let eth_messages_expected = (config.kathy_messages / 2) as u32 * 2;
let total_messages_expected = eth_messages_expected + SOL_MESSAGES_EXPECTED;
let sol_messages_expected = if config.sealevel_enabled {
SOL_MESSAGES_EXPECTED
} else {
0
};
let total_messages_expected = eth_messages_expected + sol_messages_expected;
let lengths = fetch_metric("9092", "hyperlane_submitter_queue_length", &hashmap! {})?;
assert!(!lengths.is_empty(), "Could not find queue length metric");
@ -53,6 +61,19 @@ pub fn termination_invariants_met(
.iter()
.sum::<u32>();
let log_file_path = AGENT_LOGGING_DIR.join("RLY-output.log");
let relayer_logfile = File::open(log_file_path)?;
let gas_expenditure_log_count =
get_matching_lines(&relayer_logfile, GAS_EXPENDITURE_LOG_MESSAGE)
.unwrap()
.len();
// Zero insertion messages don't reach `submit` stage where gas is spent, so we only expect these logs for the other messages.
assert_eq!(
gas_expenditure_log_count as u32, total_messages_expected,
"Didn't record gas payment for all delivered messages"
);
let gas_payment_sealevel_events_count = fetch_metric(
"9092",
"hyperlane_contract_sync_stored_events",
@ -74,9 +95,13 @@ pub fn termination_invariants_met(
return Ok(false);
}
if !solana_termination_invariants_met(solana_cli_tools_path, solana_config_path) {
log!("Solana termination invariants not met");
return Ok(false);
if let Some((solana_cli_tools_path, solana_config_path)) =
solana_cli_tools_path.zip(solana_config_path)
{
if !solana_termination_invariants_met(solana_cli_tools_path, solana_config_path) {
log!("Solana termination invariants not met");
return Ok(false);
}
}
let dispatched_messages_scraped = fetch_metric(

@ -11,12 +11,17 @@
//! the end conditions are met, the test is a failure. Defaults to 10 min.
//! - `E2E_KATHY_MESSAGES`: Number of kathy messages to dispatch. Defaults to 16 if CI mode is enabled.
//! else false.
//! - `SEALEVEL_ENABLED`: true/false, enables sealevel testing. Defaults to true.
use std::{
fs,
collections::HashMap,
fs::{self, File},
path::Path,
process::{Child, ExitCode},
sync::atomic::{AtomicBool, Ordering},
sync::{
atomic::{AtomicBool, Ordering},
Arc, Mutex,
},
thread::sleep,
time::{Duration, Instant},
};
@ -24,6 +29,7 @@ use std::{
use ethers_contract::MULTICALL_ADDRESS;
use logging::log;
pub use metrics::fetch_metric;
use once_cell::sync::Lazy;
use program::Program;
use tempfile::tempdir;
@ -46,6 +52,12 @@ mod program;
mod solana;
mod utils;
pub static AGENT_LOGGING_DIR: Lazy<&Path> = Lazy::new(|| {
let dir = Path::new("/tmp/test_logs");
fs::create_dir_all(dir).unwrap();
dir
});
/// These private keys are from hardhat/anvil's testing accounts.
const RELAYER_KEYS: &[&str] = &[
// test1
@ -61,17 +73,18 @@ const RELAYER_KEYS: &[&str] = &[
];
/// These private keys are from hardhat/anvil's testing accounts.
/// These must be consistent with the ISM config for the test.
const VALIDATOR_KEYS: &[&str] = &[
const ETH_VALIDATOR_KEYS: &[&str] = &[
// eth
"0x47e179ec197488593b187f80a00eb0da91f1b9d0b13f8733639f19c30a34926a",
"0x8b3a350cf5c34c9194ca85829a2df0ec3153be0318b5e2d3348e872092edffba",
"0x92db14e403b83dfe3df233f83dfa3a0d7096f21ca9b0d6d6b8d88b2b4ec1564e",
];
const SEALEVEL_VALIDATOR_KEYS: &[&str] = &[
// sealevel
"0x59c6995e998f97a5a0044966f0945389dc9e86dae88c7a8412f4603b6b78690d",
];
const VALIDATOR_ORIGIN_CHAINS: &[&str] = &["test1", "test2", "test3", "sealeveltest1"];
const AGENT_BIN_PATH: &str = "target/debug";
const INFRA_PATH: &str = "../typescript/infra";
const MONOREPO_ROOT_PATH: &str = "../";
@ -87,14 +100,15 @@ static SHUTDOWN: AtomicBool = AtomicBool::new(false);
/// cleanup purposes at this time.
#[derive(Default)]
struct State {
agents: Vec<(String, Child)>,
#[allow(clippy::type_complexity)]
agents: HashMap<String, (Child, Option<Arc<Mutex<File>>>)>,
watchers: Vec<Box<dyn TaskHandle<Output = ()>>>,
data: Vec<Box<dyn ArbitraryData>>,
}
impl State {
fn push_agent(&mut self, handles: AgentHandles) {
self.agents.push((handles.0, handles.1));
self.agents.insert(handles.0, (handles.1, handles.5));
self.watchers.push(handles.2);
self.watchers.push(handles.3);
self.data.push(handles.4);
@ -105,9 +119,7 @@ impl Drop for State {
fn drop(&mut self) {
SHUTDOWN.store(true, Ordering::Relaxed);
log!("Signaling children to stop...");
// stop children in reverse order
self.agents.reverse();
for (name, mut agent) in self.agents.drain(..) {
for (name, (mut agent, _)) in self.agents.drain() {
log!("Stopping child {}", name);
stop_child(&mut agent);
}
@ -122,6 +134,7 @@ impl Drop for State {
drop(data)
}
fs::remove_dir_all(SOLANA_CHECKPOINT_LOCATION).unwrap_or_default();
fs::remove_dir_all::<&Path>(AGENT_LOGGING_DIR.as_ref()).unwrap_or_default();
}
}
@ -133,20 +146,27 @@ fn main() -> ExitCode {
})
.unwrap();
assert_eq!(VALIDATOR_ORIGIN_CHAINS.len(), VALIDATOR_KEYS.len());
const VALIDATOR_COUNT: usize = VALIDATOR_KEYS.len();
let config = Config::load();
let solana_checkpoint_path = Path::new(SOLANA_CHECKPOINT_LOCATION);
fs::remove_dir_all(solana_checkpoint_path).unwrap_or_default();
let checkpoints_dirs: Vec<DynPath> = (0..VALIDATOR_COUNT - 1)
let mut validator_origin_chains = ["test1", "test2", "test3"].to_vec();
let mut validator_keys = ETH_VALIDATOR_KEYS.to_vec();
let mut validator_count: usize = validator_keys.len();
let mut checkpoints_dirs: Vec<DynPath> = (0..validator_count)
.map(|_| Box::new(tempdir().unwrap()) as DynPath)
.chain([Box::new(solana_checkpoint_path) as DynPath])
.collect();
if config.sealevel_enabled {
validator_origin_chains.push("sealeveltest1");
let mut sealevel_keys = SEALEVEL_VALIDATOR_KEYS.to_vec();
validator_keys.append(&mut sealevel_keys);
let solana_checkpoint_path = Path::new(SOLANA_CHECKPOINT_LOCATION);
fs::remove_dir_all(solana_checkpoint_path).unwrap_or_default();
checkpoints_dirs.push(Box::new(solana_checkpoint_path) as DynPath);
validator_count += 1;
}
assert_eq!(validator_origin_chains.len(), validator_keys.len());
let rocks_db_dir = tempdir().unwrap();
let relayer_db = concat_path(&rocks_db_dir, "relayer");
let validator_dbs = (0..VALIDATOR_COUNT)
let validator_dbs = (0..validator_count)
.map(|i| concat_path(&rocks_db_dir, format!("validator{i}")))
.collect::<Vec<_>>();
@ -207,11 +227,15 @@ fn main() -> ExitCode {
"http://127.0.0.1:8545,http://127.0.0.1:8545,http://127.0.0.1:8545",
)
// default is used for TEST3
.arg("defaultSigner.key", RELAYER_KEYS[2])
.arg(
.arg("defaultSigner.key", RELAYER_KEYS[2]);
let relayer_env = if config.sealevel_enabled {
relayer_env.arg(
"relayChains",
"test1,test2,test3,sealeveltest1,sealeveltest2",
);
)
} else {
relayer_env.arg("relayChains", "test1,test2,test3")
};
let base_validator_env = common_agent_env
.clone()
@ -233,14 +257,14 @@ fn main() -> ExitCode {
.hyp_env("INTERVAL", "5")
.hyp_env("CHECKPOINTSYNCER_TYPE", "localStorage");
let validator_envs = (0..VALIDATOR_COUNT)
let validator_envs = (0..validator_count)
.map(|i| {
base_validator_env
.clone()
.hyp_env("METRICSPORT", (9094 + i).to_string())
.hyp_env("DB", validator_dbs[i].to_str().unwrap())
.hyp_env("ORIGINCHAINNAME", VALIDATOR_ORIGIN_CHAINS[i])
.hyp_env("VALIDATOR_KEY", VALIDATOR_KEYS[i])
.hyp_env("ORIGINCHAINNAME", validator_origin_chains[i])
.hyp_env("VALIDATOR_KEY", validator_keys[i])
.hyp_env(
"CHECKPOINTSYNCER_PATH",
(*checkpoints_dirs[i]).as_ref().to_str().unwrap(),
@ -274,7 +298,7 @@ fn main() -> ExitCode {
.join(", ")
);
log!("Relayer DB in {}", relayer_db.display());
(0..VALIDATOR_COUNT).for_each(|i| {
(0..validator_count).for_each(|i| {
log!("Validator {} DB in {}", i + 1, validator_dbs[i].display());
});
@ -282,9 +306,14 @@ fn main() -> ExitCode {
// Ready to run...
//
let (solana_path, solana_path_tempdir) = install_solana_cli_tools().join();
state.data.push(Box::new(solana_path_tempdir));
let solana_program_builder = build_solana_programs(solana_path.clone());
let solana_paths = if config.sealevel_enabled {
let (solana_path, solana_path_tempdir) = install_solana_cli_tools().join();
state.data.push(Box::new(solana_path_tempdir));
let solana_program_builder = build_solana_programs(solana_path.clone());
Some((solana_program_builder.join(), solana_path))
} else {
None
};
// this task takes a long time in the CI so run it in parallel
log!("Building rust...");
@ -294,15 +323,18 @@ fn main() -> ExitCode {
.arg("bin", "relayer")
.arg("bin", "validator")
.arg("bin", "scraper")
.arg("bin", "init-db")
.arg("bin", "hyperlane-sealevel-client")
.arg("bin", "init-db");
let build_rust = if config.sealevel_enabled {
build_rust.arg("bin", "hyperlane-sealevel-client")
} else {
build_rust
};
let build_rust = build_rust
.filter_logs(|l| !l.contains("workspace-inheritance"))
.run();
let start_anvil = start_anvil(config.clone());
let solana_program_path = solana_program_builder.join();
log!("Running postgres db...");
let postgres = Program::new("docker")
.cmd("run")
@ -311,24 +343,31 @@ fn main() -> ExitCode {
.arg("env", "POSTGRES_PASSWORD=47221c18c610")
.arg("publish", "5432:5432")
.cmd("postgres:14")
.spawn("SQL");
.spawn("SQL", None);
state.push_agent(postgres);
build_rust.join();
let solana_ledger_dir = tempdir().unwrap();
let start_solana_validator = start_solana_test_validator(
solana_path.clone(),
solana_program_path,
solana_ledger_dir.as_ref().to_path_buf(),
);
let solana_config_path = if let Some((solana_program_path, solana_path)) = solana_paths.clone()
{
let start_solana_validator = start_solana_test_validator(
solana_path.clone(),
solana_program_path,
solana_ledger_dir.as_ref().to_path_buf(),
);
let (solana_config_path, solana_validator) = start_solana_validator.join();
state.push_agent(solana_validator);
Some(solana_config_path)
} else {
None
};
let (solana_config_path, solana_validator) = start_solana_validator.join();
state.push_agent(solana_validator);
state.push_agent(start_anvil.join());
// spawn 1st validator before any messages have been sent to test empty mailbox
state.push_agent(validator_envs.first().unwrap().clone().spawn("VL1"));
state.push_agent(validator_envs.first().unwrap().clone().spawn("VL1", None));
sleep(Duration::from_secs(5));
@ -336,7 +375,7 @@ fn main() -> ExitCode {
Program::new(concat_path(AGENT_BIN_PATH, "init-db"))
.run()
.join();
state.push_agent(scraper_env.spawn("SCR"));
state.push_agent(scraper_env.spawn("SCR", None));
// Send half the kathy messages before starting the rest of the agents
let kathy_env_single_insertion = Program::new("yarn")
@ -369,22 +408,35 @@ fn main() -> ExitCode {
.arg("required-hook", "merkleTreeHook");
kathy_env_double_insertion.clone().run().join();
// Send some sealevel messages before spinning up the agents, to test the backward indexing cursor
for _i in 0..(SOL_MESSAGES_EXPECTED / 2) {
initiate_solana_hyperlane_transfer(solana_path.clone(), solana_config_path.clone()).join();
if let Some((solana_config_path, (_, solana_path))) =
solana_config_path.clone().zip(solana_paths.clone())
{
// Send some sealevel messages before spinning up the agents, to test the backward indexing cursor
for _i in 0..(SOL_MESSAGES_EXPECTED / 2) {
initiate_solana_hyperlane_transfer(solana_path.clone(), solana_config_path.clone())
.join();
}
}
// spawn the rest of the validators
for (i, validator_env) in validator_envs.into_iter().enumerate().skip(1) {
let validator = validator_env.spawn(make_static(format!("VL{}", 1 + i)));
let validator = validator_env.spawn(
make_static(format!("VL{}", 1 + i)),
Some(AGENT_LOGGING_DIR.as_ref()),
);
state.push_agent(validator);
}
state.push_agent(relayer_env.spawn("RLY"));
state.push_agent(relayer_env.spawn("RLY", Some(&AGENT_LOGGING_DIR)));
// Send some sealevel messages after spinning up the relayer, to test the forward indexing cursor
for _i in 0..(SOL_MESSAGES_EXPECTED / 2) {
initiate_solana_hyperlane_transfer(solana_path.clone(), solana_config_path.clone()).join();
if let Some((solana_config_path, (_, solana_path))) =
solana_config_path.clone().zip(solana_paths.clone())
{
// Send some sealevel messages before spinning up the agents, to test the backward indexing cursor
for _i in 0..(SOL_MESSAGES_EXPECTED / 2) {
initiate_solana_hyperlane_transfer(solana_path.clone(), solana_config_path.clone())
.join();
}
}
log!("Setup complete! Agents running in background...");
@ -393,7 +445,11 @@ fn main() -> ExitCode {
// Send half the kathy messages after the relayer comes up
kathy_env_double_insertion.clone().run().join();
kathy_env_zero_insertion.clone().run().join();
state.push_agent(kathy_env_single_insertion.flag("mineforever").spawn("KTY"));
state.push_agent(
kathy_env_single_insertion
.flag("mineforever")
.spawn("KTY", None),
);
let loop_start = Instant::now();
// give things a chance to fully start.
@ -406,8 +462,11 @@ fn main() -> ExitCode {
if termination_invariants_met(
&config,
starting_relayer_balance,
&solana_path,
&solana_config_path,
solana_paths
.clone()
.map(|(_, solana_path)| solana_path)
.as_deref(),
solana_config_path.as_deref(),
)
.unwrap_or(false)
{
@ -422,7 +481,7 @@ fn main() -> ExitCode {
}
// verify long-running tasks are still running
for (name, child) in state.agents.iter_mut() {
for (name, (child, _)) in state.agents.iter_mut() {
if let Some(status) = child.try_wait().unwrap() {
if !status.success() {
log!(

@ -2,14 +2,14 @@ use std::{
collections::BTreeMap,
ffi::OsStr,
fmt::{Debug, Display, Formatter},
io::{BufRead, BufReader, Read},
fs::{File, OpenOptions},
io::{BufRead, BufReader, Read, Write},
path::{Path, PathBuf},
process::{Command, Stdio},
sync::{
atomic::{AtomicBool, Ordering},
mpsc,
mpsc::Sender,
Arc,
mpsc::{self, Sender},
Arc, Mutex,
},
thread::{sleep, spawn},
time::Duration,
@ -240,8 +240,18 @@ impl Program {
})
}
pub fn spawn(self, log_prefix: &'static str) -> AgentHandles {
pub fn spawn(self, log_prefix: &'static str, logs_dir: Option<&Path>) -> AgentHandles {
let mut command = self.create_command();
let log_file = logs_dir.map(|logs_dir| {
let log_file_name = format!("{}-output.log", log_prefix);
let log_file_path = logs_dir.join(log_file_name);
let log_file = OpenOptions::new()
.append(true)
.create(true)
.open(log_file_path)
.expect("Failed to create a log file");
Arc::new(Mutex::new(log_file))
});
command.stdout(Stdio::piped()).stderr(Stdio::piped());
log!("Spawning {}...", &self);
@ -250,17 +260,35 @@ impl Program {
.unwrap_or_else(|e| panic!("Failed to start {:?} with error: {e}", &self));
let child_stdout = child.stdout.take().unwrap();
let filter = self.get_filter();
let stdout =
spawn(move || prefix_log(child_stdout, log_prefix, &RUN_LOG_WATCHERS, filter, None));
let cloned_log_file = log_file.clone();
let stdout = spawn(move || {
prefix_log(
child_stdout,
log_prefix,
&RUN_LOG_WATCHERS,
filter,
cloned_log_file,
None,
)
});
let child_stderr = child.stderr.take().unwrap();
let stderr =
spawn(move || prefix_log(child_stderr, log_prefix, &RUN_LOG_WATCHERS, filter, None));
let stderr = spawn(move || {
prefix_log(
child_stderr,
log_prefix,
&RUN_LOG_WATCHERS,
filter,
None,
None,
)
});
(
log_prefix.to_owned(),
child,
Box::new(SimpleTaskHandle(stdout)),
Box::new(SimpleTaskHandle(stderr)),
self.get_memory(),
log_file.clone(),
)
}
@ -281,13 +309,13 @@ impl Program {
let stdout = child.stdout.take().unwrap();
let name = self.get_bin_name();
let running = running.clone();
spawn(move || prefix_log(stdout, &name, &running, filter, stdout_ch_tx))
spawn(move || prefix_log(stdout, &name, &running, filter, None, stdout_ch_tx))
};
let stderr = {
let stderr = child.stderr.take().unwrap();
let name = self.get_bin_name();
let running = running.clone();
spawn(move || prefix_log(stderr, &name, &running, filter, None))
spawn(move || prefix_log(stderr, &name, &running, filter, None, None))
};
let status = loop {
@ -321,6 +349,7 @@ fn prefix_log(
prefix: &str,
run_log_watcher: &AtomicBool,
filter: Option<LogFilter>,
file: Option<Arc<Mutex<File>>>,
channel: Option<Sender<String>>,
) {
let mut reader = BufReader::new(output).lines();
@ -340,6 +369,10 @@ fn prefix_log(
}
}
println!("<{prefix}> {line}");
if let Some(file) = &file {
let mut writer = file.lock().expect("Failed to acquire lock for log file");
writeln!(writer, "{}", line).unwrap_or(());
}
if let Some(channel) = &channel {
// ignore send errors
channel.send(line).unwrap_or(());

@ -202,7 +202,7 @@ pub fn start_solana_test_validator(
concat_path(&solana_programs_path, lib).to_str().unwrap(),
);
}
let validator = args.spawn("SOL");
let validator = args.spawn("SOL", None);
sleep(Duration::from_secs(5));
log!("Deploying the hyperlane programs to solana");

@ -1,5 +1,8 @@
use std::fs::File;
use std::io::{self, BufRead};
use std::path::{Path, PathBuf};
use std::process::Child;
use std::sync::{Arc, Mutex};
use std::thread::JoinHandle;
use nix::libc::pid_t;
@ -54,6 +57,8 @@ pub type AgentHandles = (
Box<dyn TaskHandle<Output = ()>>,
// data to drop once program exits
Box<dyn ArbitraryData>,
// file with stdout logs
Option<Arc<Mutex<File>>>,
);
pub type LogFilter = fn(&str) -> bool;
@ -112,3 +117,16 @@ pub fn stop_child(child: &mut Child) {
}
};
}
pub fn get_matching_lines(file: &File, search_string: &str) -> io::Result<Vec<String>> {
let reader = io::BufReader::new(file);
// Read lines and collect those that contain the search string
let matching_lines: Vec<String> = reader
.lines()
.map_while(Result::ok)
.filter(|line| line.contains(search_string))
.collect();
Ok(matching_lines)
}

Loading…
Cancel
Save