diff --git a/rust/main/agents/scraper/src/agent.rs b/rust/main/agents/scraper/src/agent.rs index fd9afc6d7..97f6bd3db 100644 --- a/rust/main/agents/scraper/src/agent.rs +++ b/rust/main/agents/scraper/src/agent.rs @@ -3,16 +3,17 @@ use std::{collections::HashMap, sync::Arc}; use async_trait::async_trait; use derive_more::AsRef; use futures::future::try_join_all; +use hyperlane_core::{Delivery, HyperlaneDomain, HyperlaneMessage, InterchainGasPayment, H512}; +use tokio::{sync::mpsc::Receiver as MpscReceiver, task::JoinHandle}; +use tracing::{info_span, instrument::Instrumented, trace, Instrument}; + use hyperlane_base::{ broadcast::BroadcastMpscSender, metrics::AgentMetrics, settings::IndexSettings, AgentMetadata, BaseAgent, ChainMetrics, ContractSyncMetrics, ContractSyncer, CoreMetrics, HyperlaneAgentCore, MetricsUpdater, SyncOptions, }; -use hyperlane_core::{Delivery, HyperlaneDomain, HyperlaneMessage, InterchainGasPayment, H512}; -use tokio::{sync::mpsc::Receiver as MpscReceiver, task::JoinHandle}; -use tracing::{info_span, instrument::Instrumented, trace, Instrument}; -use crate::{chain_scraper::HyperlaneSqlDb, db::ScraperDb, settings::ScraperSettings}; +use crate::{db::ScraperDb, settings::ScraperSettings, store::HyperlaneDbStore}; /// A message explorer scraper agent #[derive(Debug, AsRef)] @@ -31,7 +32,7 @@ pub struct Scraper { #[derive(Debug)] struct ChainScraper { index_settings: IndexSettings, - db: HyperlaneSqlDb, + store: HyperlaneDbStore, domain: HyperlaneDomain, } @@ -59,7 +60,7 @@ impl BaseAgent for Scraper { for domain in settings.chains_to_scrape.iter() { let chain_setup = settings.chain_setup(domain).expect("Missing chain config"); - let db = HyperlaneSqlDb::new( + let store = HyperlaneDbStore::new( db.clone(), chain_setup.addresses.mailbox, domain.clone(), @@ -74,7 +75,7 @@ impl BaseAgent for Scraper { domain.id(), ChainScraper { domain: domain.clone(), - db, + store, index_settings: chain_setup.index.clone(), }, ); @@ -132,7 +133,7 @@ impl Scraper { /// This will spawn long-running contract sync tasks async fn scrape(&self, domain_id: u32) -> Instrumented> { let scraper = self.scrapers.get(&domain_id).unwrap(); - let db = scraper.db.clone(); + let store = scraper.store.clone(); let index_settings = scraper.index_settings.clone(); let domain = scraper.domain.clone(); @@ -142,7 +143,7 @@ impl Scraper { domain.clone(), self.core_metrics.clone(), self.contract_sync_metrics.clone(), - db.clone(), + store.clone(), index_settings.clone(), ) .await; @@ -152,7 +153,7 @@ impl Scraper { domain.clone(), self.core_metrics.clone(), self.contract_sync_metrics.clone(), - db.clone(), + store.clone(), index_settings.clone(), ) .await, @@ -162,7 +163,7 @@ impl Scraper { domain, self.core_metrics.clone(), self.contract_sync_metrics.clone(), - db, + store, index_settings.clone(), BroadcastMpscSender::::map_get_receiver(maybe_broadcaster.as_ref()).await, ) @@ -183,7 +184,7 @@ impl Scraper { domain: HyperlaneDomain, metrics: Arc, contract_sync_metrics: Arc, - db: HyperlaneSqlDb, + store: HyperlaneDbStore, index_settings: IndexSettings, ) -> ( Instrumented>, @@ -196,7 +197,7 @@ impl Scraper { &domain, &metrics.clone(), &contract_sync_metrics.clone(), - db.into(), + store.into(), ) .await .unwrap(); @@ -217,7 +218,7 @@ impl Scraper { domain: HyperlaneDomain, metrics: Arc, contract_sync_metrics: Arc, - db: HyperlaneSqlDb, + store: HyperlaneDbStore, index_settings: IndexSettings, ) -> Instrumented> { let sync = self @@ -227,7 +228,7 @@ impl Scraper { &domain, &metrics.clone(), &contract_sync_metrics.clone(), - Arc::new(db.clone()) as _, + Arc::new(store.clone()) as _, ) .await .unwrap(); @@ -248,7 +249,7 @@ impl Scraper { domain: HyperlaneDomain, metrics: Arc, contract_sync_metrics: Arc, - db: HyperlaneSqlDb, + store: HyperlaneDbStore, index_settings: IndexSettings, tx_id_receiver: Option>, ) -> Instrumented> { @@ -259,7 +260,7 @@ impl Scraper { &domain, &metrics.clone(), &contract_sync_metrics.clone(), - Arc::new(db.clone()), + Arc::new(store.clone()), ) .await .unwrap(); diff --git a/rust/main/agents/scraper/src/db/message.rs b/rust/main/agents/scraper/src/db/message.rs index 423c24058..0796400f6 100644 --- a/rust/main/agents/scraper/src/db/message.rs +++ b/rust/main/agents/scraper/src/db/message.rs @@ -31,36 +31,6 @@ pub struct StorableMessage<'a> { } impl ScraperDb { - /// Get the highest message nonce that is stored in the database. - #[instrument(skip(self))] - pub async fn last_message_nonce( - &self, - origin_domain: u32, - origin_mailbox: &H256, - ) -> Result> { - #[derive(Copy, Clone, Debug, EnumIter, DeriveColumn)] - enum QueryAs { - Nonce, - } - - let last_nonce = message::Entity::find() - .filter(message::Column::Origin.eq(origin_domain)) - .filter(message::Column::OriginMailbox.eq(address_to_bytes(origin_mailbox))) - .select_only() - .column_as(message::Column::Nonce.max(), QueryAs::Nonce) - .into_values::() - .one(&self.0) - .await? - .map(|idx| idx as u32); - debug!( - ?last_nonce, - origin_domain, - ?origin_mailbox, - "Queried last message nonce from database" - ); - Ok(last_nonce) - } - /// Get the dispatched message associated with a nonce. #[instrument(skip(self))] pub async fn retrieve_message_by_nonce( diff --git a/rust/main/agents/scraper/src/main.rs b/rust/main/agents/scraper/src/main.rs index cab1a0963..f9b0d5971 100644 --- a/rust/main/agents/scraper/src/main.rs +++ b/rust/main/agents/scraper/src/main.rs @@ -17,13 +17,12 @@ use agent::Scraper; use eyre::Result; use hyperlane_base::agent_main; -mod db; - mod agent; -mod chain_scraper; mod conversions; mod date_time; +mod db; mod settings; +mod store; #[tokio::main(flavor = "current_thread")] async fn main() -> Result<()> { diff --git a/rust/main/agents/scraper/src/store.rs b/rust/main/agents/scraper/src/store.rs new file mode 100644 index 000000000..58c55a244 --- /dev/null +++ b/rust/main/agents/scraper/src/store.rs @@ -0,0 +1,6 @@ +pub use storage::HyperlaneDbStore; + +mod deliveries; +mod dispatches; +mod payments; +mod storage; diff --git a/rust/main/agents/scraper/src/store/deliveries.rs b/rust/main/agents/scraper/src/store/deliveries.rs new file mode 100644 index 000000000..dc805baaa --- /dev/null +++ b/rust/main/agents/scraper/src/store/deliveries.rs @@ -0,0 +1,43 @@ +use std::collections::HashMap; + +use async_trait::async_trait; +use eyre::Result; + +use hyperlane_core::{Delivery, HyperlaneLogStore, Indexed, LogMeta, H512}; + +use crate::db::StorableDelivery; +use crate::store::storage::{HyperlaneDbStore, TxnWithId}; + +#[async_trait] +impl HyperlaneLogStore for HyperlaneDbStore { + /// Store delivered message ids from the destination mailbox into the database. + /// We store only delivered messages ids from blocks and transaction which we could successfully + /// insert into database. + async fn store_logs(&self, deliveries: &[(Indexed, LogMeta)]) -> Result { + if deliveries.is_empty() { + return Ok(0); + } + let txns: HashMap = self + .ensure_blocks_and_txns(deliveries.iter().map(|r| &r.1)) + .await? + .map(|t| (t.hash, t)) + .collect(); + let storable = deliveries + .iter() + .filter_map(|(message_id, meta)| { + txns.get(&meta.transaction_id) + .map(|txn| (*message_id.inner(), meta, txn.id)) + }) + .map(|(message_id, meta, txn_id)| StorableDelivery { + message_id, + meta, + txn_id, + }); + + let stored = self + .db + .store_deliveries(self.domain.id(), self.mailbox_address, storable) + .await?; + Ok(stored as u32) + } +} diff --git a/rust/main/agents/scraper/src/store/dispatches.rs b/rust/main/agents/scraper/src/store/dispatches.rs new file mode 100644 index 000000000..31aecedd9 --- /dev/null +++ b/rust/main/agents/scraper/src/store/dispatches.rs @@ -0,0 +1,64 @@ +use std::collections::HashMap; + +use async_trait::async_trait; +use eyre::Result; + +use hyperlane_core::{ + unwrap_or_none_result, HyperlaneLogStore, HyperlaneMessage, + HyperlaneSequenceAwareIndexerStoreReader, Indexed, LogMeta, H512, +}; + +use crate::db::StorableMessage; +use crate::store::storage::{HyperlaneDbStore, TxnWithId}; + +#[async_trait] +impl HyperlaneLogStore for HyperlaneDbStore { + /// Store dispatched messages from the origin mailbox into the database. + /// We store only messages from blocks and transaction which we could successfully insert + /// into database. + async fn store_logs(&self, messages: &[(Indexed, LogMeta)]) -> Result { + if messages.is_empty() { + return Ok(0); + } + let txns: HashMap = self + .ensure_blocks_and_txns(messages.iter().map(|r| &r.1)) + .await? + .map(|t| (t.hash, t)) + .collect(); + let storable = messages + .iter() + .filter_map(|(message, meta)| { + txns.get(&meta.transaction_id) + .map(|t| (message.inner().clone(), meta, t.id)) + }) + .map(|(msg, meta, txn_id)| StorableMessage { msg, meta, txn_id }); + let stored = self + .db + .store_dispatched_messages(self.domain.id(), &self.mailbox_address, storable) + .await?; + Ok(stored as u32) + } +} + +#[async_trait] +impl HyperlaneSequenceAwareIndexerStoreReader for HyperlaneDbStore { + /// Gets a message by its nonce. + async fn retrieve_by_sequence(&self, sequence: u32) -> Result> { + let message = self + .db + .retrieve_message_by_nonce(self.domain.id(), &self.mailbox_address, sequence) + .await?; + Ok(message) + } + + /// Gets the block number at which the log occurred. + async fn retrieve_log_block_number_by_sequence(&self, sequence: u32) -> Result> { + let tx_id = unwrap_or_none_result!( + self.db + .retrieve_dispatched_tx_id(self.domain.id(), &self.mailbox_address, sequence) + .await? + ); + let block_id = unwrap_or_none_result!(self.db.retrieve_block_id(tx_id).await?); + Ok(self.db.retrieve_block_number(block_id).await?) + } +} diff --git a/rust/main/agents/scraper/src/store/payments.rs b/rust/main/agents/scraper/src/store/payments.rs new file mode 100644 index 000000000..4c5ee465f --- /dev/null +++ b/rust/main/agents/scraper/src/store/payments.rs @@ -0,0 +1,43 @@ +use std::collections::HashMap; + +use async_trait::async_trait; +use eyre::Result; + +use hyperlane_core::{HyperlaneLogStore, Indexed, InterchainGasPayment, LogMeta, H512}; + +use crate::db::StorablePayment; +use crate::store::storage::HyperlaneDbStore; + +#[async_trait] +impl HyperlaneLogStore for HyperlaneDbStore { + /// Store interchain gas payments into the database. + /// We store only interchain gas payments from blocks and transaction which we could + /// successfully insert into database. + async fn store_logs( + &self, + payments: &[(Indexed, LogMeta)], + ) -> Result { + if payments.is_empty() { + return Ok(0); + } + let txns: HashMap = self + .ensure_blocks_and_txns(payments.iter().map(|r| &r.1)) + .await? + .map(|t| (t.hash, t)) + .collect(); + let storable = payments + .iter() + .filter_map(|(payment, meta)| { + txns.get(&meta.transaction_id) + .map(|txn| (payment.inner(), meta, txn.id)) + }) + .map(|(payment, meta, txn_id)| StorablePayment { + payment, + meta, + txn_id, + }); + + let stored = self.db.store_payments(self.domain.id(), storable).await?; + Ok(stored as u32) + } +} diff --git a/rust/main/agents/scraper/src/chain_scraper/mod.rs b/rust/main/agents/scraper/src/store/storage.rs similarity index 67% rename from rust/main/agents/scraper/src/chain_scraper/mod.rs rename to rust/main/agents/scraper/src/store/storage.rs index 0be5b2ef1..5dcde4ed3 100644 --- a/rust/main/agents/scraper/src/chain_scraper/mod.rs +++ b/rust/main/agents/scraper/src/store/storage.rs @@ -12,15 +12,11 @@ use tracing::{trace, warn}; use hyperlane_base::settings::IndexSettings; use hyperlane_core::{ - unwrap_or_none_result, BlockId, BlockInfo, Delivery, HyperlaneDomain, HyperlaneLogStore, - HyperlaneMessage, HyperlaneProvider, HyperlaneSequenceAwareIndexerStoreReader, - HyperlaneWatermarkedLogStore, Indexed, InterchainGasPayment, LogMeta, H256, H512, + BlockId, BlockInfo, HyperlaneDomain, HyperlaneLogStore, HyperlaneProvider, + HyperlaneWatermarkedLogStore, LogMeta, H256, H512, }; -use crate::db::{ - BasicBlock, BlockCursor, ScraperDb, StorableDelivery, StorableMessage, StorablePayment, - StorableTxn, -}; +use crate::db::{BasicBlock, BlockCursor, ScraperDb, StorableTxn}; /// Maximum number of records to query at a time. This came about because when a /// lot of messages are sent in a short period of time we were ending up with a @@ -31,16 +27,16 @@ const CHUNK_SIZE: usize = 50; /// A chain scraper is comprised of all the information and contract/provider /// connections needed to scrape the contracts on a single blockchain. #[derive(Clone, Debug)] -pub struct HyperlaneSqlDb { - mailbox_address: H256, - domain: HyperlaneDomain, - db: ScraperDb, +pub struct HyperlaneDbStore { + pub(crate) mailbox_address: H256, + pub(crate) domain: HyperlaneDomain, + pub(crate) db: ScraperDb, provider: Arc, cursor: Arc, } #[allow(unused)] -impl HyperlaneSqlDb { +impl HyperlaneDbStore { pub async fn new( db: ScraperDb, mailbox_address: H256, @@ -61,21 +57,11 @@ impl HyperlaneSqlDb { }) } - pub fn domain(&self) -> &HyperlaneDomain { - &self.domain - } - - pub async fn last_message_nonce(&self) -> Result> { - self.db - .last_message_nonce(self.domain.id(), &self.mailbox_address) - .await - } - /// Takes a list of txn and block hashes and ensure they are all in the /// database. If any are not it will fetch the data and insert them. /// /// Returns the relevant transaction info. - async fn ensure_blocks_and_txns( + pub(crate) async fn ensure_blocks_and_txns( &self, log_meta: impl Iterator, ) -> Result> { @@ -262,7 +248,7 @@ impl HyperlaneSqlDb { self.db .store_blocks( - self.domain().id(), + self.domain.id(), blocks_to_insert .iter_mut() .map(|(_, info)| info.take().unwrap()), @@ -293,129 +279,9 @@ impl HyperlaneSqlDb { } #[async_trait] -impl HyperlaneLogStore for HyperlaneSqlDb { - /// Store dispatched messages from the origin mailbox into the database. - /// We store only messages from blocks and transaction which we could successfully insert - /// into database. - async fn store_logs(&self, messages: &[(Indexed, LogMeta)]) -> Result { - if messages.is_empty() { - return Ok(0); - } - let txns: HashMap = self - .ensure_blocks_and_txns(messages.iter().map(|r| &r.1)) - .await? - .map(|t| (t.hash, t)) - .collect(); - let storable = messages - .iter() - .filter_map(|(message, meta)| { - txns.get(&meta.transaction_id) - .map(|t| (message.inner().clone(), meta, t.id)) - }) - .map(|(msg, meta, txn_id)| StorableMessage { msg, meta, txn_id }); - let stored = self - .db - .store_dispatched_messages(self.domain().id(), &self.mailbox_address, storable) - .await?; - Ok(stored as u32) - } -} - -#[async_trait] -impl HyperlaneLogStore for HyperlaneSqlDb { - /// Store delivered message ids from the destination mailbox into the database. - /// We store only delivered messages ids from blocks and transaction which we could successfully - /// insert into database. - async fn store_logs(&self, deliveries: &[(Indexed, LogMeta)]) -> Result { - if deliveries.is_empty() { - return Ok(0); - } - let txns: HashMap = self - .ensure_blocks_and_txns(deliveries.iter().map(|r| &r.1)) - .await? - .map(|t| (t.hash, t)) - .collect(); - let storable = deliveries - .iter() - .filter_map(|(message_id, meta)| { - txns.get(&meta.transaction_id) - .map(|txn| (*message_id.inner(), meta, txn.id)) - }) - .map(|(message_id, meta, txn_id)| StorableDelivery { - message_id, - meta, - txn_id, - }); - - let stored = self - .db - .store_deliveries(self.domain().id(), self.mailbox_address, storable) - .await?; - Ok(stored as u32) - } -} - -#[async_trait] -impl HyperlaneLogStore for HyperlaneSqlDb { - /// Store interchain gas payments into the database. - /// We store only interchain gas payments from blocks and transaction which we could - /// successfully insert into database. - async fn store_logs( - &self, - payments: &[(Indexed, LogMeta)], - ) -> Result { - if payments.is_empty() { - return Ok(0); - } - let txns: HashMap = self - .ensure_blocks_and_txns(payments.iter().map(|r| &r.1)) - .await? - .map(|t| (t.hash, t)) - .collect(); - let storable = payments - .iter() - .filter_map(|(payment, meta)| { - txns.get(&meta.transaction_id) - .map(|txn| (payment.inner(), meta, txn.id)) - }) - .map(|(payment, meta, txn_id)| StorablePayment { - payment, - meta, - txn_id, - }); - - let stored = self.db.store_payments(self.domain().id(), storable).await?; - Ok(stored as u32) - } -} - -#[async_trait] -impl HyperlaneSequenceAwareIndexerStoreReader for HyperlaneSqlDb { - /// Gets a message by its nonce. - async fn retrieve_by_sequence(&self, sequence: u32) -> Result> { - let message = self - .db - .retrieve_message_by_nonce(self.domain().id(), &self.mailbox_address, sequence) - .await?; - Ok(message) - } - - /// Gets the block number at which the log occurred. - async fn retrieve_log_block_number_by_sequence(&self, sequence: u32) -> Result> { - let tx_id = unwrap_or_none_result!( - self.db - .retrieve_dispatched_tx_id(self.domain().id(), &self.mailbox_address, sequence) - .await? - ); - let block_id = unwrap_or_none_result!(self.db.retrieve_block_id(tx_id).await?); - Ok(self.db.retrieve_block_number(block_id).await?) - } -} - -#[async_trait] -impl HyperlaneWatermarkedLogStore for HyperlaneSqlDb +impl HyperlaneWatermarkedLogStore for HyperlaneDbStore where - HyperlaneSqlDb: HyperlaneLogStore, + HyperlaneDbStore: HyperlaneLogStore, { /// Gets the block number high watermark async fn retrieve_high_watermark(&self) -> Result> { @@ -429,9 +295,9 @@ where } #[derive(Debug, Clone)] -struct TxnWithId { - hash: H512, - id: i64, +pub(crate) struct TxnWithId { + pub hash: H512, + pub id: i64, } #[derive(Debug, Clone)] diff --git a/rust/main/hyperlane-base/src/contract_sync/cursors/rate_limited.rs b/rust/main/hyperlane-base/src/contract_sync/cursors/rate_limited.rs index 242028acb..9428d6bfd 100644 --- a/rust/main/hyperlane-base/src/contract_sync/cursors/rate_limited.rs +++ b/rust/main/hyperlane-base/src/contract_sync/cursors/rate_limited.rs @@ -78,7 +78,7 @@ pub enum SyncDirection { /// performed by `next_action`. pub(crate) struct RateLimitedContractSyncCursor { indexer: Arc>, - db: Arc>, + store: Arc>, tip: u32, last_tip_update: Instant, eta_calculator: SyncerEtaCalculator, @@ -89,14 +89,14 @@ impl RateLimitedContractSyncCursor { /// Construct a new contract sync helper. pub async fn new( indexer: Arc>, - db: Arc>, + store: Arc>, chunk_size: u32, initial_height: u32, ) -> Result { let tip = indexer.get_finalized_block_number().await?; Ok(Self { indexer, - db, + store, tip, last_tip_update: Instant::now(), eta_calculator: SyncerEtaCalculator::new(initial_height, tip, ETA_TIME_WINDOW), @@ -189,7 +189,7 @@ where ) -> Result<()> { // Store a relatively conservative view of the high watermark, which should allow a single watermark to be // safely shared across multiple cursors, so long as they are running sufficiently in sync - self.db + self.store .store_high_watermark(u32::max( self.sync_state.start_block, self.sync_state diff --git a/rust/main/hyperlane-base/src/contract_sync/cursors/sequence_aware/backward.rs b/rust/main/hyperlane-base/src/contract_sync/cursors/sequence_aware/backward.rs index 75c086c24..179ae3dd3 100644 --- a/rust/main/hyperlane-base/src/contract_sync/cursors/sequence_aware/backward.rs +++ b/rust/main/hyperlane-base/src/contract_sync/cursors/sequence_aware/backward.rs @@ -22,8 +22,8 @@ pub(crate) struct BackwardSequenceAwareSyncCursor { /// If in sequence mode, this is the max number of sequences to query. /// If in block mode, this is the max number of blocks to query. chunk_size: u32, - /// A DB used to check which logs have already been indexed. - db: Arc>, + /// A store used to check which logs have already been indexed. + store: Arc>, /// A snapshot of the last log to be indexed, or if no indexing has occurred yet, /// the initial log to start indexing backward from. last_indexed_snapshot: LastIndexedSnapshot, @@ -48,13 +48,13 @@ impl Debug for BackwardSequenceAwareSyncCursor { impl BackwardSequenceAwareSyncCursor { #[instrument( - skip(db), + skip(store), fields(chunk_size, next_sequence, start_block, index_mode), ret )] pub fn new( chunk_size: u32, - db: Arc>, + store: Arc>, current_sequence_count: u32, start_block: u32, index_mode: IndexMode, @@ -69,7 +69,7 @@ impl BackwardSequenceAwareSyncCursor { Self { chunk_size, - db, + store, current_indexing_snapshot: last_indexed_snapshot.previous_target(), last_indexed_snapshot, index_mode, @@ -166,10 +166,10 @@ impl BackwardSequenceAwareSyncCursor { /// log for the sequence number hasn't been indexed. async fn get_sequence_log_block_number(&self, sequence: u32) -> Result> { // Ensure there's a full entry for the sequence. - if self.db.retrieve_by_sequence(sequence).await?.is_some() { + if self.store.retrieve_by_sequence(sequence).await?.is_some() { // And get the block number. if let Some(block_number) = self - .db + .store .retrieve_log_block_number_by_sequence(sequence) .await? { diff --git a/rust/main/hyperlane-base/src/contract_sync/cursors/sequence_aware/forward.rs b/rust/main/hyperlane-base/src/contract_sync/cursors/sequence_aware/forward.rs index 4fcbbff8d..967f4e605 100644 --- a/rust/main/hyperlane-base/src/contract_sync/cursors/sequence_aware/forward.rs +++ b/rust/main/hyperlane-base/src/contract_sync/cursors/sequence_aware/forward.rs @@ -27,8 +27,8 @@ pub(crate) struct ForwardSequenceAwareSyncCursor { /// This is used to check if there are new logs to index and to /// establish targets to index towards. latest_sequence_querier: Arc>, - /// A DB used to check which logs have already been indexed. - db: Arc>, + /// A store used to check which logs have already been indexed. + store: Arc>, /// A snapshot of the last indexed log, or if no indexing has occurred yet, /// the initial log to start indexing forward from. last_indexed_snapshot: LastIndexedSnapshot, @@ -55,14 +55,14 @@ impl Debug for ForwardSequenceAwareSyncCursor { impl ForwardSequenceAwareSyncCursor { #[instrument( - skip(db, latest_sequence_querier), + skip(store, latest_sequence_querier), fields(chunk_size, next_sequence, start_block, index_mode), ret )] pub fn new( chunk_size: u32, latest_sequence_querier: Arc>, - db: Arc>, + store: Arc>, next_sequence: u32, start_block: u32, index_mode: IndexMode, @@ -77,7 +77,7 @@ impl ForwardSequenceAwareSyncCursor { Self { chunk_size, latest_sequence_querier, - db, + store, last_indexed_snapshot, current_indexing_snapshot: TargetSnapshot { sequence: next_sequence, @@ -221,10 +221,10 @@ impl ForwardSequenceAwareSyncCursor { /// log for the sequence number hasn't been indexed. async fn get_sequence_log_block_number(&self, sequence: u32) -> Result> { // Ensure there's a full entry for the sequence. - if self.db.retrieve_by_sequence(sequence).await?.is_some() { + if self.store.retrieve_by_sequence(sequence).await?.is_some() { // And get the block number. if let Some(block_number) = self - .db + .store .retrieve_log_block_number_by_sequence(sequence) .await? { diff --git a/rust/main/hyperlane-base/src/contract_sync/cursors/sequence_aware/mod.rs b/rust/main/hyperlane-base/src/contract_sync/cursors/sequence_aware/mod.rs index 9303438b0..74e7ebe01 100644 --- a/rust/main/hyperlane-base/src/contract_sync/cursors/sequence_aware/mod.rs +++ b/rust/main/hyperlane-base/src/contract_sync/cursors/sequence_aware/mod.rs @@ -73,7 +73,7 @@ impl ForwardBackwardSequenceAwareSyncCursor { /// Construct a new contract sync helper. pub async fn new( latest_sequence_querier: Arc>, - db: Arc>, + store: Arc>, chunk_size: u32, mode: IndexMode, ) -> Result { @@ -86,13 +86,13 @@ impl ForwardBackwardSequenceAwareSyncCursor { let forward_cursor = ForwardSequenceAwareSyncCursor::new( chunk_size, latest_sequence_querier.clone(), - db.clone(), + store.clone(), sequence_count, tip, mode, ); let backward_cursor = - BackwardSequenceAwareSyncCursor::new(chunk_size, db, sequence_count, tip, mode); + BackwardSequenceAwareSyncCursor::new(chunk_size, store, sequence_count, tip, mode); Ok(Self { forward: forward_cursor, backward: backward_cursor, diff --git a/rust/main/hyperlane-base/src/contract_sync/mod.rs b/rust/main/hyperlane-base/src/contract_sync/mod.rs index 8e6f8278f..df9563d8a 100644 --- a/rust/main/hyperlane-base/src/contract_sync/mod.rs +++ b/rust/main/hyperlane-base/src/contract_sync/mod.rs @@ -43,21 +43,26 @@ struct IndexedTxIdAndSequence { /// Extracts chain-specific data (emitted checkpoints, messages, etc) from an /// `indexer` and fills the agent's db with this data. #[derive(Debug)] -pub struct ContractSync, I: Indexer> { +pub struct ContractSync, I: Indexer> { domain: HyperlaneDomain, - db: D, + store: S, indexer: I, metrics: ContractSyncMetrics, broadcast_sender: Option>, _phantom: PhantomData, } -impl, I: Indexer> ContractSync { +impl, I: Indexer> ContractSync { /// Create a new ContractSync - pub fn new(domain: HyperlaneDomain, db: D, indexer: I, metrics: ContractSyncMetrics) -> Self { + pub fn new( + domain: HyperlaneDomain, + store: S, + indexer: I, + metrics: ContractSyncMetrics, + ) -> Self { Self { domain, - db, + store, indexer, metrics, broadcast_sender: T::broadcast_channel_size().map(BroadcastMpscSender::new), @@ -66,10 +71,10 @@ impl, I: Indexer> ContractSync } } -impl ContractSync +impl ContractSync where T: Indexable + Debug + Send + Sync + Clone + Eq + Hash + 'static, - D: HyperlaneLogStore, + S: HyperlaneLogStore, I: Indexer + 'static, { /// The domain that this ContractSync is running on @@ -221,7 +226,7 @@ where let logs = Vec::from_iter(deduped_logs); // Store deliveries - let stored = match self.db.store_logs(&logs).await { + let stored = match self.store.store_logs(&logs).await { Ok(stored) => stored, Err(err) => { warn!(?err, "Error storing logs in db"); @@ -298,7 +303,7 @@ where &self, index_settings: IndexSettings, ) -> Result>> { - let watermark = self.db.retrieve_high_watermark().await.unwrap(); + let watermark = self.store.retrieve_high_watermark().await.unwrap(); let index_settings = IndexSettings { from: watermark.unwrap_or(index_settings.from), chunk_size: index_settings.chunk_size, @@ -307,7 +312,7 @@ where Ok(Box::new( RateLimitedContractSyncCursor::new( Arc::new(self.indexer.clone()), - self.db.clone(), + self.store.clone(), index_settings.chunk_size, index_settings.from, ) @@ -348,7 +353,7 @@ where Ok(Box::new( ForwardBackwardSequenceAwareSyncCursor::new( self.indexer.clone(), - Arc::new(self.db.clone()), + Arc::new(self.store.clone()), index_settings.chunk_size, index_settings.mode, ) diff --git a/rust/main/hyperlane-base/src/settings/base.rs b/rust/main/hyperlane-base/src/settings/base.rs index 6757a545e..0320e4eb7 100644 --- a/rust/main/hyperlane-base/src/settings/base.rs +++ b/rust/main/hyperlane-base/src/settings/base.rs @@ -152,48 +152,48 @@ impl Settings { build_contract_fns!(build_provider, build_providers -> dyn HyperlaneProvider); /// Build a contract sync for type `T` using log store `D` - pub async fn sequenced_contract_sync( + pub async fn sequenced_contract_sync( &self, domain: &HyperlaneDomain, metrics: &CoreMetrics, sync_metrics: &ContractSyncMetrics, - db: Arc, + store: Arc, ) -> eyre::Result>> where T: Indexable + Debug, SequenceIndexer: TryFromWithMetrics, - D: HyperlaneLogStore + HyperlaneSequenceAwareIndexerStoreReader + 'static, + S: HyperlaneLogStore + HyperlaneSequenceAwareIndexerStoreReader + 'static, { let setup = self.chain_setup(domain)?; // Currently, all indexers are of the `SequenceIndexer` type let indexer = SequenceIndexer::::try_from_with_metrics(setup, metrics).await?; Ok(Arc::new(ContractSync::new( domain.clone(), - db.clone() as SequenceAwareLogStore<_>, + store.clone() as SequenceAwareLogStore<_>, indexer, sync_metrics.clone(), ))) } /// Build a contract sync for type `T` using log store `D` - pub async fn watermark_contract_sync( + pub async fn watermark_contract_sync( &self, domain: &HyperlaneDomain, metrics: &CoreMetrics, sync_metrics: &ContractSyncMetrics, - db: Arc, + store: Arc, ) -> eyre::Result>> where T: Indexable + Debug, SequenceIndexer: TryFromWithMetrics, - D: HyperlaneLogStore + HyperlaneWatermarkedLogStore + 'static, + S: HyperlaneLogStore + HyperlaneWatermarkedLogStore + 'static, { let setup = self.chain_setup(domain)?; // Currently, all indexers are of the `SequenceIndexer` type let indexer = SequenceIndexer::::try_from_with_metrics(setup, metrics).await?; Ok(Arc::new(ContractSync::new( domain.clone(), - db.clone() as WatermarkLogStore<_>, + store.clone() as WatermarkLogStore<_>, indexer, sync_metrics.clone(), ))) @@ -202,17 +202,17 @@ impl Settings { /// Build multiple contract syncs. /// All contracts have to implement both sequenced and /// watermark trait bounds - pub async fn contract_syncs( + pub async fn contract_syncs( &self, domains: impl Iterator, metrics: &CoreMetrics, sync_metrics: &ContractSyncMetrics, - dbs: HashMap>, + stores: HashMap>, ) -> Result>>> where T: Indexable + Debug + Send + Sync + Clone + Eq + Hash + 'static, SequenceIndexer: TryFromWithMetrics, - D: HyperlaneLogStore + S: HyperlaneLogStore + HyperlaneSequenceAwareIndexerStoreReader + HyperlaneWatermarkedLogStore + 'static, @@ -226,7 +226,7 @@ impl Settings { domain, metrics, sync_metrics, - dbs.get(domain).unwrap().clone(), + stores.get(domain).unwrap().clone(), ) .await .map(|r| r as Arc>)?, @@ -235,7 +235,7 @@ impl Settings { domain, metrics, sync_metrics, - dbs.get(domain).unwrap().clone(), + stores.get(domain).unwrap().clone(), ) .await .map(|r| r as Arc>)?, diff --git a/rust/main/hyperlane-core/src/traits/db.rs b/rust/main/hyperlane-core/src/traits/db.rs index 92c0c15f7..6bef9781d 100644 --- a/rust/main/hyperlane-core/src/traits/db.rs +++ b/rust/main/hyperlane-core/src/traits/db.rs @@ -42,8 +42,8 @@ pub trait HyperlaneSequenceAwareIndexerStore: } /// Auto-impl for HyperlaneSequenceAwareIndexerStore -impl HyperlaneSequenceAwareIndexerStore for U where - U: HyperlaneLogStore + HyperlaneSequenceAwareIndexerStoreReader + Send + Sync + Debug +impl HyperlaneSequenceAwareIndexerStore for S where + S: HyperlaneLogStore + HyperlaneSequenceAwareIndexerStoreReader + Send + Sync + Debug { }