feat: fetch chain-specific metrics in standalone task (#3214)

### Description

Spawning a tokio task to fetch chain-specific metrics in provider
middleware was wasteful because we ended up with 15 tasks doing the same
job. Now there is a single task that is spawned inside the relayer,
which I added in the struct that used to only fetch relayer balance.

Some questions:
- I'm realising in the current state I'm probably removing these metrics
from validators. Lmk if yes and I'll add back
- the chain-specific metrics were moved out of `MiddlewareMetrics`
because they're no longer part of middleware, but I wasn't sure where to
place them. Maybe `CoreMetrics` is a better place than the current
`custom_metrics.rs` file?

`tokio-metrics` turned out not to be useful because we'd need to
instrument every call site of `tokio::spawn` with it. We should still do
it but as a separate task imo.

### Drive-by changes

- This PR also makes it very easy to add the metrics tasks for new
chains, by extending the `HyperlaneProvider` trait with a
`get_chain_metrics` call which seems reasonably general to me to have
implemented by all providers. (Bc we currently only track these for evm
chains)
- the description, naming and logic of the `gas_price` metric was also
changed to support new chains

### Related issues

- Fixes https://github.com/hyperlane-xyz/hyperlane-monorepo/issues/3047

### Backward compatibility

Yes

### Testing

Still need to manually test
pull/3241/head
Daniel Savu 10 months ago committed by GitHub
parent ab17af5f7e
commit 3cbb06a07a
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
  1. 1
      rust/Cargo.lock
  2. 41
      rust/agents/relayer/src/relayer.rs
  3. 1
      rust/agents/scraper/Cargo.toml
  4. 39
      rust/agents/scraper/src/agent.rs
  5. 34
      rust/agents/validator/src/validator.rs
  6. 8
      rust/chains/hyperlane-cosmos/src/providers/mod.rs
  7. 4
      rust/chains/hyperlane-ethereum/src/error.rs
  8. 36
      rust/chains/hyperlane-ethereum/src/provider.rs
  9. 37
      rust/chains/hyperlane-ethereum/src/trait_builder.rs
  10. 7
      rust/chains/hyperlane-fuel/src/provider.rs
  11. 7
      rust/chains/hyperlane-sealevel/src/provider.rs
  12. 115
      rust/ethers-prometheus/src/middleware/mod.rs
  13. 6
      rust/hyperlane-base/src/agent.rs
  14. 146
      rust/hyperlane-base/src/metrics/agent_metrics.rs
  15. 10
      rust/hyperlane-base/src/metrics/provider.rs
  16. 6
      rust/hyperlane-core/src/chain.rs
  17. 2
      rust/hyperlane-core/src/error.rs
  18. 5
      rust/hyperlane-core/src/traits/provider.rs
  19. 14
      rust/hyperlane-core/src/types/chain_data.rs

1
rust/Cargo.lock generated

@ -7623,6 +7623,7 @@ dependencies = [
"itertools 0.12.0",
"migration",
"num-bigint 0.4.4",
"num-traits",
"prometheus",
"sea-orm",
"serde",

@ -9,15 +9,14 @@ use derive_more::AsRef;
use eyre::Result;
use hyperlane_base::{
db::{HyperlaneRocksDB, DB},
metrics::{AgentMetrics, AgentMetricsUpdater},
metrics::{AgentMetrics, MetricsUpdater},
run_all,
settings::ChainConf,
BaseAgent, ContractSyncMetrics, CoreMetrics, HyperlaneAgentCore, SequencedDataContractSync,
WatermarkContractSync,
BaseAgent, ChainMetrics, ContractSyncMetrics, CoreMetrics, HyperlaneAgentCore,
SequencedDataContractSync, WatermarkContractSync,
};
use hyperlane_core::{
metrics::agent::METRICS_SCRAPE_INTERVAL, HyperlaneDomain, HyperlaneMessage,
InterchainGasPayment, MerkleTreeInsertion, U256,
HyperlaneDomain, HyperlaneMessage, InterchainGasPayment, MerkleTreeInsertion, U256,
};
use tokio::{
sync::{
@ -72,7 +71,10 @@ pub struct Relayer {
skip_transaction_gas_limit_for: HashSet<u32>,
allow_local_checkpoint_syncers: bool,
core_metrics: Arc<CoreMetrics>,
// TODO: decide whether to consolidate `agent_metrics` and `chain_metrics` into a single struct
// or move them in `core_metrics`, like the validator metrics
agent_metrics: AgentMetrics,
chain_metrics: ChainMetrics,
}
impl Debug for Relayer {
@ -102,6 +104,7 @@ impl BaseAgent for Relayer {
settings: Self::Settings,
core_metrics: Arc<CoreMetrics>,
agent_metrics: AgentMetrics,
chain_metrics: ChainMetrics,
) -> Result<Self>
where
Self: Sized,
@ -260,6 +263,7 @@ impl BaseAgent for Relayer {
allow_local_checkpoint_syncers: settings.allow_local_checkpoint_syncers,
core_metrics,
agent_metrics,
chain_metrics,
})
}
@ -276,25 +280,16 @@ impl BaseAgent for Relayer {
tasks.push(self.run_destination_submitter(dest_domain, receive_channel));
let agent_metrics_conf = dest_conf
.agent_metrics_conf(Self::AGENT_NAME.to_string())
.await
.unwrap();
let agent_metrics_fetcher = dest_conf.build_provider(&self.core_metrics).await.unwrap();
let agent_metrics = AgentMetricsUpdater::new(
let metrics_updater = MetricsUpdater::new(
dest_conf,
self.core_metrics.clone(),
self.agent_metrics.clone(),
agent_metrics_conf,
agent_metrics_fetcher,
);
let fetcher_task = tokio::spawn(async move {
agent_metrics
.start_updating_on_interval(METRICS_SCRAPE_INTERVAL)
.await;
Ok(())
})
.instrument(info_span!("AgentMetrics"));
tasks.push(fetcher_task);
self.chain_metrics.clone(),
Self::AGENT_NAME.to_string(),
)
.await
.unwrap();
tasks.push(metrics_updater.spawn());
}
for origin in &self.origin_chains {

@ -18,6 +18,7 @@ eyre.workspace = true
futures.workspace = true
itertools.workspace = true
num-bigint.workspace = true
num-traits.workspace = true
prometheus.workspace = true
sea-orm = { workspace = true }
serde.workspace = true

@ -3,10 +3,11 @@ use std::{collections::HashMap, sync::Arc};
use async_trait::async_trait;
use derive_more::AsRef;
use hyperlane_base::{
metrics::AgentMetrics, run_all, settings::IndexSettings, BaseAgent, ContractSyncMetrics,
CoreMetrics, HyperlaneAgentCore,
metrics::AgentMetrics, run_all, settings::IndexSettings, BaseAgent, ChainMetrics,
ContractSyncMetrics, CoreMetrics, HyperlaneAgentCore, MetricsUpdater,
};
use hyperlane_core::HyperlaneDomain;
use hyperlane_core::{HyperlaneDomain, KnownHyperlaneDomain};
use num_traits::cast::FromPrimitive;
use tokio::task::JoinHandle;
use tracing::{info_span, instrument::Instrumented, trace, Instrument};
@ -19,8 +20,11 @@ pub struct Scraper {
#[as_ref]
core: HyperlaneAgentCore,
contract_sync_metrics: Arc<ContractSyncMetrics>,
metrics: Arc<CoreMetrics>,
scrapers: HashMap<u32, ChainScraper>,
settings: ScraperSettings,
core_metrics: Arc<CoreMetrics>,
agent_metrics: AgentMetrics,
chain_metrics: ChainMetrics,
}
#[derive(Debug)]
@ -38,7 +42,8 @@ impl BaseAgent for Scraper {
async fn from_settings(
settings: Self::Settings,
metrics: Arc<CoreMetrics>,
_agent_metrics: AgentMetrics,
agent_metrics: AgentMetrics,
chain_metrics: ChainMetrics,
) -> eyre::Result<Self>
where
Self: Sized,
@ -76,9 +81,12 @@ impl BaseAgent for Scraper {
Ok(Self {
core,
metrics,
contract_sync_metrics,
scrapers,
settings,
core_metrics: metrics,
agent_metrics,
chain_metrics,
})
}
@ -87,6 +95,19 @@ impl BaseAgent for Scraper {
let mut tasks = Vec::with_capacity(self.scrapers.len());
for domain in self.scrapers.keys() {
tasks.push(self.scrape(*domain).await);
let domain = KnownHyperlaneDomain::from_u32(*domain).unwrap();
let chain_conf = self.settings.chain_setup(&domain.into()).unwrap();
let metrics_updater = MetricsUpdater::new(
chain_conf,
self.core_metrics.clone(),
self.agent_metrics.clone(),
self.chain_metrics.clone(),
Self::AGENT_NAME.to_string(),
)
.await
.unwrap();
tasks.push(metrics_updater.spawn());
}
run_all(tasks)
}
@ -105,7 +126,7 @@ impl Scraper {
tasks.push(
self.build_message_indexer(
domain.clone(),
self.metrics.clone(),
self.core_metrics.clone(),
self.contract_sync_metrics.clone(),
db.clone(),
index_settings.clone(),
@ -115,7 +136,7 @@ impl Scraper {
tasks.push(
self.build_delivery_indexer(
domain.clone(),
self.metrics.clone(),
self.core_metrics.clone(),
self.contract_sync_metrics.clone(),
db.clone(),
index_settings.clone(),
@ -125,7 +146,7 @@ impl Scraper {
tasks.push(
self.build_interchain_gas_payment_indexer(
domain,
self.metrics.clone(),
self.core_metrics.clone(),
self.contract_sync_metrics.clone(),
db,
index_settings.clone(),

@ -11,8 +11,10 @@ use tracing::{error, info, info_span, instrument::Instrumented, warn, Instrument
use hyperlane_base::{
db::{HyperlaneRocksDB, DB},
metrics::AgentMetrics,
run_all, BaseAgent, CheckpointSyncer, ContractSyncMetrics, CoreMetrics, HyperlaneAgentCore,
SequencedDataContractSync,
run_all,
settings::ChainConf,
BaseAgent, ChainMetrics, CheckpointSyncer, ContractSyncMetrics, CoreMetrics,
HyperlaneAgentCore, MetricsUpdater, SequencedDataContractSync,
};
use hyperlane_core::{
@ -31,6 +33,7 @@ use crate::{
#[derive(Debug, AsRef)]
pub struct Validator {
origin_chain: HyperlaneDomain,
origin_chain_conf: ChainConf,
#[as_ref]
core: HyperlaneAgentCore,
db: HyperlaneRocksDB,
@ -44,6 +47,9 @@ pub struct Validator {
reorg_period: u64,
interval: Duration,
checkpoint_syncer: Arc<dyn CheckpointSyncer>,
core_metrics: Arc<CoreMetrics>,
agent_metrics: AgentMetrics,
chain_metrics: ChainMetrics,
}
#[async_trait]
@ -55,7 +61,8 @@ impl BaseAgent for Validator {
async fn from_settings(
settings: Self::Settings,
metrics: Arc<CoreMetrics>,
_agent_metrics: AgentMetrics,
agent_metrics: AgentMetrics,
chain_metrics: ChainMetrics,
) -> Result<Self>
where
Self: Sized,
@ -81,6 +88,12 @@ impl BaseAgent for Validator {
.build_validator_announce(&settings.origin_chain, &metrics)
.await?;
let origin_chain_conf = core
.settings
.chain_setup(&settings.origin_chain)
.unwrap()
.clone();
let contract_sync_metrics = Arc::new(ContractSyncMetrics::new(&metrics));
let merkle_tree_hook_sync = settings
@ -95,6 +108,7 @@ impl BaseAgent for Validator {
Ok(Self {
origin_chain: settings.origin_chain,
origin_chain_conf,
core,
db: msg_db,
mailbox: mailbox.into(),
@ -106,6 +120,9 @@ impl BaseAgent for Validator {
reorg_period: settings.reorg_period,
interval: settings.interval,
checkpoint_syncer,
agent_metrics,
chain_metrics,
core_metrics: metrics,
})
}
@ -123,6 +140,17 @@ impl BaseAgent for Validator {
);
}
let metrics_updater = MetricsUpdater::new(
&self.origin_chain_conf,
self.core_metrics.clone(),
self.agent_metrics.clone(),
self.chain_metrics.clone(),
Self::AGENT_NAME.to_string(),
)
.await
.unwrap();
tasks.push(metrics_updater.spawn());
// announce the validator after spawning the signer task
self.announce().await.expect("Failed to announce validator");

@ -1,7 +1,7 @@
use async_trait::async_trait;
use hyperlane_core::{
BlockInfo, ChainResult, ContractLocator, HyperlaneChain, HyperlaneDomain, HyperlaneProvider,
TxnInfo, H256, U256,
BlockInfo, ChainInfo, ChainResult, ContractLocator, HyperlaneChain, HyperlaneDomain,
HyperlaneProvider, TxnInfo, H256, U256,
};
use tendermint_rpc::{client::CompatMode, HttpClient};
@ -99,4 +99,8 @@ impl HyperlaneProvider for CosmosProvider {
.get_balance(address, self.canonical_asset.clone())
.await?)
}
async fn get_chain_metrics(&self) -> ChainResult<Option<ChainInfo>> {
Ok(None)
}
}

@ -10,6 +10,10 @@ pub enum HyperlaneEthereumError {
/// provider Error
#[error("{0}")]
ProviderError(#[from] ProviderError),
/// Some details from a queried block are missing
#[error("Some details from a queried block are missing")]
MissingBlockDetails,
}
impl From<HyperlaneEthereumError> for ChainCommunicationError {

@ -6,8 +6,8 @@ use std::time::Duration;
use async_trait::async_trait;
use derive_new::new;
use ethers::prelude::Middleware;
use ethers_core::abi::Address;
use hyperlane_core::{ethers_core_types, U256};
use ethers_core::{abi::Address, types::BlockNumber};
use hyperlane_core::{ethers_core_types, ChainInfo, HyperlaneCustomErrorWrapper, U256};
use tokio::time::sleep;
use tracing::instrument;
@ -21,10 +21,7 @@ use crate::BuildableWithProvider;
/// Connection to an ethereum provider. Useful for querying information about
/// the blockchain.
#[derive(Debug, Clone, new)]
pub struct EthereumProvider<M>
where
M: Middleware,
{
pub struct EthereumProvider<M> {
provider: Arc<M>,
domain: HyperlaneDomain,
}
@ -119,6 +116,33 @@ where
.map_err(ChainCommunicationError::from_other)?;
Ok(balance.into())
}
async fn get_chain_metrics(&self) -> ChainResult<Option<ChainInfo>> {
let Some(block) = self
.provider
.get_block(BlockNumber::Latest)
.await
.map_err(|e| {
ChainCommunicationError::Other(HyperlaneCustomErrorWrapper::new(Box::new(e)))
})?
else {
return Ok(None);
};
// Given the block is queried with `BlockNumber::Latest` rather than `BlockNumber::Pending`,
// if `block` is Some at this point, we're guaranteed to have its `hash` and `number` defined,
// so it's safe to unwrap below
// more info at <https://docs.rs/ethers/latest/ethers/core/types/struct.Block.html#structfield.number>
let chain_metrics = ChainInfo::new(
BlockInfo {
hash: block.hash.unwrap().into(),
timestamp: block.timestamp.as_u64(),
number: block.number.unwrap().as_u64(),
},
block.base_fee_per_gas.map(Into::into),
);
Ok(Some(chain_metrics))
}
}
impl<M> EthereumProvider<M>

@ -10,7 +10,6 @@ use ethers::prelude::{
Http, JsonRpcClient, Middleware, NonceManagerMiddleware, Provider, Quorum, QuorumProvider,
SignerMiddleware, WeightedProvider, Ws, WsClientError,
};
use hyperlane_core::metrics::agent::METRICS_SCRAPE_INTERVAL;
use hyperlane_core::rpc_clients::FallbackProvider;
use reqwest::{Client, Url};
use thiserror::Error;
@ -19,9 +18,7 @@ use ethers_prometheus::json_rpc_client::{
JsonRpcBlockGetter, JsonRpcClientMetrics, JsonRpcClientMetricsBuilder, NodeInfo,
PrometheusJsonRpcClient, PrometheusJsonRpcClientConfig,
};
use ethers_prometheus::middleware::{
MiddlewareMetrics, PrometheusMiddleware, PrometheusMiddlewareConf,
};
use ethers_prometheus::middleware::{MiddlewareMetrics, PrometheusMiddlewareConf};
use hyperlane_core::{
ChainCommunicationError, ChainResult, ContractLocator, HyperlaneDomain, KnownHyperlaneDomain,
};
@ -96,8 +93,7 @@ pub trait BuildableWithProvider {
builder = builder.add_provider(weighted_provider);
}
let quorum_provider = builder.build();
self.build(quorum_provider, locator, signer, middleware_metrics)
.await?
self.build(quorum_provider, locator, signer).await?
}
ConnectionConf::HttpFallback { urls } => {
let mut builder = FallbackProvider::builder();
@ -120,13 +116,8 @@ pub trait BuildableWithProvider {
_,
JsonRpcBlockGetter<PrometheusJsonRpcClient<Http>>,
>::new(fallback_provider);
self.build(
ethereum_fallback_provider,
locator,
signer,
middleware_metrics,
)
.await?
self.build(ethereum_fallback_provider, locator, signer)
.await?
}
ConnectionConf::Http { url } => {
let http_client = Client::builder()
@ -141,14 +132,13 @@ pub trait BuildableWithProvider {
&middleware_metrics,
);
let retrying_http_provider = RetryingProvider::new(metrics_provider, None, None);
self.build(retrying_http_provider, locator, signer, middleware_metrics)
.await?
self.build(retrying_http_provider, locator, signer).await?
}
ConnectionConf::Ws { url } => {
let ws = Ws::connect(url)
.await
.map_err(EthereumProviderConnectionError::from)?;
self.build(ws, locator, signer, middleware_metrics).await?
self.build(ws, locator, signer).await?
}
})
}
@ -189,30 +179,19 @@ pub trait BuildableWithProvider {
)
}
/// Create the provider, applying any middlewares (e.g. gas oracle, signer, metrics) as needed,
/// Create the provider, applying any middlewares (e.g. gas oracle, signer) as needed,
/// and then create the associated trait.
async fn build<P>(
&self,
client: P,
locator: &ContractLocator,
signer: Option<Signers>,
metrics: Option<(MiddlewareMetrics, PrometheusMiddlewareConf)>,
) -> ChainResult<Self::Output>
where
P: JsonRpcClient + 'static,
{
let provider = wrap_with_gas_oracle(Provider::new(client), locator.domain)?;
Ok(if let Some(metrics) = metrics {
let provider = Arc::new(PrometheusMiddleware::new(provider, metrics.0, metrics.1));
// TODO: This task is spawned each time `.build_ethereum(...)` is called, which is about 15 times,
// in spite of it doing the same thing, wasting resources.
// Only spawn this once along with the other agent tasks.
tokio::spawn(provider.start_updating_on_interval(METRICS_SCRAPE_INTERVAL));
self.build_with_signer(provider, locator, signer).await?
} else {
self.build_with_signer(provider, locator, signer).await?
})
self.build_with_signer(provider, locator, signer).await
}
/// Wrap the provider creation with a signing provider if signers were

@ -1,7 +1,8 @@
use async_trait::async_trait;
use hyperlane_core::{
BlockInfo, ChainResult, HyperlaneChain, HyperlaneDomain, HyperlaneProvider, TxnInfo, H256, U256,
BlockInfo, ChainInfo, ChainResult, HyperlaneChain, HyperlaneDomain, HyperlaneProvider, TxnInfo,
H256, U256,
};
/// A wrapper around a fuel provider to get generic blockchain information.
@ -35,4 +36,8 @@ impl HyperlaneProvider for FuelProvider {
async fn get_balance(&self, address: String) -> ChainResult<U256> {
todo!()
}
async fn get_chain_metrics(&self) -> ChainResult<Option<ChainInfo>> {
Ok(None)
}
}

@ -3,7 +3,8 @@ use std::{str::FromStr, sync::Arc};
use async_trait::async_trait;
use hyperlane_core::{
BlockInfo, ChainResult, HyperlaneChain, HyperlaneDomain, HyperlaneProvider, TxnInfo, H256, U256,
BlockInfo, ChainInfo, ChainResult, HyperlaneChain, HyperlaneDomain, HyperlaneProvider, TxnInfo,
H256, U256,
};
use solana_sdk::{commitment_config::CommitmentConfig, pubkey::Pubkey};
@ -76,4 +77,8 @@ impl HyperlaneProvider for SealevelProvider {
async fn get_balance(&self, address: String) -> ChainResult<U256> {
self.get_balance(address).await
}
async fn get_chain_metrics(&self) -> ChainResult<Option<ChainInfo>> {
Ok(None)
}
}

@ -4,9 +4,8 @@
use std::clone::Clone;
use std::collections::HashMap;
use std::fmt::{Debug, Formatter};
use std::future::Future;
use std::sync::Arc;
use std::time::{Duration, Instant};
use std::time::Instant;
use async_trait::async_trait;
use derive_builder::Builder;
@ -14,16 +13,12 @@ use ethers::abi::AbiEncode;
use ethers::prelude::*;
use ethers::types::transaction::eip2718::TypedTransaction;
use ethers::utils::hex::ToHex;
use hyperlane_core::metrics::agent::u256_as_scaled_f64;
use hyperlane_core::HyperlaneDomainProtocol;
use log::{debug, trace};
use maplit::hashmap;
use prometheus::{CounterVec, GaugeVec, IntCounterVec, IntGaugeVec};
use prometheus::{CounterVec, IntCounterVec};
use static_assertions::assert_impl_all;
use tokio::sync::RwLock;
pub use error::PrometheusMiddlewareError;
use tokio::time::MissedTickBehavior;
pub use crate::ChainInfo;
@ -51,16 +46,6 @@ pub struct ContractInfo {
pub functions: HashMap<Selector, String>,
}
/// Expected label names for the `block_height` metric.
pub const BLOCK_HEIGHT_LABELS: &[&str] = &["chain"];
/// Help string for the metric.
pub const BLOCK_HEIGHT_HELP: &str = "Tracks the current block height of the chain";
/// Expected label names for the `gas_price_gwei` metric.
pub const GAS_PRICE_GWEI_LABELS: &[&str] = &["chain"];
/// Help string for the metric.
pub const GAS_PRICE_GWEI_HELP: &str = "Tracks the current gas price of the chain";
/// Expected label names for the `contract_call_duration_seconds` metric.
pub const CONTRACT_CALL_DURATION_SECONDS_LABELS: &[&str] = &[
"chain",
@ -128,19 +113,6 @@ pub const TRANSACTION_SEND_TOTAL_HELP: &str = "Number of transactions sent";
/// Container for all the relevant middleware metrics.
#[derive(Clone, Builder)]
pub struct MiddlewareMetrics {
/// Tracks the current block height of the chain.
/// - `chain`: the chain name (or ID if the name is unknown) of the chain
/// the block number refers to.
#[builder(setter(into, strip_option), default)]
block_height: Option<IntGaugeVec>,
/// Tracks the current gas price of the chain. Uses the base_fee_per_gas if
/// available or else the median of the transactions.
/// - `chain`: the chain name (or chain ID if the name is unknown) of the
/// chain the gas price refers to.
#[builder(setter(into, strip_option), default)]
gas_price_gwei: Option<GaugeVec>,
/// Contract call durations by contract and function
/// - `chain`: the chain name (or chain ID if the name is unknown) of the
/// chain the tx occurred on.
@ -468,89 +440,6 @@ impl<M> PrometheusMiddleware<M> {
}
}
impl<M: Middleware> PrometheusMiddleware<M> {
/// Start the update cycle using tokio. This must be called if you want
/// some metrics to be updated automatically. Alternatively you could call
/// update yourself.
pub fn start_updating_on_interval(
self: &Arc<Self>,
period: Duration,
) -> impl Future<Output = ()> + Send {
let zelf = Arc::downgrade(self);
async move {
let mut interval = tokio::time::interval(period);
interval.set_missed_tick_behavior(MissedTickBehavior::Skip);
loop {
if let Some(zelf) = zelf.upgrade() {
zelf.update().await;
} else {
return;
}
interval.tick().await;
}
}
}
}
impl<M: Middleware + Send + Sync> PrometheusMiddleware<M> {
/// Update gauges. You should submit this on a schedule to your runtime to
/// be collected once on a regular interval that ideally aligns with the
/// prometheus scrape interval.
pub fn update(&self) -> impl Future<Output = ()> {
// all metrics are Arcs internally so just clone the ones we want to report for.
let block_height = self.metrics.block_height.clone();
let gas_price_gwei = self.metrics.gas_price_gwei.clone();
let data_ref = self.conf.clone();
let client = self.inner.clone();
async move {
let data = data_ref.read().await;
let chain = chain_name(&data.chain);
debug!("Updating metrics for chain ({chain})");
if block_height.is_some() || gas_price_gwei.is_some() {
Self::update_block_details(&*client, chain, block_height, gas_price_gwei).await;
}
// more metrics to come...
}
}
async fn update_block_details(
client: &M,
chain: &str,
block_height: Option<IntGaugeVec>,
gas_price_gwei: Option<GaugeVec>,
) {
let Ok(Some(current_block)) = client.get_block(BlockNumber::Latest).await else {
return;
};
if let Some(block_height) = block_height {
let height = current_block
.number
.expect("Block number should always be Some for included blocks.")
.as_u64() as i64;
trace!("Block height for chain {chain} is {height}");
block_height
.with(&hashmap! { "chain" => chain })
.set(height);
}
if let Some(gas_price_gwei) = gas_price_gwei {
if let Some(london_fee) = current_block.base_fee_per_gas {
let gas =
u256_as_scaled_f64(london_fee.into(), HyperlaneDomainProtocol::Ethereum) * 1e9;
trace!("Gas price for chain {chain} is {gas:.1}gwei");
gas_price_gwei.with(&hashmap! { "chain" => chain }).set(gas);
} else {
trace!("Gas price for chain {chain} unknown, chain is pre-london");
}
}
}
}
impl<M: Middleware> Debug for PrometheusMiddleware<M> {
fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
write!(f, "PrometheusMiddleware({:?})", self.inner)

@ -8,8 +8,10 @@ use tokio::task::JoinHandle;
use tracing::{debug_span, instrument::Instrumented, Instrument};
use crate::{
create_chain_metrics,
metrics::{create_agent_metrics, AgentMetrics, CoreMetrics},
settings::Settings,
ChainMetrics,
};
/// Properties shared across all hyperlane agents
@ -43,6 +45,7 @@ pub trait BaseAgent: Send + Sync + Debug {
settings: Self::Settings,
metrics: Arc<CoreMetrics>,
agent_metrics: AgentMetrics,
chain_metrics: ChainMetrics,
) -> Result<Self>
where
Self: Sized;
@ -76,7 +79,8 @@ pub async fn agent_main<A: BaseAgent>() -> Result<()> {
let metrics = settings.as_ref().metrics(A::AGENT_NAME)?;
core_settings.tracing.start_tracing(&metrics)?;
let agent_metrics = create_agent_metrics(&metrics)?;
let agent = A::from_settings(settings, metrics.clone(), agent_metrics).await?;
let chain_metrics = create_chain_metrics(&metrics)?;
let agent = A::from_settings(settings, metrics.clone(), agent_metrics, chain_metrics).await?;
metrics.run_http_server();
agent.run().await.await?

@ -1,16 +1,23 @@
//! Metrics either related to the agents, or observed by them
use std::sync::Arc;
use std::time::Duration;
use derive_builder::Builder;
use derive_new::new;
use eyre::Result;
use eyre::{Report, Result};
use hyperlane_core::metrics::agent::decimals_by_protocol;
use hyperlane_core::metrics::agent::u256_as_scaled_f64;
use hyperlane_core::metrics::agent::METRICS_SCRAPE_INTERVAL;
use hyperlane_core::HyperlaneDomain;
use hyperlane_core::HyperlaneProvider;
use maplit::hashmap;
use prometheus::GaugeVec;
use tokio::time::MissedTickBehavior;
use tracing::{trace, warn};
use prometheus::IntGaugeVec;
use tokio::{task::JoinHandle, time::MissedTickBehavior};
use tracing::info_span;
use tracing::{debug, instrument::Instrumented, trace, warn, Instrument};
use crate::settings::ChainConf;
use crate::CoreMetrics;
/// Expected label names for the `wallet_balance` metric.
@ -26,8 +33,19 @@ pub const WALLET_BALANCE_LABELS: &[&str] = &[
pub const WALLET_BALANCE_HELP: &str =
"Current native token balance for the wallet addresses in the `wallets` set";
/// Expected label names for the `block_height` metric.
pub const BLOCK_HEIGHT_LABELS: &[&str] = &["chain"];
/// Help string for the metric.
pub const BLOCK_HEIGHT_HELP: &str = "Tracks the current block height of the chain";
/// Expected label names for the `gas_price` metric.
pub const GAS_PRICE_LABELS: &[&str] = &["chain"];
/// Help string for the metric.
pub const GAS_PRICE_HELP: &str =
"Tracks the current gas price of the chain, in the lowest denomination (e.g. wei)";
/// Agent-specific metrics
#[derive(Clone, Builder)]
#[derive(Clone, Builder, Debug)]
pub struct AgentMetrics {
/// Current balance of native tokens for the
/// wallet address.
@ -52,6 +70,35 @@ pub(crate) fn create_agent_metrics(metrics: &CoreMetrics) -> Result<AgentMetrics
.build()?)
}
/// Chain-specific metrics
#[derive(Clone, Builder, Debug)]
pub struct ChainMetrics {
/// Tracks the current block height of the chain.
/// - `chain`: the chain name (or ID if the name is unknown) of the chain
/// the block number refers to.
#[builder(setter(into))]
pub block_height: IntGaugeVec,
/// Tracks the current gas price of the chain. Uses the base_fee_per_gas if
/// available or else sets this to none.
/// TODO: use the median of the transactions.
/// - `chain`: the chain name (or chain ID if the name is unknown) of the
/// chain the gas price refers to.
#[builder(setter(into, strip_option), default)]
pub gas_price: Option<GaugeVec>,
}
pub(crate) fn create_chain_metrics(metrics: &CoreMetrics) -> Result<ChainMetrics> {
Ok(ChainMetricsBuilder::default()
.block_height(metrics.new_int_gauge(
"block_height",
BLOCK_HEIGHT_HELP,
BLOCK_HEIGHT_LABELS,
)?)
.gas_price(metrics.new_gauge("gas_price", GAS_PRICE_HELP, GAS_PRICE_LABELS)?)
.build()?)
}
/// Configuration for the prometheus middleware. This can be loaded via serde.
#[derive(Clone, Debug)]
#[cfg_attr(feature = "serde", derive(serde::Deserialize))]
@ -68,38 +115,55 @@ pub struct AgentMetricsConf {
pub name: String,
}
/// Utility struct to update agent metrics for a given chain
#[derive(new)]
pub struct AgentMetricsUpdater {
metrics: AgentMetrics,
/// Utility struct to update various metrics using a standalone tokio task
pub struct MetricsUpdater {
agent_metrics: AgentMetrics,
chain_metrics: ChainMetrics,
conf: AgentMetricsConf,
provider: Box<dyn HyperlaneProvider>,
}
impl AgentMetricsUpdater {
async fn update_wallet_balances(&self) {
impl MetricsUpdater {
/// Creates a new instance of the `MetricsUpdater`
pub async fn new(
chain_conf: &ChainConf,
core_metrics: Arc<CoreMetrics>,
agent_metrics: AgentMetrics,
chain_metrics: ChainMetrics,
agent_name: String,
) -> Result<Self> {
let agent_metrics_conf = chain_conf.agent_metrics_conf(agent_name).await?;
let provider = chain_conf.build_provider(&core_metrics).await?;
Ok(Self {
agent_metrics,
chain_metrics,
conf: agent_metrics_conf,
provider,
})
}
async fn update_agent_metrics(&self) {
let Some(wallet_addr) = self.conf.address.clone() else {
return;
};
let wallet_name = self.conf.name.clone();
let Some(wallet_balance_metric) = self.metrics.wallet_balance.clone() else {
let Some(wallet_balance_metric) = self.agent_metrics.wallet_balance.clone() else {
return;
};
let chain = self.conf.domain.name();
match self.provider.get_balance(wallet_addr.clone()).await {
Ok(balance) => {
// Okay, so the native type is not a token, but whatever, close enough.
// Note: This is ETH for many chains, but not all so that is why we use `N` and `Native`
// TODO: can we get away with scaling as 18 in all cases here? I am guessing not.
let balance = u256_as_scaled_f64(balance, self.conf.domain.domain_protocol());
trace!("Wallet {wallet_name} ({wallet_addr}) on chain {chain} balance is {balance} of the native currency");
wallet_balance_metric
.with(&hashmap! {
.with(&hashmap! {
"chain" => chain,
"wallet_address" => wallet_addr.as_str(),
"wallet_name" => wallet_name.as_str(),
"token_address" => "none",
// Note: Whatever this `chain`'s native currency is
"token_symbol" => "Native",
"token_name" => "Native"
}).set(balance)
@ -108,13 +172,61 @@ impl AgentMetricsUpdater {
}
}
async fn update_block_details(&self) {
let block_height = self.chain_metrics.block_height.clone();
let gas_price = self.chain_metrics.gas_price.clone();
if let HyperlaneDomain::Unknown { .. } = self.conf.domain {
return;
};
let chain = self.conf.domain.name();
debug!(?chain, "Updating metrics");
let chain_metrics = match self.provider.get_chain_metrics().await {
Ok(Some(chain_metrics)) => chain_metrics,
Err(err) => {
trace!(?chain, ?err, "Failed to get chain metrics");
return;
}
// This is the case hit by chains with an empty impl, no need to log an error
_ => return,
};
let height = chain_metrics.latest_block.number as i64;
trace!("Block height for chain {chain} is {height}");
block_height
.with(&hashmap! { "chain" => chain })
.set(height);
if let Some(gas_price) = gas_price {
let protocol = self.conf.domain.domain_protocol();
let decimals_scale = 10f64.powf(decimals_by_protocol(protocol).into());
let gas = u256_as_scaled_f64(chain_metrics.min_gas_price.unwrap_or_default(), protocol)
* decimals_scale;
trace!(
?chain,
gas = format!("{gas:.2}"),
"Gas price updated for chain (using lowest denomination)"
);
gas_price.with(&hashmap! { "chain" => chain }).set(gas);
}
}
/// Periodically updates the metrics
pub async fn start_updating_on_interval(self, period: Duration) {
let mut interval = tokio::time::interval(period);
interval.set_missed_tick_behavior(MissedTickBehavior::Skip);
loop {
self.update_wallet_balances().await;
self.update_agent_metrics().await;
self.update_block_details().await;
interval.tick().await;
}
}
/// Spawns a tokio task to update the metrics
pub fn spawn(self) -> Instrumented<JoinHandle<Result<(), Report>>> {
tokio::spawn(async move {
self.start_updating_on_interval(METRICS_SCRAPE_INTERVAL)
.await;
Ok(())
})
.instrument(info_span!("MetricsUpdater"))
}
}

@ -6,16 +6,6 @@ use crate::CoreMetrics;
pub(crate) fn create_provider_metrics(metrics: &CoreMetrics) -> Result<MiddlewareMetrics> {
Ok(MiddlewareMetricsBuilder::default()
.block_height(metrics.new_int_gauge(
"block_height",
BLOCK_HEIGHT_HELP,
BLOCK_HEIGHT_LABELS,
)?)
.gas_price_gwei(metrics.new_gauge(
"gas_price_gwei",
GAS_PRICE_GWEI_HELP,
GAS_PRICE_GWEI_LABELS,
)?)
.contract_call_duration_seconds(metrics.new_counter(
"contract_call_duration_seconds",
CONTRACT_CALL_DURATION_SECONDS_HELP,

@ -295,6 +295,12 @@ impl Debug for HyperlaneDomain {
}
}
impl From<KnownHyperlaneDomain> for HyperlaneDomain {
fn from(domain: KnownHyperlaneDomain) -> Self {
HyperlaneDomain::Known(domain)
}
}
#[derive(thiserror::Error, Debug)]
pub enum HyperlaneDomainConfigError {
#[error("Domain name (`{0}`) does not match the name of a known domain id; the name is probably misspelled.")]

@ -4,6 +4,7 @@ use std::fmt::{Debug, Display, Formatter};
use std::ops::Deref;
use bigdecimal::ParseBigDecimalError;
use derive_new::new;
use crate::config::StrOrIntParseError;
use crate::rpc_clients::RpcClientError;
@ -24,6 +25,7 @@ impl<E: StdError + Send + Sync + Any> HyperlaneCustomError for E {}
/// Thin wrapper around a boxed HyperlaneCustomError; required to satisfy
/// AsDynError implementations. Basically a trait-object adaptor.
#[repr(transparent)]
#[derive(new)]
pub struct HyperlaneCustomErrorWrapper(Box<dyn HyperlaneCustomError>);
impl Debug for HyperlaneCustomErrorWrapper {

@ -4,7 +4,7 @@ use async_trait::async_trait;
use auto_impl::auto_impl;
use thiserror::Error;
use crate::{BlockInfo, ChainResult, HyperlaneChain, TxnInfo, H256, U256};
use crate::{BlockInfo, ChainInfo, ChainResult, HyperlaneChain, TxnInfo, H256, U256};
/// Interface for a provider. Allows abstraction over different provider types
/// for different chains.
@ -27,6 +27,9 @@ pub trait HyperlaneProvider: HyperlaneChain + Send + Sync + Debug {
/// Fetch the balance of the wallet address associated with the chain provider.
async fn get_balance(&self, address: String) -> ChainResult<U256>;
/// Fetch metrics related to this chain
async fn get_chain_metrics(&self) -> ChainResult<Option<ChainInfo>>;
}
/// Errors when querying for provider information.

@ -1,7 +1,9 @@
use derive_new::new;
use crate::{H256, U256};
/// Info about a given block in the chain.
#[derive(Debug, Clone)]
#[derive(Debug, Clone, Default)]
pub struct BlockInfo {
/// Hash of this block
pub hash: H256,
@ -11,6 +13,16 @@ pub struct BlockInfo {
pub number: u64,
}
/// Metrics about the chain.
#[derive(Debug, Clone, Default, new)]
pub struct ChainInfo {
/// Information about the latest block
pub latest_block: BlockInfo,
/// The current gas price, in the lowest denomination (e.g. wei)
/// Unless the chain implements an EIP-1559 style tx fee mechanism, this field will be `None`
pub min_gas_price: Option<U256>,
}
/// Information about a given transaction in the chain.
#[derive(Debug, Clone)]
pub struct TxnInfo {

Loading…
Cancel
Save