feat: Support SequenceAware cursors for Deliveries (#4889)

### Description

Support SequenceAware cursors for Deliveries

### Related issues

- Contributes into
https://github.com/hyperlane-xyz/hyperlane-monorepo/issues/4271

### Backward compatibility

Yes (need adding column `sequence` to `delivered_message` table before
merging

### Testing

E2E Ethereum and Sealevel tests

---------

Co-authored-by: Danil Nemirovsky <4614623+ameten@users.noreply.github.com>
pull/4931/head
Danil Nemirovsky 2 days ago committed by GitHub
parent 665a7b8d89
commit a94c50f103
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
  1. 3
      rust/main/agents/scraper/migration/src/m20230309_000004_create_table_delivered_message.rs
  2. 3
      rust/main/agents/scraper/src/db/generated/delivered_message.rs
  3. 142
      rust/main/agents/scraper/src/db/message.rs
  4. 41
      rust/main/agents/scraper/src/store/deliveries.rs
  5. 2
      rust/main/agents/scraper/src/store/dispatches.rs
  6. 59
      rust/main/chains/hyperlane-sealevel/src/mailbox.rs
  7. 7
      rust/main/chains/hyperlane-sealevel/src/rpc/client.rs

@ -47,6 +47,7 @@ impl MigrationTrait for Migration {
.big_integer()
.not_null(),
)
.col(ColumnDef::new(DeliveredMessage::Sequence).big_integer())
.foreign_key(
ForeignKey::create()
.from_col(DeliveredMessage::Domain)
@ -105,4 +106,6 @@ pub enum DeliveredMessage {
DestinationMailbox,
/// Transaction the delivery was included in
DestinationTxId,
/// Sequence when message was delivered
Sequence,
}

@ -19,6 +19,7 @@ pub struct Model {
pub domain: i32,
pub destination_mailbox: Vec<u8>,
pub destination_tx_id: i64,
pub sequence: Option<i64>,
}
#[derive(Copy, Clone, Debug, EnumIter, DeriveColumn)]
@ -29,6 +30,7 @@ pub enum Column {
Domain,
DestinationMailbox,
DestinationTxId,
Sequence,
}
#[derive(Copy, Clone, Debug, EnumIter, DerivePrimaryKey)]
@ -59,6 +61,7 @@ impl ColumnTrait for Column {
Self::Domain => ColumnType::Integer.def(),
Self::DestinationMailbox => ColumnType::Binary(BlobSize::Blob(None)).def(),
Self::DestinationTxId => ColumnType::BigInteger.def(),
Self::Sequence => ColumnType::BigInteger.def().null(),
}
}
}

@ -6,7 +6,7 @@ use sea_orm::{prelude::*, ActiveValue::*, DeriveColumn, EnumIter, Insert, QueryS
use tracing::{debug, instrument, trace};
use hyperlane_core::{
address_to_bytes, bytes_to_address, h256_to_bytes, HyperlaneMessage, LogMeta, H256,
address_to_bytes, bytes_to_address, h256_to_bytes, Delivery, HyperlaneMessage, LogMeta, H256,
};
use migration::OnConflict;
@ -18,6 +18,7 @@ use super::generated::{delivered_message, message};
#[derive(Debug, Clone)]
pub struct StorableDelivery<'a> {
pub message_id: H256,
pub sequence: Option<i64>,
pub meta: &'a LogMeta,
/// The database id of the transaction the delivery event occurred in
pub txn_id: i64,
@ -31,64 +32,54 @@ pub struct StorableMessage<'a> {
}
impl ScraperDb {
/// Get the dispatched message associated with a nonce.
/// Get the delivered message associated with a sequence.
#[instrument(skip(self))]
pub async fn retrieve_message_by_nonce(
pub async fn retrieve_delivery_by_sequence(
&self,
origin_domain: u32,
origin_mailbox: &H256,
nonce: u32,
) -> Result<Option<HyperlaneMessage>> {
#[derive(Copy, Clone, Debug, EnumIter, DeriveColumn)]
enum QueryAs {
Nonce,
}
if let Some(message) = message::Entity::find()
.filter(message::Column::Origin.eq(origin_domain))
.filter(message::Column::OriginMailbox.eq(address_to_bytes(origin_mailbox)))
.filter(message::Column::Nonce.eq(nonce))
destination_domain: u32,
destination_mailbox: &H256,
sequence: u32,
) -> Result<Option<Delivery>> {
if let Some(delivery) = delivered_message::Entity::find()
.filter(delivered_message::Column::Domain.eq(destination_domain))
.filter(
delivered_message::Column::DestinationMailbox
.eq(address_to_bytes(destination_mailbox)),
)
.filter(delivered_message::Column::Sequence.eq(sequence))
.one(&self.0)
.await?
{
Ok(Some(HyperlaneMessage {
// We do not write version to the DB.
version: 3,
origin: message.origin as u32,
destination: message.destination as u32,
nonce: message.nonce as u32,
sender: bytes_to_address(message.sender)?,
recipient: bytes_to_address(message.recipient)?,
body: message.msg_body.unwrap_or(Vec::new()),
}))
let delivery = H256::from_slice(&delivery.msg_id);
Ok(Some(delivery))
} else {
Ok(None)
}
}
/// Get the tx id associated with a dispatched message.
/// Get the tx id of a delivered message associated with a sequence.
#[instrument(skip(self))]
pub async fn retrieve_dispatched_tx_id(
pub async fn retrieve_delivered_message_tx_id(
&self,
origin_domain: u32,
origin_mailbox: &H256,
nonce: u32,
destination_domain: u32,
destination_mailbox: &H256,
sequence: u32,
) -> Result<Option<i64>> {
#[derive(Copy, Clone, Debug, EnumIter, DeriveColumn)]
enum QueryAs {
Nonce,
}
let tx_id = message::Entity::find()
.filter(message::Column::Origin.eq(origin_domain))
.filter(message::Column::OriginMailbox.eq(address_to_bytes(origin_mailbox)))
.filter(message::Column::Nonce.eq(nonce))
.select_only()
.column_as(message::Column::OriginTxId.max(), QueryAs::Nonce)
.group_by(message::Column::Origin)
.into_values::<i64, QueryAs>()
if let Some(delivery) = delivered_message::Entity::find()
.filter(delivered_message::Column::Domain.eq(destination_domain))
.filter(
delivered_message::Column::DestinationMailbox
.eq(address_to_bytes(destination_mailbox)),
)
.filter(delivered_message::Column::Sequence.eq(sequence))
.one(&self.0)
.await?;
Ok(tx_id)
.await?
{
let txn_id = delivery.destination_tx_id;
Ok(Some(txn_id))
} else {
Ok(None)
}
}
async fn latest_deliveries_id(&self, domain: u32, destination_mailbox: Vec<u8>) -> Result<i64> {
@ -147,6 +138,7 @@ impl ScraperDb {
domain: Unchanged(domain as i32),
destination_mailbox: Unchanged(destination_mailbox.clone()),
destination_tx_id: Set(delivery.txn_id),
sequence: Set(delivery.sequence),
})
.collect_vec();
@ -180,6 +172,66 @@ impl ScraperDb {
Ok(new_deliveries_count)
}
/// Get the dispatched message associated with a nonce.
#[instrument(skip(self))]
pub async fn retrieve_dispatched_message_by_nonce(
&self,
origin_domain: u32,
origin_mailbox: &H256,
nonce: u32,
) -> Result<Option<HyperlaneMessage>> {
#[derive(Copy, Clone, Debug, EnumIter, DeriveColumn)]
enum QueryAs {
Nonce,
}
if let Some(message) = message::Entity::find()
.filter(message::Column::Origin.eq(origin_domain))
.filter(message::Column::OriginMailbox.eq(address_to_bytes(origin_mailbox)))
.filter(message::Column::Nonce.eq(nonce))
.one(&self.0)
.await?
{
Ok(Some(HyperlaneMessage {
// We do not write version to the DB.
version: 3,
origin: message.origin as u32,
destination: message.destination as u32,
nonce: message.nonce as u32,
sender: bytes_to_address(message.sender)?,
recipient: bytes_to_address(message.recipient)?,
body: message.msg_body.unwrap_or(Vec::new()),
}))
} else {
Ok(None)
}
}
/// Get the tx id associated with a dispatched message.
#[instrument(skip(self))]
pub async fn retrieve_dispatched_tx_id(
&self,
origin_domain: u32,
origin_mailbox: &H256,
nonce: u32,
) -> Result<Option<i64>> {
#[derive(Copy, Clone, Debug, EnumIter, DeriveColumn)]
enum QueryAs {
Nonce,
}
let tx_id = message::Entity::find()
.filter(message::Column::Origin.eq(origin_domain))
.filter(message::Column::OriginMailbox.eq(address_to_bytes(origin_mailbox)))
.filter(message::Column::Nonce.eq(nonce))
.select_only()
.column_as(message::Column::OriginTxId.max(), QueryAs::Nonce)
.group_by(message::Column::Origin)
.into_values::<i64, QueryAs>()
.one(&self.0)
.await?;
Ok(tx_id)
}
async fn latest_dispatched_id(&self, domain: u32, origin_mailbox: Vec<u8>) -> Result<i64> {
let result = message::Entity::find()
.select_only()

@ -3,7 +3,10 @@ use std::collections::HashMap;
use async_trait::async_trait;
use eyre::Result;
use hyperlane_core::{Delivery, HyperlaneLogStore, Indexed, LogMeta, H512};
use hyperlane_core::{
unwrap_or_none_result, Delivery, HyperlaneLogStore, HyperlaneSequenceAwareIndexerStoreReader,
Indexed, LogMeta, H512,
};
use crate::db::StorableDelivery;
use crate::store::storage::{HyperlaneDbStore, TxnWithId};
@ -25,11 +28,18 @@ impl HyperlaneLogStore<Delivery> for HyperlaneDbStore {
let storable = deliveries
.iter()
.filter_map(|(message_id, meta)| {
txns.get(&meta.transaction_id)
.map(|txn| (*message_id.inner(), meta, txn.id))
txns.get(&meta.transaction_id).map(|txn| {
(
*message_id.inner(),
message_id.sequence.map(|v| v as i64),
meta,
txn.id,
)
})
})
.map(|(message_id, meta, txn_id)| StorableDelivery {
.map(|(message_id, sequence, meta, txn_id)| StorableDelivery {
message_id,
sequence,
meta,
txn_id,
});
@ -41,3 +51,26 @@ impl HyperlaneLogStore<Delivery> for HyperlaneDbStore {
Ok(stored as u32)
}
}
#[async_trait]
impl HyperlaneSequenceAwareIndexerStoreReader<Delivery> for HyperlaneDbStore {
/// Gets a delivered message by its sequence.
async fn retrieve_by_sequence(&self, sequence: u32) -> Result<Option<Delivery>> {
let delivery = self
.db
.retrieve_delivery_by_sequence(self.domain.id(), &self.mailbox_address, sequence)
.await?;
Ok(delivery)
}
/// Gets the block number at which the log occurred.
async fn retrieve_log_block_number_by_sequence(&self, sequence: u32) -> Result<Option<u64>> {
let tx_id = unwrap_or_none_result!(
self.db
.retrieve_delivered_message_tx_id(self.domain.id(), &self.mailbox_address, sequence)
.await?
);
let block_id = unwrap_or_none_result!(self.db.retrieve_block_id(tx_id).await?);
Ok(self.db.retrieve_block_number(block_id).await?)
}
}

@ -46,7 +46,7 @@ impl HyperlaneSequenceAwareIndexerStoreReader<HyperlaneMessage> for HyperlaneDbS
async fn retrieve_by_sequence(&self, sequence: u32) -> Result<Option<HyperlaneMessage>> {
let message = self
.db
.retrieve_message_by_nonce(self.domain.id(), &self.mailbox_address, sequence)
.retrieve_dispatched_message_by_nonce(self.domain.id(), &self.mailbox_address, sequence)
.await?;
Ok(message)
}

@ -9,7 +9,7 @@ use hyperlane_sealevel_interchain_security_module_interface::{
};
use hyperlane_sealevel_mailbox::{
accounts::{
DispatchedMessageAccount, InboxAccount, OutboxAccount, ProcessedMessage,
DispatchedMessageAccount, Inbox, InboxAccount, OutboxAccount, ProcessedMessage,
ProcessedMessageAccount, DISPATCHED_MESSAGE_DISCRIMINATOR, PROCESSED_MESSAGE_DISCRIMINATOR,
},
instruction,
@ -53,12 +53,12 @@ use solana_transaction_status::{
use tracing::{debug, info, instrument, warn};
use hyperlane_core::{
accumulator::incremental::IncrementalMerkle, BatchItem, ChainCommunicationError,
ChainCommunicationError::ContractError, ChainResult, Checkpoint, ContractLocator, Decode as _,
Encode as _, FixedPointNumber, HyperlaneAbi, HyperlaneChain, HyperlaneContract,
HyperlaneDomain, HyperlaneMessage, HyperlaneProvider, Indexed, Indexer, KnownHyperlaneDomain,
LogMeta, Mailbox, MerkleTreeHook, ReorgPeriod, SequenceAwareIndexer, TxCostEstimate, TxOutcome,
H256, H512, U256,
accumulator::incremental::IncrementalMerkle, config::StrOrIntParseError, BatchItem,
ChainCommunicationError, ChainCommunicationError::ContractError, ChainResult, Checkpoint,
ContractLocator, Decode as _, Encode as _, FixedPointNumber, HyperlaneAbi, HyperlaneChain,
HyperlaneContract, HyperlaneDomain, HyperlaneMessage, HyperlaneProvider, Indexed, Indexer,
KnownHyperlaneDomain, LogMeta, Mailbox, MerkleTreeHook, ReorgPeriod, SequenceAwareIndexer,
TxCostEstimate, TxOutcome, H256, H512, U256,
};
use crate::account::{search_accounts_by_discriminator, search_and_validate_account};
@ -400,6 +400,17 @@ impl SealevelMailbox {
),
))
}
async fn get_inbox(&self) -> ChainResult<Box<Inbox>> {
let account = self
.rpc()
.get_account_with_finalized_commitment(&self.inbox.0)
.await?;
let inbox = InboxAccount::fetch(&mut account.data.as_ref())
.map_err(ChainCommunicationError::from_other)?
.into_inner();
Ok(inbox)
}
}
impl HyperlaneContract for SealevelMailbox {
@ -451,11 +462,7 @@ impl Mailbox for SealevelMailbox {
#[instrument(err, ret, skip(self))]
async fn default_ism(&self) -> ChainResult<H256> {
let inbox_account = self.rpc().get_account(&self.inbox.0).await?;
let inbox = InboxAccount::fetch(&mut inbox_account.data.as_ref())
.map_err(ChainCommunicationError::from_other)?
.into_inner();
let inbox = self.get_inbox().await?;
Ok(inbox.default_ism.to_bytes().into())
}
@ -783,18 +790,18 @@ impl SealevelMailboxIndexer {
.map_err(Into::<ChainCommunicationError>::into)
}
async fn get_delivered_message_with_nonce(
async fn get_delivered_message_with_sequence(
&self,
nonce: u32,
sequence: u32,
) -> ChainResult<(Indexed<H256>, LogMeta)> {
let nonce_bytes = nonce.to_le_bytes();
let sequence_bytes = sequence.to_le_bytes();
let delivered_message_id_offset = 1 + 8 + 8; // the offset to get the `message_id` field
let delivered_message_id_length = 32;
let accounts = search_accounts_by_discriminator(
self.rpc(),
&self.program_id,
&PROCESSED_MESSAGE_DISCRIMINATOR,
&nonce_bytes,
&sequence_bytes,
delivered_message_id_offset,
delivered_message_id_length,
)
@ -819,7 +826,7 @@ impl SealevelMailboxIndexer {
let log_meta = if self.advanced_log_meta {
self.delivered_message_log_meta(
U256::from(nonce),
U256::from(sequence),
&valid_message_storage_pda_pubkey,
&delivered_message_account.slot,
)
@ -837,7 +844,10 @@ impl SealevelMailboxIndexer {
}
};
Ok((message_id.into(), log_meta))
let mut indexed = Indexed::from(message_id);
indexed.sequence = Some(sequence);
Ok((indexed, log_meta))
}
fn delivered_message_account(&self, account: &Account) -> ChainResult<Pubkey> {
@ -928,7 +938,7 @@ impl Indexer<H256> for SealevelMailboxIndexer {
let message_capacity = range.end().saturating_sub(*range.start());
let mut message_ids = Vec::with_capacity(message_capacity as usize);
for nonce in range {
message_ids.push(self.get_delivered_message_with_nonce(nonce).await?);
message_ids.push(self.get_delivered_message_with_sequence(nonce).await?);
}
Ok(message_ids)
}
@ -944,10 +954,15 @@ impl Indexer<H256> for SealevelMailboxIndexer {
#[async_trait]
impl SequenceAwareIndexer<H256> for SealevelMailboxIndexer {
async fn latest_sequence_count_and_tip(&self) -> ChainResult<(Option<u32>, u32)> {
// TODO: implement when sealevel scraper support is implemented
info!("Message delivery indexing not implemented");
let inbox = self.mailbox.get_inbox().await?;
let sequence = inbox
.processed_count
.try_into()
.map_err(StrOrIntParseError::from)?;
let tip = self.mailbox.provider.rpc().get_slot().await?;
Ok((Some(1), tip))
Ok((Some(sequence), tip))
}
}

@ -47,13 +47,6 @@ impl SealevelRpcClient {
.map_err(Into::into)
}
pub async fn get_account(&self, pubkey: &Pubkey) -> ChainResult<Account> {
self.0
.get_account(pubkey)
.await
.map_err(ChainCommunicationError::from_other)
}
/// Simulates an Instruction that will return a list of AccountMetas.
pub async fn get_account_metas(
&self,

Loading…
Cancel
Save