Observability of validators for relayers (#3057)

### Description

Goal of this was to have insight into validators of important sets being
"up"

Introduces a new metric used by relayers:
`hyperlane_observed_validator_latest_index`, e.g.:

```
hyperlane_observed_validator_latest_index{agent="relayer",app_context="default_ism",destination="test1",hyperlane_baselib_version="0.1.0",origin="test2",validator="0x9965507d1a55bcc2695c58ba16fb37d819b0a4dc"} 664
hyperlane_observed_validator_latest_index{agent="relayer",app_context="default_ism",destination="test1",hyperlane_baselib_version="0.1.0",origin="test3",validator="0x976ea74026e726554db657fa54763abd0c3a0aa9"} 641
hyperlane_observed_validator_latest_index{agent="relayer",app_context="default_ism",destination="test2",hyperlane_baselib_version="0.1.0",origin="test1",validator="0x15d34aaf54267db7d7c367839aaf71a00a2c6a65"} 670
hyperlane_observed_validator_latest_index{agent="relayer",app_context="default_ism",destination="test2",hyperlane_baselib_version="0.1.0",origin="test3",validator="0x976ea74026e726554db657fa54763abd0c3a0aa9"} 665
hyperlane_observed_validator_latest_index{agent="relayer",app_context="default_ism",destination="test3",hyperlane_baselib_version="0.1.0",origin="test1",validator="0x15d34aaf54267db7d7c367839aaf71a00a2c6a65"} 652
hyperlane_observed_validator_latest_index{agent="relayer",app_context="default_ism",destination="test3",hyperlane_baselib_version="0.1.0",origin="test2",validator="0x9965507d1a55bcc2695c58ba16fb37d819b0a4dc"} 664
hyperlane_observed_validator_latest_index{agent="relayer",app_context="testapp",destination="test1",hyperlane_baselib_version="0.1.0",origin="test2",validator="0x9965507d1a55bcc2695c58ba16fb37d819b0a4dc"} 658
hyperlane_observed_validator_latest_index{agent="relayer",app_context="testapp",destination="test1",hyperlane_baselib_version="0.1.0",origin="test3",validator="0x976ea74026e726554db657fa54763abd0c3a0aa9"} 641
```

Tapping into metadata building for multisig ISMs, the relayer will
update the metric with the latest indices for the validators in a set.
In order to prevent the cardinality being ridiculously high, only
certain validator sets are tracked. This is done by introducing an
`app_context` label (I'm very open to other names here, for some reason
whenever idk how to name some kind of identifier I end up calling it a
context 😆)

The app context can either be:
- if a new setting, --metricAppContexts, is specified, a message will be
classified based off the first matching list it matches. E.g.
`--metricAppContexts '[{"name": "testapp", "matchingList":
[{"recipient_address": "0xd84379ceae14aa33c123af12424a37803f885889",
"destination_domain": 13371 }] }]'`. This is nice for e.g. warp route
deployments, where the ISM is maybe not a default ISM, and can be
changed
- if a message doesn't get classified this way, it can also be
classified with the "default_ism" app context, which is just for any
message that happens to use the default ISM as its "root" ISM

This way we have insight in to the default ISM and any
application-specific ISMs.

Some things to note:
- it's possible for a message to actually have more than one validator
set, e.g. if it's using an aggregation ISM. In this case, we'll have
metrics on the union of all validator sets for that app context
- Some effort is required to make sure that metrics don't stick around
for a validator that has actually been removed from the set. To handle
this, we cache the validator set for an app context and clear out the
entire set each time we set the metrics

### Drive-by changes

- Zod's nonempty function for strings is deprecated, moves to `.min(1)`
instead

### Related issues

- Fixes https://github.com/hyperlane-xyz/hyperlane-monorepo/issues/1762

### Backward compatibility

yes

### Testing

Ran locally - I think i'll probably add something in e2e tests, but
opening now
pull/3110/head
Trevor Porter 11 months ago committed by GitHub
parent a04454d6d7
commit 3f88aa6f6e
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 4
      rust/agents/relayer/src/msg/metadata/aggregation.rs
  2. 287
      rust/agents/relayer/src/msg/metadata/base.rs
  3. 4
      rust/agents/relayer/src/msg/metadata/ccip_read.rs
  4. 2
      rust/agents/relayer/src/msg/metadata/mod.rs
  5. 8
      rust/agents/relayer/src/msg/metadata/multisig/base.rs
  6. 8
      rust/agents/relayer/src/msg/metadata/multisig/merkle_root_multisig.rs
  7. 14
      rust/agents/relayer/src/msg/metadata/multisig/message_id_multisig.rs
  8. 4
      rust/agents/relayer/src/msg/metadata/routing.rs
  9. 17
      rust/agents/relayer/src/msg/pending_message.rs
  10. 25
      rust/agents/relayer/src/msg/processor.rs
  11. 9
      rust/agents/relayer/src/relayer.rs
  12. 72
      rust/agents/relayer/src/settings/mod.rs
  13. 8
      rust/chains/hyperlane-cosmos/src/mailbox.rs
  14. 128
      rust/hyperlane-base/src/metrics/core.rs
  15. 35
      rust/hyperlane-base/src/settings/checkpoint_syncer.rs
  16. 88
      rust/hyperlane-base/src/types/multisig.rs
  17. 38
      typescript/sdk/src/metadata/agentConfig.ts

@ -9,7 +9,7 @@ use tracing::{info, instrument};
use hyperlane_core::{HyperlaneMessage, InterchainSecurityModule, ModuleType, H256, U256};
use super::{BaseMetadataBuilder, MetadataBuilder};
use super::{MessageMetadataBuilder, MetadataBuilder};
/// Bytes used to store one member of the (start, end) range tuple
/// Copied from `AggregationIsmMetadata.sol`
@ -17,7 +17,7 @@ const METADATA_RANGE_SIZE: usize = 4;
#[derive(Clone, Debug, new, Deref)]
pub struct AggregationIsmMetadataBuilder {
base: BaseMetadataBuilder,
base: MessageMetadataBuilder,
}
#[derive(Clone, Debug, new, PartialEq, Eq)]

@ -1,4 +1,11 @@
use std::{collections::HashMap, fmt::Debug, str::FromStr, sync::Arc};
use std::{
collections::HashMap,
fmt::Debug,
ops::Deref,
str::FromStr,
sync::Arc,
time::{Duration, Instant},
};
use crate::{
merkle_tree::builder::MerkleTreeBuilder,
@ -7,6 +14,7 @@ use crate::{
AggregationIsmMetadataBuilder, CcipReadIsmMetadataBuilder, NullMetadataBuilder,
RoutingIsmMetadataBuilder,
},
settings::matching_list::MatchingList,
};
use async_trait::async_trait;
use derive_new::new;
@ -18,9 +26,10 @@ use hyperlane_base::{
};
use hyperlane_core::{
accumulator::merkle::Proof, AggregationIsm, CcipReadIsm, Checkpoint, HyperlaneDomain,
HyperlaneMessage, InterchainSecurityModule, ModuleType, MultisigIsm, RoutingIsm,
HyperlaneMessage, InterchainSecurityModule, Mailbox, ModuleType, MultisigIsm, RoutingIsm,
ValidatorAnnounce, H160, H256,
};
use tokio::sync::RwLock;
use tracing::{debug, info, instrument, warn};
@ -40,39 +49,129 @@ pub struct IsmWithMetadataAndType {
#[async_trait]
pub trait MetadataBuilder: Send + Sync {
#[allow(clippy::async_yields_async)]
async fn build(&self, ism_address: H256, message: &HyperlaneMessage)
-> Result<Option<Vec<u8>>>;
}
#[derive(Clone, new)]
pub struct BaseMetadataBuilder {
destination_chain_setup: ChainConf,
origin_prover_sync: Arc<RwLock<MerkleTreeBuilder>>,
origin_validator_announce: Arc<dyn ValidatorAnnounce>,
allow_local_checkpoint_syncers: bool,
metrics: Arc<CoreMetrics>,
db: HyperlaneRocksDB,
/// Allows fetching the default ISM, caching the value for a period of time
/// to avoid fetching it all the time.
/// TODO: make this generic
#[derive(Debug)]
pub struct DefaultIsmCache {
value: RwLock<Option<(H256, Instant)>>,
mailbox: Arc<dyn Mailbox>,
}
impl DefaultIsmCache {
/// Time to live for the cached default ISM. 10 mins.
const TTL: Duration = Duration::from_secs(60 * 10);
pub fn new(mailbox: Arc<dyn Mailbox>) -> Self {
Self {
value: RwLock::new(None),
mailbox,
}
}
/// Gets the default ISM, fetching it from onchain if the cached value
/// is stale.
/// TODO: this can and should be made generic eventually
pub async fn get(&self) -> Result<H256> {
// If the duration since the value was last updated does not
// exceed the TTL, return the cached value.
// This is in its own block to avoid holding the lock during the
// async operation to fetch the on-chain default ISM if
// the cached value is stale.
{
let value = self.value.read().await;
if let Some(value) = *value {
if value.1.elapsed() < Self::TTL {
return Ok(value.0);
}
}
}
let default_ism = self.mailbox.default_ism().await?;
// Update the cached value.
{
let mut value = self.value.write().await;
*value = Some((default_ism, Instant::now()));
}
Ok(default_ism)
}
}
/// Classifies messages into an app context if they have one.
#[derive(Debug)]
pub struct AppContextClassifier {
default_ism: DefaultIsmCache,
app_matching_lists: Vec<(MatchingList, String)>,
}
impl AppContextClassifier {
pub fn new(
destination_mailbox: Arc<dyn Mailbox>,
app_matching_lists: Vec<(MatchingList, String)>,
) -> Self {
Self {
default_ism: DefaultIsmCache::new(destination_mailbox),
app_matching_lists,
}
}
/// Classifies messages into an app context if they have one, or None
/// if they don't.
/// An app context is a string that identifies the app that sent the message
/// and exists just for metrics.
/// An app context is chosen based on:
/// - the first element in `app_matching_lists` that matches the message
/// - if the message's ISM is the default ISM, the app context is "default_ism"
pub async fn get_app_context(
&self,
message: &HyperlaneMessage,
root_ism: H256,
) -> Result<Option<String>> {
// Give priority to the matching list. If the app from the matching list happens
// to use the default ISM, it's preferable to use the app context from the matching
// list.
for (matching_list, app_context) in self.app_matching_lists.iter() {
if matching_list.msg_matches(message, false) {
return Ok(Some(app_context.clone()));
}
}
let default_ism = self.default_ism.get().await?;
if root_ism == default_ism {
return Ok(Some("default_ism".to_string()));
}
Ok(None)
}
}
/// Builds metadata for a message.
#[derive(Debug, Clone)]
pub struct MessageMetadataBuilder {
pub base: Arc<BaseMetadataBuilder>,
/// ISMs can be structured recursively. We keep track of the depth
/// of the recursion to avoid infinite loops.
#[new(default)]
depth: u32,
max_depth: u32,
pub depth: u32,
pub app_context: Option<String>,
}
impl Debug for BaseMetadataBuilder {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
write!(
f,
"MetadataBuilder {{ chain_setup: {:?}, validator_announce: {:?} }}",
self.destination_chain_setup, self.origin_validator_announce
)
impl Deref for MessageMetadataBuilder {
type Target = BaseMetadataBuilder;
fn deref(&self) -> &Self::Target {
&self.base
}
}
#[async_trait]
impl MetadataBuilder for BaseMetadataBuilder {
#[instrument(err, skip(self), fields(domain=self.domain().name()))]
impl MetadataBuilder for MessageMetadataBuilder {
#[instrument(err, skip(self), fields(destination_domain=self.destination_domain().name()))]
async fn build(
&self,
ism_address: H256,
@ -84,12 +183,24 @@ impl MetadataBuilder for BaseMetadataBuilder {
}
}
impl BaseMetadataBuilder {
pub fn domain(&self) -> &HyperlaneDomain {
&self.destination_chain_setup.domain
impl MessageMetadataBuilder {
pub async fn new(
ism_address: H256,
message: &HyperlaneMessage,
base: Arc<BaseMetadataBuilder>,
) -> Result<Self> {
let app_context = base
.app_context_classifier
.get_app_context(message, ism_address)
.await?;
Ok(Self {
base,
depth: 0,
app_context,
})
}
pub fn clone_with_incremented_depth(&self) -> Result<BaseMetadataBuilder> {
fn clone_with_incremented_depth(&self) -> Result<MessageMetadataBuilder> {
let mut cloned = self.clone();
cloned.depth += 1;
if cloned.depth > cloned.max_depth {
@ -99,6 +210,82 @@ impl BaseMetadataBuilder {
}
}
#[instrument(err, skip(self), fields(destination_domain=self.destination_domain().name()))]
pub async fn build_ism_and_metadata(
&self,
ism_address: H256,
message: &HyperlaneMessage,
) -> Result<IsmWithMetadataAndType> {
let ism: Box<dyn InterchainSecurityModule> = self
.build_ism(ism_address)
.await
.context("When building ISM")?;
let module_type = ism
.module_type()
.await
.context("When fetching module type")?;
let cloned = self.clone_with_incremented_depth()?;
let metadata_builder: Box<dyn MetadataBuilder> = match module_type {
ModuleType::MerkleRootMultisig => {
Box::new(MerkleRootMultisigMetadataBuilder::new(cloned))
}
ModuleType::MessageIdMultisig => {
Box::new(MessageIdMultisigMetadataBuilder::new(cloned))
}
ModuleType::Routing => Box::new(RoutingIsmMetadataBuilder::new(cloned)),
ModuleType::Aggregation => Box::new(AggregationIsmMetadataBuilder::new(cloned)),
ModuleType::Null => Box::new(NullMetadataBuilder::new()),
ModuleType::CcipRead => Box::new(CcipReadIsmMetadataBuilder::new(cloned)),
_ => return Err(MetadataBuilderError::UnsupportedModuleType(module_type).into()),
};
let meta = metadata_builder
.build(ism_address, message)
.await
.context("When building metadata");
Ok(IsmWithMetadataAndType {
ism,
metadata: meta?,
module_type,
})
}
}
/// Base metadata builder with types used by higher level metadata builders.
#[allow(clippy::too_many_arguments)]
#[derive(new)]
pub struct BaseMetadataBuilder {
origin_domain: HyperlaneDomain,
destination_chain_setup: ChainConf,
origin_prover_sync: Arc<RwLock<MerkleTreeBuilder>>,
origin_validator_announce: Arc<dyn ValidatorAnnounce>,
allow_local_checkpoint_syncers: bool,
metrics: Arc<CoreMetrics>,
db: HyperlaneRocksDB,
max_depth: u32,
app_context_classifier: AppContextClassifier,
}
impl Debug for BaseMetadataBuilder {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
write!(
f,
"BaseMetadataBuilder {{ origin_domain: {:?} destination_chain_setup: {:?}, validator_announce: {:?} }}",
self.origin_domain, self.destination_chain_setup, self.origin_validator_announce
)
}
}
impl BaseMetadataBuilder {
pub fn origin_domain(&self) -> &HyperlaneDomain {
&self.origin_domain
}
pub fn destination_domain(&self) -> &HyperlaneDomain {
&self.destination_chain_setup.domain
}
pub async fn get_proof(&self, leaf_index: u32, checkpoint: Checkpoint) -> Result<Proof> {
const CTX: &str = "When fetching message proof";
let proof = self
@ -162,6 +349,7 @@ impl BaseMetadataBuilder {
pub async fn build_checkpoint_syncer(
&self,
validators: &[H256],
app_context: Option<String>,
) -> Result<MultisigCheckpointSyncer> {
let storage_locations = self
.origin_validator_announce
@ -221,45 +409,10 @@ impl BaseMetadataBuilder {
}
}
}
Ok(MultisigCheckpointSyncer::new(checkpoint_syncers))
}
#[instrument(err, skip(self), fields(domain=self.domain().name()))]
pub async fn build_ism_and_metadata(
&self,
ism_address: H256,
message: &HyperlaneMessage,
) -> Result<IsmWithMetadataAndType> {
let ism: Box<dyn InterchainSecurityModule> = self
.build_ism(ism_address)
.await
.context("When building ISM")?;
let module_type = ism
.module_type()
.await
.context("When fetching module type")?;
let base = self.clone_with_incremented_depth()?;
let metadata_builder: Box<dyn MetadataBuilder> = match module_type {
ModuleType::MerkleRootMultisig => {
Box::new(MerkleRootMultisigMetadataBuilder::new(base))
}
ModuleType::MessageIdMultisig => Box::new(MessageIdMultisigMetadataBuilder::new(base)),
ModuleType::Routing => Box::new(RoutingIsmMetadataBuilder::new(base)),
ModuleType::Aggregation => Box::new(AggregationIsmMetadataBuilder::new(base)),
ModuleType::Null => Box::new(NullMetadataBuilder::new()),
ModuleType::CcipRead => Box::new(CcipReadIsmMetadataBuilder::new(base)),
_ => return Err(MetadataBuilderError::UnsupportedModuleType(module_type).into()),
};
let meta = metadata_builder
.build(ism_address, message)
.await
.context("When building metadata");
Ok(IsmWithMetadataAndType {
ism,
metadata: meta?,
module_type,
})
Ok(MultisigCheckpointSyncer::new(
checkpoint_syncers,
self.metrics.clone(),
app_context,
))
}
}

@ -11,7 +11,7 @@ use serde::{Deserialize, Serialize};
use serde_json::json;
use tracing::{info, instrument};
use super::{BaseMetadataBuilder, MetadataBuilder};
use super::{base::MessageMetadataBuilder, MetadataBuilder};
#[derive(Serialize, Deserialize)]
struct OffchainResponse {
@ -20,7 +20,7 @@ struct OffchainResponse {
#[derive(Clone, Debug, new, Deref)]
pub struct CcipReadIsmMetadataBuilder {
base: BaseMetadataBuilder,
base: MessageMetadataBuilder,
}
#[async_trait]

@ -6,8 +6,8 @@ mod null_metadata;
mod routing;
use aggregation::AggregationIsmMetadataBuilder;
pub(crate) use base::BaseMetadataBuilder;
pub(crate) use base::MetadataBuilder;
pub(crate) use base::{AppContextClassifier, BaseMetadataBuilder, MessageMetadataBuilder};
use ccip_read::CcipReadIsmMetadataBuilder;
use null_metadata::NullMetadataBuilder;
use routing::RoutingIsmMetadataBuilder;

@ -12,7 +12,8 @@ use hyperlane_core::{HyperlaneMessage, MultisigSignedCheckpoint, H256};
use strum::Display;
use tracing::{debug, info};
use crate::msg::metadata::BaseMetadataBuilder;
use crate::msg::metadata::base::MessageMetadataBuilder;
use crate::msg::metadata::MetadataBuilder;
#[derive(new, AsRef, Deref)]
@ -36,7 +37,7 @@ pub enum MetadataToken {
}
#[async_trait]
pub trait MultisigIsmMetadataBuilder: AsRef<BaseMetadataBuilder> + Send + Sync {
pub trait MultisigIsmMetadataBuilder: AsRef<MessageMetadataBuilder> + Send + Sync {
async fn fetch_metadata(
&self,
validators: &[H256],
@ -92,7 +93,6 @@ pub trait MultisigIsmMetadataBuilder: AsRef<BaseMetadataBuilder> + Send + Sync {
#[async_trait]
impl<T: MultisigIsmMetadataBuilder> MetadataBuilder for T {
#[allow(clippy::async_yields_async)]
async fn build(
&self,
ism_address: H256,
@ -117,7 +117,7 @@ impl<T: MultisigIsmMetadataBuilder> MetadataBuilder for T {
let checkpoint_syncer = self
.as_ref()
.build_checkpoint_syncer(&validators)
.build_checkpoint_syncer(&validators, self.as_ref().app_context.clone())
.await
.context(CTX)?;

@ -9,12 +9,12 @@ use hyperlane_base::MultisigCheckpointSyncer;
use hyperlane_core::{unwrap_or_none_result, HyperlaneMessage, H256};
use tracing::debug;
use crate::msg::metadata::BaseMetadataBuilder;
use crate::msg::metadata::MessageMetadataBuilder;
use super::base::{MetadataToken, MultisigIsmMetadataBuilder, MultisigMetadata};
#[derive(Debug, Clone, Deref, new, AsRef)]
pub struct MerkleRootMultisigMetadataBuilder(BaseMetadataBuilder);
pub struct MerkleRootMultisigMetadataBuilder(MessageMetadataBuilder);
#[async_trait]
impl MultisigIsmMetadataBuilder for MerkleRootMultisigMetadataBuilder {
fn token_layout(&self) -> Vec<MetadataToken> {
@ -55,7 +55,9 @@ impl MultisigIsmMetadataBuilder for MerkleRootMultisigMetadataBuilder {
validators,
threshold as usize,
leaf_index,
highest_leaf_index
highest_leaf_index,
self.origin_domain(),
self.destination_domain(),
)
.await
.context(CTX)?,

@ -9,12 +9,12 @@ use hyperlane_base::MultisigCheckpointSyncer;
use hyperlane_core::{unwrap_or_none_result, HyperlaneMessage, H256};
use tracing::{debug, warn};
use crate::msg::metadata::BaseMetadataBuilder;
use crate::msg::metadata::MessageMetadataBuilder;
use super::base::{MetadataToken, MultisigIsmMetadataBuilder, MultisigMetadata};
#[derive(Debug, Clone, Deref, new, AsRef)]
pub struct MessageIdMultisigMetadataBuilder(BaseMetadataBuilder);
pub struct MessageIdMultisigMetadataBuilder(MessageMetadataBuilder);
#[async_trait]
impl MultisigIsmMetadataBuilder for MessageIdMultisigMetadataBuilder {
@ -46,6 +46,16 @@ impl MultisigIsmMetadataBuilder for MessageIdMultisigMetadataBuilder {
"No merkle leaf found for message id, must have not been enqueued in the tree"
)
);
// Update the validator latest checkpoint metrics.
let _ = checkpoint_syncer
.get_validator_latest_checkpoints_and_update_metrics(
validators,
self.origin_domain(),
self.destination_domain(),
)
.await;
let quorum_checkpoint = unwrap_or_none_result!(
checkpoint_syncer
.fetch_checkpoint(validators, threshold as usize, leaf_index)

@ -5,11 +5,11 @@ use eyre::Context;
use hyperlane_core::{HyperlaneMessage, H256};
use tracing::instrument;
use super::{BaseMetadataBuilder, MetadataBuilder};
use super::{MessageMetadataBuilder, MetadataBuilder};
#[derive(Clone, Debug, new, Deref)]
pub struct RoutingIsmMetadataBuilder {
base: BaseMetadataBuilder,
base: MessageMetadataBuilder,
}
#[async_trait]

@ -14,7 +14,7 @@ use tracing::{debug, error, info, instrument, trace, warn};
use super::{
gas_payment::GasPaymentEnforcer,
metadata::{BaseMetadataBuilder, MetadataBuilder},
metadata::{BaseMetadataBuilder, MessageMetadataBuilder, MetadataBuilder},
pending_operation::*,
};
@ -35,7 +35,7 @@ pub struct MessageContext {
pub origin_db: HyperlaneRocksDB,
/// Used to construct the ISM metadata needed to verify a message from the
/// origin.
pub metadata_builder: BaseMetadataBuilder,
pub metadata_builder: Arc<BaseMetadataBuilder>,
/// Used to determine if messages from the origin have made sufficient gas
/// payments.
pub origin_gas_payment_enforcer: Arc<GasPaymentEnforcer>,
@ -153,9 +153,18 @@ impl PendingOperation for PendingMessage {
"fetching ISM address. Potentially malformed recipient ISM address."
);
let message_metadata_builder = op_try!(
MessageMetadataBuilder::new(
ism_address,
&self.message,
self.ctx.metadata_builder.clone()
)
.await,
"getting the message metadata builder"
);
let Some(metadata) = op_try!(
self.ctx
.metadata_builder
message_metadata_builder
.build(ism_address, &self.message)
.await,
"building metadata"

@ -183,7 +183,8 @@ mod test {
use crate::{
merkle_tree::builder::MerkleTreeBuilder,
msg::{
gas_payment::GasPaymentEnforcer, metadata::BaseMetadataBuilder,
gas_payment::GasPaymentEnforcer,
metadata::{AppContextClassifier, BaseMetadataBuilder},
pending_operation::PendingOperation,
},
processor::Processor,
@ -240,16 +241,23 @@ mod test {
}
fn dummy_metadata_builder(
domain: &HyperlaneDomain,
origin_domain: &HyperlaneDomain,
destination_domain: &HyperlaneDomain,
db: &HyperlaneRocksDB,
) -> BaseMetadataBuilder {
let mut settings = Settings::default();
settings
.chains
.insert(domain.name().to_owned(), dummy_chain_conf(domain));
let destination_chain_conf = settings.chain_setup(domain).unwrap();
settings.chains.insert(
origin_domain.name().to_owned(),
dummy_chain_conf(origin_domain),
);
settings.chains.insert(
destination_domain.name().to_owned(),
dummy_chain_conf(destination_domain),
);
let destination_chain_conf = settings.chain_setup(destination_domain).unwrap();
let core_metrics = CoreMetrics::new("dummy_relayer", 37582, Registry::new()).unwrap();
BaseMetadataBuilder::new(
origin_domain.clone(),
destination_chain_conf.clone(),
Arc::new(RwLock::new(MerkleTreeBuilder::new())),
Arc::new(MockValidatorAnnounceContract::default()),
@ -257,6 +265,7 @@ mod test {
Arc::new(core_metrics),
db.clone(),
5,
AppContextClassifier::new(Arc::new(MockMailboxContract::default()), vec![]),
)
}
@ -268,11 +277,11 @@ mod test {
MessageProcessor,
UnboundedReceiver<Box<DynPendingOperation>>,
) {
let base_metadata_builder = dummy_metadata_builder(origin_domain, db);
let base_metadata_builder = dummy_metadata_builder(origin_domain, destination_domain, db);
let message_context = Arc::new(MessageContext {
destination_mailbox: Arc::new(MockMailboxContract::default()),
origin_db: db.clone(),
metadata_builder: base_metadata_builder,
metadata_builder: Arc::new(base_metadata_builder),
origin_gas_payment_enforcer: Arc::new(GasPaymentEnforcer::new([], db.clone())),
transaction_gas_limit: Default::default(),
metrics: dummy_submission_metrics(),

@ -34,7 +34,7 @@ use crate::{
merkle_tree::builder::MerkleTreeBuilder,
msg::{
gas_payment::GasPaymentEnforcer,
metadata::BaseMetadataBuilder,
metadata::{AppContextClassifier, BaseMetadataBuilder},
pending_message::{MessageContext, MessageSubmissionMetrics},
pending_operation::DynPendingOperation,
processor::{MessageProcessor, MessageProcessorMetrics},
@ -212,6 +212,7 @@ impl BaseAgent for Relayer {
for origin in &settings.origin_chains {
let db = dbs.get(origin).unwrap().clone();
let metadata_builder = BaseMetadataBuilder::new(
origin.clone(),
destination_chain_setup.clone(),
prover_syncs[origin].clone(),
validator_announces[origin].clone(),
@ -219,6 +220,10 @@ impl BaseAgent for Relayer {
core.metrics.clone(),
db,
5,
AppContextClassifier::new(
mailboxes[destination].clone(),
settings.metric_app_contexts.clone(),
),
);
msg_ctxs.insert(
@ -229,7 +234,7 @@ impl BaseAgent for Relayer {
Arc::new(MessageContext {
destination_mailbox: mailboxes[destination].clone(),
origin_db: dbs.get(origin).unwrap().clone(),
metadata_builder,
metadata_builder: Arc::new(metadata_builder),
origin_gas_payment_enforcer: gas_payment_enforcers[origin].clone(),
transaction_gas_limit,
metrics: MessageSubmissionMetrics::new(&core_metrics, origin, destination),

@ -54,6 +54,8 @@ pub struct RelayerSettings {
/// If true, allows local storage based checkpoint syncers.
/// Not intended for production use.
pub allow_local_checkpoint_syncers: bool,
/// App contexts used for metrics.
pub metric_app_contexts: Vec<(MatchingList, String)>,
}
/// Config for gas payment enforcement
@ -118,26 +120,11 @@ impl FromRawConf<RawRelayerSettings> for RelayerSettings {
.parse_from_str("Expected database path")
.unwrap_or_else(|| std::env::current_dir().unwrap().join("hyperlane_db"));
let (raw_gas_payment_enforcement_path, raw_gas_payment_enforcement) = match p
let (raw_gas_payment_enforcement_path, raw_gas_payment_enforcement) = p
.get_opt_key("gasPaymentEnforcement")
.take_config_err_flat(&mut err)
{
None => None,
Some(ValueParser {
val: Value::String(policy_str),
cwp,
}) => serde_json::from_str::<Value>(policy_str)
.context("Expected JSON string")
.take_err(&mut err, || cwp.clone())
.map(|v| (cwp, recase_json_value(v, Case::Flat))),
Some(ValueParser {
val: value @ Value::Array(_),
cwp,
}) => Some((cwp, value.clone())),
Some(_) => Err(eyre!("Expected JSON array or stringified JSON"))
.take_err(&mut err, || cwp.clone()),
}
.unwrap_or_else(|| (&p.cwp + "gas_payment_enforcement", Value::Array(vec![])));
.and_then(parse_json_array)
.unwrap_or_else(|| (&p.cwp + "gas_payment_enforcement", Value::Array(vec![])));
let gas_payment_enforcement_parser = ValueParser::new(
raw_gas_payment_enforcement_path,
@ -247,6 +234,32 @@ impl FromRawConf<RawRelayerSettings> for RelayerSettings {
})
.collect();
let (raw_metric_app_contexts_path, raw_metric_app_contexts) = p
.get_opt_key("metricAppContexts")
.take_config_err_flat(&mut err)
.and_then(parse_json_array)
.unwrap_or_else(|| (&p.cwp + "metric_app_contexts", Value::Array(vec![])));
let metric_app_contexts_parser =
ValueParser::new(raw_metric_app_contexts_path, &raw_metric_app_contexts);
let metric_app_contexts = metric_app_contexts_parser
.into_array_iter()
.map(|itr| {
itr.filter_map(|policy| {
let name = policy.chain(&mut err).get_key("name").parse_string().end();
let matching_list = policy
.chain(&mut err)
.get_key("matchingList")
.and_then(parse_matching_list)
.unwrap_or_default();
name.map(|name| (matching_list, name.to_owned()))
})
.collect_vec()
})
.unwrap_or_default();
err.into_result(RelayerSettings {
base,
db,
@ -258,28 +271,35 @@ impl FromRawConf<RawRelayerSettings> for RelayerSettings {
transaction_gas_limit,
skip_transaction_gas_limit_for,
allow_local_checkpoint_syncers,
metric_app_contexts,
})
}
}
fn parse_matching_list(p: ValueParser) -> ConfigResult<MatchingList> {
fn parse_json_array(p: ValueParser) -> Option<(ConfigPath, Value)> {
let mut err = ConfigParsingError::default();
let raw_list = match &p {
match p {
ValueParser {
val: Value::String(matching_list_str),
val: Value::String(array_str),
cwp,
} => serde_json::from_str::<Value>(matching_list_str)
} => serde_json::from_str::<Value>(array_str)
.context("Expected JSON string")
.take_err(&mut err, || cwp.clone())
.map(|v| recase_json_value(v, Case::Flat)),
.map(|v| (cwp, recase_json_value(v, Case::Flat))),
ValueParser {
val: value @ Value::Array(_),
..
} => Some((*value).clone()),
cwp,
} => Some((cwp, value.clone())),
_ => Err(eyre!("Expected JSON array or stringified JSON"))
.take_err(&mut err, || p.cwp.clone()),
};
}
}
fn parse_matching_list(p: ValueParser) -> ConfigResult<MatchingList> {
let mut err = ConfigParsingError::default();
let raw_list = parse_json_array(p.clone()).map(|(_, v)| v);
let Some(raw_list) = raw_list else {
return err.into_result(MatchingList::default());
};

@ -144,9 +144,9 @@ impl Mailbox for CosmosMailbox {
.await?;
let response: mailbox::DefaultIsmResponse = serde_json::from_slice(&data)?;
// convert Hex to H256
let ism = H256::from_slice(&hex::decode(response.default_ism)?);
Ok(ism)
// convert bech32 to H256
let ism = CosmosAddress::from_str(&response.default_ism)?;
Ok(ism.digest())
}
#[instrument(err, ret, skip(self))]
@ -166,7 +166,7 @@ impl Mailbox for CosmosMailbox {
.await?;
let response: mailbox::RecipientIsmResponse = serde_json::from_slice(&data)?;
// convert Hex to H256
// convert bech32 to H256
let ism = CosmosAddress::from_str(&response.ism)?;
Ok(ism.digest())
}

@ -1,14 +1,16 @@
use std::collections::HashMap;
use std::collections::{HashMap, HashSet};
use std::fmt::{Debug, Formatter};
use std::sync::{Arc, OnceLock};
use eyre::Result;
use hyperlane_core::{HyperlaneDomain, H160};
use prometheus::{
histogram_opts, labels, opts, register_counter_vec_with_registry,
register_gauge_vec_with_registry, register_histogram_vec_with_registry,
register_int_counter_vec_with_registry, register_int_gauge_vec_with_registry, CounterVec,
Encoder, GaugeVec, HistogramVec, IntCounterVec, IntGaugeVec, Registry,
};
use tokio::sync::RwLock;
use tokio::task::JoinHandle;
use tracing::warn;
@ -37,7 +39,6 @@ pub struct CoreMetrics {
span_counts: IntCounterVec,
span_events: IntCounterVec,
last_known_message_nonce: IntGaugeVec,
validator_checkpoint_index: IntGaugeVec,
submitter_queue_length: IntGaugeVec,
operations_processed_count: IntCounterVec,
@ -51,6 +52,9 @@ pub struct CoreMetrics {
/// Set of provider-specific metrics. These only need to get created once.
provider_metrics: OnceLock<MiddlewareMetrics>,
/// Metrics that are used to observe validator sets.
pub validator_metrics: ValidatorObservabilityMetricManager,
}
impl CoreMetrics {
@ -109,13 +113,18 @@ impl CoreMetrics {
registry
)?;
let validator_checkpoint_index = register_int_gauge_vec_with_registry!(
let observed_validator_latest_index = register_int_gauge_vec_with_registry!(
opts!(
namespaced!("validator_checkpoint_index"),
"Observed signed checkpoint indices per validator",
namespaced!("observed_validator_latest_index"),
"The latest observed latest signed checkpoint indices per validator, from the perspective of the relayer",
const_labels_ref
),
&["origin", "validator"],
&[
"origin",
"destination",
"validator",
"app_context",
],
registry
)?;
@ -169,7 +178,6 @@ impl CoreMetrics {
span_counts,
span_events,
last_known_message_nonce,
validator_checkpoint_index,
submitter_queue_length,
@ -180,6 +188,10 @@ impl CoreMetrics {
json_rpc_client_metrics: OnceLock::new(),
provider_metrics: OnceLock::new(),
validator_metrics: ValidatorObservabilityMetricManager::new(
observed_validator_latest_index.clone(),
),
})
}
@ -298,14 +310,6 @@ impl CoreMetrics {
self.last_known_message_nonce.clone()
}
/// Gauge for reporting the most recent validator checkpoint index
/// Labels:
/// - `origin`: Origin chain
/// - `validator`: Address of the validator
pub fn validator_checkpoint_index(&self) -> IntGaugeVec {
self.validator_checkpoint_index.clone()
}
/// Latest message nonce in the validator.
///
/// Phase:
@ -468,3 +472,97 @@ impl Debug for CoreMetrics {
)
}
}
#[derive(Debug, Eq, PartialEq, Hash)]
struct AppContextKey {
origin: HyperlaneDomain,
destination: HyperlaneDomain,
app_context: String,
}
/// Manages metrics for observing sets of validators.
pub struct ValidatorObservabilityMetricManager {
observed_validator_latest_index: IntGaugeVec,
app_context_validators: RwLock<HashMap<AppContextKey, HashSet<H160>>>,
}
impl ValidatorObservabilityMetricManager {
fn new(observed_validator_latest_index: IntGaugeVec) -> Self {
Self {
observed_validator_latest_index,
app_context_validators: RwLock::new(HashMap::new()),
}
}
/// Updates the metrics with the latest checkpoint index for each validator
/// in a given set.
pub async fn set_validator_latest_checkpoints(
&self,
origin: &HyperlaneDomain,
destination: &HyperlaneDomain,
app_context: String,
latest_checkpoints: &HashMap<H160, Option<u32>>,
) {
let key = AppContextKey {
origin: origin.clone(),
destination: destination.clone(),
app_context: app_context.clone(),
};
let mut app_context_validators = self.app_context_validators.write().await;
// First, clear out all previous metrics for the app context.
// This is necessary because the set of validators may have changed.
if let Some(prev_validators) = app_context_validators.get(&key) {
for validator in prev_validators {
// We unwrap because an error here occurs if the # of labels
// provided is incorrect, and we'd like to loudly fail in e2e if that
// happens.
self.observed_validator_latest_index
.remove_label_values(&[
origin.as_ref(),
destination.as_ref(),
&format!("0x{:x}", validator).to_lowercase(),
&app_context,
])
.unwrap();
}
}
let mut set = HashSet::new();
// Then set the new metrics and update the cached set of validators.
for (validator, latest_checkpoint) in latest_checkpoints {
self.observed_validator_latest_index
.with_label_values(&[
origin.as_ref(),
destination.as_ref(),
&format!("0x{:x}", validator).to_lowercase(),
&app_context,
])
// If the latest checkpoint is None, set to -1 to indicate that
// the validator did not provide a valid latest checkpoint index.
.set(latest_checkpoint.map(|i| i as i64).unwrap_or(-1));
set.insert(*validator);
}
app_context_validators.insert(key, set);
}
/// Gauge for reporting recently observed latest checkpoint indices for validator sets.
/// The entire set for an app context should be updated at once, and it should be updated
/// in a way that is robust to validator set changes.
/// Set to -1 to indicate a validator did not provide a valid latest checkpoint index.
/// Note that it's possible for an app to be using an aggregation ISM of more than one
/// validator set. If these sets are different, there is no label built into the metric
/// to distinguish them.
///
/// Labels:
/// - `origin`: Origin chain
/// - `destination`: Destination chain
/// - `validator`: Address of the validator
/// - `app_context`: App context for the validator set
pub fn observed_validator_latest_index(&self) -> IntGaugeVec {
self.observed_validator_latest_index.clone()
}
}

@ -1,12 +1,11 @@
use core::str::FromStr;
use std::{collections::HashMap, path::PathBuf};
use std::path::PathBuf;
use eyre::{eyre, Context, Report, Result};
use hyperlane_core::H160;
use prometheus::{IntGauge, IntGaugeVec};
use prometheus::IntGauge;
use rusoto_core::Region;
use crate::{CheckpointSyncer, LocalStorage, MultisigCheckpointSyncer, S3Storage};
use crate::{CheckpointSyncer, LocalStorage, S3Storage};
/// Checkpoint Syncer types
#[derive(Debug, Clone)]
@ -83,31 +82,3 @@ impl CheckpointSyncerConf {
})
}
}
/// Config for a MultisigCheckpointSyncer
#[derive(Debug, Clone)]
pub struct MultisigCheckpointSyncerConf {
/// The checkpoint syncer for each valid validator signer address
checkpointsyncers: HashMap<String, CheckpointSyncerConf>,
}
impl MultisigCheckpointSyncerConf {
/// Get a MultisigCheckpointSyncer from the config
pub fn build(
&self,
origin: &str,
validator_checkpoint_index: IntGaugeVec,
) -> Result<MultisigCheckpointSyncer, Report> {
let mut checkpoint_syncers = HashMap::new();
for (key, value) in self.checkpointsyncers.iter() {
let gauge =
validator_checkpoint_index.with_label_values(&[origin, &key.to_lowercase()]);
if let Ok(conf) = value.build(Some(gauge)) {
checkpoint_syncers.insert(H160::from_str(key)?, conf.into());
} else {
continue;
}
}
Ok(MultisigCheckpointSyncer::new(checkpoint_syncers))
}
}

@ -5,19 +5,76 @@ use derive_new::new;
use eyre::Result;
use tracing::{debug, instrument};
use hyperlane_core::{MultisigSignedCheckpoint, SignedCheckpointWithMessageId, H160, H256};
use hyperlane_core::{
HyperlaneDomain, MultisigSignedCheckpoint, SignedCheckpointWithMessageId, H160, H256,
};
use crate::CheckpointSyncer;
use crate::{CheckpointSyncer, CoreMetrics};
/// Fetches signed checkpoints from multiple validators to create
/// MultisigSignedCheckpoints
/// For a particular validator set, fetches signed checkpoints from multiple
/// validators to create MultisigSignedCheckpoints.
#[derive(Clone, Debug, new)]
pub struct MultisigCheckpointSyncer {
/// The checkpoint syncer for each valid validator signer address
checkpoint_syncers: HashMap<H160, Arc<dyn CheckpointSyncer>>,
metrics: Arc<CoreMetrics>,
app_context: Option<String>,
}
impl MultisigCheckpointSyncer {
/// Gets the latest checkpoint index from each validator's checkpoint syncer.
/// Returns a vector of the latest indices, in an unspecified order, and does
/// not contain indices for validators that did not provide a latest index.
/// Also updates the validator latest checkpoint metrics.
pub async fn get_validator_latest_checkpoints_and_update_metrics(
&self,
validators: &[H256],
origin: &HyperlaneDomain,
destination: &HyperlaneDomain,
) -> Vec<u32> {
// Get the latest_index from each validator's checkpoint syncer.
// If a validator does not return a latest index, None is recorded so
// this can be surfaced in the metrics.
let mut latest_indices: HashMap<H160, Option<u32>> =
HashMap::with_capacity(validators.len());
for validator in validators {
let address = H160::from(*validator);
if let Some(checkpoint_syncer) = self.checkpoint_syncers.get(&address) {
// Gracefully handle errors getting the latest_index
match checkpoint_syncer.latest_index().await {
Ok(Some(index)) => {
debug!(?address, ?index, "Validator returned latest index");
latest_indices.insert(H160::from(*validator), Some(index));
}
result => {
debug!(
?address,
?result,
"Failed to get latest index from validator"
);
latest_indices.insert(H160::from(*validator), None);
}
}
}
}
if let Some(app_context) = &self.app_context {
self.metrics
.validator_metrics
.set_validator_latest_checkpoints(
origin,
destination,
app_context.clone(),
&latest_indices,
)
.await;
}
// Filter out any validators that did not return a latest index
latest_indices.values().copied().flatten().collect()
}
/// Attempts to get the latest checkpoint with a quorum of signatures among
/// validators.
///
@ -37,24 +94,13 @@ impl MultisigCheckpointSyncer {
threshold: usize,
minimum_index: u32,
maximum_index: u32,
origin: &HyperlaneDomain,
destination: &HyperlaneDomain,
) -> Result<Option<MultisigSignedCheckpoint>> {
// Get the latest_index from each validator's checkpoint syncer.
let mut latest_indices = Vec::with_capacity(validators.len());
for validator in validators {
let address = H160::from(*validator);
if let Some(checkpoint_syncer) = self.checkpoint_syncers.get(&address) {
// Gracefully handle errors getting the latest_index
match checkpoint_syncer.latest_index().await {
Ok(Some(index)) => {
debug!(?address, ?index, "Validator returned latest index");
latest_indices.push(index);
}
err => {
debug!(?address, ?err, "Failed to get latest index from validator");
}
}
}
}
let mut latest_indices = self
.get_validator_latest_checkpoints_and_update_metrics(validators, origin, destination)
.await;
debug!(
?latest_indices,
"Fetched latest indices from checkpoint syncers"

@ -235,29 +235,36 @@ const GasPaymentEnforcementSchema = z.union([
]);
export type GasPaymentEnforcement = z.infer<typeof GasPaymentEnforcementSchema>;
const MetricAppContextSchema = z.object({
name: z.string().min(1),
matchingList: MatchingListSchema.describe(
'A matching list, any message that matches will be classified as this app context.',
),
});
export const RelayerAgentConfigSchema = AgentConfigSchema.extend({
db: z
.string()
.nonempty()
.min(1)
.optional()
.describe('The path to the relayer database.'),
relayChains: CommaSeperatedChainList.describe(
'Comma seperated list of chains to relay messages between.',
),
gasPaymentEnforcement: z
.union([z.array(GasPaymentEnforcementSchema), z.string().nonempty()])
.union([z.array(GasPaymentEnforcementSchema), z.string().min(1)])
.optional()
.describe(
'The gas payment enforcement configuration as JSON. Expects an ordered array of `GasPaymentEnforcementConfig`.',
),
whitelist: z
.union([MatchingListSchema, z.string().nonempty()])
.union([MatchingListSchema, z.string().min(1)])
.optional()
.describe(
'If no whitelist is provided ALL messages will be considered on the whitelist.',
),
blacklist: z
.union([MatchingListSchema, z.string().nonempty()])
.union([MatchingListSchema, z.string().min(1)])
.optional()
.describe(
'If no blacklist is provided ALL will be considered to not be on the blacklist.',
@ -274,12 +281,18 @@ export const RelayerAgentConfigSchema = AgentConfigSchema.extend({
.describe(
'If true, allows local storage based checkpoint syncers. Not intended for production use.',
),
metricAppContexts: z
.union([z.array(MetricAppContextSchema), z.string().min(1)])
.optional()
.describe(
'A list of app contexts and their matching lists to use for metrics. A message will be classified as the first matching app context.',
),
});
export type RelayerConfig = z.infer<typeof RelayerAgentConfigSchema>;
export const ScraperAgentConfigSchema = AgentConfigSchema.extend({
db: z.string().nonempty().describe('Database connection string'),
db: z.string().min(1).describe('Database connection string'),
chainsToScrape: CommaSeperatedChainList.describe(
'Comma separated list of chain names to scrape',
),
@ -290,32 +303,29 @@ export type ScraperConfig = z.infer<typeof ScraperAgentConfigSchema>;
export const ValidatorAgentConfigSchema = AgentConfigSchema.extend({
db: z
.string()
.nonempty()
.min(1)
.optional()
.describe('The path to the validator database.'),
originChainName: z
.string()
.nonempty()
.min(1)
.describe('Name of the chain to validate messages on'),
validator: AgentSignerSchema.describe('The validator attestation signer'),
checkpointSyncer: z.discriminatedUnion('type', [
z
.object({
type: z.literal('localStorage'),
path: z
.string()
.nonempty()
.describe('Path to the local storage location'),
path: z.string().min(1).describe('Path to the local storage location'),
})
.describe('A local checkpoint syncer'),
z
.object({
type: z.literal('s3'),
bucket: z.string().nonempty(),
region: z.string().nonempty(),
bucket: z.string().min(1),
region: z.string().min(1),
folder: z
.string()
.nonempty()
.min(1)
.optional()
.describe(
'The folder/key-prefix to use, defaults to the root of the bucket',

Loading…
Cancel
Save