From a94c50f103ad33fa56953cc20626bac2fe57b1a4 Mon Sep 17 00:00:00 2001 From: Danil Nemirovsky Date: Mon, 2 Dec 2024 15:30:49 +0000 Subject: [PATCH] feat: Support SequenceAware cursors for Deliveries (#4889) ### Description Support SequenceAware cursors for Deliveries ### Related issues - Contributes into https://github.com/hyperlane-xyz/hyperlane-monorepo/issues/4271 ### Backward compatibility Yes (need adding column `sequence` to `delivered_message` table before merging ### Testing E2E Ethereum and Sealevel tests --------- Co-authored-by: Danil Nemirovsky <4614623+ameten@users.noreply.github.com> --- ...9_000004_create_table_delivered_message.rs | 3 + .../src/db/generated/delivered_message.rs | 3 + rust/main/agents/scraper/src/db/message.rs | 142 ++++++++++++------ .../agents/scraper/src/store/deliveries.rs | 41 ++++- .../agents/scraper/src/store/dispatches.rs | 2 +- .../chains/hyperlane-sealevel/src/mailbox.rs | 59 +++++--- .../hyperlane-sealevel/src/rpc/client.rs | 7 - 7 files changed, 178 insertions(+), 79 deletions(-) diff --git a/rust/main/agents/scraper/migration/src/m20230309_000004_create_table_delivered_message.rs b/rust/main/agents/scraper/migration/src/m20230309_000004_create_table_delivered_message.rs index c7e4a1dc5..45cb71ec4 100644 --- a/rust/main/agents/scraper/migration/src/m20230309_000004_create_table_delivered_message.rs +++ b/rust/main/agents/scraper/migration/src/m20230309_000004_create_table_delivered_message.rs @@ -47,6 +47,7 @@ impl MigrationTrait for Migration { .big_integer() .not_null(), ) + .col(ColumnDef::new(DeliveredMessage::Sequence).big_integer()) .foreign_key( ForeignKey::create() .from_col(DeliveredMessage::Domain) @@ -105,4 +106,6 @@ pub enum DeliveredMessage { DestinationMailbox, /// Transaction the delivery was included in DestinationTxId, + /// Sequence when message was delivered + Sequence, } diff --git a/rust/main/agents/scraper/src/db/generated/delivered_message.rs b/rust/main/agents/scraper/src/db/generated/delivered_message.rs index 542d1c8e7..a3da2fe6d 100644 --- a/rust/main/agents/scraper/src/db/generated/delivered_message.rs +++ b/rust/main/agents/scraper/src/db/generated/delivered_message.rs @@ -19,6 +19,7 @@ pub struct Model { pub domain: i32, pub destination_mailbox: Vec, pub destination_tx_id: i64, + pub sequence: Option, } #[derive(Copy, Clone, Debug, EnumIter, DeriveColumn)] @@ -29,6 +30,7 @@ pub enum Column { Domain, DestinationMailbox, DestinationTxId, + Sequence, } #[derive(Copy, Clone, Debug, EnumIter, DerivePrimaryKey)] @@ -59,6 +61,7 @@ impl ColumnTrait for Column { Self::Domain => ColumnType::Integer.def(), Self::DestinationMailbox => ColumnType::Binary(BlobSize::Blob(None)).def(), Self::DestinationTxId => ColumnType::BigInteger.def(), + Self::Sequence => ColumnType::BigInteger.def().null(), } } } diff --git a/rust/main/agents/scraper/src/db/message.rs b/rust/main/agents/scraper/src/db/message.rs index 0796400f6..fc3904904 100644 --- a/rust/main/agents/scraper/src/db/message.rs +++ b/rust/main/agents/scraper/src/db/message.rs @@ -6,7 +6,7 @@ use sea_orm::{prelude::*, ActiveValue::*, DeriveColumn, EnumIter, Insert, QueryS use tracing::{debug, instrument, trace}; use hyperlane_core::{ - address_to_bytes, bytes_to_address, h256_to_bytes, HyperlaneMessage, LogMeta, H256, + address_to_bytes, bytes_to_address, h256_to_bytes, Delivery, HyperlaneMessage, LogMeta, H256, }; use migration::OnConflict; @@ -18,6 +18,7 @@ use super::generated::{delivered_message, message}; #[derive(Debug, Clone)] pub struct StorableDelivery<'a> { pub message_id: H256, + pub sequence: Option, pub meta: &'a LogMeta, /// The database id of the transaction the delivery event occurred in pub txn_id: i64, @@ -31,64 +32,54 @@ pub struct StorableMessage<'a> { } impl ScraperDb { - /// Get the dispatched message associated with a nonce. + /// Get the delivered message associated with a sequence. #[instrument(skip(self))] - pub async fn retrieve_message_by_nonce( + pub async fn retrieve_delivery_by_sequence( &self, - origin_domain: u32, - origin_mailbox: &H256, - nonce: u32, - ) -> Result> { - #[derive(Copy, Clone, Debug, EnumIter, DeriveColumn)] - enum QueryAs { - Nonce, - } - if let Some(message) = message::Entity::find() - .filter(message::Column::Origin.eq(origin_domain)) - .filter(message::Column::OriginMailbox.eq(address_to_bytes(origin_mailbox))) - .filter(message::Column::Nonce.eq(nonce)) + destination_domain: u32, + destination_mailbox: &H256, + sequence: u32, + ) -> Result> { + if let Some(delivery) = delivered_message::Entity::find() + .filter(delivered_message::Column::Domain.eq(destination_domain)) + .filter( + delivered_message::Column::DestinationMailbox + .eq(address_to_bytes(destination_mailbox)), + ) + .filter(delivered_message::Column::Sequence.eq(sequence)) .one(&self.0) .await? { - Ok(Some(HyperlaneMessage { - // We do not write version to the DB. - version: 3, - origin: message.origin as u32, - destination: message.destination as u32, - nonce: message.nonce as u32, - sender: bytes_to_address(message.sender)?, - recipient: bytes_to_address(message.recipient)?, - body: message.msg_body.unwrap_or(Vec::new()), - })) + let delivery = H256::from_slice(&delivery.msg_id); + Ok(Some(delivery)) } else { Ok(None) } } - /// Get the tx id associated with a dispatched message. + /// Get the tx id of a delivered message associated with a sequence. #[instrument(skip(self))] - pub async fn retrieve_dispatched_tx_id( + pub async fn retrieve_delivered_message_tx_id( &self, - origin_domain: u32, - origin_mailbox: &H256, - nonce: u32, + destination_domain: u32, + destination_mailbox: &H256, + sequence: u32, ) -> Result> { - #[derive(Copy, Clone, Debug, EnumIter, DeriveColumn)] - enum QueryAs { - Nonce, - } - - let tx_id = message::Entity::find() - .filter(message::Column::Origin.eq(origin_domain)) - .filter(message::Column::OriginMailbox.eq(address_to_bytes(origin_mailbox))) - .filter(message::Column::Nonce.eq(nonce)) - .select_only() - .column_as(message::Column::OriginTxId.max(), QueryAs::Nonce) - .group_by(message::Column::Origin) - .into_values::() + if let Some(delivery) = delivered_message::Entity::find() + .filter(delivered_message::Column::Domain.eq(destination_domain)) + .filter( + delivered_message::Column::DestinationMailbox + .eq(address_to_bytes(destination_mailbox)), + ) + .filter(delivered_message::Column::Sequence.eq(sequence)) .one(&self.0) - .await?; - Ok(tx_id) + .await? + { + let txn_id = delivery.destination_tx_id; + Ok(Some(txn_id)) + } else { + Ok(None) + } } async fn latest_deliveries_id(&self, domain: u32, destination_mailbox: Vec) -> Result { @@ -147,6 +138,7 @@ impl ScraperDb { domain: Unchanged(domain as i32), destination_mailbox: Unchanged(destination_mailbox.clone()), destination_tx_id: Set(delivery.txn_id), + sequence: Set(delivery.sequence), }) .collect_vec(); @@ -180,6 +172,66 @@ impl ScraperDb { Ok(new_deliveries_count) } + /// Get the dispatched message associated with a nonce. + #[instrument(skip(self))] + pub async fn retrieve_dispatched_message_by_nonce( + &self, + origin_domain: u32, + origin_mailbox: &H256, + nonce: u32, + ) -> Result> { + #[derive(Copy, Clone, Debug, EnumIter, DeriveColumn)] + enum QueryAs { + Nonce, + } + if let Some(message) = message::Entity::find() + .filter(message::Column::Origin.eq(origin_domain)) + .filter(message::Column::OriginMailbox.eq(address_to_bytes(origin_mailbox))) + .filter(message::Column::Nonce.eq(nonce)) + .one(&self.0) + .await? + { + Ok(Some(HyperlaneMessage { + // We do not write version to the DB. + version: 3, + origin: message.origin as u32, + destination: message.destination as u32, + nonce: message.nonce as u32, + sender: bytes_to_address(message.sender)?, + recipient: bytes_to_address(message.recipient)?, + body: message.msg_body.unwrap_or(Vec::new()), + })) + } else { + Ok(None) + } + } + + /// Get the tx id associated with a dispatched message. + #[instrument(skip(self))] + pub async fn retrieve_dispatched_tx_id( + &self, + origin_domain: u32, + origin_mailbox: &H256, + nonce: u32, + ) -> Result> { + #[derive(Copy, Clone, Debug, EnumIter, DeriveColumn)] + enum QueryAs { + Nonce, + } + + let tx_id = message::Entity::find() + .filter(message::Column::Origin.eq(origin_domain)) + .filter(message::Column::OriginMailbox.eq(address_to_bytes(origin_mailbox))) + .filter(message::Column::Nonce.eq(nonce)) + .select_only() + .column_as(message::Column::OriginTxId.max(), QueryAs::Nonce) + .group_by(message::Column::Origin) + .into_values::() + .one(&self.0) + .await?; + Ok(tx_id) + } + async fn latest_dispatched_id(&self, domain: u32, origin_mailbox: Vec) -> Result { let result = message::Entity::find() .select_only() diff --git a/rust/main/agents/scraper/src/store/deliveries.rs b/rust/main/agents/scraper/src/store/deliveries.rs index dc805baaa..5238cfa96 100644 --- a/rust/main/agents/scraper/src/store/deliveries.rs +++ b/rust/main/agents/scraper/src/store/deliveries.rs @@ -3,7 +3,10 @@ use std::collections::HashMap; use async_trait::async_trait; use eyre::Result; -use hyperlane_core::{Delivery, HyperlaneLogStore, Indexed, LogMeta, H512}; +use hyperlane_core::{ + unwrap_or_none_result, Delivery, HyperlaneLogStore, HyperlaneSequenceAwareIndexerStoreReader, + Indexed, LogMeta, H512, +}; use crate::db::StorableDelivery; use crate::store::storage::{HyperlaneDbStore, TxnWithId}; @@ -25,11 +28,18 @@ impl HyperlaneLogStore for HyperlaneDbStore { let storable = deliveries .iter() .filter_map(|(message_id, meta)| { - txns.get(&meta.transaction_id) - .map(|txn| (*message_id.inner(), meta, txn.id)) + txns.get(&meta.transaction_id).map(|txn| { + ( + *message_id.inner(), + message_id.sequence.map(|v| v as i64), + meta, + txn.id, + ) + }) }) - .map(|(message_id, meta, txn_id)| StorableDelivery { + .map(|(message_id, sequence, meta, txn_id)| StorableDelivery { message_id, + sequence, meta, txn_id, }); @@ -41,3 +51,26 @@ impl HyperlaneLogStore for HyperlaneDbStore { Ok(stored as u32) } } + +#[async_trait] +impl HyperlaneSequenceAwareIndexerStoreReader for HyperlaneDbStore { + /// Gets a delivered message by its sequence. + async fn retrieve_by_sequence(&self, sequence: u32) -> Result> { + let delivery = self + .db + .retrieve_delivery_by_sequence(self.domain.id(), &self.mailbox_address, sequence) + .await?; + Ok(delivery) + } + + /// Gets the block number at which the log occurred. + async fn retrieve_log_block_number_by_sequence(&self, sequence: u32) -> Result> { + let tx_id = unwrap_or_none_result!( + self.db + .retrieve_delivered_message_tx_id(self.domain.id(), &self.mailbox_address, sequence) + .await? + ); + let block_id = unwrap_or_none_result!(self.db.retrieve_block_id(tx_id).await?); + Ok(self.db.retrieve_block_number(block_id).await?) + } +} diff --git a/rust/main/agents/scraper/src/store/dispatches.rs b/rust/main/agents/scraper/src/store/dispatches.rs index 31aecedd9..b9c98f047 100644 --- a/rust/main/agents/scraper/src/store/dispatches.rs +++ b/rust/main/agents/scraper/src/store/dispatches.rs @@ -46,7 +46,7 @@ impl HyperlaneSequenceAwareIndexerStoreReader for HyperlaneDbS async fn retrieve_by_sequence(&self, sequence: u32) -> Result> { let message = self .db - .retrieve_message_by_nonce(self.domain.id(), &self.mailbox_address, sequence) + .retrieve_dispatched_message_by_nonce(self.domain.id(), &self.mailbox_address, sequence) .await?; Ok(message) } diff --git a/rust/main/chains/hyperlane-sealevel/src/mailbox.rs b/rust/main/chains/hyperlane-sealevel/src/mailbox.rs index 6c87ad4fa..d64963e24 100644 --- a/rust/main/chains/hyperlane-sealevel/src/mailbox.rs +++ b/rust/main/chains/hyperlane-sealevel/src/mailbox.rs @@ -9,7 +9,7 @@ use hyperlane_sealevel_interchain_security_module_interface::{ }; use hyperlane_sealevel_mailbox::{ accounts::{ - DispatchedMessageAccount, InboxAccount, OutboxAccount, ProcessedMessage, + DispatchedMessageAccount, Inbox, InboxAccount, OutboxAccount, ProcessedMessage, ProcessedMessageAccount, DISPATCHED_MESSAGE_DISCRIMINATOR, PROCESSED_MESSAGE_DISCRIMINATOR, }, instruction, @@ -53,12 +53,12 @@ use solana_transaction_status::{ use tracing::{debug, info, instrument, warn}; use hyperlane_core::{ - accumulator::incremental::IncrementalMerkle, BatchItem, ChainCommunicationError, - ChainCommunicationError::ContractError, ChainResult, Checkpoint, ContractLocator, Decode as _, - Encode as _, FixedPointNumber, HyperlaneAbi, HyperlaneChain, HyperlaneContract, - HyperlaneDomain, HyperlaneMessage, HyperlaneProvider, Indexed, Indexer, KnownHyperlaneDomain, - LogMeta, Mailbox, MerkleTreeHook, ReorgPeriod, SequenceAwareIndexer, TxCostEstimate, TxOutcome, - H256, H512, U256, + accumulator::incremental::IncrementalMerkle, config::StrOrIntParseError, BatchItem, + ChainCommunicationError, ChainCommunicationError::ContractError, ChainResult, Checkpoint, + ContractLocator, Decode as _, Encode as _, FixedPointNumber, HyperlaneAbi, HyperlaneChain, + HyperlaneContract, HyperlaneDomain, HyperlaneMessage, HyperlaneProvider, Indexed, Indexer, + KnownHyperlaneDomain, LogMeta, Mailbox, MerkleTreeHook, ReorgPeriod, SequenceAwareIndexer, + TxCostEstimate, TxOutcome, H256, H512, U256, }; use crate::account::{search_accounts_by_discriminator, search_and_validate_account}; @@ -400,6 +400,17 @@ impl SealevelMailbox { ), )) } + + async fn get_inbox(&self) -> ChainResult> { + let account = self + .rpc() + .get_account_with_finalized_commitment(&self.inbox.0) + .await?; + let inbox = InboxAccount::fetch(&mut account.data.as_ref()) + .map_err(ChainCommunicationError::from_other)? + .into_inner(); + Ok(inbox) + } } impl HyperlaneContract for SealevelMailbox { @@ -451,11 +462,7 @@ impl Mailbox for SealevelMailbox { #[instrument(err, ret, skip(self))] async fn default_ism(&self) -> ChainResult { - let inbox_account = self.rpc().get_account(&self.inbox.0).await?; - let inbox = InboxAccount::fetch(&mut inbox_account.data.as_ref()) - .map_err(ChainCommunicationError::from_other)? - .into_inner(); - + let inbox = self.get_inbox().await?; Ok(inbox.default_ism.to_bytes().into()) } @@ -783,18 +790,18 @@ impl SealevelMailboxIndexer { .map_err(Into::::into) } - async fn get_delivered_message_with_nonce( + async fn get_delivered_message_with_sequence( &self, - nonce: u32, + sequence: u32, ) -> ChainResult<(Indexed, LogMeta)> { - let nonce_bytes = nonce.to_le_bytes(); + let sequence_bytes = sequence.to_le_bytes(); let delivered_message_id_offset = 1 + 8 + 8; // the offset to get the `message_id` field let delivered_message_id_length = 32; let accounts = search_accounts_by_discriminator( self.rpc(), &self.program_id, &PROCESSED_MESSAGE_DISCRIMINATOR, - &nonce_bytes, + &sequence_bytes, delivered_message_id_offset, delivered_message_id_length, ) @@ -819,7 +826,7 @@ impl SealevelMailboxIndexer { let log_meta = if self.advanced_log_meta { self.delivered_message_log_meta( - U256::from(nonce), + U256::from(sequence), &valid_message_storage_pda_pubkey, &delivered_message_account.slot, ) @@ -837,7 +844,10 @@ impl SealevelMailboxIndexer { } }; - Ok((message_id.into(), log_meta)) + let mut indexed = Indexed::from(message_id); + indexed.sequence = Some(sequence); + + Ok((indexed, log_meta)) } fn delivered_message_account(&self, account: &Account) -> ChainResult { @@ -928,7 +938,7 @@ impl Indexer for SealevelMailboxIndexer { let message_capacity = range.end().saturating_sub(*range.start()); let mut message_ids = Vec::with_capacity(message_capacity as usize); for nonce in range { - message_ids.push(self.get_delivered_message_with_nonce(nonce).await?); + message_ids.push(self.get_delivered_message_with_sequence(nonce).await?); } Ok(message_ids) } @@ -944,10 +954,15 @@ impl Indexer for SealevelMailboxIndexer { #[async_trait] impl SequenceAwareIndexer for SealevelMailboxIndexer { async fn latest_sequence_count_and_tip(&self) -> ChainResult<(Option, u32)> { - // TODO: implement when sealevel scraper support is implemented - info!("Message delivery indexing not implemented"); + let inbox = self.mailbox.get_inbox().await?; + let sequence = inbox + .processed_count + .try_into() + .map_err(StrOrIntParseError::from)?; + let tip = self.mailbox.provider.rpc().get_slot().await?; - Ok((Some(1), tip)) + + Ok((Some(sequence), tip)) } } diff --git a/rust/main/chains/hyperlane-sealevel/src/rpc/client.rs b/rust/main/chains/hyperlane-sealevel/src/rpc/client.rs index 4d557c0d1..59adb671f 100644 --- a/rust/main/chains/hyperlane-sealevel/src/rpc/client.rs +++ b/rust/main/chains/hyperlane-sealevel/src/rpc/client.rs @@ -47,13 +47,6 @@ impl SealevelRpcClient { .map_err(Into::into) } - pub async fn get_account(&self, pubkey: &Pubkey) -> ChainResult { - self.0 - .get_account(pubkey) - .await - .map_err(ChainCommunicationError::from_other) - } - /// Simulates an Instruction that will return a list of AccountMetas. pub async fn get_account_metas( &self,