diff --git a/rust/README.md b/rust/README.md index 22bf8b54e..289d1dc1d 100644 --- a/rust/README.md +++ b/rust/README.md @@ -90,7 +90,7 @@ env $(cat ./config/validator.fuji.env | grep -v "#" | xargs) ./target/debug/vali Clone `hyperlane-registry` repo next to `hyperlane-monorepo` repo. -To perform an automated e2e test of the agents locally, from within the `hyperlane-monorepo/rust` directory, run: +To perform an automated e2e test of the agents locally, from within the `hyperlane-monorepo/rust/main` directory, run: ```bash cargo run --release --bin run-locally diff --git a/rust/main/chains/hyperlane-cosmos/src/aggregation_ism.rs b/rust/main/chains/hyperlane-cosmos/src/aggregation_ism.rs index 835b2cf04..d9a1f5391 100644 --- a/rust/main/chains/hyperlane-cosmos/src/aggregation_ism.rs +++ b/rust/main/chains/hyperlane-cosmos/src/aggregation_ism.rs @@ -33,7 +33,7 @@ impl CosmosAggregationIsm { let provider = CosmosProvider::new( locator.domain.clone(), conf.clone(), - Some(locator.clone()), + locator.clone(), signer, )?; diff --git a/rust/main/chains/hyperlane-cosmos/src/interchain_gas.rs b/rust/main/chains/hyperlane-cosmos/src/interchain_gas.rs index 0ab0a707e..1e1cb2b32 100644 --- a/rust/main/chains/hyperlane-cosmos/src/interchain_gas.rs +++ b/rust/main/chains/hyperlane-cosmos/src/interchain_gas.rs @@ -1,25 +1,24 @@ +use std::ops::RangeInclusive; + use async_trait::async_trait; use base64::{engine::general_purpose::STANDARD as BASE64, Engine}; +use once_cell::sync::Lazy; +use tendermint::abci::EventAttribute; +use tracing::instrument; + use hyperlane_core::{ ChainCommunicationError, ChainResult, ContractLocator, HyperlaneChain, HyperlaneContract, HyperlaneDomain, HyperlaneProvider, Indexed, Indexer, InterchainGasPaymaster, - InterchainGasPayment, LogMeta, SequenceAwareIndexer, H256, U256, + InterchainGasPayment, LogMeta, SequenceAwareIndexer, H256, H512, U256, }; -use once_cell::sync::Lazy; -use std::ops::RangeInclusive; -use tendermint::abci::EventAttribute; -use tracing::instrument; -use crate::utils::parse_logs_in_range; -use crate::{ - rpc::{CosmosWasmIndexer, ParsedEvent, WasmIndexer}, - signers::Signer, - utils::{ - execute_and_parse_log_futures, CONTRACT_ADDRESS_ATTRIBUTE_KEY, - CONTRACT_ADDRESS_ATTRIBUTE_KEY_BASE64, - }, - ConnectionConf, CosmosProvider, HyperlaneCosmosError, +use crate::rpc::{CosmosWasmRpcProvider, ParsedEvent, WasmRpcProvider}; +use crate::signers::Signer; +use crate::utils::{ + execute_and_parse_log_futures, parse_logs_in_range, parse_logs_in_tx, + CONTRACT_ADDRESS_ATTRIBUTE_KEY, CONTRACT_ADDRESS_ATTRIBUTE_KEY_BASE64, }; +use crate::{ConnectionConf, CosmosProvider, HyperlaneCosmosError}; /// A reference to a InterchainGasPaymaster contract on some Cosmos chain #[derive(Debug)] @@ -57,7 +56,7 @@ impl CosmosInterchainGasPaymaster { let provider = CosmosProvider::new( locator.domain.clone(), conf.clone(), - Some(locator.clone()), + locator.clone(), signer, )?; @@ -90,7 +89,7 @@ static DESTINATION_ATTRIBUTE_KEY_BASE64: Lazy = /// A reference to a InterchainGasPaymasterIndexer contract on some Cosmos chain #[derive(Debug, Clone)] pub struct CosmosInterchainGasPaymasterIndexer { - indexer: Box, + provider: Box, } impl CosmosInterchainGasPaymasterIndexer { @@ -103,7 +102,7 @@ impl CosmosInterchainGasPaymasterIndexer { locator: ContractLocator, reorg_period: u32, ) -> ChainResult { - let indexer = CosmosWasmIndexer::new( + let provider = CosmosWasmRpcProvider::new( conf, locator, Self::INTERCHAIN_GAS_PAYMENT_EVENT_TYPE.into(), @@ -111,7 +110,7 @@ impl CosmosInterchainGasPaymasterIndexer { )?; Ok(Self { - indexer: Box::new(indexer), + provider: Box::new(provider), }) } @@ -211,7 +210,7 @@ impl Indexer for CosmosInterchainGasPaymasterIndexer { ) -> ChainResult, LogMeta)>> { let logs_futures = parse_logs_in_range( range, - self.indexer.clone(), + self.provider.clone(), Self::interchain_gas_payment_parser, "InterchainGasPaymentCursor", ); @@ -220,7 +219,21 @@ impl Indexer for CosmosInterchainGasPaymasterIndexer { } async fn get_finalized_block_number(&self) -> ChainResult { - self.indexer.get_finalized_block_number().await + self.provider.get_finalized_block_number().await + } + + async fn fetch_logs_by_tx_hash( + &self, + tx_hash: H512, + ) -> ChainResult, LogMeta)>> { + parse_logs_in_tx( + &tx_hash.into(), + self.provider.clone(), + Self::interchain_gas_payment_parser, + "InterchainGasPaymentReceiver", + ) + .await + .map(|v| v.into_iter().map(|(m, l)| (m.into(), l)).collect()) } } @@ -269,10 +282,12 @@ impl TryInto for IncompleteInterchainGasPayment { #[cfg(test)] mod tests { - use hyperlane_core::{InterchainGasPayment, H256, U256}; use std::str::FromStr; - use crate::{rpc::ParsedEvent, utils::event_attributes_from_str}; + use hyperlane_core::{InterchainGasPayment, H256, U256}; + + use crate::providers::rpc::ParsedEvent; + use crate::utils::event_attributes_from_str; use super::*; diff --git a/rust/main/chains/hyperlane-cosmos/src/interchain_security_module.rs b/rust/main/chains/hyperlane-cosmos/src/interchain_security_module.rs index dd495be89..e23807a58 100644 --- a/rust/main/chains/hyperlane-cosmos/src/interchain_security_module.rs +++ b/rust/main/chains/hyperlane-cosmos/src/interchain_security_module.rs @@ -38,7 +38,7 @@ impl CosmosInterchainSecurityModule { let provider = CosmosProvider::new( locator.domain.clone(), conf.clone(), - Some(locator.clone()), + locator.clone(), signer, )?; diff --git a/rust/main/chains/hyperlane-cosmos/src/mailbox/contract.rs b/rust/main/chains/hyperlane-cosmos/src/mailbox/contract.rs index 754a84ce7..9d9f39c54 100644 --- a/rust/main/chains/hyperlane-cosmos/src/mailbox/contract.rs +++ b/rust/main/chains/hyperlane-cosmos/src/mailbox/contract.rs @@ -41,7 +41,7 @@ impl CosmosMailbox { let provider = CosmosProvider::new( locator.domain.clone(), conf.clone(), - Some(locator.clone()), + locator.clone(), signer, )?; diff --git a/rust/main/chains/hyperlane-cosmos/src/mailbox/delivery_indexer.rs b/rust/main/chains/hyperlane-cosmos/src/mailbox/delivery_indexer.rs index 7279c67fe..ccc92df70 100644 --- a/rust/main/chains/hyperlane-cosmos/src/mailbox/delivery_indexer.rs +++ b/rust/main/chains/hyperlane-cosmos/src/mailbox/delivery_indexer.rs @@ -9,14 +9,14 @@ use tendermint::abci::EventAttribute; use tracing::instrument; use hyperlane_core::{ - ChainCommunicationError, ChainResult, ContractLocator, Delivery, Indexed, Indexer, LogMeta, - SequenceAwareIndexer, H256, + ChainCommunicationError, ChainResult, ContractLocator, Delivery, HyperlaneMessage, Indexed, + Indexer, LogMeta, SequenceAwareIndexer, H256, H512, }; -use crate::rpc::{CosmosWasmIndexer, ParsedEvent, WasmIndexer}; +use crate::rpc::{CosmosWasmRpcProvider, ParsedEvent, WasmRpcProvider}; use crate::utils::{ - execute_and_parse_log_futures, parse_logs_in_range, CONTRACT_ADDRESS_ATTRIBUTE_KEY, - CONTRACT_ADDRESS_ATTRIBUTE_KEY_BASE64, + execute_and_parse_log_futures, parse_logs_in_range, parse_logs_in_tx, + CONTRACT_ADDRESS_ATTRIBUTE_KEY, CONTRACT_ADDRESS_ATTRIBUTE_KEY_BASE64, }; use crate::{ConnectionConf, HyperlaneCosmosError, Signer}; @@ -28,7 +28,7 @@ static MESSAGE_ID_ATTRIBUTE_KEY_BASE64: Lazy = /// Struct that retrieves delivery event data for a Cosmos Mailbox contract pub struct CosmosMailboxDeliveryIndexer { - indexer: Box, + provider: Box, } impl CosmosMailboxDeliveryIndexer { @@ -40,7 +40,7 @@ impl CosmosMailboxDeliveryIndexer { signer: Option, reorg_period: u32, ) -> ChainResult { - let indexer = CosmosWasmIndexer::new( + let provider = CosmosWasmRpcProvider::new( conf, locator, MESSAGE_DELIVERY_EVENT_TYPE.to_owned(), @@ -48,7 +48,7 @@ impl CosmosMailboxDeliveryIndexer { )?; Ok(Self { - indexer: Box::new(indexer), + provider: Box::new(provider), }) } @@ -114,7 +114,7 @@ impl Indexer for CosmosMailboxDeliveryIndexer { ) -> ChainResult, LogMeta)>> { let logs_futures = parse_logs_in_range( range, - self.indexer.clone(), + self.provider.clone(), Self::hyperlane_delivery_parser, "DeliveryCursor", ); @@ -123,7 +123,21 @@ impl Indexer for CosmosMailboxDeliveryIndexer { } async fn get_finalized_block_number(&self) -> ChainResult { - self.indexer.get_finalized_block_number().await + self.provider.get_finalized_block_number().await + } + + async fn fetch_logs_by_tx_hash( + &self, + tx_hash: H512, + ) -> ChainResult, LogMeta)>> { + parse_logs_in_tx( + &tx_hash.into(), + self.provider.clone(), + Self::hyperlane_delivery_parser, + "DeliveryReceiver", + ) + .await + .map(|v| v.into_iter().map(|(m, l)| (m.into(), l)).collect()) } } diff --git a/rust/main/chains/hyperlane-cosmos/src/mailbox/dispatch_indexer.rs b/rust/main/chains/hyperlane-cosmos/src/mailbox/dispatch_indexer.rs index 77fc6579c..433ea661e 100644 --- a/rust/main/chains/hyperlane-cosmos/src/mailbox/dispatch_indexer.rs +++ b/rust/main/chains/hyperlane-cosmos/src/mailbox/dispatch_indexer.rs @@ -9,13 +9,13 @@ use tracing::instrument; use hyperlane_core::{ ChainCommunicationError, ChainResult, ContractLocator, Decode, HyperlaneMessage, Indexed, - Indexer, LogMeta, SequenceAwareIndexer, + Indexer, LogMeta, SequenceAwareIndexer, H512, }; -use crate::rpc::{CosmosWasmIndexer, ParsedEvent, WasmIndexer}; +use crate::rpc::{CosmosWasmRpcProvider, ParsedEvent, WasmRpcProvider}; use crate::utils::{ - execute_and_parse_log_futures, parse_logs_in_range, CONTRACT_ADDRESS_ATTRIBUTE_KEY, - CONTRACT_ADDRESS_ATTRIBUTE_KEY_BASE64, + execute_and_parse_log_futures, parse_logs_in_range, parse_logs_in_tx, + CONTRACT_ADDRESS_ATTRIBUTE_KEY, CONTRACT_ADDRESS_ATTRIBUTE_KEY_BASE64, }; use crate::{ConnectionConf, CosmosMailbox, HyperlaneCosmosError, Signer}; @@ -29,7 +29,7 @@ static MESSAGE_ATTRIBUTE_KEY_BASE64: Lazy = #[derive(Debug, Clone)] pub struct CosmosMailboxDispatchIndexer { mailbox: CosmosMailbox, - indexer: Box, + provider: Box, } impl CosmosMailboxDispatchIndexer { @@ -42,7 +42,7 @@ impl CosmosMailboxDispatchIndexer { reorg_period: u32, ) -> ChainResult { let mailbox = CosmosMailbox::new(conf.clone(), locator.clone(), signer.clone())?; - let indexer = CosmosWasmIndexer::new( + let provider = CosmosWasmRpcProvider::new( conf, locator, MESSAGE_DISPATCH_EVENT_TYPE.into(), @@ -51,7 +51,7 @@ impl CosmosMailboxDispatchIndexer { Ok(Self { mailbox, - indexer: Box::new(indexer), + provider: Box::new(provider), }) } @@ -116,7 +116,7 @@ impl Indexer for CosmosMailboxDispatchIndexer { ) -> ChainResult, LogMeta)>> { let logs_futures = parse_logs_in_range( range, - self.indexer.clone(), + self.provider.clone(), Self::hyperlane_message_parser, "HyperlaneMessageCursor", ); @@ -125,7 +125,21 @@ impl Indexer for CosmosMailboxDispatchIndexer { } async fn get_finalized_block_number(&self) -> ChainResult { - self.indexer.get_finalized_block_number().await + self.provider.get_finalized_block_number().await + } + + async fn fetch_logs_by_tx_hash( + &self, + tx_hash: H512, + ) -> ChainResult, LogMeta)>> { + parse_logs_in_tx( + &tx_hash.into(), + self.provider.clone(), + Self::hyperlane_message_parser, + "HyperlaneMessageReceiver", + ) + .await + .map(|v| v.into_iter().map(|(m, l)| (m.into(), l)).collect()) } } @@ -144,7 +158,8 @@ impl SequenceAwareIndexer for CosmosMailboxDispatchIndexer { mod tests { use hyperlane_core::HyperlaneMessage; - use crate::{rpc::ParsedEvent, utils::event_attributes_from_str}; + use crate::providers::rpc::ParsedEvent; + use crate::utils::event_attributes_from_str; use super::*; diff --git a/rust/main/chains/hyperlane-cosmos/src/merkle_tree_hook.rs b/rust/main/chains/hyperlane-cosmos/src/merkle_tree_hook.rs index ce9283fe2..c9e48c59f 100644 --- a/rust/main/chains/hyperlane-cosmos/src/merkle_tree_hook.rs +++ b/rust/main/chains/hyperlane-cosmos/src/merkle_tree_hook.rs @@ -2,27 +2,26 @@ use std::{fmt::Debug, num::NonZeroU64, ops::RangeInclusive, str::FromStr}; use async_trait::async_trait; use base64::{engine::general_purpose::STANDARD as BASE64, Engine}; -use hyperlane_core::{ - accumulator::incremental::IncrementalMerkle, ChainCommunicationError, ChainResult, Checkpoint, - ContractLocator, HyperlaneChain, HyperlaneContract, HyperlaneDomain, HyperlaneProvider, - Indexed, Indexer, LogMeta, MerkleTreeHook, MerkleTreeInsertion, SequenceAwareIndexer, H256, -}; use once_cell::sync::Lazy; use tendermint::abci::EventAttribute; use tracing::instrument; -use crate::utils::parse_logs_in_range; -use crate::{ - grpc::WasmProvider, - payloads::{general, merkle_tree_hook}, - rpc::{CosmosWasmIndexer, ParsedEvent, WasmIndexer}, - utils::{ - execute_and_parse_log_futures, get_block_height_for_lag, CONTRACT_ADDRESS_ATTRIBUTE_KEY, - CONTRACT_ADDRESS_ATTRIBUTE_KEY_BASE64, - }, - ConnectionConf, CosmosProvider, HyperlaneCosmosError, Signer, +use hyperlane_core::accumulator::incremental::IncrementalMerkle; +use hyperlane_core::{ + ChainCommunicationError, ChainResult, Checkpoint, ContractLocator, HyperlaneChain, + HyperlaneContract, HyperlaneDomain, HyperlaneProvider, Indexed, Indexer, LogMeta, + MerkleTreeHook, MerkleTreeInsertion, SequenceAwareIndexer, H256, H512, }; +use crate::grpc::WasmProvider; +use crate::payloads::{general, merkle_tree_hook}; +use crate::rpc::{CosmosWasmRpcProvider, ParsedEvent, WasmRpcProvider}; +use crate::utils::{ + execute_and_parse_log_futures, get_block_height_for_lag, parse_logs_in_range, parse_logs_in_tx, + CONTRACT_ADDRESS_ATTRIBUTE_KEY, CONTRACT_ADDRESS_ATTRIBUTE_KEY_BASE64, +}; +use crate::{ConnectionConf, CosmosProvider, HyperlaneCosmosError, Signer}; + #[derive(Debug, Clone)] /// A reference to a MerkleTreeHook contract on some Cosmos chain pub struct CosmosMerkleTreeHook { @@ -44,7 +43,7 @@ impl CosmosMerkleTreeHook { let provider = CosmosProvider::new( locator.domain.clone(), conf.clone(), - Some(locator.clone()), + locator.clone(), signer, )?; @@ -188,8 +187,8 @@ pub(crate) static MESSAGE_ID_ATTRIBUTE_KEY_BASE64: Lazy = pub struct CosmosMerkleTreeHookIndexer { /// The CosmosMerkleTreeHook merkle_tree_hook: CosmosMerkleTreeHook, - /// Cosmwasm indexer instance - indexer: Box, + /// Cosmwasm RPC provider instance + provider: Box, } impl CosmosMerkleTreeHookIndexer { @@ -203,7 +202,7 @@ impl CosmosMerkleTreeHookIndexer { signer: Option, reorg_period: u32, ) -> ChainResult { - let indexer = CosmosWasmIndexer::new( + let provider = CosmosWasmRpcProvider::new( conf.clone(), locator.clone(), Self::MERKLE_TREE_INSERTION_EVENT_TYPE.into(), @@ -212,7 +211,7 @@ impl CosmosMerkleTreeHookIndexer { Ok(Self { merkle_tree_hook: CosmosMerkleTreeHook::new(conf, locator, signer)?, - indexer: Box::new(indexer), + provider: Box::new(provider), }) } @@ -287,7 +286,7 @@ impl Indexer for CosmosMerkleTreeHookIndexer { ) -> ChainResult, LogMeta)>> { let logs_futures = parse_logs_in_range( range, - self.indexer.clone(), + self.provider.clone(), Self::merkle_tree_insertion_parser, "MerkleTreeInsertionCursor", ); @@ -297,7 +296,21 @@ impl Indexer for CosmosMerkleTreeHookIndexer { /// Get the chain's latest block number that has reached finality async fn get_finalized_block_number(&self) -> ChainResult { - self.indexer.get_finalized_block_number().await + self.provider.get_finalized_block_number().await + } + + async fn fetch_logs_by_tx_hash( + &self, + tx_hash: H512, + ) -> ChainResult, LogMeta)>> { + parse_logs_in_tx( + &tx_hash.into(), + self.provider.clone(), + Self::merkle_tree_insertion_parser, + "MerkleTreeInsertionReceiver", + ) + .await + .map(|v| v.into_iter().map(|(m, l)| (m.into(), l)).collect()) } } @@ -337,10 +350,12 @@ impl TryInto for IncompleteMerkleTreeInsertion { #[cfg(test)] mod tests { - use hyperlane_core::H256; use std::str::FromStr; - use crate::{rpc::ParsedEvent, utils::event_attributes_from_str}; + use hyperlane_core::H256; + + use crate::providers::rpc::ParsedEvent; + use crate::utils::event_attributes_from_str; use super::*; diff --git a/rust/main/chains/hyperlane-cosmos/src/multisig_ism.rs b/rust/main/chains/hyperlane-cosmos/src/multisig_ism.rs index d558acfa3..0b7f49234 100644 --- a/rust/main/chains/hyperlane-cosmos/src/multisig_ism.rs +++ b/rust/main/chains/hyperlane-cosmos/src/multisig_ism.rs @@ -30,7 +30,7 @@ impl CosmosMultisigIsm { let provider = CosmosProvider::new( locator.domain.clone(), conf.clone(), - Some(locator.clone()), + locator.clone(), signer, )?; diff --git a/rust/main/chains/hyperlane-cosmos/src/providers.rs b/rust/main/chains/hyperlane-cosmos/src/providers.rs new file mode 100644 index 000000000..3c3a79fd8 --- /dev/null +++ b/rust/main/chains/hyperlane-cosmos/src/providers.rs @@ -0,0 +1,8 @@ +pub use cosmos::CosmosProvider; + +/// cosmos provider +mod cosmos; +/// cosmos grpc provider +pub mod grpc; +/// cosmos rpc provider +pub mod rpc; diff --git a/rust/main/chains/hyperlane-cosmos/src/providers/cosmos.rs b/rust/main/chains/hyperlane-cosmos/src/providers/cosmos.rs new file mode 100644 index 000000000..9fea74725 --- /dev/null +++ b/rust/main/chains/hyperlane-cosmos/src/providers/cosmos.rs @@ -0,0 +1,3 @@ +pub use provider::CosmosProvider; + +mod provider; diff --git a/rust/main/chains/hyperlane-cosmos/src/providers/mod.rs b/rust/main/chains/hyperlane-cosmos/src/providers/cosmos/provider.rs similarity index 87% rename from rust/main/chains/hyperlane-cosmos/src/providers/mod.rs rename to rust/main/chains/hyperlane-cosmos/src/providers/cosmos/provider.rs index 0d3d2a854..5f207cc8a 100644 --- a/rust/main/chains/hyperlane-cosmos/src/providers/mod.rs +++ b/rust/main/chains/hyperlane-cosmos/src/providers/cosmos/provider.rs @@ -15,24 +15,18 @@ use hyperlane_core::{ }; use crate::address::CosmosAddress; -use crate::grpc::WasmProvider; +use crate::grpc::{WasmGrpcProvider, WasmProvider}; use crate::libs::account::CosmosAccountId; +use crate::providers::rpc::CosmosRpcClient; use crate::{ConnectionConf, CosmosAmount, HyperlaneCosmosError, Signer}; -use self::grpc::WasmGrpcProvider; - -/// cosmos grpc provider -pub mod grpc; -/// cosmos rpc provider -pub mod rpc; - /// Abstraction over a connection to a Cosmos chain #[derive(Debug, Clone)] pub struct CosmosProvider { domain: HyperlaneDomain, connection_conf: ConnectionConf, - grpc_client: WasmGrpcProvider, - rpc_client: HttpClient, + grpc_provider: WasmGrpcProvider, + rpc_client: CosmosRpcClient, } impl CosmosProvider { @@ -40,43 +34,30 @@ impl CosmosProvider { pub fn new( domain: HyperlaneDomain, conf: ConnectionConf, - locator: Option, + locator: ContractLocator, signer: Option, ) -> ChainResult { let gas_price = CosmosAmount::try_from(conf.get_minimum_gas_price().clone())?; - let grpc_client = WasmGrpcProvider::new( + let grpc_provider = WasmGrpcProvider::new( domain.clone(), conf.clone(), gas_price.clone(), locator, signer, )?; - let rpc_client = HttpClient::builder( - conf.get_rpc_url() - .parse() - .map_err(Into::::into)?, - ) - // Consider supporting different compatibility modes. - .compat_mode(CompatMode::latest()) - .build() - .map_err(Into::::into)?; + let rpc_client = CosmosRpcClient::new(&conf)?; Ok(Self { domain, connection_conf: conf, + grpc_provider, rpc_client, - grpc_client, }) } /// Get a grpc client pub fn grpc(&self) -> &WasmGrpcProvider { - &self.grpc_client - } - - /// Get an rpc client - pub fn rpc(&self) -> &HttpClient { - &self.rpc_client + &self.grpc_provider } fn search_payer_in_signer_infos( @@ -182,11 +163,7 @@ impl HyperlaneProvider for CosmosProvider { let tendermint_hash = Hash::from_bytes(Algorithm::Sha256, hash.as_bytes()) .expect("block hash should be of correct size"); - let response = self - .rpc_client - .block_by_hash(tendermint_hash) - .await - .map_err(ChainCommunicationError::from_other)?; + let response = self.rpc_client.get_block_by_hash(tendermint_hash).await?; let received_hash = H256::from_slice(response.block_id.hash.as_bytes()); @@ -218,11 +195,7 @@ impl HyperlaneProvider for CosmosProvider { let tendermint_hash = Hash::from_bytes(Algorithm::Sha256, hash.as_bytes()) .expect("transaction hash should be of correct size"); - let response = self - .rpc_client - .tx(tendermint_hash, false) - .await - .map_err(Into::::into)?; + let response = self.rpc_client.get_tx_by_hash(tendermint_hash).await?; let received_hash = H256::from_slice(response.hash.as_bytes()); @@ -269,7 +242,7 @@ impl HyperlaneProvider for CosmosProvider { } async fn is_contract(&self, address: &H256) -> ChainResult { - match self.grpc_client.wasm_contract_info().await { + match self.grpc_provider.wasm_contract_info().await { Ok(c) => Ok(true), Err(e) => Ok(false), } @@ -277,7 +250,7 @@ impl HyperlaneProvider for CosmosProvider { async fn get_balance(&self, address: String) -> ChainResult { Ok(self - .grpc_client + .grpc_provider .get_balance(address, self.connection_conf.get_canonical_asset()) .await?) } diff --git a/rust/main/chains/hyperlane-cosmos/src/providers/grpc.rs b/rust/main/chains/hyperlane-cosmos/src/providers/grpc.rs index e0a6388a4..4b521658a 100644 --- a/rust/main/chains/hyperlane-cosmos/src/providers/grpc.rs +++ b/rust/main/chains/hyperlane-cosmos/src/providers/grpc.rs @@ -123,7 +123,7 @@ pub struct WasmGrpcProvider { conf: ConnectionConf, /// A contract address that can be used as the default /// for queries / sends / estimates. - contract_address: Option, + contract_address: CosmosAddress, /// Signer for transactions. signer: Option, /// GRPC Channel that can be cheaply cloned. @@ -138,7 +138,7 @@ impl WasmGrpcProvider { domain: HyperlaneDomain, conf: ConnectionConf, gas_price: CosmosAmount, - locator: Option, + locator: ContractLocator, signer: Option, ) -> ChainResult { // get all the configured grpc urls and convert them to a Vec @@ -156,15 +156,11 @@ impl WasmGrpcProvider { let fallback_provider = builder.build(); let provider = CosmosFallbackProvider::new(fallback_provider); - let contract_address = locator - .map(|l| { - CosmosAddress::from_h256( - l.address, - &conf.get_bech32_prefix(), - conf.get_contract_address_bytes(), - ) - }) - .transpose()?; + let contract_address = CosmosAddress::from_h256( + locator.address, + &conf.get_bech32_prefix(), + conf.get_contract_address_bytes(), + )?; Ok(Self { domain, @@ -446,11 +442,8 @@ impl WasmGrpcProvider { }) } - fn get_contract_address(&self) -> Result<&CosmosAddress, ChainCommunicationError> { - let contract_address = self.contract_address.as_ref().ok_or_else(|| { - ChainCommunicationError::from_other_str("No contract address available") - })?; - Ok(contract_address) + fn get_contract_address(&self) -> &CosmosAddress { + &self.contract_address } } @@ -488,7 +481,7 @@ impl WasmProvider for WasmGrpcProvider { where T: Serialize + Send + Sync + Clone + Debug, { - let contract_address = self.get_contract_address()?; + let contract_address = self.get_contract_address(); let query_data = serde_json::to_string(&payload)?.as_bytes().to_vec(); let response = self .provider @@ -522,7 +515,7 @@ impl WasmProvider for WasmGrpcProvider { } async fn wasm_contract_info(&self) -> ChainResult { - let contract_address = self.get_contract_address()?; + let contract_address = self.get_contract_address(); let response = self .provider .call(move |provider| { @@ -557,7 +550,7 @@ impl WasmProvider for WasmGrpcProvider { T: Serialize + Send + Sync + Clone + Debug, { let signer = self.get_signer()?; - let contract_address = self.get_contract_address()?; + let contract_address = self.get_contract_address(); let msgs = vec![MsgExecuteContract { sender: signer.address.clone(), contract: contract_address.address(), @@ -625,7 +618,7 @@ impl WasmProvider for WasmGrpcProvider { // Estimating gas requires a signer, which we can reasonably expect to have // since we need one to send a tx with the estimated gas anyways. let signer = self.get_signer()?; - let contract_address = self.get_contract_address()?; + let contract_address = self.get_contract_address(); let msg = MsgExecuteContract { sender: signer.address.clone(), contract: contract_address.address(), diff --git a/rust/main/chains/hyperlane-cosmos/src/providers/grpc/tests.rs b/rust/main/chains/hyperlane-cosmos/src/providers/grpc/tests.rs index 9a59b6007..451c67b22 100644 --- a/rust/main/chains/hyperlane-cosmos/src/providers/grpc/tests.rs +++ b/rust/main/chains/hyperlane-cosmos/src/providers/grpc/tests.rs @@ -49,7 +49,7 @@ async fn test_wasm_contract_info_no_contract() { fn provider(address: &str) -> WasmGrpcProvider { let domain = HyperlaneDomain::Known(KnownHyperlaneDomain::Neutron); let address = CosmosAddress::from_str(address).unwrap(); - let locator = Some(ContractLocator::new(&domain, address.digest())); + let locator = ContractLocator::new(&domain, address.digest()); WasmGrpcProvider::new( domain.clone(), diff --git a/rust/main/chains/hyperlane-cosmos/src/providers/rpc.rs b/rust/main/chains/hyperlane-cosmos/src/providers/rpc.rs index fac93e008..08864b26c 100644 --- a/rust/main/chains/hyperlane-cosmos/src/providers/rpc.rs +++ b/rust/main/chains/hyperlane-cosmos/src/providers/rpc.rs @@ -1,251 +1,5 @@ -use async_trait::async_trait; -use cosmrs::rpc::client::Client; -use hyperlane_core::{ChainCommunicationError, ChainResult, ContractLocator, LogMeta, H256, U256}; -use sha256::digest; -use std::fmt::Debug; -use tendermint::abci::{Event, EventAttribute}; -use tendermint::hash::Algorithm; -use tendermint::Hash; -use tendermint_rpc::endpoint::block::Response as BlockResponse; -use tendermint_rpc::endpoint::block_results::Response as BlockResultsResponse; -use tendermint_rpc::HttpClient; -use tracing::{debug, instrument, trace}; +pub use client::CosmosRpcClient; +pub use provider::{CosmosWasmRpcProvider, ParsedEvent, WasmRpcProvider}; -use crate::address::CosmosAddress; -use crate::{ConnectionConf, CosmosProvider, HyperlaneCosmosError}; - -#[async_trait] -/// Trait for wasm indexer. Use rpc provider -pub trait WasmIndexer: Send + Sync { - /// Get the finalized block height. - async fn get_finalized_block_number(&self) -> ChainResult; - - /// Get logs for the given block using the given parser. - async fn get_logs_in_block( - &self, - block_number: u32, - parser: for<'a> fn(&'a Vec) -> ChainResult>, - cursor_label: &'static str, - ) -> ChainResult> - where - T: Send + Sync + PartialEq + Debug + 'static; -} - -#[derive(Debug, Eq, PartialEq)] -/// An event parsed from the RPC response. -pub struct ParsedEvent { - contract_address: String, - event: T, -} - -impl ParsedEvent { - /// Create a new ParsedEvent. - pub fn new(contract_address: String, event: T) -> Self { - Self { - contract_address, - event, - } - } - - /// Get the inner event - pub fn inner(self) -> T { - self.event - } -} - -#[derive(Debug, Clone)] -/// Cosmwasm RPC Provider -pub struct CosmosWasmIndexer { - provider: CosmosProvider, - contract_address: CosmosAddress, - target_event_kind: String, - reorg_period: u32, -} - -impl CosmosWasmIndexer { - const WASM_TYPE: &'static str = "wasm"; - - /// create new Cosmwasm RPC Provider - pub fn new( - conf: ConnectionConf, - locator: ContractLocator, - event_type: String, - reorg_period: u32, - ) -> ChainResult { - let provider = CosmosProvider::new( - locator.domain.clone(), - conf.clone(), - Some(locator.clone()), - None, - )?; - Ok(Self { - provider, - contract_address: CosmosAddress::from_h256( - locator.address, - conf.get_bech32_prefix().as_str(), - conf.get_contract_address_bytes(), - )?, - target_event_kind: format!("{}-{}", Self::WASM_TYPE, event_type), - reorg_period, - }) - } - - async fn get_block(client: HttpClient, block_number: u32) -> ChainResult { - Ok(client - .block(block_number) - .await - .map_err(Into::::into)?) - } - - async fn get_block_results( - client: HttpClient, - block_number: u32, - ) -> ChainResult { - Ok(client - .block_results(block_number) - .await - .map_err(Into::::into)?) - } - - async fn get_latest_block(client: HttpClient) -> ChainResult { - Ok(client - .latest_block() - .await - .map_err(Into::::into)?) - } -} - -impl CosmosWasmIndexer { - // Iterate through all txs, filter out failed txs, find target events - // in successful txs, and parse them. - fn handle_txs( - &self, - block: BlockResponse, - block_results: BlockResultsResponse, - parser: for<'a> fn(&'a Vec) -> ChainResult>, - cursor_label: &'static str, - ) -> Vec<(T, LogMeta)> - where - T: PartialEq + Debug + 'static, - { - let Some(tx_results) = block_results.txs_results else { - return vec![]; - }; - - let tx_hashes: Vec = block - .clone() - .block - .data - .into_iter() - .filter_map(|tx| hex::decode(digest(tx.as_slice())).ok()) - .filter_map(|hash| { - Hash::from_bytes(Algorithm::Sha256, hash.as_slice()) - .ok() - .map(|hash| H256::from_slice(hash.as_bytes())) - }) - .collect(); - - tx_results - .into_iter() - .enumerate() - .filter_map(move |(idx, tx)| { - let Some(tx_hash) = tx_hashes.get(idx) else { - debug!(?tx, "No tx hash found for tx"); - return None; - }; - if tx.code.is_err() { - debug!(?tx_hash, "Not indexing failed transaction"); - return None; - } - Some(self.handle_tx(block.clone(), tx.events, *tx_hash, idx, parser)) - }) - .flatten() - .collect() - } - - // Iter through all events in the tx, looking for any target events - // made by the contract we are indexing. - fn handle_tx( - &self, - block: BlockResponse, - tx_events: Vec, - tx_hash: H256, - transaction_index: usize, - parser: for<'a> fn(&'a Vec) -> ChainResult>, - ) -> impl Iterator + '_ - where - T: PartialEq + 'static, - { - tx_events.into_iter().enumerate().filter_map(move |(log_idx, event)| { - if event.kind.as_str() != self.target_event_kind { - return None; - } - - parser(&event.attributes) - .map_err(|err| { - // This can happen if we attempt to parse an event that just happens - // to have the same name but a different structure. - tracing::trace!(?err, tx_hash=?tx_hash, log_idx, ?event, "Failed to parse event attributes"); - }) - .ok() - .and_then(|parsed_event| { - // This is crucial! We need to make sure that the contract address - // in the event matches the contract address we are indexing. - // Otherwise, we might index events from other contracts that happen - // to have the same target event name. - if parsed_event.contract_address != self.contract_address.address() { - trace!(tx_hash=?tx_hash, log_idx, ?event, "Event contract address does not match indexer contract address"); - return None; - } - - Some((parsed_event.event, LogMeta { - address: self.contract_address.digest(), - block_number: block.block.header.height.into(), - block_hash: H256::from_slice(block.block_id.hash.as_bytes()), - transaction_id: H256::from_slice(tx_hash.as_bytes()).into(), - transaction_index: transaction_index as u64, - log_index: U256::from(log_idx), - })) - }) - }) - } -} - -#[async_trait] -impl WasmIndexer for CosmosWasmIndexer { - #[instrument(err, skip(self))] - #[allow(clippy::blocks_in_conditions)] // TODO: `rustc` 1.80.1 clippy issue - async fn get_finalized_block_number(&self) -> ChainResult { - let latest_block = Self::get_latest_block(self.provider.rpc().clone()).await?; - let latest_height: u32 = latest_block - .block - .header - .height - .value() - .try_into() - .map_err(ChainCommunicationError::from_other)?; - Ok(latest_height.saturating_sub(self.reorg_period)) - } - - #[instrument(err, skip(self, parser))] - #[allow(clippy::blocks_in_conditions)] // TODO: `rustc` 1.80.1 clippy issue - async fn get_logs_in_block( - &self, - block_number: u32, - parser: for<'a> fn(&'a Vec) -> ChainResult>, - cursor_label: &'static str, - ) -> ChainResult> - where - T: Send + Sync + PartialEq + Debug + 'static, - { - let client = self.provider.rpc().clone(); - debug!(?block_number, cursor_label, domain=?self.provider.domain, "Getting logs in block"); - - // The two calls below could be made in parallel, but on cosmos rate limiting is a bigger problem - // than indexing latency, so we do them sequentially. - let block = Self::get_block(client.clone(), block_number).await?; - let block_results = Self::get_block_results(client.clone(), block_number).await?; - - Ok(self.handle_txs(block, block_results, parser, cursor_label)) - } -} +mod client; +mod provider; diff --git a/rust/main/chains/hyperlane-cosmos/src/providers/rpc/client.rs b/rust/main/chains/hyperlane-cosmos/src/providers/rpc/client.rs new file mode 100644 index 000000000..f3c3930b1 --- /dev/null +++ b/rust/main/chains/hyperlane-cosmos/src/providers/rpc/client.rs @@ -0,0 +1,77 @@ +use cosmrs::proto::tendermint::blocksync::BlockResponse; +use tendermint::Hash; +use tendermint_rpc::client::CompatMode; +use tendermint_rpc::endpoint::{block, block_by_hash, block_results, tx}; +use tendermint_rpc::{Client, HttpClient}; + +use hyperlane_core::ChainResult; + +use crate::{ConnectionConf, HyperlaneCosmosError}; + +/// Thin wrapper around Cosmos RPC client with error mapping +#[derive(Clone, Debug)] +pub struct CosmosRpcClient { + client: HttpClient, +} + +impl CosmosRpcClient { + /// Create new `CosmosRpcClient` + pub fn new(conf: &ConnectionConf) -> ChainResult { + let client = HttpClient::builder( + conf.get_rpc_url() + .parse() + .map_err(Into::::into)?, + ) + // Consider supporting different compatibility modes. + .compat_mode(CompatMode::latest()) + .build() + .map_err(Into::::into)?; + + Ok(Self { client }) + } + + /// Request block by block height + pub async fn get_block(&self, height: u32) -> ChainResult { + Ok(self + .client + .block(height) + .await + .map_err(Into::::into)?) + } + + /// Request block results by block height + pub async fn get_block_results(&self, height: u32) -> ChainResult { + Ok(self + .client + .block_results(height) + .await + .map_err(Into::::into)?) + } + + /// Request block by block hash + pub async fn get_block_by_hash(&self, hash: Hash) -> ChainResult { + Ok(self + .client + .block_by_hash(hash) + .await + .map_err(Into::::into)?) + } + + /// Request the latest block + pub async fn get_latest_block(&self) -> ChainResult { + Ok(self + .client + .latest_block() + .await + .map_err(Into::::into)?) + } + + /// Request transaction by transaction hash + pub async fn get_tx_by_hash(&self, hash: Hash) -> ChainResult { + Ok(self + .client + .tx(hash, false) + .await + .map_err(Into::::into)?) + } +} diff --git a/rust/main/chains/hyperlane-cosmos/src/providers/rpc/provider.rs b/rust/main/chains/hyperlane-cosmos/src/providers/rpc/provider.rs new file mode 100644 index 000000000..0184aff96 --- /dev/null +++ b/rust/main/chains/hyperlane-cosmos/src/providers/rpc/provider.rs @@ -0,0 +1,277 @@ +use std::fmt::Debug; + +use async_trait::async_trait; +use cosmrs::cosmwasm::MsgExecuteContract; +use cosmrs::rpc::client::Client; +use futures::StreamExt; +use sha256::digest; +use tendermint::abci::{Event, EventAttribute}; +use tendermint::hash::Algorithm; +use tendermint::Hash; +use tendermint_rpc::client::CompatMode; +use tendermint_rpc::endpoint::block::Response as BlockResponse; +use tendermint_rpc::endpoint::block_results::Response as BlockResultsResponse; +use tendermint_rpc::endpoint::tx; +use tendermint_rpc::HttpClient; +use time::OffsetDateTime; +use tracing::{debug, instrument, trace}; + +use hyperlane_core::{ + ChainCommunicationError, ChainResult, ContractLocator, HyperlaneDomain, LogMeta, H256, U256, +}; + +use crate::address::CosmosAddress; +use crate::rpc::CosmosRpcClient; +use crate::{ConnectionConf, CosmosProvider, HyperlaneCosmosError}; + +#[async_trait] +/// Trait for wasm indexer. Use rpc provider +pub trait WasmRpcProvider: Send + Sync { + /// Get the finalized block height. + async fn get_finalized_block_number(&self) -> ChainResult; + + /// Get logs for the given block using the given parser. + async fn get_logs_in_block( + &self, + block_number: u32, + parser: for<'a> fn(&'a Vec) -> ChainResult>, + cursor_label: &'static str, + ) -> ChainResult> + where + T: Send + Sync + PartialEq + Debug + 'static; + + /// Get logs for the given transaction using the given parser. + async fn get_logs_in_tx( + &self, + tx_hash: Hash, + parser: for<'a> fn(&'a Vec) -> ChainResult>, + cursor_label: &'static str, + ) -> ChainResult> + where + T: Send + Sync + PartialEq + Debug + 'static; +} + +#[derive(Debug, Eq, PartialEq)] +/// An event parsed from the RPC response. +pub struct ParsedEvent { + contract_address: String, + event: T, +} + +impl ParsedEvent { + /// Create a new ParsedEvent. + pub fn new(contract_address: String, event: T) -> Self { + Self { + contract_address, + event, + } + } + + /// Get the inner event + pub fn inner(self) -> T { + self.event + } +} + +#[derive(Debug, Clone)] +/// Cosmwasm RPC Provider +pub struct CosmosWasmRpcProvider { + domain: HyperlaneDomain, + contract_address: CosmosAddress, + target_event_kind: String, + reorg_period: u32, + rpc_client: CosmosRpcClient, +} + +impl CosmosWasmRpcProvider { + const WASM_TYPE: &'static str = "wasm"; + + /// create new Cosmwasm RPC Provider + pub fn new( + conf: ConnectionConf, + locator: ContractLocator, + event_type: String, + reorg_period: u32, + ) -> ChainResult { + let rpc_client = CosmosRpcClient::new(&conf)?; + + Ok(Self { + domain: locator.domain.clone(), + contract_address: CosmosAddress::from_h256( + locator.address, + conf.get_bech32_prefix().as_str(), + conf.get_contract_address_bytes(), + )?, + target_event_kind: format!("{}-{}", Self::WASM_TYPE, event_type), + reorg_period, + rpc_client, + }) + } +} + +impl CosmosWasmRpcProvider { + // Iterate through all txs, filter out failed txs, find target events + // in successful txs, and parse them. + fn handle_txs( + &self, + block: BlockResponse, + block_results: BlockResultsResponse, + parser: for<'a> fn(&'a Vec) -> ChainResult>, + cursor_label: &'static str, + ) -> Vec<(T, LogMeta)> + where + T: PartialEq + Debug + 'static, + { + let Some(tx_results) = block_results.txs_results else { + return vec![]; + }; + + let tx_hashes: Vec = block + .clone() + .block + .data + .into_iter() + .filter_map(|tx| hex::decode(digest(tx.as_slice())).ok()) + .filter_map(|hash| Hash::from_bytes(Algorithm::Sha256, hash.as_slice()).ok()) + .collect(); + + tx_results + .into_iter() + .enumerate() + .filter_map(move |(idx, tx)| { + let Some(tx_hash) = tx_hashes.get(idx) else { + debug!(?tx, "No tx hash found for tx"); + return None; + }; + if tx.code.is_err() { + debug!(?tx_hash, "Not indexing failed transaction"); + return None; + } + + // We construct a simplified structure `tx::Response` here so that we can + // reuse `handle_tx` method below. + let tx_response = tx::Response { + hash: *tx_hash, + height: block_results.height, + index: idx as u32, + tx_result: tx, + tx: vec![], + proof: None, + }; + + let block_hash = H256::from_slice(block.block_id.hash.as_bytes()); + + Some(self.handle_tx(tx_response, block_hash, parser)) + }) + .flatten() + .collect() + } + + // Iter through all events in the tx, looking for any target events + // made by the contract we are indexing. + fn handle_tx( + &self, + tx: tx::Response, + block_hash: H256, + parser: for<'a> fn(&'a Vec) -> ChainResult>, + ) -> impl Iterator + '_ + where + T: PartialEq + 'static, + { + let tx_events = tx.tx_result.events; + let tx_hash = tx.hash; + let tx_index = tx.index; + let block_height = tx.height; + + tx_events.into_iter().enumerate().filter_map(move |(log_idx, event)| { + if event.kind.as_str() != self.target_event_kind { + return None; + } + + parser(&event.attributes) + .map_err(|err| { + // This can happen if we attempt to parse an event that just happens + // to have the same name but a different structure. + trace!(?err, tx_hash=?tx_hash, log_idx, ?event, "Failed to parse event attributes"); + }) + .ok() + .and_then(|parsed_event| { + // This is crucial! We need to make sure that the contract address + // in the event matches the contract address we are indexing. + // Otherwise, we might index events from other contracts that happen + // to have the same target event name. + if parsed_event.contract_address != self.contract_address.address() { + trace!(tx_hash=?tx_hash, log_idx, ?event, "Event contract address does not match indexer contract address"); + return None; + } + + Some((parsed_event.event, LogMeta { + address: self.contract_address.digest(), + block_number: block_height.value(), + block_hash, + transaction_id: H256::from_slice(tx_hash.as_bytes()).into(), + transaction_index: tx_index as u64, + log_index: U256::from(log_idx), + })) + }) + }) + } +} + +#[async_trait] +impl WasmRpcProvider for CosmosWasmRpcProvider { + #[instrument(err, skip(self))] + #[allow(clippy::blocks_in_conditions)] // TODO: `rustc` 1.80.1 clippy issue + async fn get_finalized_block_number(&self) -> ChainResult { + let latest_block = self.rpc_client.get_latest_block().await?; + let latest_height: u32 = latest_block + .block + .header + .height + .value() + .try_into() + .map_err(ChainCommunicationError::from_other)?; + Ok(latest_height.saturating_sub(self.reorg_period)) + } + + #[instrument(err, skip(self, parser))] + #[allow(clippy::blocks_in_conditions)] // TODO: `rustc` 1.80.1 clippy issue + async fn get_logs_in_block( + &self, + block_number: u32, + parser: for<'a> fn(&'a Vec) -> ChainResult>, + cursor_label: &'static str, + ) -> ChainResult> + where + T: Send + Sync + PartialEq + Debug + 'static, + { + debug!(?block_number, cursor_label, domain=?self.domain, "Getting logs in block"); + + // The two calls below could be made in parallel, but on cosmos rate limiting is a bigger problem + // than indexing latency, so we do them sequentially. + let block = self.rpc_client.get_block(block_number).await?; + let block_results = self.rpc_client.get_block_results(block_number).await?; + + Ok(self.handle_txs(block, block_results, parser, cursor_label)) + } + + #[instrument(err, skip(self, parser))] + #[allow(clippy::blocks_in_conditions)] // TODO: `rustc` 1.80.1 clippy issue + async fn get_logs_in_tx( + &self, + hash: Hash, + parser: for<'a> fn(&'a Vec) -> ChainResult>, + cursor_label: &'static str, + ) -> ChainResult> + where + T: Send + Sync + PartialEq + Debug + 'static, + { + debug!(?hash, cursor_label, domain=?self.domain, "Getting logs in transaction"); + + let tx = self.rpc_client.get_tx_by_hash(hash).await?; + let block = self.rpc_client.get_block(tx.height.value() as u32).await?; + let block_hash = H256::from_slice(block.block_id.hash.as_bytes()); + + Ok(self.handle_tx(tx, block_hash, parser).collect()) + } +} diff --git a/rust/main/chains/hyperlane-cosmos/src/routing_ism.rs b/rust/main/chains/hyperlane-cosmos/src/routing_ism.rs index 63b759f1b..f7d3620eb 100644 --- a/rust/main/chains/hyperlane-cosmos/src/routing_ism.rs +++ b/rust/main/chains/hyperlane-cosmos/src/routing_ism.rs @@ -35,7 +35,7 @@ impl CosmosRoutingIsm { let provider = CosmosProvider::new( locator.domain.clone(), conf.clone(), - Some(locator.clone()), + locator.clone(), signer, )?; diff --git a/rust/main/chains/hyperlane-cosmos/src/utils.rs b/rust/main/chains/hyperlane-cosmos/src/utils.rs index 032700785..74cb75a27 100644 --- a/rust/main/chains/hyperlane-cosmos/src/utils.rs +++ b/rust/main/chains/hyperlane-cosmos/src/utils.rs @@ -6,13 +6,15 @@ use base64::{engine::general_purpose::STANDARD as BASE64, Engine}; use futures::future; use once_cell::sync::Lazy; use tendermint::abci::EventAttribute; +use tendermint::hash::Algorithm; +use tendermint::Hash; use tokio::task::JoinHandle; use tracing::warn; -use hyperlane_core::{ChainCommunicationError, ChainResult, Indexed, LogMeta}; +use hyperlane_core::{ChainCommunicationError, ChainResult, Indexed, LogMeta, H256}; use crate::grpc::{WasmGrpcProvider, WasmProvider}; -use crate::rpc::{CosmosWasmIndexer, ParsedEvent, WasmIndexer}; +use crate::rpc::{CosmosWasmRpcProvider, ParsedEvent, WasmRpcProvider}; type FutureChainResults = Vec>, u32)>>; @@ -43,21 +45,37 @@ pub(crate) async fn get_block_height_for_lag( pub(crate) fn parse_logs_in_range( range: RangeInclusive, - indexer: Box, + provider: Box, parser: for<'a> fn(&'a Vec) -> ChainResult>, label: &'static str, ) -> FutureChainResults { range .map(|block_number| { - let indexer = indexer.clone(); + let provider = provider.clone(); tokio::spawn(async move { - let logs = indexer.get_logs_in_block(block_number, parser, label).await; + let logs = provider + .get_logs_in_block(block_number, parser, label) + .await; (logs, block_number) }) }) .collect() } +pub(crate) async fn parse_logs_in_tx( + hash: &H256, + provider: Box, + parser: for<'a> fn(&'a Vec) -> ChainResult>, + label: &'static str, +) -> ChainResult> { + let tendermint_hash = Hash::from_bytes(Algorithm::Sha256, hash.as_bytes()) + .expect("transaction hash should be of correct size"); + + provider + .get_logs_in_tx(tendermint_hash, parser, label) + .await +} + #[allow(clippy::type_complexity)] pub(crate) async fn execute_and_parse_log_futures>>( logs_futures: Vec, ChainCommunicationError>, u32)>>, diff --git a/rust/main/chains/hyperlane-cosmos/src/validator_announce.rs b/rust/main/chains/hyperlane-cosmos/src/validator_announce.rs index 9ead17f8f..d82b0e829 100644 --- a/rust/main/chains/hyperlane-cosmos/src/validator_announce.rs +++ b/rust/main/chains/hyperlane-cosmos/src/validator_announce.rs @@ -35,7 +35,7 @@ impl CosmosValidatorAnnounce { let provider = CosmosProvider::new( locator.domain.clone(), conf.clone(), - Some(locator.clone()), + locator.clone(), signer, )?; diff --git a/rust/main/hyperlane-base/src/settings/chains.rs b/rust/main/hyperlane-base/src/settings/chains.rs index 34de1192a..8dcb2b9c5 100644 --- a/rust/main/hyperlane-base/src/settings/chains.rs +++ b/rust/main/hyperlane-base/src/settings/chains.rs @@ -188,7 +188,7 @@ impl ChainConf { let provider = CosmosProvider::new( locator.domain.clone(), conf.clone(), - Some(locator.clone()), + locator.clone(), None, )?; Ok(Box::new(provider) as Box)