Relayer balance metrics (#2976)

Done:
- scaffolding for fetching custom agent metrics
- abstractions for building a metrics fetcher for a given VM
- querying cosmos balances; e2e tested.
- querying evm balances; e2e tested.
- **Note that as a result, evm addresses are now no longer zero-padded
when printed in the logs. This may break existing log queries**
- fixed a nasty bug on ubuntu where wasmd (osmosisd dependency, part of
the grpc query flow) would panic when a block is specified via
`x-cosmos-block-height`. The fix was to bump the version of osmosisd
from `19.0.0` to `20.5.0`. **Note that when running e2e on Mac OS, the
osmosis version in use is still 19.0.0**. That's because we need a fork
that publishes a darwin target binary (currently pointing
[here](https://github.com/hashableric/osmosis/releases/download/v19.0.0-mnts/osmosisd-19.0.0-mnts-darwin-arm64.tar.gz))

For follow up PR:
- sealevel balance querying

I'm open to all renaming suggestions, I just tried to speed through and
didn't ponder names too much

Relates to https://github.com/hyperlane-xyz/issues/issues/701
Closes https://github.com/hyperlane-xyz/issues/issues/702 (because the
balance becomes available in the metrics endpoint for polling)
pull/3025/head
Daniel Savu 12 months ago committed by GitHub
parent 2da6ccebe8
commit 77aa58c581
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 3
      rust/Cargo.lock
  2. 73
      rust/agents/relayer/src/relayer.rs
  3. 8
      rust/agents/scraper/migration/bin/common.rs
  4. 5
      rust/agents/scraper/src/agent.rs
  5. 7
      rust/agents/validator/src/validator.rs
  6. 15
      rust/chains/hyperlane-cosmos/src/aggregation_ism.rs
  7. 12
      rust/chains/hyperlane-cosmos/src/interchain_gas.rs
  8. 17
      rust/chains/hyperlane-cosmos/src/interchain_security_module.rs
  9. 7
      rust/chains/hyperlane-cosmos/src/libs/address.rs
  10. 31
      rust/chains/hyperlane-cosmos/src/mailbox.rs
  11. 24
      rust/chains/hyperlane-cosmos/src/merkle_tree_hook.rs
  12. 18
      rust/chains/hyperlane-cosmos/src/multisig_ism.rs
  13. 44
      rust/chains/hyperlane-cosmos/src/providers/grpc.rs
  14. 61
      rust/chains/hyperlane-cosmos/src/providers/mod.rs
  15. 29
      rust/chains/hyperlane-cosmos/src/providers/rpc.rs
  16. 16
      rust/chains/hyperlane-cosmos/src/routing_ism.rs
  17. 18
      rust/chains/hyperlane-cosmos/src/validator_announce.rs
  18. 16
      rust/chains/hyperlane-ethereum/src/provider.rs
  19. 5
      rust/chains/hyperlane-ethereum/src/trait_builder.rs
  20. 6
      rust/chains/hyperlane-fuel/src/provider.rs
  21. 6
      rust/chains/hyperlane-sealevel/src/provider.rs
  22. 1
      rust/ethers-prometheus/Cargo.toml
  23. 7
      rust/ethers-prometheus/src/lib.rs
  24. 157
      rust/ethers-prometheus/src/middleware/mod.rs
  25. 2
      rust/hyperlane-base/Cargo.toml
  26. 14
      rust/hyperlane-base/src/agent.rs
  27. 2
      rust/hyperlane-base/src/lib.rs
  28. 120
      rust/hyperlane-base/src/metrics/agent_metrics.rs
  29. 6
      rust/hyperlane-base/src/metrics/mod.rs
  30. 5
      rust/hyperlane-base/src/metrics/provider.rs
  31. 49
      rust/hyperlane-base/src/settings/chains.rs
  32. 3
      rust/hyperlane-base/src/settings/signers.rs
  33. 3
      rust/hyperlane-core/src/chain.rs
  34. 2
      rust/hyperlane-core/src/lib.rs
  35. 29
      rust/hyperlane-core/src/metrics/agent.rs
  36. 2
      rust/hyperlane-core/src/metrics/mod.rs
  37. 5
      rust/hyperlane-core/src/traits/provider.rs
  38. 2
      rust/utils/run-locally/Cargo.toml
  39. 115
      rust/utils/run-locally/src/cosmos/mod.rs
  40. 2
      rust/utils/run-locally/src/cosmos/types.rs
  41. 13
      rust/utils/run-locally/src/invariants.rs
  42. 6
      rust/utils/run-locally/src/main.rs
  43. 29
      rust/utils/run-locally/src/metrics.rs

3
rust/Cargo.lock generated

@ -2775,6 +2775,7 @@ dependencies = [
"derive_builder",
"ethers",
"futures",
"hyperlane-core",
"log",
"maplit",
"parking_lot 0.12.1",
@ -4035,6 +4036,7 @@ dependencies = [
"config",
"convert_case 0.6.0",
"derive-new",
"derive_builder",
"ed25519-dalek",
"ethers",
"ethers-prometheus",
@ -4048,6 +4050,7 @@ dependencies = [
"hyperlane-sealevel",
"hyperlane-test",
"itertools 0.11.0",
"maplit",
"paste",
"prometheus",
"rocksdb",

@ -9,11 +9,15 @@ use derive_more::AsRef;
use eyre::Result;
use hyperlane_base::{
db::{HyperlaneRocksDB, DB},
run_all, BaseAgent, ContractSyncMetrics, CoreMetrics, HyperlaneAgentCore,
SequencedDataContractSync, WatermarkContractSync,
metrics::{AgentMetrics, AgentMetricsUpdater},
run_all,
settings::ChainConf,
BaseAgent, ContractSyncMetrics, CoreMetrics, HyperlaneAgentCore, SequencedDataContractSync,
WatermarkContractSync,
};
use hyperlane_core::{
HyperlaneDomain, HyperlaneMessage, InterchainGasPayment, MerkleTreeInsertion, U256,
metrics::agent::METRICS_SCRAPE_INTERVAL, HyperlaneDomain, HyperlaneMessage,
InterchainGasPayment, MerkleTreeInsertion, U256,
};
use tokio::{
sync::{
@ -49,7 +53,7 @@ struct ContextKey {
#[derive(AsRef)]
pub struct Relayer {
origin_chains: HashSet<HyperlaneDomain>,
destination_chains: HashSet<HyperlaneDomain>,
destination_chains: HashMap<HyperlaneDomain, ChainConf>,
#[as_ref]
core: HyperlaneAgentCore,
message_syncs: HashMap<HyperlaneDomain, Arc<SequencedDataContractSync<HyperlaneMessage>>>,
@ -67,6 +71,8 @@ pub struct Relayer {
transaction_gas_limit: Option<U256>,
skip_transaction_gas_limit_for: HashSet<u32>,
allow_local_checkpoint_syncers: bool,
core_metrics: Arc<CoreMetrics>,
agent_metrics: AgentMetrics,
}
impl Debug for Relayer {
@ -92,11 +98,15 @@ impl BaseAgent for Relayer {
type Settings = RelayerSettings;
async fn from_settings(settings: Self::Settings, metrics: Arc<CoreMetrics>) -> Result<Self>
async fn from_settings(
settings: Self::Settings,
core_metrics: Arc<CoreMetrics>,
agent_metrics: AgentMetrics,
) -> Result<Self>
where
Self: Sized,
{
let core = settings.build_hyperlane_core(metrics.clone());
let core = settings.build_hyperlane_core(core_metrics.clone());
let db = DB::from_path(&settings.db)?;
let dbs = settings
.origin_chains
@ -105,18 +115,18 @@ impl BaseAgent for Relayer {
.collect::<HashMap<_, _>>();
let mailboxes = settings
.build_mailboxes(settings.destination_chains.iter(), &metrics)
.build_mailboxes(settings.destination_chains.iter(), &core_metrics)
.await?;
let validator_announces = settings
.build_validator_announces(settings.origin_chains.iter(), &metrics)
.build_validator_announces(settings.origin_chains.iter(), &core_metrics)
.await?;
let contract_sync_metrics = Arc::new(ContractSyncMetrics::new(&metrics));
let contract_sync_metrics = Arc::new(ContractSyncMetrics::new(&core_metrics));
let message_syncs = settings
.build_message_indexers(
settings.origin_chains.iter(),
&metrics,
&core_metrics,
&contract_sync_metrics,
dbs.iter()
.map(|(d, db)| (d.clone(), Arc::new(db.clone()) as _))
@ -126,7 +136,7 @@ impl BaseAgent for Relayer {
let interchain_gas_payment_syncs = settings
.build_interchain_gas_payment_indexers(
settings.origin_chains.iter(),
&metrics,
&core_metrics,
&contract_sync_metrics,
dbs.iter()
.map(|(d, db)| (d.clone(), Arc::new(db.clone()) as _))
@ -136,7 +146,7 @@ impl BaseAgent for Relayer {
let merkle_tree_hook_syncs = settings
.build_merkle_tree_hook_indexers(
settings.origin_chains.iter(),
&metrics,
&core_metrics,
&contract_sync_metrics,
dbs.iter()
.map(|(d, db)| (d.clone(), Arc::new(db.clone()) as _))
@ -188,9 +198,10 @@ impl BaseAgent for Relayer {
.collect();
let mut msg_ctxs = HashMap::new();
let mut destination_chains = HashMap::new();
for destination in &settings.destination_chains {
let destination_chain_setup = core.settings.chain_setup(destination).unwrap().clone();
destination_chains.insert(destination.clone(), destination_chain_setup.clone());
let transaction_gas_limit: Option<U256> =
if skip_transaction_gas_limit_for.contains(&destination.id()) {
None
@ -221,7 +232,7 @@ impl BaseAgent for Relayer {
metadata_builder,
origin_gas_payment_enforcer: gas_payment_enforcers[origin].clone(),
transaction_gas_limit,
metrics: MessageSubmissionMetrics::new(&metrics, origin, destination),
metrics: MessageSubmissionMetrics::new(&core_metrics, origin, destination),
}),
);
}
@ -230,7 +241,7 @@ impl BaseAgent for Relayer {
Ok(Self {
dbs,
origin_chains: settings.origin_chains,
destination_chains: settings.destination_chains,
destination_chains,
msg_ctxs,
core,
message_syncs,
@ -242,6 +253,8 @@ impl BaseAgent for Relayer {
transaction_gas_limit,
skip_transaction_gas_limit_for,
allow_local_checkpoint_syncers: settings.allow_local_checkpoint_syncers,
core_metrics,
agent_metrics,
})
}
@ -251,12 +264,32 @@ impl BaseAgent for Relayer {
// send channels by destination chain
let mut send_channels = HashMap::with_capacity(self.destination_chains.len());
for destination in &self.destination_chains {
for (dest_domain, dest_conf) in &self.destination_chains {
let (send_channel, receive_channel) =
mpsc::unbounded_channel::<Box<DynPendingOperation>>();
send_channels.insert(destination.id(), send_channel);
send_channels.insert(dest_domain.id(), send_channel);
tasks.push(self.run_destination_submitter(dest_domain, receive_channel));
tasks.push(self.run_destination_submitter(destination, receive_channel));
let agent_metrics_conf = dest_conf
.agent_metrics_conf(Self::AGENT_NAME.to_string())
.await
.unwrap();
let agent_metrics_fetcher = dest_conf.build_provider(&self.core_metrics).await.unwrap();
let agent_metrics = AgentMetricsUpdater::new(
self.agent_metrics.clone(),
agent_metrics_conf,
agent_metrics_fetcher,
);
let fetcher_task = tokio::spawn(async move {
agent_metrics
.start_updating_on_interval(METRICS_SCRAPE_INTERVAL)
.await;
Ok(())
})
.instrument(info_span!("AgentMetrics"));
tasks.push(fetcher_task);
}
for origin in &self.origin_chains {
@ -330,11 +363,11 @@ impl Relayer {
let metrics = MessageProcessorMetrics::new(
&self.core.metrics,
origin,
self.destination_chains.iter(),
self.destination_chains.keys(),
);
let destination_ctxs = self
.destination_chains
.iter()
.keys()
.filter(|&destination| destination != origin)
.map(|destination| {
(

@ -1,9 +1,11 @@
use std::env;
use std::{env, time::Duration};
use migration::sea_orm::{Database, DatabaseConnection};
pub use migration::{DbErr, Migrator, MigratorTrait as _};
use sea_orm::ConnectOptions;
const LOCAL_DATABASE_URL: &str = "postgresql://postgres:47221c18c610@localhost:5432/postgres";
const CONNECT_TIMEOUT: u64 = 20;
pub fn url() -> String {
env::var("DATABASE_URL").unwrap_or_else(|_| LOCAL_DATABASE_URL.into())
@ -16,6 +18,8 @@ pub async fn init() -> Result<DatabaseConnection, DbErr> {
.init();
let url = url();
let mut options: ConnectOptions = url.clone().into();
options.connect_timeout(Duration::from_secs(CONNECT_TIMEOUT));
println!("Connecting to {url}");
Database::connect(url).await
Database::connect(options).await
}

@ -3,8 +3,8 @@ use std::{collections::HashMap, sync::Arc};
use async_trait::async_trait;
use derive_more::AsRef;
use hyperlane_base::{
run_all, settings::IndexSettings, BaseAgent, ContractSyncMetrics, CoreMetrics,
HyperlaneAgentCore,
metrics::AgentMetrics, run_all, settings::IndexSettings, BaseAgent, ContractSyncMetrics,
CoreMetrics, HyperlaneAgentCore,
};
use hyperlane_core::HyperlaneDomain;
use tokio::task::JoinHandle;
@ -38,6 +38,7 @@ impl BaseAgent for Scraper {
async fn from_settings(
settings: Self::Settings,
metrics: Arc<CoreMetrics>,
_agent_metrics: AgentMetrics,
) -> eyre::Result<Self>
where
Self: Sized,

@ -10,6 +10,7 @@ use tracing::{error, info, info_span, instrument::Instrumented, warn, Instrument
use hyperlane_base::{
db::{HyperlaneRocksDB, DB},
metrics::AgentMetrics,
run_all, BaseAgent, CheckpointSyncer, ContractSyncMetrics, CoreMetrics, HyperlaneAgentCore,
SequencedDataContractSync,
};
@ -51,7 +52,11 @@ impl BaseAgent for Validator {
type Settings = ValidatorSettings;
async fn from_settings(settings: Self::Settings, metrics: Arc<CoreMetrics>) -> Result<Self>
async fn from_settings(
settings: Self::Settings,
metrics: Arc<CoreMetrics>,
_agent_metrics: AgentMetrics,
) -> Result<Self>
where
Self: Sized,
{

@ -2,7 +2,7 @@ use std::str::FromStr;
use crate::{
address::CosmosAddress,
grpc::{WasmGrpcProvider, WasmProvider},
grpc::WasmProvider,
payloads::aggregate_ism::{ModulesAndThresholdRequest, ModulesAndThresholdResponse},
ConnectionConf, CosmosProvider, Signer,
};
@ -18,7 +18,7 @@ use tracing::instrument;
pub struct CosmosAggregationIsm {
domain: HyperlaneDomain,
address: H256,
provider: Box<WasmGrpcProvider>,
provider: Box<CosmosProvider>,
}
impl CosmosAggregationIsm {
@ -28,7 +28,12 @@ impl CosmosAggregationIsm {
locator: ContractLocator,
signer: Option<Signer>,
) -> ChainResult<Self> {
let provider = WasmGrpcProvider::new(conf.clone(), locator.clone(), signer)?;
let provider = CosmosProvider::new(
locator.domain.clone(),
conf.clone(),
Some(locator.clone()),
signer,
)?;
Ok(Self {
domain: locator.domain.clone(),
@ -50,7 +55,7 @@ impl HyperlaneChain for CosmosAggregationIsm {
}
fn provider(&self) -> Box<dyn HyperlaneProvider> {
Box::new(CosmosProvider::new(self.domain.clone()))
self.provider.clone()
}
}
@ -63,7 +68,7 @@ impl AggregationIsm for CosmosAggregationIsm {
) -> ChainResult<(Vec<H256>, u8)> {
let payload = ModulesAndThresholdRequest::new(message);
let data = self.provider.wasm_query(payload, None).await?;
let data = self.provider.grpc().wasm_query(payload, None).await?;
let response: ModulesAndThresholdResponse = serde_json::from_slice(&data)?;
let modules: ChainResult<Vec<H256>> = response

@ -10,7 +10,6 @@ use once_cell::sync::Lazy;
use std::ops::RangeInclusive;
use crate::{
grpc::WasmGrpcProvider,
rpc::{CosmosWasmIndexer, ParsedEvent, WasmIndexer},
signers::Signer,
utils::{CONTRACT_ADDRESS_ATTRIBUTE_KEY, CONTRACT_ADDRESS_ATTRIBUTE_KEY_BASE64},
@ -22,6 +21,7 @@ use crate::{
pub struct CosmosInterchainGasPaymaster {
domain: HyperlaneDomain,
address: H256,
provider: CosmosProvider,
}
impl HyperlaneContract for CosmosInterchainGasPaymaster {
@ -36,7 +36,7 @@ impl HyperlaneChain for CosmosInterchainGasPaymaster {
}
fn provider(&self) -> Box<dyn HyperlaneProvider> {
Box::new(CosmosProvider::new(self.domain.clone()))
Box::new(self.provider.clone())
}
}
@ -49,11 +49,17 @@ impl CosmosInterchainGasPaymaster {
locator: ContractLocator,
signer: Option<Signer>,
) -> ChainResult<Self> {
let provider = WasmGrpcProvider::new(conf.clone(), locator.clone(), signer)?;
let provider = CosmosProvider::new(
locator.domain.clone(),
conf.clone(),
Some(locator.clone()),
signer,
)?;
Ok(Self {
domain: locator.domain.clone(),
address: locator.address,
provider,
})
}
}

@ -5,7 +5,7 @@ use hyperlane_core::{
};
use crate::{
grpc::{WasmGrpcProvider, WasmProvider},
grpc::WasmProvider,
payloads::{
general::EmptyStruct,
ism_routes::{QueryIsmGeneralRequest, QueryIsmModuleTypeRequest},
@ -22,7 +22,7 @@ pub struct CosmosInterchainSecurityModule {
/// The address of the ISM contract.
address: H256,
/// The provider for the ISM contract.
provider: Box<WasmGrpcProvider>,
provider: CosmosProvider,
}
/// The Cosmos Interchain Security Module Implementation.
@ -33,13 +33,17 @@ impl CosmosInterchainSecurityModule {
locator: ContractLocator,
signer: Option<Signer>,
) -> ChainResult<Self> {
let provider: WasmGrpcProvider =
WasmGrpcProvider::new(conf.clone(), locator.clone(), signer)?;
let provider = CosmosProvider::new(
locator.domain.clone(),
conf.clone(),
Some(locator.clone()),
signer,
)?;
Ok(Self {
domain: locator.domain.clone(),
address: locator.address,
provider: Box::new(provider),
provider,
})
}
}
@ -56,7 +60,7 @@ impl HyperlaneChain for CosmosInterchainSecurityModule {
}
fn provider(&self) -> Box<dyn HyperlaneProvider> {
Box::new(CosmosProvider::new(self.domain.clone()))
Box::new(self.provider.clone())
}
}
@ -71,6 +75,7 @@ impl InterchainSecurityModule for CosmosInterchainSecurityModule {
let data = self
.provider
.grpc()
.wasm_query(QueryIsmGeneralRequest { ism: query }, None)
.await?;

@ -12,7 +12,7 @@ use tendermint::public_key::PublicKey as TendermintPublicKey;
use crate::HyperlaneCosmosError;
/// Wrapper around the cosmrs AccountId type that abstracts bech32 encoding
#[derive(new, Debug)]
#[derive(new, Debug, Clone)]
pub struct CosmosAddress {
/// Bech32 encoded cosmos account
account_id: AccountId,
@ -132,6 +132,11 @@ pub mod test {
addr.address(),
"neutron1kknekjxg0ear00dky5ykzs8wwp2gz62z9s6aaj"
);
// TODO: watch out for this edge case. This check will fail unless
// the first 12 bytes are removed from the digest.
// let digest = addr.digest();
// let addr2 = CosmosAddress::from_h256(digest, prefix).expect("Cosmos address creation failed");
// assert_eq!(addr.address(), addr2.address());
}
#[test]

@ -14,10 +14,7 @@ use crate::payloads::{general, mailbox};
use crate::rpc::{CosmosWasmIndexer, ParsedEvent, WasmIndexer};
use crate::CosmosProvider;
use crate::{address::CosmosAddress, types::tx_response_to_outcome};
use crate::{
grpc::{WasmGrpcProvider, WasmProvider},
HyperlaneCosmosError,
};
use crate::{grpc::WasmProvider, HyperlaneCosmosError};
use crate::{signers::Signer, utils::get_block_height_for_lag, ConnectionConf};
use async_trait::async_trait;
use cosmrs::proto::cosmos::base::abci::v1beta1::TxResponse;
@ -40,7 +37,7 @@ pub struct CosmosMailbox {
config: ConnectionConf,
domain: HyperlaneDomain,
address: H256,
provider: Box<WasmGrpcProvider>,
provider: CosmosProvider,
}
impl CosmosMailbox {
@ -51,13 +48,18 @@ impl CosmosMailbox {
locator: ContractLocator,
signer: Option<Signer>,
) -> ChainResult<Self> {
let provider = WasmGrpcProvider::new(conf.clone(), locator.clone(), signer)?;
let provider = CosmosProvider::new(
locator.domain.clone(),
conf.clone(),
Some(locator.clone()),
signer,
)?;
Ok(Self {
config: conf,
domain: locator.domain.clone(),
address: locator.address,
provider: Box::new(provider),
provider,
})
}
@ -79,7 +81,7 @@ impl HyperlaneChain for CosmosMailbox {
}
fn provider(&self) -> Box<dyn HyperlaneProvider> {
Box::new(CosmosProvider::new(self.domain.clone()))
Box::new(self.provider.clone())
}
}
@ -94,7 +96,7 @@ impl Debug for CosmosMailbox {
impl Mailbox for CosmosMailbox {
#[instrument(level = "debug", err, ret, skip(self))]
async fn count(&self, lag: Option<NonZeroU64>) -> ChainResult<u32> {
let block_height = get_block_height_for_lag(&self.provider, lag).await?;
let block_height = get_block_height_for_lag(self.provider.grpc(), lag).await?;
self.nonce_at_block(block_height).await
}
@ -107,6 +109,7 @@ impl Mailbox for CosmosMailbox {
let delivered = match self
.provider
.grpc()
.wasm_query(GeneralMailboxQuery { mailbox: payload }, None)
.await
{
@ -136,6 +139,7 @@ impl Mailbox for CosmosMailbox {
let data = self
.provider
.grpc()
.wasm_query(GeneralMailboxQuery { mailbox: payload }, None)
.await?;
let response: mailbox::DefaultIsmResponse = serde_json::from_slice(&data)?;
@ -157,6 +161,7 @@ impl Mailbox for CosmosMailbox {
let data = self
.provider
.grpc()
.wasm_query(GeneralMailboxQuery { mailbox: payload }, None)
.await?;
let response: mailbox::RecipientIsmResponse = serde_json::from_slice(&data)?;
@ -182,6 +187,7 @@ impl Mailbox for CosmosMailbox {
let response: TxResponse = self
.provider
.grpc()
.wasm_send(process_message, tx_gas_limit)
.await?;
@ -201,7 +207,11 @@ impl Mailbox for CosmosMailbox {
},
};
let gas_limit = self.provider.wasm_estimate_gas(process_message).await?;
let gas_limit = self
.provider
.grpc()
.wasm_estimate_gas(process_message)
.await?;
let result = TxCostEstimate {
gas_limit: gas_limit.into(),
@ -226,6 +236,7 @@ impl CosmosMailbox {
let data = self
.provider
.grpc()
.wasm_query(GeneralMailboxQuery { mailbox: payload }, block_height)
.await?;

@ -12,7 +12,7 @@ use once_cell::sync::Lazy;
use tracing::instrument;
use crate::{
grpc::{WasmGrpcProvider, WasmProvider},
grpc::WasmProvider,
payloads::{
general::{self},
merkle_tree_hook,
@ -33,7 +33,7 @@ pub struct CosmosMerkleTreeHook {
/// Contract address
address: H256,
/// Provider
provider: Box<WasmGrpcProvider>,
provider: CosmosProvider,
}
impl CosmosMerkleTreeHook {
@ -43,12 +43,17 @@ impl CosmosMerkleTreeHook {
locator: ContractLocator,
signer: Option<Signer>,
) -> ChainResult<Self> {
let provider = WasmGrpcProvider::new(conf.clone(), locator.clone(), signer)?;
let provider = CosmosProvider::new(
locator.domain.clone(),
conf.clone(),
Some(locator.clone()),
signer,
)?;
Ok(Self {
domain: locator.domain.clone(),
address: locator.address,
provider: Box::new(provider),
provider,
})
}
}
@ -65,7 +70,7 @@ impl HyperlaneChain for CosmosMerkleTreeHook {
}
fn provider(&self) -> Box<dyn HyperlaneProvider> {
Box::new(CosmosProvider::new(self.domain.clone()))
Box::new(self.provider.clone())
}
}
@ -78,10 +83,11 @@ impl MerkleTreeHook for CosmosMerkleTreeHook {
tree: general::EmptyStruct {},
};
let block_height = get_block_height_for_lag(&self.provider, lag).await?;
let block_height = get_block_height_for_lag(self.provider.grpc(), lag).await?;
let data = self
.provider
.grpc()
.wasm_query(
merkle_tree_hook::MerkleTreeGenericRequest {
merkle_hook: payload,
@ -111,7 +117,7 @@ impl MerkleTreeHook for CosmosMerkleTreeHook {
count: general::EmptyStruct {},
};
let block_height = get_block_height_for_lag(&self.provider, lag).await?;
let block_height = get_block_height_for_lag(self.provider.grpc(), lag).await?;
self.count_at_block(block_height).await
}
@ -122,10 +128,11 @@ impl MerkleTreeHook for CosmosMerkleTreeHook {
check_point: general::EmptyStruct {},
};
let block_height = get_block_height_for_lag(&self.provider, lag).await?;
let block_height = get_block_height_for_lag(self.provider.grpc(), lag).await?;
let data = self
.provider
.grpc()
.wasm_query(
merkle_tree_hook::MerkleTreeGenericRequest {
merkle_hook: payload,
@ -153,6 +160,7 @@ impl CosmosMerkleTreeHook {
let data = self
.provider
.grpc()
.wasm_query(
merkle_tree_hook::MerkleTreeGenericRequest {
merkle_hook: payload,

@ -1,9 +1,7 @@
use std::str::FromStr;
use crate::{
grpc::{WasmGrpcProvider, WasmProvider},
payloads::ism_routes::QueryIsmGeneralRequest,
signers::Signer,
grpc::WasmProvider, payloads::ism_routes::QueryIsmGeneralRequest, signers::Signer,
ConnectionConf, CosmosProvider,
};
use async_trait::async_trait;
@ -19,7 +17,7 @@ use crate::payloads::multisig_ism::{self, VerifyInfoRequest, VerifyInfoRequestIn
pub struct CosmosMultisigIsm {
domain: HyperlaneDomain,
address: H256,
provider: Box<WasmGrpcProvider>,
provider: CosmosProvider,
}
impl CosmosMultisigIsm {
@ -29,12 +27,17 @@ impl CosmosMultisigIsm {
locator: ContractLocator,
signer: Option<Signer>,
) -> ChainResult<Self> {
let provider = WasmGrpcProvider::new(conf.clone(), locator.clone(), signer)?;
let provider = CosmosProvider::new(
locator.domain.clone(),
conf.clone(),
Some(locator.clone()),
signer,
)?;
Ok(Self {
domain: locator.domain.clone(),
address: locator.address,
provider: Box::new(provider),
provider,
})
}
}
@ -51,7 +54,7 @@ impl HyperlaneChain for CosmosMultisigIsm {
}
fn provider(&self) -> Box<dyn HyperlaneProvider> {
Box::new(CosmosProvider::new(self.domain.clone()))
Box::new(self.provider.clone())
}
}
@ -70,6 +73,7 @@ impl MultisigIsm for CosmosMultisigIsm {
let data = self
.provider
.grpc()
.wasm_query(QueryIsmGeneralRequest { ism: payload }, None)
.await?;
let response: multisig_ism::VerifyInfoResponse = serde_json::from_slice(&data)?;

@ -5,6 +5,7 @@ use cosmrs::{
auth::v1beta1::{
query_client::QueryClient as QueryAccountClient, BaseAccount, QueryAccountRequest,
},
bank::v1beta1::{query_client::QueryClient as QueryBalanceClient, QueryBalanceRequest},
base::{
abci::v1beta1::TxResponse,
tendermint::v1beta1::{service_client::ServiceClient, GetLatestBlockRequest},
@ -77,14 +78,14 @@ pub trait WasmProvider: Send + Sync {
async fn wasm_estimate_gas<T: Serialize + Sync + Send>(&self, payload: T) -> ChainResult<u64>;
}
#[derive(Debug)]
#[derive(Debug, Clone)]
/// CosmWasm GRPC provider.
pub struct WasmGrpcProvider {
/// Connection configuration.
conf: ConnectionConf,
/// A contract address that can be used as the default
/// for queries / sends / estimates.
contract_address: CosmosAddress,
contract_address: Option<CosmosAddress>,
/// Signer for transactions.
signer: Option<Signer>,
/// GRPC Channel that can be cheaply cloned.
@ -96,13 +97,15 @@ impl WasmGrpcProvider {
/// Create new CosmWasm GRPC Provider.
pub fn new(
conf: ConnectionConf,
locator: ContractLocator,
locator: Option<ContractLocator>,
signer: Option<Signer>,
) -> ChainResult<Self> {
let endpoint =
Endpoint::new(conf.get_grpc_url()).map_err(Into::<HyperlaneCosmosError>::into)?;
let channel = endpoint.connect_lazy();
let contract_address = CosmosAddress::from_h256(locator.address, &conf.get_prefix())?;
let contract_address = locator
.map(|l| CosmosAddress::from_h256(l.address, &conf.get_prefix()))
.transpose()?;
Ok(Self {
conf,
@ -220,6 +223,24 @@ impl WasmGrpcProvider {
Ok(gas_estimate)
}
/// Fetches balance for a given `address` and `denom`
pub async fn get_balance(&self, address: String, denom: String) -> ChainResult<U256> {
let mut client = QueryBalanceClient::new(self.channel.clone());
let balance_request = tonic::Request::new(QueryBalanceRequest { address, denom });
let response = client
.balance(balance_request)
.await
.map_err(ChainCommunicationError::from_other)?
.into_inner();
let balance = response
.balance
.ok_or_else(|| ChainCommunicationError::from_other_str("account not present"))?;
Ok(balance.amount.parse()?)
}
/// Queries an account.
async fn account_query(&self, account: String) -> ChainResult<BaseAccount> {
let mut client = QueryAccountClient::new(self.channel.clone());
@ -268,7 +289,10 @@ impl WasmProvider for WasmGrpcProvider {
where
T: Serialize + Send + Sync,
{
self.wasm_query_to(self.contract_address.address(), payload, block_height)
let contract_address = self.contract_address.as_ref().ok_or_else(|| {
ChainCommunicationError::from_other_str("No contract address available")
})?;
self.wasm_query_to(contract_address.address(), payload, block_height)
.await
}
@ -308,10 +332,13 @@ impl WasmProvider for WasmGrpcProvider {
{
let signer = self.get_signer()?;
let mut client = TxServiceClient::new(self.channel.clone());
let contract_address = self.contract_address.as_ref().ok_or_else(|| {
ChainCommunicationError::from_other_str("No contract address available")
})?;
let msgs = vec![MsgExecuteContract {
sender: signer.address.clone(),
contract: self.contract_address.address(),
contract: contract_address.address(),
msg: serde_json::to_string(&payload)?.as_bytes().to_vec(),
funds: vec![],
}
@ -354,9 +381,12 @@ impl WasmProvider for WasmGrpcProvider {
// Estimating gas requires a signer, which we can reasonably expect to have
// since we need one to send a tx with the estimated gas anyways.
let signer = self.get_signer()?;
let contract_address = self.contract_address.as_ref().ok_or_else(|| {
ChainCommunicationError::from_other_str("No contract address available")
})?;
let msg = MsgExecuteContract {
sender: signer.address.clone(),
contract: self.contract_address.address(),
contract: contract_address.address(),
msg: serde_json::to_string(&payload)?.as_bytes().to_vec(),
funds: vec![],
};

@ -1,23 +1,63 @@
use async_trait::async_trait;
use hyperlane_core::{
BlockInfo, ChainResult, HyperlaneChain, HyperlaneDomain, HyperlaneProvider, TxnInfo, H256,
BlockInfo, ChainResult, ContractLocator, HyperlaneChain, HyperlaneDomain, HyperlaneProvider,
TxnInfo, H256, U256,
};
use tendermint_rpc::{client::CompatMode, HttpClient};
use crate::{ConnectionConf, HyperlaneCosmosError, Signer};
use self::grpc::WasmGrpcProvider;
/// cosmos grpc provider
pub mod grpc;
/// cosmos rpc provider
pub mod rpc;
/// A reference to a Cosmos chain
#[derive(Debug)]
/// Abstraction over a connection to a Cosmos chain
#[derive(Debug, Clone)]
pub struct CosmosProvider {
domain: HyperlaneDomain,
canonical_asset: String,
grpc_client: WasmGrpcProvider,
rpc_client: HttpClient,
}
impl CosmosProvider {
/// Create a reference to a Cosmos chain
pub fn new(domain: HyperlaneDomain) -> Self {
Self { domain }
pub fn new(
domain: HyperlaneDomain,
conf: ConnectionConf,
locator: Option<ContractLocator>,
signer: Option<Signer>,
) -> ChainResult<Self> {
let grpc_client = WasmGrpcProvider::new(conf.clone(), locator, signer)?;
let rpc_client = HttpClient::builder(
conf.get_rpc_url()
.parse()
.map_err(Into::<HyperlaneCosmosError>::into)?,
)
// Consider supporting different compatibility modes.
.compat_mode(CompatMode::latest())
.build()
.map_err(Into::<HyperlaneCosmosError>::into)?;
Ok(Self {
domain,
rpc_client,
grpc_client,
canonical_asset: conf.get_canonical_asset(),
})
}
/// Get a grpc client
pub fn grpc(&self) -> &WasmGrpcProvider {
&self.grpc_client
}
/// Get an rpc client
pub fn rpc(&self) -> &HttpClient {
&self.rpc_client
}
}
@ -27,9 +67,7 @@ impl HyperlaneChain for CosmosProvider {
}
fn provider(&self) -> Box<dyn HyperlaneProvider> {
Box::new(CosmosProvider {
domain: self.domain.clone(),
})
Box::new(self.clone())
}
}
@ -47,4 +85,11 @@ impl HyperlaneProvider for CosmosProvider {
// FIXME
Ok(true)
}
async fn get_balance(&self, address: String) -> ChainResult<U256> {
Ok(self
.grpc_client
.get_balance(address, self.canonical_asset.clone())
.await?)
}
}

@ -1,7 +1,7 @@
use std::ops::RangeInclusive;
use async_trait::async_trait;
use cosmrs::rpc::client::{Client, CompatMode, HttpClient};
use cosmrs::rpc::client::Client;
use cosmrs::rpc::endpoint::{tx, tx_search::Response as TxSearchResponse};
use cosmrs::rpc::query::Query;
use cosmrs::rpc::Order;
@ -10,7 +10,7 @@ use hyperlane_core::{ChainCommunicationError, ChainResult, ContractLocator, LogM
use tracing::{instrument, trace};
use crate::address::CosmosAddress;
use crate::{ConnectionConf, HyperlaneCosmosError};
use crate::{ConnectionConf, CosmosProvider, HyperlaneCosmosError};
const PAGINATION_LIMIT: u8 = 100;
@ -50,7 +50,7 @@ impl<T: PartialEq> ParsedEvent<T> {
#[derive(Debug)]
/// Cosmwasm RPC Provider
pub struct CosmosWasmIndexer {
client: HttpClient,
provider: CosmosProvider,
contract_address: CosmosAddress,
target_event_kind: String,
reorg_period: u32,
@ -66,17 +66,14 @@ impl CosmosWasmIndexer {
event_type: String,
reorg_period: u32,
) -> ChainResult<Self> {
let client = HttpClient::builder(
conf.get_rpc_url()
.parse()
.map_err(Into::<HyperlaneCosmosError>::into)?,
)
// Consider supporting different compatibility modes.
.compat_mode(CompatMode::latest())
.build()
.map_err(Into::<HyperlaneCosmosError>::into)?;
let provider = CosmosProvider::new(
locator.domain.clone(),
conf.clone(),
Some(locator.clone()),
None,
)?;
Ok(Self {
client,
provider,
contract_address: CosmosAddress::from_h256(
locator.address,
conf.get_prefix().as_str(),
@ -91,7 +88,8 @@ impl CosmosWasmIndexer {
#[instrument(level = "trace", err, skip(self))]
async fn tx_search(&self, query: Query, page: u32) -> ChainResult<TxSearchResponse> {
Ok(self
.client
.provider
.rpc()
.tx_search(query, false, page, PAGINATION_LIMIT, Order::Ascending)
.await
.map_err(Into::<HyperlaneCosmosError>::into)?)
@ -176,7 +174,8 @@ impl CosmosWasmIndexer {
impl WasmIndexer for CosmosWasmIndexer {
async fn get_finalized_block_number(&self) -> ChainResult<u32> {
let latest_height: u32 = self
.client
.provider
.rpc()
.latest_block()
.await
.map_err(Into::<HyperlaneCosmosError>::into)?

@ -9,7 +9,7 @@ use hyperlane_core::{
use crate::{
address::CosmosAddress,
grpc::{WasmGrpcProvider, WasmProvider},
grpc::WasmProvider,
payloads::ism_routes::{
IsmRouteRequest, IsmRouteRequestInner, IsmRouteRespnose, QueryRoutingIsmGeneralRequest,
},
@ -22,7 +22,7 @@ use crate::{
pub struct CosmosRoutingIsm {
domain: HyperlaneDomain,
address: H256,
provider: Box<WasmGrpcProvider>,
provider: CosmosProvider,
}
impl CosmosRoutingIsm {
@ -32,12 +32,17 @@ impl CosmosRoutingIsm {
locator: ContractLocator,
signer: Option<Signer>,
) -> ChainResult<Self> {
let provider = WasmGrpcProvider::new(conf.clone(), locator.clone(), signer)?;
let provider = CosmosProvider::new(
locator.domain.clone(),
conf.clone(),
Some(locator.clone()),
signer,
)?;
Ok(Self {
domain: locator.domain.clone(),
address: locator.address,
provider: Box::new(provider),
provider,
})
}
}
@ -54,7 +59,7 @@ impl HyperlaneChain for CosmosRoutingIsm {
}
fn provider(&self) -> Box<dyn HyperlaneProvider> {
Box::new(CosmosProvider::new(self.domain.clone()))
Box::new(self.provider.clone())
}
}
@ -69,6 +74,7 @@ impl RoutingIsm for CosmosRoutingIsm {
let data = self
.provider
.grpc()
.wasm_query(
QueryRoutingIsmGeneralRequest {
routing_ism: payload,

@ -7,7 +7,7 @@ use hyperlane_core::{
};
use crate::{
grpc::{WasmGrpcProvider, WasmProvider},
grpc::WasmProvider,
payloads::validator_announce::{
self, AnnouncementRequest, AnnouncementRequestInner, GetAnnounceStorageLocationsRequest,
GetAnnounceStorageLocationsRequestInner,
@ -22,7 +22,7 @@ use crate::{
pub struct CosmosValidatorAnnounce {
domain: HyperlaneDomain,
address: H256,
provider: Box<WasmGrpcProvider>,
provider: CosmosProvider,
}
impl CosmosValidatorAnnounce {
@ -32,12 +32,17 @@ impl CosmosValidatorAnnounce {
locator: ContractLocator,
signer: Option<Signer>,
) -> ChainResult<Self> {
let provider = WasmGrpcProvider::new(conf.clone(), locator.clone(), signer)?;
let provider = CosmosProvider::new(
locator.domain.clone(),
conf.clone(),
Some(locator.clone()),
signer,
)?;
Ok(Self {
domain: locator.domain.clone(),
address: locator.address,
provider: Box::new(provider),
provider,
})
}
}
@ -54,7 +59,7 @@ impl HyperlaneChain for CosmosValidatorAnnounce {
}
fn provider(&self) -> Box<dyn HyperlaneProvider> {
Box::new(CosmosProvider::new(self.domain.clone()))
Box::new(self.provider.clone())
}
}
@ -76,7 +81,7 @@ impl ValidatorAnnounce for CosmosValidatorAnnounce {
},
};
let data: Vec<u8> = self.provider.wasm_query(payload, None).await?;
let data: Vec<u8> = self.provider.grpc().wasm_query(payload, None).await?;
let response: validator_announce::GetAnnounceStorageLocationsResponse =
serde_json::from_slice(&data)?;
@ -102,6 +107,7 @@ impl ValidatorAnnounce for CosmosValidatorAnnounce {
let response: TxResponse = self
.provider
.grpc()
.wasm_send(announce_request, tx_gas_limit)
.await?;

@ -6,7 +6,8 @@ use std::time::Duration;
use async_trait::async_trait;
use derive_new::new;
use ethers::prelude::Middleware;
use hyperlane_core::ethers_core_types;
use ethers_core::abi::Address;
use hyperlane_core::{ethers_core_types, U256};
use tokio::time::sleep;
use tracing::instrument;
@ -105,6 +106,19 @@ where
.map_err(ChainCommunicationError::from_other)?;
Ok(!code.is_empty())
}
#[instrument(err, skip(self))]
async fn get_balance(&self, address: String) -> ChainResult<U256> {
// Can't use the address directly as a string, because ethers interprets it
// as an ENS name rather than an address.
let addr: Address = address.parse()?;
let balance = self
.provider
.get_balance(addr, None)
.await
.map_err(ChainCommunicationError::from_other)?;
Ok(balance.into())
}
}
impl<M> EthereumProvider<M>

@ -10,6 +10,7 @@ use ethers::prelude::{
Http, JsonRpcClient, Middleware, NonceManagerMiddleware, Provider, Quorum, QuorumProvider,
SignerMiddleware, WeightedProvider, Ws, WsClientError,
};
use hyperlane_core::metrics::agent::METRICS_SCRAPE_INTERVAL;
use reqwest::{Client, Url};
use thiserror::Error;
@ -27,7 +28,6 @@ use hyperlane_core::{
use crate::{signers::Signers, ConnectionConf, FallbackProvider, RetryingProvider};
// This should be whatever the prometheus scrape interval is
const METRICS_SCRAPE_INTERVAL: Duration = Duration::from_secs(60);
const HTTP_CLIENT_TIMEOUT: Duration = Duration::from_secs(60);
/// An error when connecting to an ethereum provider.
@ -194,6 +194,9 @@ pub trait BuildableWithProvider {
Ok(if let Some(metrics) = metrics {
let provider = Arc::new(PrometheusMiddleware::new(provider, metrics.0, metrics.1));
// TODO: This task is spawned each time `.build_ethereum(...)` is called, which is about 15 times,
// in spite of it doing the same thing, wasting resources.
// Only spawn this once along with the other agent tasks.
tokio::spawn(provider.start_updating_on_interval(METRICS_SCRAPE_INTERVAL));
self.build_with_signer(provider, locator, signer).await?
} else {

@ -1,7 +1,7 @@
use async_trait::async_trait;
use hyperlane_core::{
BlockInfo, ChainResult, HyperlaneChain, HyperlaneDomain, HyperlaneProvider, TxnInfo, H256,
BlockInfo, ChainResult, HyperlaneChain, HyperlaneDomain, HyperlaneProvider, TxnInfo, H256, U256,
};
/// A wrapper around a fuel provider to get generic blockchain information.
@ -31,4 +31,8 @@ impl HyperlaneProvider for FuelProvider {
async fn is_contract(&self, address: &H256) -> ChainResult<bool> {
todo!()
}
async fn get_balance(&self, address: String) -> ChainResult<U256> {
todo!()
}
}

@ -1,7 +1,7 @@
use async_trait::async_trait;
use hyperlane_core::{
BlockInfo, ChainResult, HyperlaneChain, HyperlaneDomain, HyperlaneProvider, TxnInfo, H256,
BlockInfo, ChainResult, HyperlaneChain, HyperlaneDomain, HyperlaneProvider, TxnInfo, H256, U256,
};
/// A wrapper around a Sealevel provider to get generic blockchain information.
@ -43,4 +43,8 @@ impl HyperlaneProvider for SealevelProvider {
// FIXME
Ok(true)
}
async fn get_balance(&self, _address: String) -> ChainResult<U256> {
todo!() // FIXME
}
}

@ -26,6 +26,7 @@ tokio = { workspace = true, features = ["time", "sync", "parking_lot"] }
# enable feature for this crate that is imported by ethers-rs
primitive-types = { workspace = true, features = ["fp-conversion"] }
hyperlane-core = { path = "../hyperlane-core", features = ["agent", "float"] }
[build-dependencies]
abigen = { path = "../utils/abigen", features = ["ethers"] }

@ -3,8 +3,6 @@
#![forbid(unsafe_code)]
#![warn(missing_docs)]
use ethers::prelude::U256;
mod contracts;
pub mod json_rpc_client;
@ -19,8 +17,3 @@ pub struct ChainInfo {
/// "kovan".
pub name: Option<String>,
}
/// Convert a u256 scaled integer value into the corresponding f64 value.
fn u256_as_scaled_f64(value: U256, decimals: u8) -> f64 {
value.to_f64_lossy() / (10u64.pow(decimals as u32) as f64)
}

@ -14,44 +14,21 @@ use ethers::abi::AbiEncode;
use ethers::prelude::*;
use ethers::types::transaction::eip2718::TypedTransaction;
use ethers::utils::hex::ToHex;
use log::{debug, trace, warn};
use hyperlane_core::metrics::agent::u256_as_scaled_f64;
use hyperlane_core::HyperlaneDomainProtocol;
use log::{debug, trace};
use maplit::hashmap;
use prometheus::{CounterVec, GaugeVec, IntCounterVec, IntGaugeVec};
use static_assertions::assert_impl_all;
use tokio::sync::RwLock;
use tokio::time::MissedTickBehavior;
pub use error::PrometheusMiddlewareError;
use tokio::time::MissedTickBehavior;
use crate::contracts::erc_20::Erc20;
use crate::u256_as_scaled_f64;
pub use crate::ChainInfo;
mod error;
/// Some basic information about a token.
#[derive(Clone, Debug)]
#[cfg_attr(feature = "serde", derive(serde::Deserialize))]
#[cfg_attr(feature = "serde", serde(tag = "type", rename_all = "camelCase"))]
pub struct TokenInfo {
/// Full name of the token. E.g. Ether.
pub name: String,
/// Token symbol. E.g. ETH.
pub symbol: String,
/// Number of
pub decimals: u8,
}
impl Default for TokenInfo {
fn default() -> Self {
Self {
name: "Unknown".into(),
symbol: "".into(),
decimals: 18,
}
}
}
/// Some basic information about a wallet.
#[derive(Clone, Debug)]
#[cfg_attr(feature = "serde", derive(serde::Deserialize))]
@ -148,18 +125,6 @@ pub const TRANSACTION_SEND_TOTAL_LABELS: &[&str] =
/// Help string for the metric.
pub const TRANSACTION_SEND_TOTAL_HELP: &str = "Number of transactions sent";
/// Expected label names for the `wallet_balance` metric.
pub const WALLET_BALANCE_LABELS: &[&str] = &[
"chain",
"wallet_address",
"wallet_name",
"token_address",
"token_symbol",
"token_name",
];
/// Help string for the metric.
pub const WALLET_BALANCE_HELP: &str = "Current balance of eth and other tokens in the `tokens` map for the wallet addresses in the `wallets` set";
/// Container for all the relevant middleware metrics.
#[derive(Clone, Builder)]
pub struct MiddlewareMetrics {
@ -238,24 +203,12 @@ pub struct MiddlewareMetrics {
/// - `txn_status`: `dispatched`, `completed`, or `failed`
#[builder(setter(into, strip_option), default)]
transaction_send_total: Option<IntCounterVec>,
// /// Gas spent on completed transactions.
// /// - `chain`: the chain name (or ID if the name is unknown) of the chain the tx occurred
// on. /// - `address_from`: source address of the transaction.
// /// - `address_to`: destination address of the transaction.
// #[builder(setter(into, strip_option), default)]
// transaction_send_gas_eth_total: Option<CounterVec>,
/// Current balance of eth and other tokens in the `tokens` map for the
/// wallet addresses in the `wallets` set.
/// - `chain`: the chain name (or chain ID if the name is unknown) of the
/// chain the tx occurred on.
/// - `wallet_address`: Address of the wallet holding the funds.
/// - `wallet_name`: Name of the address holding the funds.
/// - `token_address`: Address of the token.
/// - `token_symbol`: Symbol of the token.
/// - `token_name`: Full name of the token.
#[builder(setter(into, strip_option), default)]
wallet_balance: Option<GaugeVec>,
}
/// An ethers-rs middleware that instruments calls with prometheus metrics. To
@ -273,14 +226,6 @@ pub struct PrometheusMiddleware<M> {
#[cfg_attr(feature = "serde", derive(serde::Deserialize))]
#[cfg_attr(feature = "serde", serde(tag = "type", rename_all = "camelCase"))]
pub struct PrometheusMiddlewareConf {
/// The tokens to track and identifying info
#[cfg_attr(feature = "serde", serde(default))]
pub tokens: HashMap<Address, TokenInfo>,
/// The wallets to track and identifying info
#[cfg_attr(feature = "serde", serde(default))]
pub wallets: HashMap<Address, WalletInfo>,
/// Contract info for more useful metrics
#[cfg_attr(feature = "serde", serde(default))]
pub contracts: HashMap<Address, ContractInfo>,
@ -521,32 +466,6 @@ impl<M> PrometheusMiddleware<M> {
conf: Arc::new(RwLock::new(conf)),
}
}
/// Start tracking metrics for a new token.
pub async fn track_new_token(&self, addr: Address, info: TokenInfo) {
self.track_new_tokens([(addr, info)]).await;
}
/// Start tacking metrics for new tokens.
pub async fn track_new_tokens(&self, iter: impl IntoIterator<Item = (Address, TokenInfo)>) {
let mut data = self.conf.write().await;
for (addr, info) in iter {
data.tokens.insert(addr, info);
}
}
/// Start tracking metrics for a new wallet.
pub async fn track_new_wallet(&self, addr: Address, info: WalletInfo) {
self.track_new_wallets([(addr, info)]).await;
}
/// Start tracking metrics for new wallets.
pub async fn track_new_wallets(&self, iter: impl IntoIterator<Item = (Address, WalletInfo)>) {
let mut data = self.conf.write().await;
for (addr, info) in iter {
data.wallets.insert(addr, info);
}
}
}
impl<M: Middleware> PrometheusMiddleware<M> {
@ -580,7 +499,6 @@ impl<M: Middleware + Send + Sync> PrometheusMiddleware<M> {
/// prometheus scrape interval.
pub fn update(&self) -> impl Future<Output = ()> {
// all metrics are Arcs internally so just clone the ones we want to report for.
let wallet_balance = self.metrics.wallet_balance.clone();
let block_height = self.metrics.block_height.clone();
let gas_price_gwei = self.metrics.gas_price_gwei.clone();
@ -595,9 +513,6 @@ impl<M: Middleware + Send + Sync> PrometheusMiddleware<M> {
if block_height.is_some() || gas_price_gwei.is_some() {
Self::update_block_details(&*client, chain, block_height, gas_price_gwei).await;
}
if let Some(wallet_balance) = wallet_balance {
Self::update_wallet_balances(client.clone(), &data, chain, wallet_balance).await;
}
// more metrics to come...
}
@ -609,9 +524,7 @@ impl<M: Middleware + Send + Sync> PrometheusMiddleware<M> {
block_height: Option<IntGaugeVec>,
gas_price_gwei: Option<GaugeVec>,
) {
let current_block = if let Ok(Some(b)) = client.get_block(BlockNumber::Latest).await {
b
} else {
let Ok(Some(current_block)) = client.get_block(BlockNumber::Latest).await else {
return;
};
@ -627,7 +540,8 @@ impl<M: Middleware + Send + Sync> PrometheusMiddleware<M> {
}
if let Some(gas_price_gwei) = gas_price_gwei {
if let Some(london_fee) = current_block.base_fee_per_gas {
let gas = u256_as_scaled_f64(london_fee, 18) * 1e9;
let gas =
u256_as_scaled_f64(london_fee.into(), HyperlaneDomainProtocol::Ethereum) * 1e9;
trace!("Gas price for chain {chain} is {gas:.1}gwei");
gas_price_gwei.with(&hashmap! { "chain" => chain }).set(gas);
} else {
@ -635,63 +549,6 @@ impl<M: Middleware + Send + Sync> PrometheusMiddleware<M> {
}
}
}
async fn update_wallet_balances(
client: Arc<M>,
data: &PrometheusMiddlewareConf,
chain: &str,
wallet_balance_metric: GaugeVec,
) {
for (wallet_addr, wallet_info) in data.wallets.iter() {
let wallet_addr_str: String = wallet_addr.encode_hex();
let wallet_name = wallet_info.name.as_deref().unwrap_or("none");
match client.get_balance(*wallet_addr, None).await {
Ok(balance) => {
// Okay, so the native type is not a token, but whatever, close enough.
// Note: This is ETH for many chains, but not all so that is why we use `N` and `Native`
// TODO: can we get away with scaling as 18 in all cases here? I am guessing not.
let balance = u256_as_scaled_f64(balance, 18);
trace!("Wallet {wallet_name} ({wallet_addr_str}) on chain {chain} balance is {balance} of the native currency");
wallet_balance_metric
.with(&hashmap! {
"chain" => chain,
"wallet_address" => wallet_addr_str.as_str(),
"wallet_name" => wallet_name,
"token_address" => "none",
"token_symbol" => "Native",
"token_name" => "Native"
}).set(balance)
},
Err(e) => warn!("Metric update failed for wallet {wallet_name} ({wallet_addr_str}) on chain {chain} balance for native currency; {e}")
}
for (token_addr, token) in data.tokens.iter() {
let token_addr_str: String = token_addr.encode_hex();
let balance = match Erc20::new(*token_addr, client.clone())
.balance_of(*wallet_addr)
.call()
.await
{
Ok(b) => u256_as_scaled_f64(b, token.decimals),
Err(e) => {
warn!("Metric update failed for wallet {wallet_name} ({wallet_addr_str}) on chain {chain} balance for {name}; {e}", name=token.name);
continue;
}
};
trace!("Wallet {wallet_name} ({wallet_addr_str}) on chain {chain} balance is {balance}{}", token.symbol);
wallet_balance_metric
.with(&hashmap! {
"chain" => chain,
"wallet_address" => wallet_addr_str.as_str(),
"wallet_name" => wallet_name,
"token_address" => token_addr_str.as_str(),
"token_symbol" => token.symbol.as_str(),
"token_name" => token.symbol.as_str()
})
.set(balance);
}
}
}
}
impl<M: Middleware> Debug for PrometheusMiddleware<M> {

@ -15,6 +15,7 @@ bs58.workspace = true
color-eyre = { workspace = true, optional = true }
config.workspace = true
convert_case.workspace = true
derive_builder.workspace = true
derive-new.workspace = true
ed25519-dalek.workspace = true
ethers.workspace = true
@ -22,6 +23,7 @@ eyre.workspace = true
fuels.workspace = true
futures-util.workspace = true
itertools.workspace = true
maplit.workspace = true
paste.workspace = true
prometheus.workspace = true
rocksdb.workspace = true

@ -7,7 +7,10 @@ use hyperlane_core::config::*;
use tokio::task::JoinHandle;
use tracing::{debug_span, instrument::Instrumented, Instrument};
use crate::{metrics::CoreMetrics, settings::Settings};
use crate::{
metrics::{create_agent_metrics, AgentMetrics, CoreMetrics},
settings::Settings,
};
/// Properties shared across all hyperlane agents
#[derive(Debug)]
@ -36,7 +39,11 @@ pub trait BaseAgent: Send + Sync + Debug {
type Settings: LoadableFromSettings;
/// Instantiate the agent from the standard settings object
async fn from_settings(settings: Self::Settings, metrics: Arc<CoreMetrics>) -> Result<Self>
async fn from_settings(
settings: Self::Settings,
metrics: Arc<CoreMetrics>,
agent_metrics: AgentMetrics,
) -> Result<Self>
where
Self: Sized;
@ -68,7 +75,8 @@ pub async fn agent_main<A: BaseAgent>() -> Result<()> {
let metrics = settings.as_ref().metrics(A::AGENT_NAME)?;
core_settings.tracing.start_tracing(&metrics)?;
let agent = A::from_settings(settings, metrics.clone()).await?;
let agent_metrics = create_agent_metrics(&metrics)?;
let agent = A::from_settings(settings, metrics.clone(), agent_metrics).await?;
metrics.run_http_server();
agent.run().await.await?

@ -12,7 +12,7 @@ pub mod settings;
mod agent;
pub use agent::*;
mod metrics;
pub mod metrics;
pub use metrics::*;
mod contract_sync;

@ -0,0 +1,120 @@
use std::time::Duration;
use derive_builder::Builder;
use derive_new::new;
use eyre::Result;
use hyperlane_core::metrics::agent::u256_as_scaled_f64;
use hyperlane_core::HyperlaneDomain;
use hyperlane_core::HyperlaneProvider;
use maplit::hashmap;
use prometheus::GaugeVec;
use tokio::time::MissedTickBehavior;
use tracing::{trace, warn};
use crate::CoreMetrics;
/// Expected label names for the `wallet_balance` metric.
pub const WALLET_BALANCE_LABELS: &[&str] = &[
"chain",
"wallet_address",
"wallet_name",
"token_address",
"token_symbol",
"token_name",
];
/// Help string for the metric.
pub const WALLET_BALANCE_HELP: &str =
"Current native token balance for the wallet addresses in the `wallets` set";
/// Agent-specific metrics
#[derive(Clone, Builder)]
pub struct AgentMetrics {
/// Current balance of native tokens for the
/// wallet address.
/// - `chain`: the chain name (or chain ID if the name is unknown) of the
/// chain the tx occurred on.
/// - `wallet_address`: Address of the wallet holding the funds.
/// - `wallet_name`: Name of the address holding the funds.
/// - `token_address`: Address of the token.
/// - `token_symbol`: Symbol of the token.
/// - `token_name`: Full name of the token.
#[builder(setter(into, strip_option), default)]
wallet_balance: Option<GaugeVec>,
}
pub(crate) fn create_agent_metrics(metrics: &CoreMetrics) -> Result<AgentMetrics> {
Ok(AgentMetricsBuilder::default()
.wallet_balance(metrics.new_gauge(
"wallet_balance",
WALLET_BALANCE_HELP,
WALLET_BALANCE_LABELS,
)?)
.build()?)
}
/// Configuration for the prometheus middleware. This can be loaded via serde.
#[derive(Clone, Debug)]
#[cfg_attr(feature = "serde", derive(serde::Deserialize))]
#[cfg_attr(feature = "serde", serde(tag = "type", rename_all = "camelCase"))]
pub struct AgentMetricsConf {
/// The account to track
#[cfg_attr(feature = "serde", serde(default))]
pub address: Option<String>,
/// Information about the chain this metric is for
pub domain: HyperlaneDomain,
/// Name of the agent the metrics are about
pub name: String,
}
/// Utility struct to update agent metrics for a given chain
#[derive(new)]
pub struct AgentMetricsUpdater {
metrics: AgentMetrics,
conf: AgentMetricsConf,
provider: Box<dyn HyperlaneProvider>,
}
impl AgentMetricsUpdater {
async fn update_wallet_balances(&self) {
let Some(wallet_addr) = self.conf.address.clone() else {
return;
};
let wallet_name = self.conf.name.clone();
let Some(wallet_balance_metric) = self.metrics.wallet_balance.clone() else {
return;
};
let chain = self.conf.domain.name();
match self.provider.get_balance(wallet_addr.clone()).await {
Ok(balance) => {
// Okay, so the native type is not a token, but whatever, close enough.
// Note: This is ETH for many chains, but not all so that is why we use `N` and `Native`
// TODO: can we get away with scaling as 18 in all cases here? I am guessing not.
let balance = u256_as_scaled_f64(balance, self.conf.domain.domain_protocol());
trace!("Wallet {wallet_name} ({wallet_addr}) on chain {chain} balance is {balance} of the native currency");
wallet_balance_metric
.with(&hashmap! {
"chain" => chain,
"wallet_address" => wallet_addr.as_str(),
"wallet_name" => wallet_name.as_str(),
"token_address" => "none",
"token_symbol" => "Native",
"token_name" => "Native"
}).set(balance)
},
Err(e) => warn!("Metric update failed for wallet {wallet_name} ({wallet_addr}) on chain {chain} balance for native currency; {e}")
}
}
/// Periodically updates the metrics
pub async fn start_updating_on_interval(self, period: Duration) {
let mut interval = tokio::time::interval(period);
interval.set_missed_tick_behavior(MissedTickBehavior::Skip);
loop {
self.update_wallet_balances().await;
interval.tick().await;
}
}
}

@ -1,10 +1,14 @@
//! Useful metrics that all agents should track.
pub use self::core::*;
/// The metrics namespace prefix. All metric names will start with `{NAMESPACE}_`.
pub const NAMESPACE: &str = "hyperlane";
mod core;
pub use self::core::*;
mod agent_metrics;
mod json_rpc_client;
mod provider;
pub use self::agent_metrics::*;

@ -46,10 +46,5 @@ pub(crate) fn create_provider_metrics(metrics: &CoreMetrics) -> Result<Middlewar
TRANSACTION_SEND_TOTAL_HELP,
TRANSACTION_SEND_TOTAL_LABELS,
)?)
.wallet_balance(metrics.new_gauge(
"wallet_balance",
WALLET_BALANCE_HELP,
WALLET_BALANCE_LABELS,
)?)
.build()?)
}

@ -1,14 +1,13 @@
use ethers::prelude::Selector;
use h_cosmos::CosmosProvider;
use std::collections::HashMap;
use eyre::{eyre, Context, Result};
use ethers_prometheus::middleware::{
ChainInfo, ContractInfo, PrometheusMiddlewareConf, WalletInfo,
};
use ethers_prometheus::middleware::{ChainInfo, ContractInfo, PrometheusMiddlewareConf};
use hyperlane_core::{
AggregationIsm, CcipReadIsm, ContractLocator, HyperlaneAbi, HyperlaneDomain,
HyperlaneDomainProtocol, HyperlaneMessage, HyperlaneProvider, HyperlaneSigner, IndexMode,
HyperlaneDomainProtocol, HyperlaneMessage, HyperlaneProvider, IndexMode,
InterchainGasPaymaster, InterchainGasPayment, InterchainSecurityModule, Mailbox,
MerkleTreeHook, MerkleTreeInsertion, MultisigIsm, RoutingIsm, SequenceIndexer,
ValidatorAnnounce, H256,
@ -22,10 +21,13 @@ use hyperlane_fuel as h_fuel;
use hyperlane_sealevel as h_sealevel;
use crate::{
settings::signers::{BuildableWithSignerConf, ChainSigner, SignerConf},
metrics::AgentMetricsConf,
settings::signers::{BuildableWithSignerConf, SignerConf},
CoreMetrics,
};
use super::ChainSigner;
/// A chain setup is a domain ID, an address on that chain (where the mailbox is
/// deployed) and details for connecting to the chain API.
#[derive(Clone, Debug)]
@ -117,7 +119,16 @@ impl ChainConf {
}
ChainConnectionConf::Fuel(_) => todo!(),
ChainConnectionConf::Sealevel(_) => todo!(),
ChainConnectionConf::Cosmos(_) => todo!(),
ChainConnectionConf::Cosmos(conf) => {
let locator = self.locator(H256::zero());
let provider = CosmosProvider::new(
locator.domain.clone(),
conf.clone(),
Some(locator.clone()),
None,
)?;
Ok(Box::new(provider) as Box<dyn HyperlaneProvider>)
}
}
.context(ctx)
}
@ -639,13 +650,19 @@ impl ChainConf {
self.signer().await
}
/// Try to build an agent metrics configuration from the chain config
pub async fn agent_metrics_conf(&self, agent_name: String) -> Result<AgentMetricsConf> {
let chain_signer_address = self.chain_signer().await?.map(|s| s.address_string());
Ok(AgentMetricsConf {
address: chain_signer_address,
domain: self.domain.clone(),
name: agent_name,
})
}
/// Get a clone of the ethereum metrics conf with correctly configured
/// contract information.
fn metrics_conf(
&self,
agent_name: &str,
signer: &Option<impl HyperlaneSigner>,
) -> PrometheusMiddlewareConf {
pub fn metrics_conf(&self) -> PrometheusMiddlewareConf {
let mut cfg = self.metrics_conf.clone();
if cfg.chain.is_none() {
@ -654,14 +671,6 @@ impl ChainConf {
});
}
if let Some(signer) = signer {
cfg.wallets
.entry(signer.eth_address().into())
.or_insert_with(|| WalletInfo {
name: Some(agent_name.into()),
});
}
let mut register_contract = |name: &str, address: H256, fns: HashMap<Vec<u8>, String>| {
cfg.contracts
.entry(address.into())
@ -718,7 +727,7 @@ impl ChainConf {
B: BuildableWithProvider + Sync,
{
let signer = self.ethereum_signer().await?;
let metrics_conf = self.metrics_conf(metrics.agent_name(), &signer);
let metrics_conf = self.metrics_conf();
let rpc_metrics = Some(metrics.json_rpc_client_metrics());
let middleware_metrics = Some((metrics.provider_metrics(), metrics_conf));
let res = builder

@ -3,6 +3,7 @@ use std::time::Duration;
use async_trait::async_trait;
use ed25519_dalek::SecretKey;
use ethers::prelude::{AwsSigner, LocalWallet};
use ethers::utils::hex::ToHex;
use eyre::{bail, Context, Report};
use hyperlane_core::H256;
use hyperlane_sealevel::Keypair;
@ -96,7 +97,7 @@ impl BuildableWithSignerConf for hyperlane_ethereum::Signers {
impl ChainSigner for hyperlane_ethereum::Signers {
fn address_string(&self) -> String {
ethers::abi::AbiEncode::encode_hex(ethers::signers::Signer::address(self))
ethers::signers::Signer::address(self).encode_hex()
}
}

@ -5,6 +5,7 @@ use std::{
hash::{Hash, Hasher},
};
use derive_new::new;
use num_derive::FromPrimitive;
use num_traits::FromPrimitive;
#[cfg(feature = "strum")]
@ -18,7 +19,7 @@ pub struct Address(pub bytes::Bytes);
#[derive(Debug, Clone)]
pub struct Balance(pub num::BigInt);
#[derive(Debug, Clone)]
#[derive(Debug, Clone, new)]
pub struct ContractLocator<'a> {
pub domain: &'a HyperlaneDomain,
pub address: H256,

@ -26,6 +26,8 @@ pub mod utils;
pub mod test_utils;
pub mod config;
/// Prometheus metrics traits / utilities
pub mod metrics;
/// Core hyperlane system data structures
mod types;

@ -0,0 +1,29 @@
use crate::HyperlaneDomainProtocol;
use std::time::Duration;
use crate::U256;
const ETHEREUM_DECIMALS: u8 = 18;
const COSMOS_DECIMALS: u8 = 6;
const SOLANA_DECIMALS: u8 = 9;
/// Interval for querying the prometheus metrics endpoint.
/// This should be whatever the prometheus scrape interval is
pub const METRICS_SCRAPE_INTERVAL: Duration = Duration::from_secs(60);
/// Convert a u256 scaled integer value into the corresponding f64 value.
#[cfg(feature = "float")]
pub fn u256_as_scaled_f64(value: U256, domain: HyperlaneDomainProtocol) -> f64 {
let decimals = decimals_by_protocol(domain);
value.to_f64_lossy() / (10u64.pow(decimals as u32) as f64)
}
/// Get the decimals each protocol typically uses for its lowest denomination
/// of the native token
pub fn decimals_by_protocol(protocol: HyperlaneDomainProtocol) -> u8 {
match protocol {
HyperlaneDomainProtocol::Cosmos => COSMOS_DECIMALS,
HyperlaneDomainProtocol::Sealevel => SOLANA_DECIMALS,
_ => ETHEREUM_DECIMALS,
}
}

@ -0,0 +1,2 @@
/// Agent metrics utils
pub mod agent;

@ -4,7 +4,7 @@ use async_trait::async_trait;
use auto_impl::auto_impl;
use thiserror::Error;
use crate::{BlockInfo, ChainResult, HyperlaneChain, TxnInfo, H256};
use crate::{BlockInfo, ChainResult, HyperlaneChain, TxnInfo, H256, U256};
/// Interface for a provider. Allows abstraction over different provider types
/// for different chains.
@ -24,6 +24,9 @@ pub trait HyperlaneProvider: HyperlaneChain + Send + Sync + Debug {
/// Returns whether a contract exists at the provided address
async fn is_contract(&self, address: &H256) -> ChainResult<bool>;
/// Fetch the balance of the wallet address associated with the chain provider.
async fn get_balance(&self, address: String) -> ChainResult<U256>;
}
/// Errors when querying for provider information.

@ -10,7 +10,7 @@ publish.workspace = true
version.workspace = true
[dependencies]
hyperlane-core = { path = "../../hyperlane-core" }
hyperlane-core = { path = "../../hyperlane-core", features = ["float"]}
toml_edit.workspace = true
k256.workspace = true
ripemd.workspace = true

@ -7,6 +7,7 @@ use std::{env, fs};
use cosmwasm_schema::cw_serde;
use hpl_interface::types::bech32_decode;
use macro_rules_attribute::apply;
use maplit::hashmap;
use tempfile::tempdir;
mod cli;
@ -24,16 +25,17 @@ use utils::*;
use crate::cosmos::link::link_networks;
use crate::logging::log;
use crate::metrics::agent_balance_sum;
use crate::program::Program;
use crate::utils::{as_task, concat_path, stop_child, AgentHandles, TaskHandle};
use crate::AGENT_BIN_PATH;
use crate::{fetch_metric, AGENT_BIN_PATH};
use cli::{OsmosisCLI, OsmosisEndpoint};
use self::deploy::deploy_cw_hyperlane;
use self::source::{CLISource, CodeSource};
const OSMOSIS_CLI_GIT: &str = "https://github.com/osmosis-labs/osmosis";
const OSMOSIS_CLI_VERSION: &str = "19.0.0";
const OSMOSIS_CLI_VERSION: &str = "20.5.0";
const KEY_HPL_VALIDATOR: (&str,&str) = ("hpl-validator", "guard evolve region sentence danger sort despair eye deputy brave trim actor left recipe debate document upgrade sustain bus cage afford half demand pigeon");
const KEY_HPL_RELAYER: (&str,&str) = ("hpl-relayer", "moral item damp melt gloom vendor notice head assume balance doctor retire fashion trim find biology saddle undo switch fault cattle toast drip empty");
@ -257,7 +259,6 @@ fn launch_cosmos_validator(
.hyp_env("CHECKPOINTSYNCER_PATH", checkpoint_path.to_str().unwrap())
.hyp_env("CHECKPOINTSYNCER_TYPE", "localStorage")
.hyp_env("ORIGINCHAINNAME", agent_config.name)
.hyp_env("REORGPERIOD", "100")
.hyp_env("DB", validator_base_db.to_str().unwrap())
.hyp_env("METRICSPORT", agent_config.metrics_port.to_string())
.hyp_env("VALIDATOR_SIGNER_TYPE", agent_config.signer.typ)
@ -287,7 +288,6 @@ fn launch_cosmos_relayer(
.env("CONFIG_FILES", agent_config_path.to_str().unwrap())
.env("RUST_BACKTRACE", "1")
.hyp_env("RELAYCHAINS", relay_chains.join(","))
.hyp_env("REORGPERIOD", "100")
.hyp_env("DB", relayer_base.as_ref().to_str().unwrap())
.hyp_env("ALLOWLOCALCHECKPOINTSYNCERS", "true")
.hyp_env("TRACING_LEVEL", if debug { "debug" } else { "info" })
@ -460,6 +460,11 @@ fn run_locally() {
debug,
);
// give things a chance to fully start.
sleep(Duration::from_secs(10));
let starting_relayer_balance: f64 = agent_balance_sum(hpl_rly_metrics_port).unwrap();
// dispatch messages
let mut dispatched_messages = 0;
@ -517,12 +522,16 @@ fn run_locally() {
// Mostly copy-pasta from `rust/utils/run-locally/src/main.rs`
// TODO: refactor to share code
let loop_start = Instant::now();
// give things a chance to fully start.
sleep(Duration::from_secs(5));
let mut failure_occurred = false;
loop {
// look for the end condition.
if termination_invariants_met(hpl_rly_metrics_port, dispatched_messages).unwrap_or(false) {
if termination_invariants_met(
hpl_rly_metrics_port,
dispatched_messages,
starting_relayer_balance,
)
.unwrap_or(false)
{
// end condition reached successfully
break;
} else if (Instant::now() - loop_start).as_secs() > TIMEOUT_SECS {
@ -542,44 +551,62 @@ fn run_locally() {
}
}
fn termination_invariants_met(_metrics_port: u32, _messages_expected: u32) -> eyre::Result<bool> {
fn termination_invariants_met(
relayer_metrics_port: u32,
messages_expected: u32,
starting_relayer_balance: f64,
) -> eyre::Result<bool> {
let gas_payments_scraped = fetch_metric(
&relayer_metrics_port.to_string(),
"hyperlane_contract_sync_stored_events",
&hashmap! {"data_type" => "gas_payment"},
)?
.iter()
.sum::<u32>();
let expected_gas_payments = messages_expected;
if gas_payments_scraped != expected_gas_payments {
log!(
"Scraper has scraped {} gas payments, expected {}",
gas_payments_scraped,
expected_gas_payments
);
return Ok(false);
}
let delivered_messages_scraped = fetch_metric(
&relayer_metrics_port.to_string(),
"hyperlane_operations_processed_count",
&hashmap! {"phase" => "confirmed"},
)?
.iter()
.sum::<u32>();
if delivered_messages_scraped != messages_expected {
log!(
"Relayer confirmed {} submitted messages, expected {}",
delivered_messages_scraped,
messages_expected
);
return Ok(false);
}
let ending_relayer_balance: f64 = agent_balance_sum(relayer_metrics_port).unwrap();
// Make sure the balance was correctly updated in the metrics.
// Ideally, make sure that the difference is >= gas_per_tx * gas_cost, set here:
// https://github.com/hyperlane-xyz/hyperlane-monorepo/blob/c2288eb31734ba1f2f997e2c6ecb30176427bc2c/rust/utils/run-locally/src/cosmos/cli.rs#L55
// What's stopping this is that the format returned by the `uosmo` balance query is a surprisingly low number (0.000003999999995184)
// but then maybe the gas_per_tx is just very low - how can we check that? (maybe by simulating said tx)
if starting_relayer_balance <= ending_relayer_balance {
log!(
"Expected starting relayer balance to be greater than ending relayer balance, but got {} <= {}",
starting_relayer_balance,
ending_relayer_balance
);
return Ok(false);
}
log!("Termination invariants have been meet");
Ok(true)
// TODO: uncomment once CI passes consistently on Ubuntu
// let gas_payments_scraped = fetch_metric(
// "9093",
// "hyperlane_contract_sync_stored_events",
// &hashmap! {"data_type" => "gas_payment"},
// )?
// .iter()
// .sum::<u32>();
// let expected_gas_payments = messages_expected;
// if gas_payments_scraped != expected_gas_payments {
// log!(
// "Scraper has scraped {} gas payments, expected {}",
// gas_payments_scraped,
// expected_gas_payments
// );
// return Ok(false);
// }
// let delivered_messages_scraped = fetch_metric(
// "9093",
// "hyperlane_operations_processed_count",
// &hashmap! {"phase" => "confirmed"},
// )?
// .iter()
// .sum::<u32>();
// if delivered_messages_scraped != messages_expected {
// log!(
// "Relayer confirmed {} submitted messages, expected {}",
// delivered_messages_scraped,
// messages_expected
// );
// return Ok(false);
// }
// log!("Termination invariants have been meet");
// Ok(true)
}
#[cfg(test)]

@ -119,7 +119,6 @@ pub struct AgentConfig {
pub validator_announce: String,
pub merkle_tree_hook: String,
pub protocol: String,
pub finality_blocks: u32,
pub chain_id: String,
pub rpc_urls: Vec<AgentUrl>,
pub grpc_url: String,
@ -151,7 +150,6 @@ impl AgentConfig {
validator_announce: to_hex_addr(&network.deployments.va),
merkle_tree_hook: to_hex_addr(&network.deployments.hook_merkle),
protocol: "cosmos".to_string(),
finality_blocks: 1,
chain_id: format!("cosmos-test-{}", network.domain),
rpc_urls: vec![AgentUrl {
http: format!(

@ -1,6 +1,7 @@
// use std::path::Path;
use crate::config::Config;
use crate::metrics::agent_balance_sum;
use maplit::hashmap;
use crate::logging::log;
@ -15,6 +16,7 @@ pub const SOL_MESSAGES_EXPECTED: u32 = 0;
/// number of messages have been sent.
pub fn termination_invariants_met(
config: &Config,
starting_relayer_balance: f64,
// solana_cli_tools_path: &Path,
// solana_config_path: &Path,
) -> eyre::Result<bool> {
@ -129,6 +131,17 @@ pub fn termination_invariants_met(
return Ok(false);
}
let ending_relayer_balance: f64 = agent_balance_sum(9092).unwrap();
// Make sure the balance was correctly updated in the metrics.
if starting_relayer_balance <= ending_relayer_balance {
log!(
"Expected starting relayer balance to be greater than ending relayer balance, but got {} <= {}",
starting_relayer_balance,
ending_relayer_balance
);
return Ok(false);
}
log!("Termination invariants have been meet");
Ok(true)
}

@ -30,6 +30,7 @@ use crate::{
config::Config,
ethereum::start_anvil,
invariants::termination_invariants_met,
metrics::agent_balance_sum,
solana::*,
utils::{concat_path, make_static, stop_child, AgentHandles, ArbitraryData, TaskHandle},
};
@ -388,12 +389,13 @@ fn main() -> ExitCode {
let loop_start = Instant::now();
// give things a chance to fully start.
sleep(Duration::from_secs(5));
sleep(Duration::from_secs(10));
let mut failure_occurred = false;
let starting_relayer_balance: f64 = agent_balance_sum(9092).unwrap();
while !SHUTDOWN.load(Ordering::Relaxed) {
if config.ci_mode {
// for CI we have to look for the end condition.
if termination_invariants_met(&config)
if termination_invariants_met(&config, starting_relayer_balance)
// if termination_invariants_met(&config, &solana_path, &solana_config_path)
.unwrap_or(false)
{

@ -1,8 +1,14 @@
use std::collections::HashMap;
use std::{collections::HashMap, error::Error as StdError, str::FromStr};
use eyre::{eyre, Result};
use eyre::{eyre, ErrReport, Result};
use maplit::hashmap;
pub fn fetch_metric(port: &str, metric: &str, labels: &HashMap<&str, &str>) -> Result<Vec<u32>> {
/// Fetch a prometheus format metric, filtering by labels.
pub fn fetch_metric<T, E>(port: &str, metric: &str, labels: &HashMap<&str, &str>) -> Result<Vec<T>>
where
T: FromStr<Err = E>,
E: Into<ErrReport> + StdError + Send + Sync + 'static,
{
let resp = ureq::get(&format!("http://127.0.0.1:{}/metrics", port));
resp.call()?
.into_string()?
@ -16,10 +22,19 @@ pub fn fetch_metric(port: &str, metric: &str, labels: &HashMap<&str, &str>) -> R
.all(|(k, v)| l.contains(&format!("{k}=\"{v}")))
})
.map(|l| {
Ok(l.rsplit_once(' ')
.ok_or(eyre!("Unknown metric format"))?
.1
.parse::<u32>()?)
let value = l.rsplit_once(' ').ok_or(eyre!("Unknown metric format"))?.1;
Ok(value.parse::<T>()?)
})
.collect()
}
pub fn agent_balance_sum(metrics_port: u32) -> eyre::Result<f64> {
let balance = fetch_metric(
&metrics_port.to_string(),
"hyperlane_wallet_balance",
&hashmap! {},
)?
.iter()
.sum();
Ok(balance)
}

Loading…
Cancel
Save