diff --git a/rust/agents/relayer/src/msg/metadata/aggregation.rs b/rust/agents/relayer/src/msg/metadata/aggregation.rs index ffcde517d..fed501b62 100644 --- a/rust/agents/relayer/src/msg/metadata/aggregation.rs +++ b/rust/agents/relayer/src/msg/metadata/aggregation.rs @@ -9,7 +9,7 @@ use tracing::{info, instrument}; use hyperlane_core::{HyperlaneMessage, InterchainSecurityModule, ModuleType, H256, U256}; -use super::{BaseMetadataBuilder, MetadataBuilder}; +use super::{MessageMetadataBuilder, MetadataBuilder}; /// Bytes used to store one member of the (start, end) range tuple /// Copied from `AggregationIsmMetadata.sol` @@ -17,7 +17,7 @@ const METADATA_RANGE_SIZE: usize = 4; #[derive(Clone, Debug, new, Deref)] pub struct AggregationIsmMetadataBuilder { - base: BaseMetadataBuilder, + base: MessageMetadataBuilder, } #[derive(Clone, Debug, new, PartialEq, Eq)] diff --git a/rust/agents/relayer/src/msg/metadata/base.rs b/rust/agents/relayer/src/msg/metadata/base.rs index 054cd8574..65dda6181 100644 --- a/rust/agents/relayer/src/msg/metadata/base.rs +++ b/rust/agents/relayer/src/msg/metadata/base.rs @@ -1,4 +1,11 @@ -use std::{collections::HashMap, fmt::Debug, str::FromStr, sync::Arc}; +use std::{ + collections::HashMap, + fmt::Debug, + ops::Deref, + str::FromStr, + sync::Arc, + time::{Duration, Instant}, +}; use crate::{ merkle_tree::builder::MerkleTreeBuilder, @@ -7,6 +14,7 @@ use crate::{ AggregationIsmMetadataBuilder, CcipReadIsmMetadataBuilder, NullMetadataBuilder, RoutingIsmMetadataBuilder, }, + settings::matching_list::MatchingList, }; use async_trait::async_trait; use derive_new::new; @@ -18,9 +26,10 @@ use hyperlane_base::{ }; use hyperlane_core::{ accumulator::merkle::Proof, AggregationIsm, CcipReadIsm, Checkpoint, HyperlaneDomain, - HyperlaneMessage, InterchainSecurityModule, ModuleType, MultisigIsm, RoutingIsm, + HyperlaneMessage, InterchainSecurityModule, Mailbox, ModuleType, MultisigIsm, RoutingIsm, ValidatorAnnounce, H160, H256, }; + use tokio::sync::RwLock; use tracing::{debug, info, instrument, warn}; @@ -40,39 +49,129 @@ pub struct IsmWithMetadataAndType { #[async_trait] pub trait MetadataBuilder: Send + Sync { - #[allow(clippy::async_yields_async)] async fn build(&self, ism_address: H256, message: &HyperlaneMessage) -> Result>>; } -#[derive(Clone, new)] -pub struct BaseMetadataBuilder { - destination_chain_setup: ChainConf, - origin_prover_sync: Arc>, - origin_validator_announce: Arc, - allow_local_checkpoint_syncers: bool, - metrics: Arc, - db: HyperlaneRocksDB, +/// Allows fetching the default ISM, caching the value for a period of time +/// to avoid fetching it all the time. +/// TODO: make this generic +#[derive(Debug)] +pub struct DefaultIsmCache { + value: RwLock>, + mailbox: Arc, +} + +impl DefaultIsmCache { + /// Time to live for the cached default ISM. 10 mins. + const TTL: Duration = Duration::from_secs(60 * 10); + + pub fn new(mailbox: Arc) -> Self { + Self { + value: RwLock::new(None), + mailbox, + } + } + + /// Gets the default ISM, fetching it from onchain if the cached value + /// is stale. + /// TODO: this can and should be made generic eventually + pub async fn get(&self) -> Result { + // If the duration since the value was last updated does not + // exceed the TTL, return the cached value. + // This is in its own block to avoid holding the lock during the + // async operation to fetch the on-chain default ISM if + // the cached value is stale. + { + let value = self.value.read().await; + + if let Some(value) = *value { + if value.1.elapsed() < Self::TTL { + return Ok(value.0); + } + } + } + + let default_ism = self.mailbox.default_ism().await?; + // Update the cached value. + { + let mut value = self.value.write().await; + *value = Some((default_ism, Instant::now())); + } + + Ok(default_ism) + } +} + +/// Classifies messages into an app context if they have one. +#[derive(Debug)] +pub struct AppContextClassifier { + default_ism: DefaultIsmCache, + app_matching_lists: Vec<(MatchingList, String)>, +} + +impl AppContextClassifier { + pub fn new( + destination_mailbox: Arc, + app_matching_lists: Vec<(MatchingList, String)>, + ) -> Self { + Self { + default_ism: DefaultIsmCache::new(destination_mailbox), + app_matching_lists, + } + } + + /// Classifies messages into an app context if they have one, or None + /// if they don't. + /// An app context is a string that identifies the app that sent the message + /// and exists just for metrics. + /// An app context is chosen based on: + /// - the first element in `app_matching_lists` that matches the message + /// - if the message's ISM is the default ISM, the app context is "default_ism" + pub async fn get_app_context( + &self, + message: &HyperlaneMessage, + root_ism: H256, + ) -> Result> { + // Give priority to the matching list. If the app from the matching list happens + // to use the default ISM, it's preferable to use the app context from the matching + // list. + for (matching_list, app_context) in self.app_matching_lists.iter() { + if matching_list.msg_matches(message, false) { + return Ok(Some(app_context.clone())); + } + } + + let default_ism = self.default_ism.get().await?; + if root_ism == default_ism { + return Ok(Some("default_ism".to_string())); + } + + Ok(None) + } +} + +/// Builds metadata for a message. +#[derive(Debug, Clone)] +pub struct MessageMetadataBuilder { + pub base: Arc, /// ISMs can be structured recursively. We keep track of the depth /// of the recursion to avoid infinite loops. - #[new(default)] - depth: u32, - max_depth: u32, + pub depth: u32, + pub app_context: Option, } -impl Debug for BaseMetadataBuilder { - fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { - write!( - f, - "MetadataBuilder {{ chain_setup: {:?}, validator_announce: {:?} }}", - self.destination_chain_setup, self.origin_validator_announce - ) +impl Deref for MessageMetadataBuilder { + type Target = BaseMetadataBuilder; + + fn deref(&self) -> &Self::Target { + &self.base } } #[async_trait] -impl MetadataBuilder for BaseMetadataBuilder { - #[instrument(err, skip(self), fields(domain=self.domain().name()))] +impl MetadataBuilder for MessageMetadataBuilder { + #[instrument(err, skip(self), fields(destination_domain=self.destination_domain().name()))] async fn build( &self, ism_address: H256, @@ -84,12 +183,24 @@ impl MetadataBuilder for BaseMetadataBuilder { } } -impl BaseMetadataBuilder { - pub fn domain(&self) -> &HyperlaneDomain { - &self.destination_chain_setup.domain +impl MessageMetadataBuilder { + pub async fn new( + ism_address: H256, + message: &HyperlaneMessage, + base: Arc, + ) -> Result { + let app_context = base + .app_context_classifier + .get_app_context(message, ism_address) + .await?; + Ok(Self { + base, + depth: 0, + app_context, + }) } - pub fn clone_with_incremented_depth(&self) -> Result { + fn clone_with_incremented_depth(&self) -> Result { let mut cloned = self.clone(); cloned.depth += 1; if cloned.depth > cloned.max_depth { @@ -99,6 +210,82 @@ impl BaseMetadataBuilder { } } + #[instrument(err, skip(self), fields(destination_domain=self.destination_domain().name()))] + pub async fn build_ism_and_metadata( + &self, + ism_address: H256, + message: &HyperlaneMessage, + ) -> Result { + let ism: Box = self + .build_ism(ism_address) + .await + .context("When building ISM")?; + + let module_type = ism + .module_type() + .await + .context("When fetching module type")?; + let cloned = self.clone_with_incremented_depth()?; + + let metadata_builder: Box = match module_type { + ModuleType::MerkleRootMultisig => { + Box::new(MerkleRootMultisigMetadataBuilder::new(cloned)) + } + ModuleType::MessageIdMultisig => { + Box::new(MessageIdMultisigMetadataBuilder::new(cloned)) + } + ModuleType::Routing => Box::new(RoutingIsmMetadataBuilder::new(cloned)), + ModuleType::Aggregation => Box::new(AggregationIsmMetadataBuilder::new(cloned)), + ModuleType::Null => Box::new(NullMetadataBuilder::new()), + ModuleType::CcipRead => Box::new(CcipReadIsmMetadataBuilder::new(cloned)), + _ => return Err(MetadataBuilderError::UnsupportedModuleType(module_type).into()), + }; + let meta = metadata_builder + .build(ism_address, message) + .await + .context("When building metadata"); + Ok(IsmWithMetadataAndType { + ism, + metadata: meta?, + module_type, + }) + } +} + +/// Base metadata builder with types used by higher level metadata builders. +#[allow(clippy::too_many_arguments)] +#[derive(new)] +pub struct BaseMetadataBuilder { + origin_domain: HyperlaneDomain, + destination_chain_setup: ChainConf, + origin_prover_sync: Arc>, + origin_validator_announce: Arc, + allow_local_checkpoint_syncers: bool, + metrics: Arc, + db: HyperlaneRocksDB, + max_depth: u32, + app_context_classifier: AppContextClassifier, +} + +impl Debug for BaseMetadataBuilder { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + write!( + f, + "BaseMetadataBuilder {{ origin_domain: {:?} destination_chain_setup: {:?}, validator_announce: {:?} }}", + self.origin_domain, self.destination_chain_setup, self.origin_validator_announce + ) + } +} + +impl BaseMetadataBuilder { + pub fn origin_domain(&self) -> &HyperlaneDomain { + &self.origin_domain + } + + pub fn destination_domain(&self) -> &HyperlaneDomain { + &self.destination_chain_setup.domain + } + pub async fn get_proof(&self, leaf_index: u32, checkpoint: Checkpoint) -> Result { const CTX: &str = "When fetching message proof"; let proof = self @@ -162,6 +349,7 @@ impl BaseMetadataBuilder { pub async fn build_checkpoint_syncer( &self, validators: &[H256], + app_context: Option, ) -> Result { let storage_locations = self .origin_validator_announce @@ -221,45 +409,10 @@ impl BaseMetadataBuilder { } } } - Ok(MultisigCheckpointSyncer::new(checkpoint_syncers)) - } - - #[instrument(err, skip(self), fields(domain=self.domain().name()))] - pub async fn build_ism_and_metadata( - &self, - ism_address: H256, - message: &HyperlaneMessage, - ) -> Result { - let ism: Box = self - .build_ism(ism_address) - .await - .context("When building ISM")?; - - let module_type = ism - .module_type() - .await - .context("When fetching module type")?; - let base = self.clone_with_incremented_depth()?; - - let metadata_builder: Box = match module_type { - ModuleType::MerkleRootMultisig => { - Box::new(MerkleRootMultisigMetadataBuilder::new(base)) - } - ModuleType::MessageIdMultisig => Box::new(MessageIdMultisigMetadataBuilder::new(base)), - ModuleType::Routing => Box::new(RoutingIsmMetadataBuilder::new(base)), - ModuleType::Aggregation => Box::new(AggregationIsmMetadataBuilder::new(base)), - ModuleType::Null => Box::new(NullMetadataBuilder::new()), - ModuleType::CcipRead => Box::new(CcipReadIsmMetadataBuilder::new(base)), - _ => return Err(MetadataBuilderError::UnsupportedModuleType(module_type).into()), - }; - let meta = metadata_builder - .build(ism_address, message) - .await - .context("When building metadata"); - Ok(IsmWithMetadataAndType { - ism, - metadata: meta?, - module_type, - }) + Ok(MultisigCheckpointSyncer::new( + checkpoint_syncers, + self.metrics.clone(), + app_context, + )) } } diff --git a/rust/agents/relayer/src/msg/metadata/ccip_read.rs b/rust/agents/relayer/src/msg/metadata/ccip_read.rs index 7bce48ce4..ea4cef347 100644 --- a/rust/agents/relayer/src/msg/metadata/ccip_read.rs +++ b/rust/agents/relayer/src/msg/metadata/ccip_read.rs @@ -11,7 +11,7 @@ use serde::{Deserialize, Serialize}; use serde_json::json; use tracing::{info, instrument}; -use super::{BaseMetadataBuilder, MetadataBuilder}; +use super::{base::MessageMetadataBuilder, MetadataBuilder}; #[derive(Serialize, Deserialize)] struct OffchainResponse { @@ -20,7 +20,7 @@ struct OffchainResponse { #[derive(Clone, Debug, new, Deref)] pub struct CcipReadIsmMetadataBuilder { - base: BaseMetadataBuilder, + base: MessageMetadataBuilder, } #[async_trait] diff --git a/rust/agents/relayer/src/msg/metadata/mod.rs b/rust/agents/relayer/src/msg/metadata/mod.rs index 258a10a74..2b3c3bf3d 100644 --- a/rust/agents/relayer/src/msg/metadata/mod.rs +++ b/rust/agents/relayer/src/msg/metadata/mod.rs @@ -6,8 +6,8 @@ mod null_metadata; mod routing; use aggregation::AggregationIsmMetadataBuilder; -pub(crate) use base::BaseMetadataBuilder; pub(crate) use base::MetadataBuilder; +pub(crate) use base::{AppContextClassifier, BaseMetadataBuilder, MessageMetadataBuilder}; use ccip_read::CcipReadIsmMetadataBuilder; use null_metadata::NullMetadataBuilder; use routing::RoutingIsmMetadataBuilder; diff --git a/rust/agents/relayer/src/msg/metadata/multisig/base.rs b/rust/agents/relayer/src/msg/metadata/multisig/base.rs index cbe622615..328b8848b 100644 --- a/rust/agents/relayer/src/msg/metadata/multisig/base.rs +++ b/rust/agents/relayer/src/msg/metadata/multisig/base.rs @@ -12,7 +12,8 @@ use hyperlane_core::{HyperlaneMessage, MultisigSignedCheckpoint, H256}; use strum::Display; use tracing::{debug, info}; -use crate::msg::metadata::BaseMetadataBuilder; +use crate::msg::metadata::base::MessageMetadataBuilder; + use crate::msg::metadata::MetadataBuilder; #[derive(new, AsRef, Deref)] @@ -36,7 +37,7 @@ pub enum MetadataToken { } #[async_trait] -pub trait MultisigIsmMetadataBuilder: AsRef + Send + Sync { +pub trait MultisigIsmMetadataBuilder: AsRef + Send + Sync { async fn fetch_metadata( &self, validators: &[H256], @@ -92,7 +93,6 @@ pub trait MultisigIsmMetadataBuilder: AsRef + Send + Sync { #[async_trait] impl MetadataBuilder for T { - #[allow(clippy::async_yields_async)] async fn build( &self, ism_address: H256, @@ -117,7 +117,7 @@ impl MetadataBuilder for T { let checkpoint_syncer = self .as_ref() - .build_checkpoint_syncer(&validators) + .build_checkpoint_syncer(&validators, self.as_ref().app_context.clone()) .await .context(CTX)?; diff --git a/rust/agents/relayer/src/msg/metadata/multisig/merkle_root_multisig.rs b/rust/agents/relayer/src/msg/metadata/multisig/merkle_root_multisig.rs index 125aa918d..b8bcca040 100644 --- a/rust/agents/relayer/src/msg/metadata/multisig/merkle_root_multisig.rs +++ b/rust/agents/relayer/src/msg/metadata/multisig/merkle_root_multisig.rs @@ -9,12 +9,12 @@ use hyperlane_base::MultisigCheckpointSyncer; use hyperlane_core::{unwrap_or_none_result, HyperlaneMessage, H256}; use tracing::debug; -use crate::msg::metadata::BaseMetadataBuilder; +use crate::msg::metadata::MessageMetadataBuilder; use super::base::{MetadataToken, MultisigIsmMetadataBuilder, MultisigMetadata}; #[derive(Debug, Clone, Deref, new, AsRef)] -pub struct MerkleRootMultisigMetadataBuilder(BaseMetadataBuilder); +pub struct MerkleRootMultisigMetadataBuilder(MessageMetadataBuilder); #[async_trait] impl MultisigIsmMetadataBuilder for MerkleRootMultisigMetadataBuilder { fn token_layout(&self) -> Vec { @@ -55,7 +55,9 @@ impl MultisigIsmMetadataBuilder for MerkleRootMultisigMetadataBuilder { validators, threshold as usize, leaf_index, - highest_leaf_index + highest_leaf_index, + self.origin_domain(), + self.destination_domain(), ) .await .context(CTX)?, diff --git a/rust/agents/relayer/src/msg/metadata/multisig/message_id_multisig.rs b/rust/agents/relayer/src/msg/metadata/multisig/message_id_multisig.rs index e5feffb85..9866c98b0 100644 --- a/rust/agents/relayer/src/msg/metadata/multisig/message_id_multisig.rs +++ b/rust/agents/relayer/src/msg/metadata/multisig/message_id_multisig.rs @@ -9,12 +9,12 @@ use hyperlane_base::MultisigCheckpointSyncer; use hyperlane_core::{unwrap_or_none_result, HyperlaneMessage, H256}; use tracing::{debug, warn}; -use crate::msg::metadata::BaseMetadataBuilder; +use crate::msg::metadata::MessageMetadataBuilder; use super::base::{MetadataToken, MultisigIsmMetadataBuilder, MultisigMetadata}; #[derive(Debug, Clone, Deref, new, AsRef)] -pub struct MessageIdMultisigMetadataBuilder(BaseMetadataBuilder); +pub struct MessageIdMultisigMetadataBuilder(MessageMetadataBuilder); #[async_trait] impl MultisigIsmMetadataBuilder for MessageIdMultisigMetadataBuilder { @@ -46,6 +46,16 @@ impl MultisigIsmMetadataBuilder for MessageIdMultisigMetadataBuilder { "No merkle leaf found for message id, must have not been enqueued in the tree" ) ); + + // Update the validator latest checkpoint metrics. + let _ = checkpoint_syncer + .get_validator_latest_checkpoints_and_update_metrics( + validators, + self.origin_domain(), + self.destination_domain(), + ) + .await; + let quorum_checkpoint = unwrap_or_none_result!( checkpoint_syncer .fetch_checkpoint(validators, threshold as usize, leaf_index) diff --git a/rust/agents/relayer/src/msg/metadata/routing.rs b/rust/agents/relayer/src/msg/metadata/routing.rs index 0a55b137f..c51cd69ba 100644 --- a/rust/agents/relayer/src/msg/metadata/routing.rs +++ b/rust/agents/relayer/src/msg/metadata/routing.rs @@ -5,11 +5,11 @@ use eyre::Context; use hyperlane_core::{HyperlaneMessage, H256}; use tracing::instrument; -use super::{BaseMetadataBuilder, MetadataBuilder}; +use super::{MessageMetadataBuilder, MetadataBuilder}; #[derive(Clone, Debug, new, Deref)] pub struct RoutingIsmMetadataBuilder { - base: BaseMetadataBuilder, + base: MessageMetadataBuilder, } #[async_trait] diff --git a/rust/agents/relayer/src/msg/pending_message.rs b/rust/agents/relayer/src/msg/pending_message.rs index 9836a47d7..dfd4d12cc 100644 --- a/rust/agents/relayer/src/msg/pending_message.rs +++ b/rust/agents/relayer/src/msg/pending_message.rs @@ -14,7 +14,7 @@ use tracing::{debug, error, info, instrument, trace, warn}; use super::{ gas_payment::GasPaymentEnforcer, - metadata::{BaseMetadataBuilder, MetadataBuilder}, + metadata::{BaseMetadataBuilder, MessageMetadataBuilder, MetadataBuilder}, pending_operation::*, }; @@ -35,7 +35,7 @@ pub struct MessageContext { pub origin_db: HyperlaneRocksDB, /// Used to construct the ISM metadata needed to verify a message from the /// origin. - pub metadata_builder: BaseMetadataBuilder, + pub metadata_builder: Arc, /// Used to determine if messages from the origin have made sufficient gas /// payments. pub origin_gas_payment_enforcer: Arc, @@ -153,9 +153,18 @@ impl PendingOperation for PendingMessage { "fetching ISM address. Potentially malformed recipient ISM address." ); + let message_metadata_builder = op_try!( + MessageMetadataBuilder::new( + ism_address, + &self.message, + self.ctx.metadata_builder.clone() + ) + .await, + "getting the message metadata builder" + ); + let Some(metadata) = op_try!( - self.ctx - .metadata_builder + message_metadata_builder .build(ism_address, &self.message) .await, "building metadata" diff --git a/rust/agents/relayer/src/msg/processor.rs b/rust/agents/relayer/src/msg/processor.rs index 14584d0c4..d06bc6b19 100644 --- a/rust/agents/relayer/src/msg/processor.rs +++ b/rust/agents/relayer/src/msg/processor.rs @@ -183,7 +183,8 @@ mod test { use crate::{ merkle_tree::builder::MerkleTreeBuilder, msg::{ - gas_payment::GasPaymentEnforcer, metadata::BaseMetadataBuilder, + gas_payment::GasPaymentEnforcer, + metadata::{AppContextClassifier, BaseMetadataBuilder}, pending_operation::PendingOperation, }, processor::Processor, @@ -240,16 +241,23 @@ mod test { } fn dummy_metadata_builder( - domain: &HyperlaneDomain, + origin_domain: &HyperlaneDomain, + destination_domain: &HyperlaneDomain, db: &HyperlaneRocksDB, ) -> BaseMetadataBuilder { let mut settings = Settings::default(); - settings - .chains - .insert(domain.name().to_owned(), dummy_chain_conf(domain)); - let destination_chain_conf = settings.chain_setup(domain).unwrap(); + settings.chains.insert( + origin_domain.name().to_owned(), + dummy_chain_conf(origin_domain), + ); + settings.chains.insert( + destination_domain.name().to_owned(), + dummy_chain_conf(destination_domain), + ); + let destination_chain_conf = settings.chain_setup(destination_domain).unwrap(); let core_metrics = CoreMetrics::new("dummy_relayer", 37582, Registry::new()).unwrap(); BaseMetadataBuilder::new( + origin_domain.clone(), destination_chain_conf.clone(), Arc::new(RwLock::new(MerkleTreeBuilder::new())), Arc::new(MockValidatorAnnounceContract::default()), @@ -257,6 +265,7 @@ mod test { Arc::new(core_metrics), db.clone(), 5, + AppContextClassifier::new(Arc::new(MockMailboxContract::default()), vec![]), ) } @@ -268,11 +277,11 @@ mod test { MessageProcessor, UnboundedReceiver>, ) { - let base_metadata_builder = dummy_metadata_builder(origin_domain, db); + let base_metadata_builder = dummy_metadata_builder(origin_domain, destination_domain, db); let message_context = Arc::new(MessageContext { destination_mailbox: Arc::new(MockMailboxContract::default()), origin_db: db.clone(), - metadata_builder: base_metadata_builder, + metadata_builder: Arc::new(base_metadata_builder), origin_gas_payment_enforcer: Arc::new(GasPaymentEnforcer::new([], db.clone())), transaction_gas_limit: Default::default(), metrics: dummy_submission_metrics(), diff --git a/rust/agents/relayer/src/relayer.rs b/rust/agents/relayer/src/relayer.rs index ba4ac30cd..c26c0b57a 100644 --- a/rust/agents/relayer/src/relayer.rs +++ b/rust/agents/relayer/src/relayer.rs @@ -34,7 +34,7 @@ use crate::{ merkle_tree::builder::MerkleTreeBuilder, msg::{ gas_payment::GasPaymentEnforcer, - metadata::BaseMetadataBuilder, + metadata::{AppContextClassifier, BaseMetadataBuilder}, pending_message::{MessageContext, MessageSubmissionMetrics}, pending_operation::DynPendingOperation, processor::{MessageProcessor, MessageProcessorMetrics}, @@ -212,6 +212,7 @@ impl BaseAgent for Relayer { for origin in &settings.origin_chains { let db = dbs.get(origin).unwrap().clone(); let metadata_builder = BaseMetadataBuilder::new( + origin.clone(), destination_chain_setup.clone(), prover_syncs[origin].clone(), validator_announces[origin].clone(), @@ -219,6 +220,10 @@ impl BaseAgent for Relayer { core.metrics.clone(), db, 5, + AppContextClassifier::new( + mailboxes[destination].clone(), + settings.metric_app_contexts.clone(), + ), ); msg_ctxs.insert( @@ -229,7 +234,7 @@ impl BaseAgent for Relayer { Arc::new(MessageContext { destination_mailbox: mailboxes[destination].clone(), origin_db: dbs.get(origin).unwrap().clone(), - metadata_builder, + metadata_builder: Arc::new(metadata_builder), origin_gas_payment_enforcer: gas_payment_enforcers[origin].clone(), transaction_gas_limit, metrics: MessageSubmissionMetrics::new(&core_metrics, origin, destination), diff --git a/rust/agents/relayer/src/settings/mod.rs b/rust/agents/relayer/src/settings/mod.rs index a595f0899..c6934a77c 100644 --- a/rust/agents/relayer/src/settings/mod.rs +++ b/rust/agents/relayer/src/settings/mod.rs @@ -54,6 +54,8 @@ pub struct RelayerSettings { /// If true, allows local storage based checkpoint syncers. /// Not intended for production use. pub allow_local_checkpoint_syncers: bool, + /// App contexts used for metrics. + pub metric_app_contexts: Vec<(MatchingList, String)>, } /// Config for gas payment enforcement @@ -118,26 +120,11 @@ impl FromRawConf for RelayerSettings { .parse_from_str("Expected database path") .unwrap_or_else(|| std::env::current_dir().unwrap().join("hyperlane_db")); - let (raw_gas_payment_enforcement_path, raw_gas_payment_enforcement) = match p + let (raw_gas_payment_enforcement_path, raw_gas_payment_enforcement) = p .get_opt_key("gasPaymentEnforcement") .take_config_err_flat(&mut err) - { - None => None, - Some(ValueParser { - val: Value::String(policy_str), - cwp, - }) => serde_json::from_str::(policy_str) - .context("Expected JSON string") - .take_err(&mut err, || cwp.clone()) - .map(|v| (cwp, recase_json_value(v, Case::Flat))), - Some(ValueParser { - val: value @ Value::Array(_), - cwp, - }) => Some((cwp, value.clone())), - Some(_) => Err(eyre!("Expected JSON array or stringified JSON")) - .take_err(&mut err, || cwp.clone()), - } - .unwrap_or_else(|| (&p.cwp + "gas_payment_enforcement", Value::Array(vec![]))); + .and_then(parse_json_array) + .unwrap_or_else(|| (&p.cwp + "gas_payment_enforcement", Value::Array(vec![]))); let gas_payment_enforcement_parser = ValueParser::new( raw_gas_payment_enforcement_path, @@ -247,6 +234,32 @@ impl FromRawConf for RelayerSettings { }) .collect(); + let (raw_metric_app_contexts_path, raw_metric_app_contexts) = p + .get_opt_key("metricAppContexts") + .take_config_err_flat(&mut err) + .and_then(parse_json_array) + .unwrap_or_else(|| (&p.cwp + "metric_app_contexts", Value::Array(vec![]))); + + let metric_app_contexts_parser = + ValueParser::new(raw_metric_app_contexts_path, &raw_metric_app_contexts); + let metric_app_contexts = metric_app_contexts_parser + .into_array_iter() + .map(|itr| { + itr.filter_map(|policy| { + let name = policy.chain(&mut err).get_key("name").parse_string().end(); + + let matching_list = policy + .chain(&mut err) + .get_key("matchingList") + .and_then(parse_matching_list) + .unwrap_or_default(); + + name.map(|name| (matching_list, name.to_owned())) + }) + .collect_vec() + }) + .unwrap_or_default(); + err.into_result(RelayerSettings { base, db, @@ -258,28 +271,35 @@ impl FromRawConf for RelayerSettings { transaction_gas_limit, skip_transaction_gas_limit_for, allow_local_checkpoint_syncers, + metric_app_contexts, }) } } -fn parse_matching_list(p: ValueParser) -> ConfigResult { +fn parse_json_array(p: ValueParser) -> Option<(ConfigPath, Value)> { let mut err = ConfigParsingError::default(); - let raw_list = match &p { + match p { ValueParser { - val: Value::String(matching_list_str), + val: Value::String(array_str), cwp, - } => serde_json::from_str::(matching_list_str) + } => serde_json::from_str::(array_str) .context("Expected JSON string") .take_err(&mut err, || cwp.clone()) - .map(|v| recase_json_value(v, Case::Flat)), + .map(|v| (cwp, recase_json_value(v, Case::Flat))), ValueParser { val: value @ Value::Array(_), - .. - } => Some((*value).clone()), + cwp, + } => Some((cwp, value.clone())), _ => Err(eyre!("Expected JSON array or stringified JSON")) .take_err(&mut err, || p.cwp.clone()), - }; + } +} + +fn parse_matching_list(p: ValueParser) -> ConfigResult { + let mut err = ConfigParsingError::default(); + + let raw_list = parse_json_array(p.clone()).map(|(_, v)| v); let Some(raw_list) = raw_list else { return err.into_result(MatchingList::default()); }; diff --git a/rust/chains/hyperlane-cosmos/src/mailbox.rs b/rust/chains/hyperlane-cosmos/src/mailbox.rs index 0b3927cda..bb94bc7fe 100644 --- a/rust/chains/hyperlane-cosmos/src/mailbox.rs +++ b/rust/chains/hyperlane-cosmos/src/mailbox.rs @@ -144,9 +144,9 @@ impl Mailbox for CosmosMailbox { .await?; let response: mailbox::DefaultIsmResponse = serde_json::from_slice(&data)?; - // convert Hex to H256 - let ism = H256::from_slice(&hex::decode(response.default_ism)?); - Ok(ism) + // convert bech32 to H256 + let ism = CosmosAddress::from_str(&response.default_ism)?; + Ok(ism.digest()) } #[instrument(err, ret, skip(self))] @@ -166,7 +166,7 @@ impl Mailbox for CosmosMailbox { .await?; let response: mailbox::RecipientIsmResponse = serde_json::from_slice(&data)?; - // convert Hex to H256 + // convert bech32 to H256 let ism = CosmosAddress::from_str(&response.ism)?; Ok(ism.digest()) } diff --git a/rust/hyperlane-base/src/metrics/core.rs b/rust/hyperlane-base/src/metrics/core.rs index a0c40d8a8..738538dae 100644 --- a/rust/hyperlane-base/src/metrics/core.rs +++ b/rust/hyperlane-base/src/metrics/core.rs @@ -1,14 +1,16 @@ -use std::collections::HashMap; +use std::collections::{HashMap, HashSet}; use std::fmt::{Debug, Formatter}; use std::sync::{Arc, OnceLock}; use eyre::Result; +use hyperlane_core::{HyperlaneDomain, H160}; use prometheus::{ histogram_opts, labels, opts, register_counter_vec_with_registry, register_gauge_vec_with_registry, register_histogram_vec_with_registry, register_int_counter_vec_with_registry, register_int_gauge_vec_with_registry, CounterVec, Encoder, GaugeVec, HistogramVec, IntCounterVec, IntGaugeVec, Registry, }; +use tokio::sync::RwLock; use tokio::task::JoinHandle; use tracing::warn; @@ -37,7 +39,6 @@ pub struct CoreMetrics { span_counts: IntCounterVec, span_events: IntCounterVec, last_known_message_nonce: IntGaugeVec, - validator_checkpoint_index: IntGaugeVec, submitter_queue_length: IntGaugeVec, operations_processed_count: IntCounterVec, @@ -51,6 +52,9 @@ pub struct CoreMetrics { /// Set of provider-specific metrics. These only need to get created once. provider_metrics: OnceLock, + + /// Metrics that are used to observe validator sets. + pub validator_metrics: ValidatorObservabilityMetricManager, } impl CoreMetrics { @@ -109,13 +113,18 @@ impl CoreMetrics { registry )?; - let validator_checkpoint_index = register_int_gauge_vec_with_registry!( + let observed_validator_latest_index = register_int_gauge_vec_with_registry!( opts!( - namespaced!("validator_checkpoint_index"), - "Observed signed checkpoint indices per validator", + namespaced!("observed_validator_latest_index"), + "The latest observed latest signed checkpoint indices per validator, from the perspective of the relayer", const_labels_ref ), - &["origin", "validator"], + &[ + "origin", + "destination", + "validator", + "app_context", + ], registry )?; @@ -169,7 +178,6 @@ impl CoreMetrics { span_counts, span_events, last_known_message_nonce, - validator_checkpoint_index, submitter_queue_length, @@ -180,6 +188,10 @@ impl CoreMetrics { json_rpc_client_metrics: OnceLock::new(), provider_metrics: OnceLock::new(), + + validator_metrics: ValidatorObservabilityMetricManager::new( + observed_validator_latest_index.clone(), + ), }) } @@ -298,14 +310,6 @@ impl CoreMetrics { self.last_known_message_nonce.clone() } - /// Gauge for reporting the most recent validator checkpoint index - /// Labels: - /// - `origin`: Origin chain - /// - `validator`: Address of the validator - pub fn validator_checkpoint_index(&self) -> IntGaugeVec { - self.validator_checkpoint_index.clone() - } - /// Latest message nonce in the validator. /// /// Phase: @@ -468,3 +472,97 @@ impl Debug for CoreMetrics { ) } } + +#[derive(Debug, Eq, PartialEq, Hash)] +struct AppContextKey { + origin: HyperlaneDomain, + destination: HyperlaneDomain, + app_context: String, +} + +/// Manages metrics for observing sets of validators. +pub struct ValidatorObservabilityMetricManager { + observed_validator_latest_index: IntGaugeVec, + + app_context_validators: RwLock>>, +} + +impl ValidatorObservabilityMetricManager { + fn new(observed_validator_latest_index: IntGaugeVec) -> Self { + Self { + observed_validator_latest_index, + app_context_validators: RwLock::new(HashMap::new()), + } + } + + /// Updates the metrics with the latest checkpoint index for each validator + /// in a given set. + pub async fn set_validator_latest_checkpoints( + &self, + origin: &HyperlaneDomain, + destination: &HyperlaneDomain, + app_context: String, + latest_checkpoints: &HashMap>, + ) { + let key = AppContextKey { + origin: origin.clone(), + destination: destination.clone(), + app_context: app_context.clone(), + }; + + let mut app_context_validators = self.app_context_validators.write().await; + + // First, clear out all previous metrics for the app context. + // This is necessary because the set of validators may have changed. + if let Some(prev_validators) = app_context_validators.get(&key) { + for validator in prev_validators { + // We unwrap because an error here occurs if the # of labels + // provided is incorrect, and we'd like to loudly fail in e2e if that + // happens. + self.observed_validator_latest_index + .remove_label_values(&[ + origin.as_ref(), + destination.as_ref(), + &format!("0x{:x}", validator).to_lowercase(), + &app_context, + ]) + .unwrap(); + } + } + + let mut set = HashSet::new(); + + // Then set the new metrics and update the cached set of validators. + for (validator, latest_checkpoint) in latest_checkpoints { + self.observed_validator_latest_index + .with_label_values(&[ + origin.as_ref(), + destination.as_ref(), + &format!("0x{:x}", validator).to_lowercase(), + &app_context, + ]) + // If the latest checkpoint is None, set to -1 to indicate that + // the validator did not provide a valid latest checkpoint index. + .set(latest_checkpoint.map(|i| i as i64).unwrap_or(-1)); + set.insert(*validator); + } + app_context_validators.insert(key, set); + } + + /// Gauge for reporting recently observed latest checkpoint indices for validator sets. + /// The entire set for an app context should be updated at once, and it should be updated + /// in a way that is robust to validator set changes. + /// Set to -1 to indicate a validator did not provide a valid latest checkpoint index. + /// Note that it's possible for an app to be using an aggregation ISM of more than one + /// validator set. If these sets are different, there is no label built into the metric + /// to distinguish them. + /// + /// Labels: + /// - `origin`: Origin chain + /// - `destination`: Destination chain + /// - `validator`: Address of the validator + /// - `app_context`: App context for the validator set + pub fn observed_validator_latest_index(&self) -> IntGaugeVec { + self.observed_validator_latest_index.clone() + } +} diff --git a/rust/hyperlane-base/src/settings/checkpoint_syncer.rs b/rust/hyperlane-base/src/settings/checkpoint_syncer.rs index 342f44732..01640747e 100644 --- a/rust/hyperlane-base/src/settings/checkpoint_syncer.rs +++ b/rust/hyperlane-base/src/settings/checkpoint_syncer.rs @@ -1,12 +1,11 @@ use core::str::FromStr; -use std::{collections::HashMap, path::PathBuf}; +use std::path::PathBuf; use eyre::{eyre, Context, Report, Result}; -use hyperlane_core::H160; -use prometheus::{IntGauge, IntGaugeVec}; +use prometheus::IntGauge; use rusoto_core::Region; -use crate::{CheckpointSyncer, LocalStorage, MultisigCheckpointSyncer, S3Storage}; +use crate::{CheckpointSyncer, LocalStorage, S3Storage}; /// Checkpoint Syncer types #[derive(Debug, Clone)] @@ -83,31 +82,3 @@ impl CheckpointSyncerConf { }) } } - -/// Config for a MultisigCheckpointSyncer -#[derive(Debug, Clone)] -pub struct MultisigCheckpointSyncerConf { - /// The checkpoint syncer for each valid validator signer address - checkpointsyncers: HashMap, -} - -impl MultisigCheckpointSyncerConf { - /// Get a MultisigCheckpointSyncer from the config - pub fn build( - &self, - origin: &str, - validator_checkpoint_index: IntGaugeVec, - ) -> Result { - let mut checkpoint_syncers = HashMap::new(); - for (key, value) in self.checkpointsyncers.iter() { - let gauge = - validator_checkpoint_index.with_label_values(&[origin, &key.to_lowercase()]); - if let Ok(conf) = value.build(Some(gauge)) { - checkpoint_syncers.insert(H160::from_str(key)?, conf.into()); - } else { - continue; - } - } - Ok(MultisigCheckpointSyncer::new(checkpoint_syncers)) - } -} diff --git a/rust/hyperlane-base/src/types/multisig.rs b/rust/hyperlane-base/src/types/multisig.rs index 4fd60f798..dc7878af3 100644 --- a/rust/hyperlane-base/src/types/multisig.rs +++ b/rust/hyperlane-base/src/types/multisig.rs @@ -5,19 +5,76 @@ use derive_new::new; use eyre::Result; use tracing::{debug, instrument}; -use hyperlane_core::{MultisigSignedCheckpoint, SignedCheckpointWithMessageId, H160, H256}; +use hyperlane_core::{ + HyperlaneDomain, MultisigSignedCheckpoint, SignedCheckpointWithMessageId, H160, H256, +}; -use crate::CheckpointSyncer; +use crate::{CheckpointSyncer, CoreMetrics}; -/// Fetches signed checkpoints from multiple validators to create -/// MultisigSignedCheckpoints +/// For a particular validator set, fetches signed checkpoints from multiple +/// validators to create MultisigSignedCheckpoints. #[derive(Clone, Debug, new)] pub struct MultisigCheckpointSyncer { /// The checkpoint syncer for each valid validator signer address checkpoint_syncers: HashMap>, + metrics: Arc, + app_context: Option, } impl MultisigCheckpointSyncer { + /// Gets the latest checkpoint index from each validator's checkpoint syncer. + /// Returns a vector of the latest indices, in an unspecified order, and does + /// not contain indices for validators that did not provide a latest index. + /// Also updates the validator latest checkpoint metrics. + pub async fn get_validator_latest_checkpoints_and_update_metrics( + &self, + validators: &[H256], + origin: &HyperlaneDomain, + destination: &HyperlaneDomain, + ) -> Vec { + // Get the latest_index from each validator's checkpoint syncer. + // If a validator does not return a latest index, None is recorded so + // this can be surfaced in the metrics. + let mut latest_indices: HashMap> = + HashMap::with_capacity(validators.len()); + + for validator in validators { + let address = H160::from(*validator); + if let Some(checkpoint_syncer) = self.checkpoint_syncers.get(&address) { + // Gracefully handle errors getting the latest_index + match checkpoint_syncer.latest_index().await { + Ok(Some(index)) => { + debug!(?address, ?index, "Validator returned latest index"); + latest_indices.insert(H160::from(*validator), Some(index)); + } + result => { + debug!( + ?address, + ?result, + "Failed to get latest index from validator" + ); + latest_indices.insert(H160::from(*validator), None); + } + } + } + } + + if let Some(app_context) = &self.app_context { + self.metrics + .validator_metrics + .set_validator_latest_checkpoints( + origin, + destination, + app_context.clone(), + &latest_indices, + ) + .await; + } + + // Filter out any validators that did not return a latest index + latest_indices.values().copied().flatten().collect() + } + /// Attempts to get the latest checkpoint with a quorum of signatures among /// validators. /// @@ -37,24 +94,13 @@ impl MultisigCheckpointSyncer { threshold: usize, minimum_index: u32, maximum_index: u32, + origin: &HyperlaneDomain, + destination: &HyperlaneDomain, ) -> Result> { - // Get the latest_index from each validator's checkpoint syncer. - let mut latest_indices = Vec::with_capacity(validators.len()); - for validator in validators { - let address = H160::from(*validator); - if let Some(checkpoint_syncer) = self.checkpoint_syncers.get(&address) { - // Gracefully handle errors getting the latest_index - match checkpoint_syncer.latest_index().await { - Ok(Some(index)) => { - debug!(?address, ?index, "Validator returned latest index"); - latest_indices.push(index); - } - err => { - debug!(?address, ?err, "Failed to get latest index from validator"); - } - } - } - } + let mut latest_indices = self + .get_validator_latest_checkpoints_and_update_metrics(validators, origin, destination) + .await; + debug!( ?latest_indices, "Fetched latest indices from checkpoint syncers" diff --git a/typescript/sdk/src/metadata/agentConfig.ts b/typescript/sdk/src/metadata/agentConfig.ts index ea4f8270c..5be7f21fc 100644 --- a/typescript/sdk/src/metadata/agentConfig.ts +++ b/typescript/sdk/src/metadata/agentConfig.ts @@ -235,29 +235,36 @@ const GasPaymentEnforcementSchema = z.union([ ]); export type GasPaymentEnforcement = z.infer; +const MetricAppContextSchema = z.object({ + name: z.string().min(1), + matchingList: MatchingListSchema.describe( + 'A matching list, any message that matches will be classified as this app context.', + ), +}); + export const RelayerAgentConfigSchema = AgentConfigSchema.extend({ db: z .string() - .nonempty() + .min(1) .optional() .describe('The path to the relayer database.'), relayChains: CommaSeperatedChainList.describe( 'Comma seperated list of chains to relay messages between.', ), gasPaymentEnforcement: z - .union([z.array(GasPaymentEnforcementSchema), z.string().nonempty()]) + .union([z.array(GasPaymentEnforcementSchema), z.string().min(1)]) .optional() .describe( 'The gas payment enforcement configuration as JSON. Expects an ordered array of `GasPaymentEnforcementConfig`.', ), whitelist: z - .union([MatchingListSchema, z.string().nonempty()]) + .union([MatchingListSchema, z.string().min(1)]) .optional() .describe( 'If no whitelist is provided ALL messages will be considered on the whitelist.', ), blacklist: z - .union([MatchingListSchema, z.string().nonempty()]) + .union([MatchingListSchema, z.string().min(1)]) .optional() .describe( 'If no blacklist is provided ALL will be considered to not be on the blacklist.', @@ -274,12 +281,18 @@ export const RelayerAgentConfigSchema = AgentConfigSchema.extend({ .describe( 'If true, allows local storage based checkpoint syncers. Not intended for production use.', ), + metricAppContexts: z + .union([z.array(MetricAppContextSchema), z.string().min(1)]) + .optional() + .describe( + 'A list of app contexts and their matching lists to use for metrics. A message will be classified as the first matching app context.', + ), }); export type RelayerConfig = z.infer; export const ScraperAgentConfigSchema = AgentConfigSchema.extend({ - db: z.string().nonempty().describe('Database connection string'), + db: z.string().min(1).describe('Database connection string'), chainsToScrape: CommaSeperatedChainList.describe( 'Comma separated list of chain names to scrape', ), @@ -290,32 +303,29 @@ export type ScraperConfig = z.infer; export const ValidatorAgentConfigSchema = AgentConfigSchema.extend({ db: z .string() - .nonempty() + .min(1) .optional() .describe('The path to the validator database.'), originChainName: z .string() - .nonempty() + .min(1) .describe('Name of the chain to validate messages on'), validator: AgentSignerSchema.describe('The validator attestation signer'), checkpointSyncer: z.discriminatedUnion('type', [ z .object({ type: z.literal('localStorage'), - path: z - .string() - .nonempty() - .describe('Path to the local storage location'), + path: z.string().min(1).describe('Path to the local storage location'), }) .describe('A local checkpoint syncer'), z .object({ type: z.literal('s3'), - bucket: z.string().nonempty(), - region: z.string().nonempty(), + bucket: z.string().min(1), + region: z.string().min(1), folder: z .string() - .nonempty() + .min(1) .optional() .describe( 'The folder/key-prefix to use, defaults to the root of the bucket',