feat: Fetch logs by transaction hash (#4510)

### Description

Cosmos indexers now can fetch logs by transaction hash. It allows to
reprocess transaction which may be missed by range based indexers.

### Drive-by changes

Quite a bit of refactoring

### Related issues

Contributes into issue #4300

### Backward compatibility

Yes

### Testing

E2E Tests

---------

Co-authored-by: Danil Nemirovsky <4614623+ameten@users.noreply.github.com>
pull/4537/head
Danil Nemirovsky 2 months ago committed by GitHub
parent 5ecc529e14
commit 4151317b00
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
  1. 2
      rust/README.md
  2. 2
      rust/main/chains/hyperlane-cosmos/src/aggregation_ism.rs
  3. 59
      rust/main/chains/hyperlane-cosmos/src/interchain_gas.rs
  4. 2
      rust/main/chains/hyperlane-cosmos/src/interchain_security_module.rs
  5. 2
      rust/main/chains/hyperlane-cosmos/src/mailbox/contract.rs
  6. 34
      rust/main/chains/hyperlane-cosmos/src/mailbox/delivery_indexer.rs
  7. 35
      rust/main/chains/hyperlane-cosmos/src/mailbox/dispatch_indexer.rs
  8. 63
      rust/main/chains/hyperlane-cosmos/src/merkle_tree_hook.rs
  9. 2
      rust/main/chains/hyperlane-cosmos/src/multisig_ism.rs
  10. 8
      rust/main/chains/hyperlane-cosmos/src/providers.rs
  11. 3
      rust/main/chains/hyperlane-cosmos/src/providers/cosmos.rs
  12. 53
      rust/main/chains/hyperlane-cosmos/src/providers/cosmos/provider.rs
  13. 33
      rust/main/chains/hyperlane-cosmos/src/providers/grpc.rs
  14. 2
      rust/main/chains/hyperlane-cosmos/src/providers/grpc/tests.rs
  15. 254
      rust/main/chains/hyperlane-cosmos/src/providers/rpc.rs
  16. 77
      rust/main/chains/hyperlane-cosmos/src/providers/rpc/client.rs
  17. 277
      rust/main/chains/hyperlane-cosmos/src/providers/rpc/provider.rs
  18. 2
      rust/main/chains/hyperlane-cosmos/src/routing_ism.rs
  19. 28
      rust/main/chains/hyperlane-cosmos/src/utils.rs
  20. 2
      rust/main/chains/hyperlane-cosmos/src/validator_announce.rs
  21. 2
      rust/main/hyperlane-base/src/settings/chains.rs

@ -90,7 +90,7 @@ env $(cat ./config/validator.fuji.env | grep -v "#" | xargs) ./target/debug/vali
Clone `hyperlane-registry` repo next to `hyperlane-monorepo` repo. Clone `hyperlane-registry` repo next to `hyperlane-monorepo` repo.
To perform an automated e2e test of the agents locally, from within the `hyperlane-monorepo/rust` directory, run: To perform an automated e2e test of the agents locally, from within the `hyperlane-monorepo/rust/main` directory, run:
```bash ```bash
cargo run --release --bin run-locally cargo run --release --bin run-locally

@ -33,7 +33,7 @@ impl CosmosAggregationIsm {
let provider = CosmosProvider::new( let provider = CosmosProvider::new(
locator.domain.clone(), locator.domain.clone(),
conf.clone(), conf.clone(),
Some(locator.clone()), locator.clone(),
signer, signer,
)?; )?;

@ -1,25 +1,24 @@
use std::ops::RangeInclusive;
use async_trait::async_trait; use async_trait::async_trait;
use base64::{engine::general_purpose::STANDARD as BASE64, Engine}; use base64::{engine::general_purpose::STANDARD as BASE64, Engine};
use once_cell::sync::Lazy;
use tendermint::abci::EventAttribute;
use tracing::instrument;
use hyperlane_core::{ use hyperlane_core::{
ChainCommunicationError, ChainResult, ContractLocator, HyperlaneChain, HyperlaneContract, ChainCommunicationError, ChainResult, ContractLocator, HyperlaneChain, HyperlaneContract,
HyperlaneDomain, HyperlaneProvider, Indexed, Indexer, InterchainGasPaymaster, HyperlaneDomain, HyperlaneProvider, Indexed, Indexer, InterchainGasPaymaster,
InterchainGasPayment, LogMeta, SequenceAwareIndexer, H256, U256, InterchainGasPayment, LogMeta, SequenceAwareIndexer, H256, H512, U256,
}; };
use once_cell::sync::Lazy;
use std::ops::RangeInclusive;
use tendermint::abci::EventAttribute;
use tracing::instrument;
use crate::utils::parse_logs_in_range; use crate::rpc::{CosmosWasmRpcProvider, ParsedEvent, WasmRpcProvider};
use crate::{ use crate::signers::Signer;
rpc::{CosmosWasmIndexer, ParsedEvent, WasmIndexer}, use crate::utils::{
signers::Signer, execute_and_parse_log_futures, parse_logs_in_range, parse_logs_in_tx,
utils::{ CONTRACT_ADDRESS_ATTRIBUTE_KEY, CONTRACT_ADDRESS_ATTRIBUTE_KEY_BASE64,
execute_and_parse_log_futures, CONTRACT_ADDRESS_ATTRIBUTE_KEY,
CONTRACT_ADDRESS_ATTRIBUTE_KEY_BASE64,
},
ConnectionConf, CosmosProvider, HyperlaneCosmosError,
}; };
use crate::{ConnectionConf, CosmosProvider, HyperlaneCosmosError};
/// A reference to a InterchainGasPaymaster contract on some Cosmos chain /// A reference to a InterchainGasPaymaster contract on some Cosmos chain
#[derive(Debug)] #[derive(Debug)]
@ -57,7 +56,7 @@ impl CosmosInterchainGasPaymaster {
let provider = CosmosProvider::new( let provider = CosmosProvider::new(
locator.domain.clone(), locator.domain.clone(),
conf.clone(), conf.clone(),
Some(locator.clone()), locator.clone(),
signer, signer,
)?; )?;
@ -90,7 +89,7 @@ static DESTINATION_ATTRIBUTE_KEY_BASE64: Lazy<String> =
/// A reference to a InterchainGasPaymasterIndexer contract on some Cosmos chain /// A reference to a InterchainGasPaymasterIndexer contract on some Cosmos chain
#[derive(Debug, Clone)] #[derive(Debug, Clone)]
pub struct CosmosInterchainGasPaymasterIndexer { pub struct CosmosInterchainGasPaymasterIndexer {
indexer: Box<CosmosWasmIndexer>, provider: Box<CosmosWasmRpcProvider>,
} }
impl CosmosInterchainGasPaymasterIndexer { impl CosmosInterchainGasPaymasterIndexer {
@ -103,7 +102,7 @@ impl CosmosInterchainGasPaymasterIndexer {
locator: ContractLocator, locator: ContractLocator,
reorg_period: u32, reorg_period: u32,
) -> ChainResult<Self> { ) -> ChainResult<Self> {
let indexer = CosmosWasmIndexer::new( let provider = CosmosWasmRpcProvider::new(
conf, conf,
locator, locator,
Self::INTERCHAIN_GAS_PAYMENT_EVENT_TYPE.into(), Self::INTERCHAIN_GAS_PAYMENT_EVENT_TYPE.into(),
@ -111,7 +110,7 @@ impl CosmosInterchainGasPaymasterIndexer {
)?; )?;
Ok(Self { Ok(Self {
indexer: Box::new(indexer), provider: Box::new(provider),
}) })
} }
@ -211,7 +210,7 @@ impl Indexer<InterchainGasPayment> for CosmosInterchainGasPaymasterIndexer {
) -> ChainResult<Vec<(Indexed<InterchainGasPayment>, LogMeta)>> { ) -> ChainResult<Vec<(Indexed<InterchainGasPayment>, LogMeta)>> {
let logs_futures = parse_logs_in_range( let logs_futures = parse_logs_in_range(
range, range,
self.indexer.clone(), self.provider.clone(),
Self::interchain_gas_payment_parser, Self::interchain_gas_payment_parser,
"InterchainGasPaymentCursor", "InterchainGasPaymentCursor",
); );
@ -220,7 +219,21 @@ impl Indexer<InterchainGasPayment> for CosmosInterchainGasPaymasterIndexer {
} }
async fn get_finalized_block_number(&self) -> ChainResult<u32> { async fn get_finalized_block_number(&self) -> ChainResult<u32> {
self.indexer.get_finalized_block_number().await self.provider.get_finalized_block_number().await
}
async fn fetch_logs_by_tx_hash(
&self,
tx_hash: H512,
) -> ChainResult<Vec<(Indexed<InterchainGasPayment>, LogMeta)>> {
parse_logs_in_tx(
&tx_hash.into(),
self.provider.clone(),
Self::interchain_gas_payment_parser,
"InterchainGasPaymentReceiver",
)
.await
.map(|v| v.into_iter().map(|(m, l)| (m.into(), l)).collect())
} }
} }
@ -269,10 +282,12 @@ impl TryInto<InterchainGasPayment> for IncompleteInterchainGasPayment {
#[cfg(test)] #[cfg(test)]
mod tests { mod tests {
use hyperlane_core::{InterchainGasPayment, H256, U256};
use std::str::FromStr; use std::str::FromStr;
use crate::{rpc::ParsedEvent, utils::event_attributes_from_str}; use hyperlane_core::{InterchainGasPayment, H256, U256};
use crate::providers::rpc::ParsedEvent;
use crate::utils::event_attributes_from_str;
use super::*; use super::*;

@ -38,7 +38,7 @@ impl CosmosInterchainSecurityModule {
let provider = CosmosProvider::new( let provider = CosmosProvider::new(
locator.domain.clone(), locator.domain.clone(),
conf.clone(), conf.clone(),
Some(locator.clone()), locator.clone(),
signer, signer,
)?; )?;

@ -41,7 +41,7 @@ impl CosmosMailbox {
let provider = CosmosProvider::new( let provider = CosmosProvider::new(
locator.domain.clone(), locator.domain.clone(),
conf.clone(), conf.clone(),
Some(locator.clone()), locator.clone(),
signer, signer,
)?; )?;

@ -9,14 +9,14 @@ use tendermint::abci::EventAttribute;
use tracing::instrument; use tracing::instrument;
use hyperlane_core::{ use hyperlane_core::{
ChainCommunicationError, ChainResult, ContractLocator, Delivery, Indexed, Indexer, LogMeta, ChainCommunicationError, ChainResult, ContractLocator, Delivery, HyperlaneMessage, Indexed,
SequenceAwareIndexer, H256, Indexer, LogMeta, SequenceAwareIndexer, H256, H512,
}; };
use crate::rpc::{CosmosWasmIndexer, ParsedEvent, WasmIndexer}; use crate::rpc::{CosmosWasmRpcProvider, ParsedEvent, WasmRpcProvider};
use crate::utils::{ use crate::utils::{
execute_and_parse_log_futures, parse_logs_in_range, CONTRACT_ADDRESS_ATTRIBUTE_KEY, execute_and_parse_log_futures, parse_logs_in_range, parse_logs_in_tx,
CONTRACT_ADDRESS_ATTRIBUTE_KEY_BASE64, CONTRACT_ADDRESS_ATTRIBUTE_KEY, CONTRACT_ADDRESS_ATTRIBUTE_KEY_BASE64,
}; };
use crate::{ConnectionConf, HyperlaneCosmosError, Signer}; use crate::{ConnectionConf, HyperlaneCosmosError, Signer};
@ -28,7 +28,7 @@ static MESSAGE_ID_ATTRIBUTE_KEY_BASE64: Lazy<String> =
/// Struct that retrieves delivery event data for a Cosmos Mailbox contract /// Struct that retrieves delivery event data for a Cosmos Mailbox contract
pub struct CosmosMailboxDeliveryIndexer { pub struct CosmosMailboxDeliveryIndexer {
indexer: Box<CosmosWasmIndexer>, provider: Box<CosmosWasmRpcProvider>,
} }
impl CosmosMailboxDeliveryIndexer { impl CosmosMailboxDeliveryIndexer {
@ -40,7 +40,7 @@ impl CosmosMailboxDeliveryIndexer {
signer: Option<Signer>, signer: Option<Signer>,
reorg_period: u32, reorg_period: u32,
) -> ChainResult<Self> { ) -> ChainResult<Self> {
let indexer = CosmosWasmIndexer::new( let provider = CosmosWasmRpcProvider::new(
conf, conf,
locator, locator,
MESSAGE_DELIVERY_EVENT_TYPE.to_owned(), MESSAGE_DELIVERY_EVENT_TYPE.to_owned(),
@ -48,7 +48,7 @@ impl CosmosMailboxDeliveryIndexer {
)?; )?;
Ok(Self { Ok(Self {
indexer: Box::new(indexer), provider: Box::new(provider),
}) })
} }
@ -114,7 +114,7 @@ impl Indexer<H256> for CosmosMailboxDeliveryIndexer {
) -> ChainResult<Vec<(Indexed<H256>, LogMeta)>> { ) -> ChainResult<Vec<(Indexed<H256>, LogMeta)>> {
let logs_futures = parse_logs_in_range( let logs_futures = parse_logs_in_range(
range, range,
self.indexer.clone(), self.provider.clone(),
Self::hyperlane_delivery_parser, Self::hyperlane_delivery_parser,
"DeliveryCursor", "DeliveryCursor",
); );
@ -123,7 +123,21 @@ impl Indexer<H256> for CosmosMailboxDeliveryIndexer {
} }
async fn get_finalized_block_number(&self) -> ChainResult<u32> { async fn get_finalized_block_number(&self) -> ChainResult<u32> {
self.indexer.get_finalized_block_number().await self.provider.get_finalized_block_number().await
}
async fn fetch_logs_by_tx_hash(
&self,
tx_hash: H512,
) -> ChainResult<Vec<(Indexed<H256>, LogMeta)>> {
parse_logs_in_tx(
&tx_hash.into(),
self.provider.clone(),
Self::hyperlane_delivery_parser,
"DeliveryReceiver",
)
.await
.map(|v| v.into_iter().map(|(m, l)| (m.into(), l)).collect())
} }
} }

@ -9,13 +9,13 @@ use tracing::instrument;
use hyperlane_core::{ use hyperlane_core::{
ChainCommunicationError, ChainResult, ContractLocator, Decode, HyperlaneMessage, Indexed, ChainCommunicationError, ChainResult, ContractLocator, Decode, HyperlaneMessage, Indexed,
Indexer, LogMeta, SequenceAwareIndexer, Indexer, LogMeta, SequenceAwareIndexer, H512,
}; };
use crate::rpc::{CosmosWasmIndexer, ParsedEvent, WasmIndexer}; use crate::rpc::{CosmosWasmRpcProvider, ParsedEvent, WasmRpcProvider};
use crate::utils::{ use crate::utils::{
execute_and_parse_log_futures, parse_logs_in_range, CONTRACT_ADDRESS_ATTRIBUTE_KEY, execute_and_parse_log_futures, parse_logs_in_range, parse_logs_in_tx,
CONTRACT_ADDRESS_ATTRIBUTE_KEY_BASE64, CONTRACT_ADDRESS_ATTRIBUTE_KEY, CONTRACT_ADDRESS_ATTRIBUTE_KEY_BASE64,
}; };
use crate::{ConnectionConf, CosmosMailbox, HyperlaneCosmosError, Signer}; use crate::{ConnectionConf, CosmosMailbox, HyperlaneCosmosError, Signer};
@ -29,7 +29,7 @@ static MESSAGE_ATTRIBUTE_KEY_BASE64: Lazy<String> =
#[derive(Debug, Clone)] #[derive(Debug, Clone)]
pub struct CosmosMailboxDispatchIndexer { pub struct CosmosMailboxDispatchIndexer {
mailbox: CosmosMailbox, mailbox: CosmosMailbox,
indexer: Box<CosmosWasmIndexer>, provider: Box<CosmosWasmRpcProvider>,
} }
impl CosmosMailboxDispatchIndexer { impl CosmosMailboxDispatchIndexer {
@ -42,7 +42,7 @@ impl CosmosMailboxDispatchIndexer {
reorg_period: u32, reorg_period: u32,
) -> ChainResult<Self> { ) -> ChainResult<Self> {
let mailbox = CosmosMailbox::new(conf.clone(), locator.clone(), signer.clone())?; let mailbox = CosmosMailbox::new(conf.clone(), locator.clone(), signer.clone())?;
let indexer = CosmosWasmIndexer::new( let provider = CosmosWasmRpcProvider::new(
conf, conf,
locator, locator,
MESSAGE_DISPATCH_EVENT_TYPE.into(), MESSAGE_DISPATCH_EVENT_TYPE.into(),
@ -51,7 +51,7 @@ impl CosmosMailboxDispatchIndexer {
Ok(Self { Ok(Self {
mailbox, mailbox,
indexer: Box::new(indexer), provider: Box::new(provider),
}) })
} }
@ -116,7 +116,7 @@ impl Indexer<HyperlaneMessage> for CosmosMailboxDispatchIndexer {
) -> ChainResult<Vec<(Indexed<HyperlaneMessage>, LogMeta)>> { ) -> ChainResult<Vec<(Indexed<HyperlaneMessage>, LogMeta)>> {
let logs_futures = parse_logs_in_range( let logs_futures = parse_logs_in_range(
range, range,
self.indexer.clone(), self.provider.clone(),
Self::hyperlane_message_parser, Self::hyperlane_message_parser,
"HyperlaneMessageCursor", "HyperlaneMessageCursor",
); );
@ -125,7 +125,21 @@ impl Indexer<HyperlaneMessage> for CosmosMailboxDispatchIndexer {
} }
async fn get_finalized_block_number(&self) -> ChainResult<u32> { async fn get_finalized_block_number(&self) -> ChainResult<u32> {
self.indexer.get_finalized_block_number().await self.provider.get_finalized_block_number().await
}
async fn fetch_logs_by_tx_hash(
&self,
tx_hash: H512,
) -> ChainResult<Vec<(Indexed<HyperlaneMessage>, LogMeta)>> {
parse_logs_in_tx(
&tx_hash.into(),
self.provider.clone(),
Self::hyperlane_message_parser,
"HyperlaneMessageReceiver",
)
.await
.map(|v| v.into_iter().map(|(m, l)| (m.into(), l)).collect())
} }
} }
@ -144,7 +158,8 @@ impl SequenceAwareIndexer<HyperlaneMessage> for CosmosMailboxDispatchIndexer {
mod tests { mod tests {
use hyperlane_core::HyperlaneMessage; use hyperlane_core::HyperlaneMessage;
use crate::{rpc::ParsedEvent, utils::event_attributes_from_str}; use crate::providers::rpc::ParsedEvent;
use crate::utils::event_attributes_from_str;
use super::*; use super::*;

@ -2,27 +2,26 @@ use std::{fmt::Debug, num::NonZeroU64, ops::RangeInclusive, str::FromStr};
use async_trait::async_trait; use async_trait::async_trait;
use base64::{engine::general_purpose::STANDARD as BASE64, Engine}; use base64::{engine::general_purpose::STANDARD as BASE64, Engine};
use hyperlane_core::{
accumulator::incremental::IncrementalMerkle, ChainCommunicationError, ChainResult, Checkpoint,
ContractLocator, HyperlaneChain, HyperlaneContract, HyperlaneDomain, HyperlaneProvider,
Indexed, Indexer, LogMeta, MerkleTreeHook, MerkleTreeInsertion, SequenceAwareIndexer, H256,
};
use once_cell::sync::Lazy; use once_cell::sync::Lazy;
use tendermint::abci::EventAttribute; use tendermint::abci::EventAttribute;
use tracing::instrument; use tracing::instrument;
use crate::utils::parse_logs_in_range; use hyperlane_core::accumulator::incremental::IncrementalMerkle;
use crate::{ use hyperlane_core::{
grpc::WasmProvider, ChainCommunicationError, ChainResult, Checkpoint, ContractLocator, HyperlaneChain,
payloads::{general, merkle_tree_hook}, HyperlaneContract, HyperlaneDomain, HyperlaneProvider, Indexed, Indexer, LogMeta,
rpc::{CosmosWasmIndexer, ParsedEvent, WasmIndexer}, MerkleTreeHook, MerkleTreeInsertion, SequenceAwareIndexer, H256, H512,
utils::{
execute_and_parse_log_futures, get_block_height_for_lag, CONTRACT_ADDRESS_ATTRIBUTE_KEY,
CONTRACT_ADDRESS_ATTRIBUTE_KEY_BASE64,
},
ConnectionConf, CosmosProvider, HyperlaneCosmosError, Signer,
}; };
use crate::grpc::WasmProvider;
use crate::payloads::{general, merkle_tree_hook};
use crate::rpc::{CosmosWasmRpcProvider, ParsedEvent, WasmRpcProvider};
use crate::utils::{
execute_and_parse_log_futures, get_block_height_for_lag, parse_logs_in_range, parse_logs_in_tx,
CONTRACT_ADDRESS_ATTRIBUTE_KEY, CONTRACT_ADDRESS_ATTRIBUTE_KEY_BASE64,
};
use crate::{ConnectionConf, CosmosProvider, HyperlaneCosmosError, Signer};
#[derive(Debug, Clone)] #[derive(Debug, Clone)]
/// A reference to a MerkleTreeHook contract on some Cosmos chain /// A reference to a MerkleTreeHook contract on some Cosmos chain
pub struct CosmosMerkleTreeHook { pub struct CosmosMerkleTreeHook {
@ -44,7 +43,7 @@ impl CosmosMerkleTreeHook {
let provider = CosmosProvider::new( let provider = CosmosProvider::new(
locator.domain.clone(), locator.domain.clone(),
conf.clone(), conf.clone(),
Some(locator.clone()), locator.clone(),
signer, signer,
)?; )?;
@ -188,8 +187,8 @@ pub(crate) static MESSAGE_ID_ATTRIBUTE_KEY_BASE64: Lazy<String> =
pub struct CosmosMerkleTreeHookIndexer { pub struct CosmosMerkleTreeHookIndexer {
/// The CosmosMerkleTreeHook /// The CosmosMerkleTreeHook
merkle_tree_hook: CosmosMerkleTreeHook, merkle_tree_hook: CosmosMerkleTreeHook,
/// Cosmwasm indexer instance /// Cosmwasm RPC provider instance
indexer: Box<CosmosWasmIndexer>, provider: Box<CosmosWasmRpcProvider>,
} }
impl CosmosMerkleTreeHookIndexer { impl CosmosMerkleTreeHookIndexer {
@ -203,7 +202,7 @@ impl CosmosMerkleTreeHookIndexer {
signer: Option<Signer>, signer: Option<Signer>,
reorg_period: u32, reorg_period: u32,
) -> ChainResult<Self> { ) -> ChainResult<Self> {
let indexer = CosmosWasmIndexer::new( let provider = CosmosWasmRpcProvider::new(
conf.clone(), conf.clone(),
locator.clone(), locator.clone(),
Self::MERKLE_TREE_INSERTION_EVENT_TYPE.into(), Self::MERKLE_TREE_INSERTION_EVENT_TYPE.into(),
@ -212,7 +211,7 @@ impl CosmosMerkleTreeHookIndexer {
Ok(Self { Ok(Self {
merkle_tree_hook: CosmosMerkleTreeHook::new(conf, locator, signer)?, merkle_tree_hook: CosmosMerkleTreeHook::new(conf, locator, signer)?,
indexer: Box::new(indexer), provider: Box::new(provider),
}) })
} }
@ -287,7 +286,7 @@ impl Indexer<MerkleTreeInsertion> for CosmosMerkleTreeHookIndexer {
) -> ChainResult<Vec<(Indexed<MerkleTreeInsertion>, LogMeta)>> { ) -> ChainResult<Vec<(Indexed<MerkleTreeInsertion>, LogMeta)>> {
let logs_futures = parse_logs_in_range( let logs_futures = parse_logs_in_range(
range, range,
self.indexer.clone(), self.provider.clone(),
Self::merkle_tree_insertion_parser, Self::merkle_tree_insertion_parser,
"MerkleTreeInsertionCursor", "MerkleTreeInsertionCursor",
); );
@ -297,7 +296,21 @@ impl Indexer<MerkleTreeInsertion> for CosmosMerkleTreeHookIndexer {
/// Get the chain's latest block number that has reached finality /// Get the chain's latest block number that has reached finality
async fn get_finalized_block_number(&self) -> ChainResult<u32> { async fn get_finalized_block_number(&self) -> ChainResult<u32> {
self.indexer.get_finalized_block_number().await self.provider.get_finalized_block_number().await
}
async fn fetch_logs_by_tx_hash(
&self,
tx_hash: H512,
) -> ChainResult<Vec<(Indexed<MerkleTreeInsertion>, LogMeta)>> {
parse_logs_in_tx(
&tx_hash.into(),
self.provider.clone(),
Self::merkle_tree_insertion_parser,
"MerkleTreeInsertionReceiver",
)
.await
.map(|v| v.into_iter().map(|(m, l)| (m.into(), l)).collect())
} }
} }
@ -337,10 +350,12 @@ impl TryInto<MerkleTreeInsertion> for IncompleteMerkleTreeInsertion {
#[cfg(test)] #[cfg(test)]
mod tests { mod tests {
use hyperlane_core::H256;
use std::str::FromStr; use std::str::FromStr;
use crate::{rpc::ParsedEvent, utils::event_attributes_from_str}; use hyperlane_core::H256;
use crate::providers::rpc::ParsedEvent;
use crate::utils::event_attributes_from_str;
use super::*; use super::*;

@ -30,7 +30,7 @@ impl CosmosMultisigIsm {
let provider = CosmosProvider::new( let provider = CosmosProvider::new(
locator.domain.clone(), locator.domain.clone(),
conf.clone(), conf.clone(),
Some(locator.clone()), locator.clone(),
signer, signer,
)?; )?;

@ -0,0 +1,8 @@
pub use cosmos::CosmosProvider;
/// cosmos provider
mod cosmos;
/// cosmos grpc provider
pub mod grpc;
/// cosmos rpc provider
pub mod rpc;

@ -0,0 +1,3 @@
pub use provider::CosmosProvider;
mod provider;

@ -15,24 +15,18 @@ use hyperlane_core::{
}; };
use crate::address::CosmosAddress; use crate::address::CosmosAddress;
use crate::grpc::WasmProvider; use crate::grpc::{WasmGrpcProvider, WasmProvider};
use crate::libs::account::CosmosAccountId; use crate::libs::account::CosmosAccountId;
use crate::providers::rpc::CosmosRpcClient;
use crate::{ConnectionConf, CosmosAmount, HyperlaneCosmosError, Signer}; use crate::{ConnectionConf, CosmosAmount, HyperlaneCosmosError, Signer};
use self::grpc::WasmGrpcProvider;
/// cosmos grpc provider
pub mod grpc;
/// cosmos rpc provider
pub mod rpc;
/// Abstraction over a connection to a Cosmos chain /// Abstraction over a connection to a Cosmos chain
#[derive(Debug, Clone)] #[derive(Debug, Clone)]
pub struct CosmosProvider { pub struct CosmosProvider {
domain: HyperlaneDomain, domain: HyperlaneDomain,
connection_conf: ConnectionConf, connection_conf: ConnectionConf,
grpc_client: WasmGrpcProvider, grpc_provider: WasmGrpcProvider,
rpc_client: HttpClient, rpc_client: CosmosRpcClient,
} }
impl CosmosProvider { impl CosmosProvider {
@ -40,43 +34,30 @@ impl CosmosProvider {
pub fn new( pub fn new(
domain: HyperlaneDomain, domain: HyperlaneDomain,
conf: ConnectionConf, conf: ConnectionConf,
locator: Option<ContractLocator>, locator: ContractLocator,
signer: Option<Signer>, signer: Option<Signer>,
) -> ChainResult<Self> { ) -> ChainResult<Self> {
let gas_price = CosmosAmount::try_from(conf.get_minimum_gas_price().clone())?; let gas_price = CosmosAmount::try_from(conf.get_minimum_gas_price().clone())?;
let grpc_client = WasmGrpcProvider::new( let grpc_provider = WasmGrpcProvider::new(
domain.clone(), domain.clone(),
conf.clone(), conf.clone(),
gas_price.clone(), gas_price.clone(),
locator, locator,
signer, signer,
)?; )?;
let rpc_client = HttpClient::builder( let rpc_client = CosmosRpcClient::new(&conf)?;
conf.get_rpc_url()
.parse()
.map_err(Into::<HyperlaneCosmosError>::into)?,
)
// Consider supporting different compatibility modes.
.compat_mode(CompatMode::latest())
.build()
.map_err(Into::<HyperlaneCosmosError>::into)?;
Ok(Self { Ok(Self {
domain, domain,
connection_conf: conf, connection_conf: conf,
grpc_provider,
rpc_client, rpc_client,
grpc_client,
}) })
} }
/// Get a grpc client /// Get a grpc client
pub fn grpc(&self) -> &WasmGrpcProvider { pub fn grpc(&self) -> &WasmGrpcProvider {
&self.grpc_client &self.grpc_provider
}
/// Get an rpc client
pub fn rpc(&self) -> &HttpClient {
&self.rpc_client
} }
fn search_payer_in_signer_infos( fn search_payer_in_signer_infos(
@ -182,11 +163,7 @@ impl HyperlaneProvider for CosmosProvider {
let tendermint_hash = Hash::from_bytes(Algorithm::Sha256, hash.as_bytes()) let tendermint_hash = Hash::from_bytes(Algorithm::Sha256, hash.as_bytes())
.expect("block hash should be of correct size"); .expect("block hash should be of correct size");
let response = self let response = self.rpc_client.get_block_by_hash(tendermint_hash).await?;
.rpc_client
.block_by_hash(tendermint_hash)
.await
.map_err(ChainCommunicationError::from_other)?;
let received_hash = H256::from_slice(response.block_id.hash.as_bytes()); let received_hash = H256::from_slice(response.block_id.hash.as_bytes());
@ -218,11 +195,7 @@ impl HyperlaneProvider for CosmosProvider {
let tendermint_hash = Hash::from_bytes(Algorithm::Sha256, hash.as_bytes()) let tendermint_hash = Hash::from_bytes(Algorithm::Sha256, hash.as_bytes())
.expect("transaction hash should be of correct size"); .expect("transaction hash should be of correct size");
let response = self let response = self.rpc_client.get_tx_by_hash(tendermint_hash).await?;
.rpc_client
.tx(tendermint_hash, false)
.await
.map_err(Into::<HyperlaneCosmosError>::into)?;
let received_hash = H256::from_slice(response.hash.as_bytes()); let received_hash = H256::from_slice(response.hash.as_bytes());
@ -269,7 +242,7 @@ impl HyperlaneProvider for CosmosProvider {
} }
async fn is_contract(&self, address: &H256) -> ChainResult<bool> { async fn is_contract(&self, address: &H256) -> ChainResult<bool> {
match self.grpc_client.wasm_contract_info().await { match self.grpc_provider.wasm_contract_info().await {
Ok(c) => Ok(true), Ok(c) => Ok(true),
Err(e) => Ok(false), Err(e) => Ok(false),
} }
@ -277,7 +250,7 @@ impl HyperlaneProvider for CosmosProvider {
async fn get_balance(&self, address: String) -> ChainResult<U256> { async fn get_balance(&self, address: String) -> ChainResult<U256> {
Ok(self Ok(self
.grpc_client .grpc_provider
.get_balance(address, self.connection_conf.get_canonical_asset()) .get_balance(address, self.connection_conf.get_canonical_asset())
.await?) .await?)
} }

@ -123,7 +123,7 @@ pub struct WasmGrpcProvider {
conf: ConnectionConf, conf: ConnectionConf,
/// A contract address that can be used as the default /// A contract address that can be used as the default
/// for queries / sends / estimates. /// for queries / sends / estimates.
contract_address: Option<CosmosAddress>, contract_address: CosmosAddress,
/// Signer for transactions. /// Signer for transactions.
signer: Option<Signer>, signer: Option<Signer>,
/// GRPC Channel that can be cheaply cloned. /// GRPC Channel that can be cheaply cloned.
@ -138,7 +138,7 @@ impl WasmGrpcProvider {
domain: HyperlaneDomain, domain: HyperlaneDomain,
conf: ConnectionConf, conf: ConnectionConf,
gas_price: CosmosAmount, gas_price: CosmosAmount,
locator: Option<ContractLocator>, locator: ContractLocator,
signer: Option<Signer>, signer: Option<Signer>,
) -> ChainResult<Self> { ) -> ChainResult<Self> {
// get all the configured grpc urls and convert them to a Vec<Endpoint> // get all the configured grpc urls and convert them to a Vec<Endpoint>
@ -156,15 +156,11 @@ impl WasmGrpcProvider {
let fallback_provider = builder.build(); let fallback_provider = builder.build();
let provider = CosmosFallbackProvider::new(fallback_provider); let provider = CosmosFallbackProvider::new(fallback_provider);
let contract_address = locator let contract_address = CosmosAddress::from_h256(
.map(|l| { locator.address,
CosmosAddress::from_h256( &conf.get_bech32_prefix(),
l.address, conf.get_contract_address_bytes(),
&conf.get_bech32_prefix(), )?;
conf.get_contract_address_bytes(),
)
})
.transpose()?;
Ok(Self { Ok(Self {
domain, domain,
@ -446,11 +442,8 @@ impl WasmGrpcProvider {
}) })
} }
fn get_contract_address(&self) -> Result<&CosmosAddress, ChainCommunicationError> { fn get_contract_address(&self) -> &CosmosAddress {
let contract_address = self.contract_address.as_ref().ok_or_else(|| { &self.contract_address
ChainCommunicationError::from_other_str("No contract address available")
})?;
Ok(contract_address)
} }
} }
@ -488,7 +481,7 @@ impl WasmProvider for WasmGrpcProvider {
where where
T: Serialize + Send + Sync + Clone + Debug, T: Serialize + Send + Sync + Clone + Debug,
{ {
let contract_address = self.get_contract_address()?; let contract_address = self.get_contract_address();
let query_data = serde_json::to_string(&payload)?.as_bytes().to_vec(); let query_data = serde_json::to_string(&payload)?.as_bytes().to_vec();
let response = self let response = self
.provider .provider
@ -522,7 +515,7 @@ impl WasmProvider for WasmGrpcProvider {
} }
async fn wasm_contract_info(&self) -> ChainResult<ContractInfo> { async fn wasm_contract_info(&self) -> ChainResult<ContractInfo> {
let contract_address = self.get_contract_address()?; let contract_address = self.get_contract_address();
let response = self let response = self
.provider .provider
.call(move |provider| { .call(move |provider| {
@ -557,7 +550,7 @@ impl WasmProvider for WasmGrpcProvider {
T: Serialize + Send + Sync + Clone + Debug, T: Serialize + Send + Sync + Clone + Debug,
{ {
let signer = self.get_signer()?; let signer = self.get_signer()?;
let contract_address = self.get_contract_address()?; let contract_address = self.get_contract_address();
let msgs = vec![MsgExecuteContract { let msgs = vec![MsgExecuteContract {
sender: signer.address.clone(), sender: signer.address.clone(),
contract: contract_address.address(), contract: contract_address.address(),
@ -625,7 +618,7 @@ impl WasmProvider for WasmGrpcProvider {
// Estimating gas requires a signer, which we can reasonably expect to have // Estimating gas requires a signer, which we can reasonably expect to have
// since we need one to send a tx with the estimated gas anyways. // since we need one to send a tx with the estimated gas anyways.
let signer = self.get_signer()?; let signer = self.get_signer()?;
let contract_address = self.get_contract_address()?; let contract_address = self.get_contract_address();
let msg = MsgExecuteContract { let msg = MsgExecuteContract {
sender: signer.address.clone(), sender: signer.address.clone(),
contract: contract_address.address(), contract: contract_address.address(),

@ -49,7 +49,7 @@ async fn test_wasm_contract_info_no_contract() {
fn provider(address: &str) -> WasmGrpcProvider { fn provider(address: &str) -> WasmGrpcProvider {
let domain = HyperlaneDomain::Known(KnownHyperlaneDomain::Neutron); let domain = HyperlaneDomain::Known(KnownHyperlaneDomain::Neutron);
let address = CosmosAddress::from_str(address).unwrap(); let address = CosmosAddress::from_str(address).unwrap();
let locator = Some(ContractLocator::new(&domain, address.digest())); let locator = ContractLocator::new(&domain, address.digest());
WasmGrpcProvider::new( WasmGrpcProvider::new(
domain.clone(), domain.clone(),

@ -1,251 +1,5 @@
use async_trait::async_trait; pub use client::CosmosRpcClient;
use cosmrs::rpc::client::Client; pub use provider::{CosmosWasmRpcProvider, ParsedEvent, WasmRpcProvider};
use hyperlane_core::{ChainCommunicationError, ChainResult, ContractLocator, LogMeta, H256, U256};
use sha256::digest;
use std::fmt::Debug;
use tendermint::abci::{Event, EventAttribute};
use tendermint::hash::Algorithm;
use tendermint::Hash;
use tendermint_rpc::endpoint::block::Response as BlockResponse;
use tendermint_rpc::endpoint::block_results::Response as BlockResultsResponse;
use tendermint_rpc::HttpClient;
use tracing::{debug, instrument, trace};
use crate::address::CosmosAddress; mod client;
use crate::{ConnectionConf, CosmosProvider, HyperlaneCosmosError}; mod provider;
#[async_trait]
/// Trait for wasm indexer. Use rpc provider
pub trait WasmIndexer: Send + Sync {
/// Get the finalized block height.
async fn get_finalized_block_number(&self) -> ChainResult<u32>;
/// Get logs for the given block using the given parser.
async fn get_logs_in_block<T>(
&self,
block_number: u32,
parser: for<'a> fn(&'a Vec<EventAttribute>) -> ChainResult<ParsedEvent<T>>,
cursor_label: &'static str,
) -> ChainResult<Vec<(T, LogMeta)>>
where
T: Send + Sync + PartialEq + Debug + 'static;
}
#[derive(Debug, Eq, PartialEq)]
/// An event parsed from the RPC response.
pub struct ParsedEvent<T: PartialEq> {
contract_address: String,
event: T,
}
impl<T: PartialEq> ParsedEvent<T> {
/// Create a new ParsedEvent.
pub fn new(contract_address: String, event: T) -> Self {
Self {
contract_address,
event,
}
}
/// Get the inner event
pub fn inner(self) -> T {
self.event
}
}
#[derive(Debug, Clone)]
/// Cosmwasm RPC Provider
pub struct CosmosWasmIndexer {
provider: CosmosProvider,
contract_address: CosmosAddress,
target_event_kind: String,
reorg_period: u32,
}
impl CosmosWasmIndexer {
const WASM_TYPE: &'static str = "wasm";
/// create new Cosmwasm RPC Provider
pub fn new(
conf: ConnectionConf,
locator: ContractLocator,
event_type: String,
reorg_period: u32,
) -> ChainResult<Self> {
let provider = CosmosProvider::new(
locator.domain.clone(),
conf.clone(),
Some(locator.clone()),
None,
)?;
Ok(Self {
provider,
contract_address: CosmosAddress::from_h256(
locator.address,
conf.get_bech32_prefix().as_str(),
conf.get_contract_address_bytes(),
)?,
target_event_kind: format!("{}-{}", Self::WASM_TYPE, event_type),
reorg_period,
})
}
async fn get_block(client: HttpClient, block_number: u32) -> ChainResult<BlockResponse> {
Ok(client
.block(block_number)
.await
.map_err(Into::<HyperlaneCosmosError>::into)?)
}
async fn get_block_results(
client: HttpClient,
block_number: u32,
) -> ChainResult<BlockResultsResponse> {
Ok(client
.block_results(block_number)
.await
.map_err(Into::<HyperlaneCosmosError>::into)?)
}
async fn get_latest_block(client: HttpClient) -> ChainResult<BlockResponse> {
Ok(client
.latest_block()
.await
.map_err(Into::<HyperlaneCosmosError>::into)?)
}
}
impl CosmosWasmIndexer {
// Iterate through all txs, filter out failed txs, find target events
// in successful txs, and parse them.
fn handle_txs<T>(
&self,
block: BlockResponse,
block_results: BlockResultsResponse,
parser: for<'a> fn(&'a Vec<EventAttribute>) -> ChainResult<ParsedEvent<T>>,
cursor_label: &'static str,
) -> Vec<(T, LogMeta)>
where
T: PartialEq + Debug + 'static,
{
let Some(tx_results) = block_results.txs_results else {
return vec![];
};
let tx_hashes: Vec<H256> = block
.clone()
.block
.data
.into_iter()
.filter_map(|tx| hex::decode(digest(tx.as_slice())).ok())
.filter_map(|hash| {
Hash::from_bytes(Algorithm::Sha256, hash.as_slice())
.ok()
.map(|hash| H256::from_slice(hash.as_bytes()))
})
.collect();
tx_results
.into_iter()
.enumerate()
.filter_map(move |(idx, tx)| {
let Some(tx_hash) = tx_hashes.get(idx) else {
debug!(?tx, "No tx hash found for tx");
return None;
};
if tx.code.is_err() {
debug!(?tx_hash, "Not indexing failed transaction");
return None;
}
Some(self.handle_tx(block.clone(), tx.events, *tx_hash, idx, parser))
})
.flatten()
.collect()
}
// Iter through all events in the tx, looking for any target events
// made by the contract we are indexing.
fn handle_tx<T>(
&self,
block: BlockResponse,
tx_events: Vec<Event>,
tx_hash: H256,
transaction_index: usize,
parser: for<'a> fn(&'a Vec<EventAttribute>) -> ChainResult<ParsedEvent<T>>,
) -> impl Iterator<Item = (T, LogMeta)> + '_
where
T: PartialEq + 'static,
{
tx_events.into_iter().enumerate().filter_map(move |(log_idx, event)| {
if event.kind.as_str() != self.target_event_kind {
return None;
}
parser(&event.attributes)
.map_err(|err| {
// This can happen if we attempt to parse an event that just happens
// to have the same name but a different structure.
tracing::trace!(?err, tx_hash=?tx_hash, log_idx, ?event, "Failed to parse event attributes");
})
.ok()
.and_then(|parsed_event| {
// This is crucial! We need to make sure that the contract address
// in the event matches the contract address we are indexing.
// Otherwise, we might index events from other contracts that happen
// to have the same target event name.
if parsed_event.contract_address != self.contract_address.address() {
trace!(tx_hash=?tx_hash, log_idx, ?event, "Event contract address does not match indexer contract address");
return None;
}
Some((parsed_event.event, LogMeta {
address: self.contract_address.digest(),
block_number: block.block.header.height.into(),
block_hash: H256::from_slice(block.block_id.hash.as_bytes()),
transaction_id: H256::from_slice(tx_hash.as_bytes()).into(),
transaction_index: transaction_index as u64,
log_index: U256::from(log_idx),
}))
})
})
}
}
#[async_trait]
impl WasmIndexer for CosmosWasmIndexer {
#[instrument(err, skip(self))]
#[allow(clippy::blocks_in_conditions)] // TODO: `rustc` 1.80.1 clippy issue
async fn get_finalized_block_number(&self) -> ChainResult<u32> {
let latest_block = Self::get_latest_block(self.provider.rpc().clone()).await?;
let latest_height: u32 = latest_block
.block
.header
.height
.value()
.try_into()
.map_err(ChainCommunicationError::from_other)?;
Ok(latest_height.saturating_sub(self.reorg_period))
}
#[instrument(err, skip(self, parser))]
#[allow(clippy::blocks_in_conditions)] // TODO: `rustc` 1.80.1 clippy issue
async fn get_logs_in_block<T>(
&self,
block_number: u32,
parser: for<'a> fn(&'a Vec<EventAttribute>) -> ChainResult<ParsedEvent<T>>,
cursor_label: &'static str,
) -> ChainResult<Vec<(T, LogMeta)>>
where
T: Send + Sync + PartialEq + Debug + 'static,
{
let client = self.provider.rpc().clone();
debug!(?block_number, cursor_label, domain=?self.provider.domain, "Getting logs in block");
// The two calls below could be made in parallel, but on cosmos rate limiting is a bigger problem
// than indexing latency, so we do them sequentially.
let block = Self::get_block(client.clone(), block_number).await?;
let block_results = Self::get_block_results(client.clone(), block_number).await?;
Ok(self.handle_txs(block, block_results, parser, cursor_label))
}
}

@ -0,0 +1,77 @@
use cosmrs::proto::tendermint::blocksync::BlockResponse;
use tendermint::Hash;
use tendermint_rpc::client::CompatMode;
use tendermint_rpc::endpoint::{block, block_by_hash, block_results, tx};
use tendermint_rpc::{Client, HttpClient};
use hyperlane_core::ChainResult;
use crate::{ConnectionConf, HyperlaneCosmosError};
/// Thin wrapper around Cosmos RPC client with error mapping
#[derive(Clone, Debug)]
pub struct CosmosRpcClient {
client: HttpClient,
}
impl CosmosRpcClient {
/// Create new `CosmosRpcClient`
pub fn new(conf: &ConnectionConf) -> ChainResult<Self> {
let client = HttpClient::builder(
conf.get_rpc_url()
.parse()
.map_err(Into::<HyperlaneCosmosError>::into)?,
)
// Consider supporting different compatibility modes.
.compat_mode(CompatMode::latest())
.build()
.map_err(Into::<HyperlaneCosmosError>::into)?;
Ok(Self { client })
}
/// Request block by block height
pub async fn get_block(&self, height: u32) -> ChainResult<block::Response> {
Ok(self
.client
.block(height)
.await
.map_err(Into::<HyperlaneCosmosError>::into)?)
}
/// Request block results by block height
pub async fn get_block_results(&self, height: u32) -> ChainResult<block_results::Response> {
Ok(self
.client
.block_results(height)
.await
.map_err(Into::<HyperlaneCosmosError>::into)?)
}
/// Request block by block hash
pub async fn get_block_by_hash(&self, hash: Hash) -> ChainResult<block_by_hash::Response> {
Ok(self
.client
.block_by_hash(hash)
.await
.map_err(Into::<HyperlaneCosmosError>::into)?)
}
/// Request the latest block
pub async fn get_latest_block(&self) -> ChainResult<block::Response> {
Ok(self
.client
.latest_block()
.await
.map_err(Into::<HyperlaneCosmosError>::into)?)
}
/// Request transaction by transaction hash
pub async fn get_tx_by_hash(&self, hash: Hash) -> ChainResult<tx::Response> {
Ok(self
.client
.tx(hash, false)
.await
.map_err(Into::<HyperlaneCosmosError>::into)?)
}
}

@ -0,0 +1,277 @@
use std::fmt::Debug;
use async_trait::async_trait;
use cosmrs::cosmwasm::MsgExecuteContract;
use cosmrs::rpc::client::Client;
use futures::StreamExt;
use sha256::digest;
use tendermint::abci::{Event, EventAttribute};
use tendermint::hash::Algorithm;
use tendermint::Hash;
use tendermint_rpc::client::CompatMode;
use tendermint_rpc::endpoint::block::Response as BlockResponse;
use tendermint_rpc::endpoint::block_results::Response as BlockResultsResponse;
use tendermint_rpc::endpoint::tx;
use tendermint_rpc::HttpClient;
use time::OffsetDateTime;
use tracing::{debug, instrument, trace};
use hyperlane_core::{
ChainCommunicationError, ChainResult, ContractLocator, HyperlaneDomain, LogMeta, H256, U256,
};
use crate::address::CosmosAddress;
use crate::rpc::CosmosRpcClient;
use crate::{ConnectionConf, CosmosProvider, HyperlaneCosmosError};
#[async_trait]
/// Trait for wasm indexer. Use rpc provider
pub trait WasmRpcProvider: Send + Sync {
/// Get the finalized block height.
async fn get_finalized_block_number(&self) -> ChainResult<u32>;
/// Get logs for the given block using the given parser.
async fn get_logs_in_block<T>(
&self,
block_number: u32,
parser: for<'a> fn(&'a Vec<EventAttribute>) -> ChainResult<ParsedEvent<T>>,
cursor_label: &'static str,
) -> ChainResult<Vec<(T, LogMeta)>>
where
T: Send + Sync + PartialEq + Debug + 'static;
/// Get logs for the given transaction using the given parser.
async fn get_logs_in_tx<T>(
&self,
tx_hash: Hash,
parser: for<'a> fn(&'a Vec<EventAttribute>) -> ChainResult<ParsedEvent<T>>,
cursor_label: &'static str,
) -> ChainResult<Vec<(T, LogMeta)>>
where
T: Send + Sync + PartialEq + Debug + 'static;
}
#[derive(Debug, Eq, PartialEq)]
/// An event parsed from the RPC response.
pub struct ParsedEvent<T: PartialEq> {
contract_address: String,
event: T,
}
impl<T: PartialEq> ParsedEvent<T> {
/// Create a new ParsedEvent.
pub fn new(contract_address: String, event: T) -> Self {
Self {
contract_address,
event,
}
}
/// Get the inner event
pub fn inner(self) -> T {
self.event
}
}
#[derive(Debug, Clone)]
/// Cosmwasm RPC Provider
pub struct CosmosWasmRpcProvider {
domain: HyperlaneDomain,
contract_address: CosmosAddress,
target_event_kind: String,
reorg_period: u32,
rpc_client: CosmosRpcClient,
}
impl CosmosWasmRpcProvider {
const WASM_TYPE: &'static str = "wasm";
/// create new Cosmwasm RPC Provider
pub fn new(
conf: ConnectionConf,
locator: ContractLocator,
event_type: String,
reorg_period: u32,
) -> ChainResult<Self> {
let rpc_client = CosmosRpcClient::new(&conf)?;
Ok(Self {
domain: locator.domain.clone(),
contract_address: CosmosAddress::from_h256(
locator.address,
conf.get_bech32_prefix().as_str(),
conf.get_contract_address_bytes(),
)?,
target_event_kind: format!("{}-{}", Self::WASM_TYPE, event_type),
reorg_period,
rpc_client,
})
}
}
impl CosmosWasmRpcProvider {
// Iterate through all txs, filter out failed txs, find target events
// in successful txs, and parse them.
fn handle_txs<T>(
&self,
block: BlockResponse,
block_results: BlockResultsResponse,
parser: for<'a> fn(&'a Vec<EventAttribute>) -> ChainResult<ParsedEvent<T>>,
cursor_label: &'static str,
) -> Vec<(T, LogMeta)>
where
T: PartialEq + Debug + 'static,
{
let Some(tx_results) = block_results.txs_results else {
return vec![];
};
let tx_hashes: Vec<Hash> = block
.clone()
.block
.data
.into_iter()
.filter_map(|tx| hex::decode(digest(tx.as_slice())).ok())
.filter_map(|hash| Hash::from_bytes(Algorithm::Sha256, hash.as_slice()).ok())
.collect();
tx_results
.into_iter()
.enumerate()
.filter_map(move |(idx, tx)| {
let Some(tx_hash) = tx_hashes.get(idx) else {
debug!(?tx, "No tx hash found for tx");
return None;
};
if tx.code.is_err() {
debug!(?tx_hash, "Not indexing failed transaction");
return None;
}
// We construct a simplified structure `tx::Response` here so that we can
// reuse `handle_tx` method below.
let tx_response = tx::Response {
hash: *tx_hash,
height: block_results.height,
index: idx as u32,
tx_result: tx,
tx: vec![],
proof: None,
};
let block_hash = H256::from_slice(block.block_id.hash.as_bytes());
Some(self.handle_tx(tx_response, block_hash, parser))
})
.flatten()
.collect()
}
// Iter through all events in the tx, looking for any target events
// made by the contract we are indexing.
fn handle_tx<T>(
&self,
tx: tx::Response,
block_hash: H256,
parser: for<'a> fn(&'a Vec<EventAttribute>) -> ChainResult<ParsedEvent<T>>,
) -> impl Iterator<Item = (T, LogMeta)> + '_
where
T: PartialEq + 'static,
{
let tx_events = tx.tx_result.events;
let tx_hash = tx.hash;
let tx_index = tx.index;
let block_height = tx.height;
tx_events.into_iter().enumerate().filter_map(move |(log_idx, event)| {
if event.kind.as_str() != self.target_event_kind {
return None;
}
parser(&event.attributes)
.map_err(|err| {
// This can happen if we attempt to parse an event that just happens
// to have the same name but a different structure.
trace!(?err, tx_hash=?tx_hash, log_idx, ?event, "Failed to parse event attributes");
})
.ok()
.and_then(|parsed_event| {
// This is crucial! We need to make sure that the contract address
// in the event matches the contract address we are indexing.
// Otherwise, we might index events from other contracts that happen
// to have the same target event name.
if parsed_event.contract_address != self.contract_address.address() {
trace!(tx_hash=?tx_hash, log_idx, ?event, "Event contract address does not match indexer contract address");
return None;
}
Some((parsed_event.event, LogMeta {
address: self.contract_address.digest(),
block_number: block_height.value(),
block_hash,
transaction_id: H256::from_slice(tx_hash.as_bytes()).into(),
transaction_index: tx_index as u64,
log_index: U256::from(log_idx),
}))
})
})
}
}
#[async_trait]
impl WasmRpcProvider for CosmosWasmRpcProvider {
#[instrument(err, skip(self))]
#[allow(clippy::blocks_in_conditions)] // TODO: `rustc` 1.80.1 clippy issue
async fn get_finalized_block_number(&self) -> ChainResult<u32> {
let latest_block = self.rpc_client.get_latest_block().await?;
let latest_height: u32 = latest_block
.block
.header
.height
.value()
.try_into()
.map_err(ChainCommunicationError::from_other)?;
Ok(latest_height.saturating_sub(self.reorg_period))
}
#[instrument(err, skip(self, parser))]
#[allow(clippy::blocks_in_conditions)] // TODO: `rustc` 1.80.1 clippy issue
async fn get_logs_in_block<T>(
&self,
block_number: u32,
parser: for<'a> fn(&'a Vec<EventAttribute>) -> ChainResult<ParsedEvent<T>>,
cursor_label: &'static str,
) -> ChainResult<Vec<(T, LogMeta)>>
where
T: Send + Sync + PartialEq + Debug + 'static,
{
debug!(?block_number, cursor_label, domain=?self.domain, "Getting logs in block");
// The two calls below could be made in parallel, but on cosmos rate limiting is a bigger problem
// than indexing latency, so we do them sequentially.
let block = self.rpc_client.get_block(block_number).await?;
let block_results = self.rpc_client.get_block_results(block_number).await?;
Ok(self.handle_txs(block, block_results, parser, cursor_label))
}
#[instrument(err, skip(self, parser))]
#[allow(clippy::blocks_in_conditions)] // TODO: `rustc` 1.80.1 clippy issue
async fn get_logs_in_tx<T>(
&self,
hash: Hash,
parser: for<'a> fn(&'a Vec<EventAttribute>) -> ChainResult<ParsedEvent<T>>,
cursor_label: &'static str,
) -> ChainResult<Vec<(T, LogMeta)>>
where
T: Send + Sync + PartialEq + Debug + 'static,
{
debug!(?hash, cursor_label, domain=?self.domain, "Getting logs in transaction");
let tx = self.rpc_client.get_tx_by_hash(hash).await?;
let block = self.rpc_client.get_block(tx.height.value() as u32).await?;
let block_hash = H256::from_slice(block.block_id.hash.as_bytes());
Ok(self.handle_tx(tx, block_hash, parser).collect())
}
}

@ -35,7 +35,7 @@ impl CosmosRoutingIsm {
let provider = CosmosProvider::new( let provider = CosmosProvider::new(
locator.domain.clone(), locator.domain.clone(),
conf.clone(), conf.clone(),
Some(locator.clone()), locator.clone(),
signer, signer,
)?; )?;

@ -6,13 +6,15 @@ use base64::{engine::general_purpose::STANDARD as BASE64, Engine};
use futures::future; use futures::future;
use once_cell::sync::Lazy; use once_cell::sync::Lazy;
use tendermint::abci::EventAttribute; use tendermint::abci::EventAttribute;
use tendermint::hash::Algorithm;
use tendermint::Hash;
use tokio::task::JoinHandle; use tokio::task::JoinHandle;
use tracing::warn; use tracing::warn;
use hyperlane_core::{ChainCommunicationError, ChainResult, Indexed, LogMeta}; use hyperlane_core::{ChainCommunicationError, ChainResult, Indexed, LogMeta, H256};
use crate::grpc::{WasmGrpcProvider, WasmProvider}; use crate::grpc::{WasmGrpcProvider, WasmProvider};
use crate::rpc::{CosmosWasmIndexer, ParsedEvent, WasmIndexer}; use crate::rpc::{CosmosWasmRpcProvider, ParsedEvent, WasmRpcProvider};
type FutureChainResults<T> = Vec<JoinHandle<(ChainResult<Vec<(T, LogMeta)>>, u32)>>; type FutureChainResults<T> = Vec<JoinHandle<(ChainResult<Vec<(T, LogMeta)>>, u32)>>;
@ -43,21 +45,37 @@ pub(crate) async fn get_block_height_for_lag(
pub(crate) fn parse_logs_in_range<T: PartialEq + Send + Sync + Debug + 'static>( pub(crate) fn parse_logs_in_range<T: PartialEq + Send + Sync + Debug + 'static>(
range: RangeInclusive<u32>, range: RangeInclusive<u32>,
indexer: Box<CosmosWasmIndexer>, provider: Box<CosmosWasmRpcProvider>,
parser: for<'a> fn(&'a Vec<EventAttribute>) -> ChainResult<ParsedEvent<T>>, parser: for<'a> fn(&'a Vec<EventAttribute>) -> ChainResult<ParsedEvent<T>>,
label: &'static str, label: &'static str,
) -> FutureChainResults<T> { ) -> FutureChainResults<T> {
range range
.map(|block_number| { .map(|block_number| {
let indexer = indexer.clone(); let provider = provider.clone();
tokio::spawn(async move { tokio::spawn(async move {
let logs = indexer.get_logs_in_block(block_number, parser, label).await; let logs = provider
.get_logs_in_block(block_number, parser, label)
.await;
(logs, block_number) (logs, block_number)
}) })
}) })
.collect() .collect()
} }
pub(crate) async fn parse_logs_in_tx<T: PartialEq + Send + Sync + Debug + 'static>(
hash: &H256,
provider: Box<CosmosWasmRpcProvider>,
parser: for<'a> fn(&'a Vec<EventAttribute>) -> ChainResult<ParsedEvent<T>>,
label: &'static str,
) -> ChainResult<Vec<(T, LogMeta)>> {
let tendermint_hash = Hash::from_bytes(Algorithm::Sha256, hash.as_bytes())
.expect("transaction hash should be of correct size");
provider
.get_logs_in_tx(tendermint_hash, parser, label)
.await
}
#[allow(clippy::type_complexity)] #[allow(clippy::type_complexity)]
pub(crate) async fn execute_and_parse_log_futures<T: Into<Indexed<T>>>( pub(crate) async fn execute_and_parse_log_futures<T: Into<Indexed<T>>>(
logs_futures: Vec<JoinHandle<(Result<Vec<(T, LogMeta)>, ChainCommunicationError>, u32)>>, logs_futures: Vec<JoinHandle<(Result<Vec<(T, LogMeta)>, ChainCommunicationError>, u32)>>,

@ -35,7 +35,7 @@ impl CosmosValidatorAnnounce {
let provider = CosmosProvider::new( let provider = CosmosProvider::new(
locator.domain.clone(), locator.domain.clone(),
conf.clone(), conf.clone(),
Some(locator.clone()), locator.clone(),
signer, signer,
)?; )?;

@ -188,7 +188,7 @@ impl ChainConf {
let provider = CosmosProvider::new( let provider = CosmosProvider::new(
locator.domain.clone(), locator.domain.clone(),
conf.clone(), conf.clone(),
Some(locator.clone()), locator.clone(),
None, None,
)?; )?;
Ok(Box::new(provider) as Box<dyn HyperlaneProvider>) Ok(Box::new(provider) as Box<dyn HyperlaneProvider>)

Loading…
Cancel
Save