refactor: Refactor Hyperlane store for Scraper (#4855)

### Description

Refactor Hyperlane store for Scraper. Just moved source code into
submodules.

### Backward compatibility

Yes

### Testing

E2E test Ethereum and Sealevel

---------

Co-authored-by: Danil Nemirovsky <4614623+ameten@users.noreply.github.com>
pull/4862/head
Danil Nemirovsky 7 days ago committed by GitHub
parent 25a927de3a
commit 57346defe3
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
  1. 35
      rust/main/agents/scraper/src/agent.rs
  2. 30
      rust/main/agents/scraper/src/db/message.rs
  3. 5
      rust/main/agents/scraper/src/main.rs
  4. 6
      rust/main/agents/scraper/src/store.rs
  5. 43
      rust/main/agents/scraper/src/store/deliveries.rs
  6. 64
      rust/main/agents/scraper/src/store/dispatches.rs
  7. 43
      rust/main/agents/scraper/src/store/payments.rs
  8. 164
      rust/main/agents/scraper/src/store/storage.rs
  9. 8
      rust/main/hyperlane-base/src/contract_sync/cursors/rate_limited.rs
  10. 14
      rust/main/hyperlane-base/src/contract_sync/cursors/sequence_aware/backward.rs
  11. 14
      rust/main/hyperlane-base/src/contract_sync/cursors/sequence_aware/forward.rs
  12. 6
      rust/main/hyperlane-base/src/contract_sync/cursors/sequence_aware/mod.rs
  13. 27
      rust/main/hyperlane-base/src/contract_sync/mod.rs
  14. 26
      rust/main/hyperlane-base/src/settings/base.rs
  15. 4
      rust/main/hyperlane-core/src/traits/db.rs

@ -3,16 +3,17 @@ use std::{collections::HashMap, sync::Arc};
use async_trait::async_trait; use async_trait::async_trait;
use derive_more::AsRef; use derive_more::AsRef;
use futures::future::try_join_all; use futures::future::try_join_all;
use hyperlane_core::{Delivery, HyperlaneDomain, HyperlaneMessage, InterchainGasPayment, H512};
use tokio::{sync::mpsc::Receiver as MpscReceiver, task::JoinHandle};
use tracing::{info_span, instrument::Instrumented, trace, Instrument};
use hyperlane_base::{ use hyperlane_base::{
broadcast::BroadcastMpscSender, metrics::AgentMetrics, settings::IndexSettings, AgentMetadata, broadcast::BroadcastMpscSender, metrics::AgentMetrics, settings::IndexSettings, AgentMetadata,
BaseAgent, ChainMetrics, ContractSyncMetrics, ContractSyncer, CoreMetrics, HyperlaneAgentCore, BaseAgent, ChainMetrics, ContractSyncMetrics, ContractSyncer, CoreMetrics, HyperlaneAgentCore,
MetricsUpdater, SyncOptions, MetricsUpdater, SyncOptions,
}; };
use hyperlane_core::{Delivery, HyperlaneDomain, HyperlaneMessage, InterchainGasPayment, H512};
use tokio::{sync::mpsc::Receiver as MpscReceiver, task::JoinHandle};
use tracing::{info_span, instrument::Instrumented, trace, Instrument};
use crate::{chain_scraper::HyperlaneSqlDb, db::ScraperDb, settings::ScraperSettings}; use crate::{db::ScraperDb, settings::ScraperSettings, store::HyperlaneDbStore};
/// A message explorer scraper agent /// A message explorer scraper agent
#[derive(Debug, AsRef)] #[derive(Debug, AsRef)]
@ -31,7 +32,7 @@ pub struct Scraper {
#[derive(Debug)] #[derive(Debug)]
struct ChainScraper { struct ChainScraper {
index_settings: IndexSettings, index_settings: IndexSettings,
db: HyperlaneSqlDb, store: HyperlaneDbStore,
domain: HyperlaneDomain, domain: HyperlaneDomain,
} }
@ -59,7 +60,7 @@ impl BaseAgent for Scraper {
for domain in settings.chains_to_scrape.iter() { for domain in settings.chains_to_scrape.iter() {
let chain_setup = settings.chain_setup(domain).expect("Missing chain config"); let chain_setup = settings.chain_setup(domain).expect("Missing chain config");
let db = HyperlaneSqlDb::new( let store = HyperlaneDbStore::new(
db.clone(), db.clone(),
chain_setup.addresses.mailbox, chain_setup.addresses.mailbox,
domain.clone(), domain.clone(),
@ -74,7 +75,7 @@ impl BaseAgent for Scraper {
domain.id(), domain.id(),
ChainScraper { ChainScraper {
domain: domain.clone(), domain: domain.clone(),
db, store,
index_settings: chain_setup.index.clone(), index_settings: chain_setup.index.clone(),
}, },
); );
@ -132,7 +133,7 @@ impl Scraper {
/// This will spawn long-running contract sync tasks /// This will spawn long-running contract sync tasks
async fn scrape(&self, domain_id: u32) -> Instrumented<JoinHandle<()>> { async fn scrape(&self, domain_id: u32) -> Instrumented<JoinHandle<()>> {
let scraper = self.scrapers.get(&domain_id).unwrap(); let scraper = self.scrapers.get(&domain_id).unwrap();
let db = scraper.db.clone(); let store = scraper.store.clone();
let index_settings = scraper.index_settings.clone(); let index_settings = scraper.index_settings.clone();
let domain = scraper.domain.clone(); let domain = scraper.domain.clone();
@ -142,7 +143,7 @@ impl Scraper {
domain.clone(), domain.clone(),
self.core_metrics.clone(), self.core_metrics.clone(),
self.contract_sync_metrics.clone(), self.contract_sync_metrics.clone(),
db.clone(), store.clone(),
index_settings.clone(), index_settings.clone(),
) )
.await; .await;
@ -152,7 +153,7 @@ impl Scraper {
domain.clone(), domain.clone(),
self.core_metrics.clone(), self.core_metrics.clone(),
self.contract_sync_metrics.clone(), self.contract_sync_metrics.clone(),
db.clone(), store.clone(),
index_settings.clone(), index_settings.clone(),
) )
.await, .await,
@ -162,7 +163,7 @@ impl Scraper {
domain, domain,
self.core_metrics.clone(), self.core_metrics.clone(),
self.contract_sync_metrics.clone(), self.contract_sync_metrics.clone(),
db, store,
index_settings.clone(), index_settings.clone(),
BroadcastMpscSender::<H512>::map_get_receiver(maybe_broadcaster.as_ref()).await, BroadcastMpscSender::<H512>::map_get_receiver(maybe_broadcaster.as_ref()).await,
) )
@ -183,7 +184,7 @@ impl Scraper {
domain: HyperlaneDomain, domain: HyperlaneDomain,
metrics: Arc<CoreMetrics>, metrics: Arc<CoreMetrics>,
contract_sync_metrics: Arc<ContractSyncMetrics>, contract_sync_metrics: Arc<ContractSyncMetrics>,
db: HyperlaneSqlDb, store: HyperlaneDbStore,
index_settings: IndexSettings, index_settings: IndexSettings,
) -> ( ) -> (
Instrumented<JoinHandle<()>>, Instrumented<JoinHandle<()>>,
@ -196,7 +197,7 @@ impl Scraper {
&domain, &domain,
&metrics.clone(), &metrics.clone(),
&contract_sync_metrics.clone(), &contract_sync_metrics.clone(),
db.into(), store.into(),
) )
.await .await
.unwrap(); .unwrap();
@ -217,7 +218,7 @@ impl Scraper {
domain: HyperlaneDomain, domain: HyperlaneDomain,
metrics: Arc<CoreMetrics>, metrics: Arc<CoreMetrics>,
contract_sync_metrics: Arc<ContractSyncMetrics>, contract_sync_metrics: Arc<ContractSyncMetrics>,
db: HyperlaneSqlDb, store: HyperlaneDbStore,
index_settings: IndexSettings, index_settings: IndexSettings,
) -> Instrumented<JoinHandle<()>> { ) -> Instrumented<JoinHandle<()>> {
let sync = self let sync = self
@ -227,7 +228,7 @@ impl Scraper {
&domain, &domain,
&metrics.clone(), &metrics.clone(),
&contract_sync_metrics.clone(), &contract_sync_metrics.clone(),
Arc::new(db.clone()) as _, Arc::new(store.clone()) as _,
) )
.await .await
.unwrap(); .unwrap();
@ -248,7 +249,7 @@ impl Scraper {
domain: HyperlaneDomain, domain: HyperlaneDomain,
metrics: Arc<CoreMetrics>, metrics: Arc<CoreMetrics>,
contract_sync_metrics: Arc<ContractSyncMetrics>, contract_sync_metrics: Arc<ContractSyncMetrics>,
db: HyperlaneSqlDb, store: HyperlaneDbStore,
index_settings: IndexSettings, index_settings: IndexSettings,
tx_id_receiver: Option<MpscReceiver<H512>>, tx_id_receiver: Option<MpscReceiver<H512>>,
) -> Instrumented<JoinHandle<()>> { ) -> Instrumented<JoinHandle<()>> {
@ -259,7 +260,7 @@ impl Scraper {
&domain, &domain,
&metrics.clone(), &metrics.clone(),
&contract_sync_metrics.clone(), &contract_sync_metrics.clone(),
Arc::new(db.clone()), Arc::new(store.clone()),
) )
.await .await
.unwrap(); .unwrap();

@ -31,36 +31,6 @@ pub struct StorableMessage<'a> {
} }
impl ScraperDb { impl ScraperDb {
/// Get the highest message nonce that is stored in the database.
#[instrument(skip(self))]
pub async fn last_message_nonce(
&self,
origin_domain: u32,
origin_mailbox: &H256,
) -> Result<Option<u32>> {
#[derive(Copy, Clone, Debug, EnumIter, DeriveColumn)]
enum QueryAs {
Nonce,
}
let last_nonce = message::Entity::find()
.filter(message::Column::Origin.eq(origin_domain))
.filter(message::Column::OriginMailbox.eq(address_to_bytes(origin_mailbox)))
.select_only()
.column_as(message::Column::Nonce.max(), QueryAs::Nonce)
.into_values::<i32, QueryAs>()
.one(&self.0)
.await?
.map(|idx| idx as u32);
debug!(
?last_nonce,
origin_domain,
?origin_mailbox,
"Queried last message nonce from database"
);
Ok(last_nonce)
}
/// Get the dispatched message associated with a nonce. /// Get the dispatched message associated with a nonce.
#[instrument(skip(self))] #[instrument(skip(self))]
pub async fn retrieve_message_by_nonce( pub async fn retrieve_message_by_nonce(

@ -17,13 +17,12 @@ use agent::Scraper;
use eyre::Result; use eyre::Result;
use hyperlane_base::agent_main; use hyperlane_base::agent_main;
mod db;
mod agent; mod agent;
mod chain_scraper;
mod conversions; mod conversions;
mod date_time; mod date_time;
mod db;
mod settings; mod settings;
mod store;
#[tokio::main(flavor = "current_thread")] #[tokio::main(flavor = "current_thread")]
async fn main() -> Result<()> { async fn main() -> Result<()> {

@ -0,0 +1,6 @@
pub use storage::HyperlaneDbStore;
mod deliveries;
mod dispatches;
mod payments;
mod storage;

@ -0,0 +1,43 @@
use std::collections::HashMap;
use async_trait::async_trait;
use eyre::Result;
use hyperlane_core::{Delivery, HyperlaneLogStore, Indexed, LogMeta, H512};
use crate::db::StorableDelivery;
use crate::store::storage::{HyperlaneDbStore, TxnWithId};
#[async_trait]
impl HyperlaneLogStore<Delivery> for HyperlaneDbStore {
/// Store delivered message ids from the destination mailbox into the database.
/// We store only delivered messages ids from blocks and transaction which we could successfully
/// insert into database.
async fn store_logs(&self, deliveries: &[(Indexed<Delivery>, LogMeta)]) -> Result<u32> {
if deliveries.is_empty() {
return Ok(0);
}
let txns: HashMap<H512, TxnWithId> = self
.ensure_blocks_and_txns(deliveries.iter().map(|r| &r.1))
.await?
.map(|t| (t.hash, t))
.collect();
let storable = deliveries
.iter()
.filter_map(|(message_id, meta)| {
txns.get(&meta.transaction_id)
.map(|txn| (*message_id.inner(), meta, txn.id))
})
.map(|(message_id, meta, txn_id)| StorableDelivery {
message_id,
meta,
txn_id,
});
let stored = self
.db
.store_deliveries(self.domain.id(), self.mailbox_address, storable)
.await?;
Ok(stored as u32)
}
}

@ -0,0 +1,64 @@
use std::collections::HashMap;
use async_trait::async_trait;
use eyre::Result;
use hyperlane_core::{
unwrap_or_none_result, HyperlaneLogStore, HyperlaneMessage,
HyperlaneSequenceAwareIndexerStoreReader, Indexed, LogMeta, H512,
};
use crate::db::StorableMessage;
use crate::store::storage::{HyperlaneDbStore, TxnWithId};
#[async_trait]
impl HyperlaneLogStore<HyperlaneMessage> for HyperlaneDbStore {
/// Store dispatched messages from the origin mailbox into the database.
/// We store only messages from blocks and transaction which we could successfully insert
/// into database.
async fn store_logs(&self, messages: &[(Indexed<HyperlaneMessage>, LogMeta)]) -> Result<u32> {
if messages.is_empty() {
return Ok(0);
}
let txns: HashMap<H512, TxnWithId> = self
.ensure_blocks_and_txns(messages.iter().map(|r| &r.1))
.await?
.map(|t| (t.hash, t))
.collect();
let storable = messages
.iter()
.filter_map(|(message, meta)| {
txns.get(&meta.transaction_id)
.map(|t| (message.inner().clone(), meta, t.id))
})
.map(|(msg, meta, txn_id)| StorableMessage { msg, meta, txn_id });
let stored = self
.db
.store_dispatched_messages(self.domain.id(), &self.mailbox_address, storable)
.await?;
Ok(stored as u32)
}
}
#[async_trait]
impl HyperlaneSequenceAwareIndexerStoreReader<HyperlaneMessage> for HyperlaneDbStore {
/// Gets a message by its nonce.
async fn retrieve_by_sequence(&self, sequence: u32) -> Result<Option<HyperlaneMessage>> {
let message = self
.db
.retrieve_message_by_nonce(self.domain.id(), &self.mailbox_address, sequence)
.await?;
Ok(message)
}
/// Gets the block number at which the log occurred.
async fn retrieve_log_block_number_by_sequence(&self, sequence: u32) -> Result<Option<u64>> {
let tx_id = unwrap_or_none_result!(
self.db
.retrieve_dispatched_tx_id(self.domain.id(), &self.mailbox_address, sequence)
.await?
);
let block_id = unwrap_or_none_result!(self.db.retrieve_block_id(tx_id).await?);
Ok(self.db.retrieve_block_number(block_id).await?)
}
}

@ -0,0 +1,43 @@
use std::collections::HashMap;
use async_trait::async_trait;
use eyre::Result;
use hyperlane_core::{HyperlaneLogStore, Indexed, InterchainGasPayment, LogMeta, H512};
use crate::db::StorablePayment;
use crate::store::storage::HyperlaneDbStore;
#[async_trait]
impl HyperlaneLogStore<InterchainGasPayment> for HyperlaneDbStore {
/// Store interchain gas payments into the database.
/// We store only interchain gas payments from blocks and transaction which we could
/// successfully insert into database.
async fn store_logs(
&self,
payments: &[(Indexed<InterchainGasPayment>, LogMeta)],
) -> Result<u32> {
if payments.is_empty() {
return Ok(0);
}
let txns: HashMap<H512, crate::store::storage::TxnWithId> = self
.ensure_blocks_and_txns(payments.iter().map(|r| &r.1))
.await?
.map(|t| (t.hash, t))
.collect();
let storable = payments
.iter()
.filter_map(|(payment, meta)| {
txns.get(&meta.transaction_id)
.map(|txn| (payment.inner(), meta, txn.id))
})
.map(|(payment, meta, txn_id)| StorablePayment {
payment,
meta,
txn_id,
});
let stored = self.db.store_payments(self.domain.id(), storable).await?;
Ok(stored as u32)
}
}

@ -12,15 +12,11 @@ use tracing::{trace, warn};
use hyperlane_base::settings::IndexSettings; use hyperlane_base::settings::IndexSettings;
use hyperlane_core::{ use hyperlane_core::{
unwrap_or_none_result, BlockId, BlockInfo, Delivery, HyperlaneDomain, HyperlaneLogStore, BlockId, BlockInfo, HyperlaneDomain, HyperlaneLogStore, HyperlaneProvider,
HyperlaneMessage, HyperlaneProvider, HyperlaneSequenceAwareIndexerStoreReader, HyperlaneWatermarkedLogStore, LogMeta, H256, H512,
HyperlaneWatermarkedLogStore, Indexed, InterchainGasPayment, LogMeta, H256, H512,
}; };
use crate::db::{ use crate::db::{BasicBlock, BlockCursor, ScraperDb, StorableTxn};
BasicBlock, BlockCursor, ScraperDb, StorableDelivery, StorableMessage, StorablePayment,
StorableTxn,
};
/// Maximum number of records to query at a time. This came about because when a /// Maximum number of records to query at a time. This came about because when a
/// lot of messages are sent in a short period of time we were ending up with a /// lot of messages are sent in a short period of time we were ending up with a
@ -31,16 +27,16 @@ const CHUNK_SIZE: usize = 50;
/// A chain scraper is comprised of all the information and contract/provider /// A chain scraper is comprised of all the information and contract/provider
/// connections needed to scrape the contracts on a single blockchain. /// connections needed to scrape the contracts on a single blockchain.
#[derive(Clone, Debug)] #[derive(Clone, Debug)]
pub struct HyperlaneSqlDb { pub struct HyperlaneDbStore {
mailbox_address: H256, pub(crate) mailbox_address: H256,
domain: HyperlaneDomain, pub(crate) domain: HyperlaneDomain,
db: ScraperDb, pub(crate) db: ScraperDb,
provider: Arc<dyn HyperlaneProvider>, provider: Arc<dyn HyperlaneProvider>,
cursor: Arc<BlockCursor>, cursor: Arc<BlockCursor>,
} }
#[allow(unused)] #[allow(unused)]
impl HyperlaneSqlDb { impl HyperlaneDbStore {
pub async fn new( pub async fn new(
db: ScraperDb, db: ScraperDb,
mailbox_address: H256, mailbox_address: H256,
@ -61,21 +57,11 @@ impl HyperlaneSqlDb {
}) })
} }
pub fn domain(&self) -> &HyperlaneDomain {
&self.domain
}
pub async fn last_message_nonce(&self) -> Result<Option<u32>> {
self.db
.last_message_nonce(self.domain.id(), &self.mailbox_address)
.await
}
/// Takes a list of txn and block hashes and ensure they are all in the /// Takes a list of txn and block hashes and ensure they are all in the
/// database. If any are not it will fetch the data and insert them. /// database. If any are not it will fetch the data and insert them.
/// ///
/// Returns the relevant transaction info. /// Returns the relevant transaction info.
async fn ensure_blocks_and_txns( pub(crate) async fn ensure_blocks_and_txns(
&self, &self,
log_meta: impl Iterator<Item = &LogMeta>, log_meta: impl Iterator<Item = &LogMeta>,
) -> Result<impl Iterator<Item = TxnWithId>> { ) -> Result<impl Iterator<Item = TxnWithId>> {
@ -262,7 +248,7 @@ impl HyperlaneSqlDb {
self.db self.db
.store_blocks( .store_blocks(
self.domain().id(), self.domain.id(),
blocks_to_insert blocks_to_insert
.iter_mut() .iter_mut()
.map(|(_, info)| info.take().unwrap()), .map(|(_, info)| info.take().unwrap()),
@ -293,129 +279,9 @@ impl HyperlaneSqlDb {
} }
#[async_trait] #[async_trait]
impl HyperlaneLogStore<HyperlaneMessage> for HyperlaneSqlDb { impl<T> HyperlaneWatermarkedLogStore<T> for HyperlaneDbStore
/// Store dispatched messages from the origin mailbox into the database.
/// We store only messages from blocks and transaction which we could successfully insert
/// into database.
async fn store_logs(&self, messages: &[(Indexed<HyperlaneMessage>, LogMeta)]) -> Result<u32> {
if messages.is_empty() {
return Ok(0);
}
let txns: HashMap<H512, TxnWithId> = self
.ensure_blocks_and_txns(messages.iter().map(|r| &r.1))
.await?
.map(|t| (t.hash, t))
.collect();
let storable = messages
.iter()
.filter_map(|(message, meta)| {
txns.get(&meta.transaction_id)
.map(|t| (message.inner().clone(), meta, t.id))
})
.map(|(msg, meta, txn_id)| StorableMessage { msg, meta, txn_id });
let stored = self
.db
.store_dispatched_messages(self.domain().id(), &self.mailbox_address, storable)
.await?;
Ok(stored as u32)
}
}
#[async_trait]
impl HyperlaneLogStore<Delivery> for HyperlaneSqlDb {
/// Store delivered message ids from the destination mailbox into the database.
/// We store only delivered messages ids from blocks and transaction which we could successfully
/// insert into database.
async fn store_logs(&self, deliveries: &[(Indexed<Delivery>, LogMeta)]) -> Result<u32> {
if deliveries.is_empty() {
return Ok(0);
}
let txns: HashMap<H512, TxnWithId> = self
.ensure_blocks_and_txns(deliveries.iter().map(|r| &r.1))
.await?
.map(|t| (t.hash, t))
.collect();
let storable = deliveries
.iter()
.filter_map(|(message_id, meta)| {
txns.get(&meta.transaction_id)
.map(|txn| (*message_id.inner(), meta, txn.id))
})
.map(|(message_id, meta, txn_id)| StorableDelivery {
message_id,
meta,
txn_id,
});
let stored = self
.db
.store_deliveries(self.domain().id(), self.mailbox_address, storable)
.await?;
Ok(stored as u32)
}
}
#[async_trait]
impl HyperlaneLogStore<InterchainGasPayment> for HyperlaneSqlDb {
/// Store interchain gas payments into the database.
/// We store only interchain gas payments from blocks and transaction which we could
/// successfully insert into database.
async fn store_logs(
&self,
payments: &[(Indexed<InterchainGasPayment>, LogMeta)],
) -> Result<u32> {
if payments.is_empty() {
return Ok(0);
}
let txns: HashMap<H512, TxnWithId> = self
.ensure_blocks_and_txns(payments.iter().map(|r| &r.1))
.await?
.map(|t| (t.hash, t))
.collect();
let storable = payments
.iter()
.filter_map(|(payment, meta)| {
txns.get(&meta.transaction_id)
.map(|txn| (payment.inner(), meta, txn.id))
})
.map(|(payment, meta, txn_id)| StorablePayment {
payment,
meta,
txn_id,
});
let stored = self.db.store_payments(self.domain().id(), storable).await?;
Ok(stored as u32)
}
}
#[async_trait]
impl HyperlaneSequenceAwareIndexerStoreReader<HyperlaneMessage> for HyperlaneSqlDb {
/// Gets a message by its nonce.
async fn retrieve_by_sequence(&self, sequence: u32) -> Result<Option<HyperlaneMessage>> {
let message = self
.db
.retrieve_message_by_nonce(self.domain().id(), &self.mailbox_address, sequence)
.await?;
Ok(message)
}
/// Gets the block number at which the log occurred.
async fn retrieve_log_block_number_by_sequence(&self, sequence: u32) -> Result<Option<u64>> {
let tx_id = unwrap_or_none_result!(
self.db
.retrieve_dispatched_tx_id(self.domain().id(), &self.mailbox_address, sequence)
.await?
);
let block_id = unwrap_or_none_result!(self.db.retrieve_block_id(tx_id).await?);
Ok(self.db.retrieve_block_number(block_id).await?)
}
}
#[async_trait]
impl<T> HyperlaneWatermarkedLogStore<T> for HyperlaneSqlDb
where where
HyperlaneSqlDb: HyperlaneLogStore<T>, HyperlaneDbStore: HyperlaneLogStore<T>,
{ {
/// Gets the block number high watermark /// Gets the block number high watermark
async fn retrieve_high_watermark(&self) -> Result<Option<u32>> { async fn retrieve_high_watermark(&self) -> Result<Option<u32>> {
@ -429,9 +295,9 @@ where
} }
#[derive(Debug, Clone)] #[derive(Debug, Clone)]
struct TxnWithId { pub(crate) struct TxnWithId {
hash: H512, pub hash: H512,
id: i64, pub id: i64,
} }
#[derive(Debug, Clone)] #[derive(Debug, Clone)]

@ -78,7 +78,7 @@ pub enum SyncDirection {
/// performed by `next_action`. /// performed by `next_action`.
pub(crate) struct RateLimitedContractSyncCursor<T> { pub(crate) struct RateLimitedContractSyncCursor<T> {
indexer: Arc<dyn Indexer<T>>, indexer: Arc<dyn Indexer<T>>,
db: Arc<dyn HyperlaneWatermarkedLogStore<T>>, store: Arc<dyn HyperlaneWatermarkedLogStore<T>>,
tip: u32, tip: u32,
last_tip_update: Instant, last_tip_update: Instant,
eta_calculator: SyncerEtaCalculator, eta_calculator: SyncerEtaCalculator,
@ -89,14 +89,14 @@ impl<T> RateLimitedContractSyncCursor<T> {
/// Construct a new contract sync helper. /// Construct a new contract sync helper.
pub async fn new( pub async fn new(
indexer: Arc<dyn Indexer<T>>, indexer: Arc<dyn Indexer<T>>,
db: Arc<dyn HyperlaneWatermarkedLogStore<T>>, store: Arc<dyn HyperlaneWatermarkedLogStore<T>>,
chunk_size: u32, chunk_size: u32,
initial_height: u32, initial_height: u32,
) -> Result<Self> { ) -> Result<Self> {
let tip = indexer.get_finalized_block_number().await?; let tip = indexer.get_finalized_block_number().await?;
Ok(Self { Ok(Self {
indexer, indexer,
db, store,
tip, tip,
last_tip_update: Instant::now(), last_tip_update: Instant::now(),
eta_calculator: SyncerEtaCalculator::new(initial_height, tip, ETA_TIME_WINDOW), eta_calculator: SyncerEtaCalculator::new(initial_height, tip, ETA_TIME_WINDOW),
@ -189,7 +189,7 @@ where
) -> Result<()> { ) -> Result<()> {
// Store a relatively conservative view of the high watermark, which should allow a single watermark to be // Store a relatively conservative view of the high watermark, which should allow a single watermark to be
// safely shared across multiple cursors, so long as they are running sufficiently in sync // safely shared across multiple cursors, so long as they are running sufficiently in sync
self.db self.store
.store_high_watermark(u32::max( .store_high_watermark(u32::max(
self.sync_state.start_block, self.sync_state.start_block,
self.sync_state self.sync_state

@ -22,8 +22,8 @@ pub(crate) struct BackwardSequenceAwareSyncCursor<T> {
/// If in sequence mode, this is the max number of sequences to query. /// If in sequence mode, this is the max number of sequences to query.
/// If in block mode, this is the max number of blocks to query. /// If in block mode, this is the max number of blocks to query.
chunk_size: u32, chunk_size: u32,
/// A DB used to check which logs have already been indexed. /// A store used to check which logs have already been indexed.
db: Arc<dyn HyperlaneSequenceAwareIndexerStoreReader<T>>, store: Arc<dyn HyperlaneSequenceAwareIndexerStoreReader<T>>,
/// A snapshot of the last log to be indexed, or if no indexing has occurred yet, /// A snapshot of the last log to be indexed, or if no indexing has occurred yet,
/// the initial log to start indexing backward from. /// the initial log to start indexing backward from.
last_indexed_snapshot: LastIndexedSnapshot, last_indexed_snapshot: LastIndexedSnapshot,
@ -48,13 +48,13 @@ impl<T> Debug for BackwardSequenceAwareSyncCursor<T> {
impl<T: Debug> BackwardSequenceAwareSyncCursor<T> { impl<T: Debug> BackwardSequenceAwareSyncCursor<T> {
#[instrument( #[instrument(
skip(db), skip(store),
fields(chunk_size, next_sequence, start_block, index_mode), fields(chunk_size, next_sequence, start_block, index_mode),
ret ret
)] )]
pub fn new( pub fn new(
chunk_size: u32, chunk_size: u32,
db: Arc<dyn HyperlaneSequenceAwareIndexerStoreReader<T>>, store: Arc<dyn HyperlaneSequenceAwareIndexerStoreReader<T>>,
current_sequence_count: u32, current_sequence_count: u32,
start_block: u32, start_block: u32,
index_mode: IndexMode, index_mode: IndexMode,
@ -69,7 +69,7 @@ impl<T: Debug> BackwardSequenceAwareSyncCursor<T> {
Self { Self {
chunk_size, chunk_size,
db, store,
current_indexing_snapshot: last_indexed_snapshot.previous_target(), current_indexing_snapshot: last_indexed_snapshot.previous_target(),
last_indexed_snapshot, last_indexed_snapshot,
index_mode, index_mode,
@ -166,10 +166,10 @@ impl<T: Debug> BackwardSequenceAwareSyncCursor<T> {
/// log for the sequence number hasn't been indexed. /// log for the sequence number hasn't been indexed.
async fn get_sequence_log_block_number(&self, sequence: u32) -> Result<Option<u32>> { async fn get_sequence_log_block_number(&self, sequence: u32) -> Result<Option<u32>> {
// Ensure there's a full entry for the sequence. // Ensure there's a full entry for the sequence.
if self.db.retrieve_by_sequence(sequence).await?.is_some() { if self.store.retrieve_by_sequence(sequence).await?.is_some() {
// And get the block number. // And get the block number.
if let Some(block_number) = self if let Some(block_number) = self
.db .store
.retrieve_log_block_number_by_sequence(sequence) .retrieve_log_block_number_by_sequence(sequence)
.await? .await?
{ {

@ -27,8 +27,8 @@ pub(crate) struct ForwardSequenceAwareSyncCursor<T> {
/// This is used to check if there are new logs to index and to /// This is used to check if there are new logs to index and to
/// establish targets to index towards. /// establish targets to index towards.
latest_sequence_querier: Arc<dyn SequenceAwareIndexer<T>>, latest_sequence_querier: Arc<dyn SequenceAwareIndexer<T>>,
/// A DB used to check which logs have already been indexed. /// A store used to check which logs have already been indexed.
db: Arc<dyn HyperlaneSequenceAwareIndexerStoreReader<T>>, store: Arc<dyn HyperlaneSequenceAwareIndexerStoreReader<T>>,
/// A snapshot of the last indexed log, or if no indexing has occurred yet, /// A snapshot of the last indexed log, or if no indexing has occurred yet,
/// the initial log to start indexing forward from. /// the initial log to start indexing forward from.
last_indexed_snapshot: LastIndexedSnapshot, last_indexed_snapshot: LastIndexedSnapshot,
@ -55,14 +55,14 @@ impl<T> Debug for ForwardSequenceAwareSyncCursor<T> {
impl<T: Debug> ForwardSequenceAwareSyncCursor<T> { impl<T: Debug> ForwardSequenceAwareSyncCursor<T> {
#[instrument( #[instrument(
skip(db, latest_sequence_querier), skip(store, latest_sequence_querier),
fields(chunk_size, next_sequence, start_block, index_mode), fields(chunk_size, next_sequence, start_block, index_mode),
ret ret
)] )]
pub fn new( pub fn new(
chunk_size: u32, chunk_size: u32,
latest_sequence_querier: Arc<dyn SequenceAwareIndexer<T>>, latest_sequence_querier: Arc<dyn SequenceAwareIndexer<T>>,
db: Arc<dyn HyperlaneSequenceAwareIndexerStoreReader<T>>, store: Arc<dyn HyperlaneSequenceAwareIndexerStoreReader<T>>,
next_sequence: u32, next_sequence: u32,
start_block: u32, start_block: u32,
index_mode: IndexMode, index_mode: IndexMode,
@ -77,7 +77,7 @@ impl<T: Debug> ForwardSequenceAwareSyncCursor<T> {
Self { Self {
chunk_size, chunk_size,
latest_sequence_querier, latest_sequence_querier,
db, store,
last_indexed_snapshot, last_indexed_snapshot,
current_indexing_snapshot: TargetSnapshot { current_indexing_snapshot: TargetSnapshot {
sequence: next_sequence, sequence: next_sequence,
@ -221,10 +221,10 @@ impl<T: Debug> ForwardSequenceAwareSyncCursor<T> {
/// log for the sequence number hasn't been indexed. /// log for the sequence number hasn't been indexed.
async fn get_sequence_log_block_number(&self, sequence: u32) -> Result<Option<u32>> { async fn get_sequence_log_block_number(&self, sequence: u32) -> Result<Option<u32>> {
// Ensure there's a full entry for the sequence. // Ensure there's a full entry for the sequence.
if self.db.retrieve_by_sequence(sequence).await?.is_some() { if self.store.retrieve_by_sequence(sequence).await?.is_some() {
// And get the block number. // And get the block number.
if let Some(block_number) = self if let Some(block_number) = self
.db .store
.retrieve_log_block_number_by_sequence(sequence) .retrieve_log_block_number_by_sequence(sequence)
.await? .await?
{ {

@ -73,7 +73,7 @@ impl<T: Debug> ForwardBackwardSequenceAwareSyncCursor<T> {
/// Construct a new contract sync helper. /// Construct a new contract sync helper.
pub async fn new( pub async fn new(
latest_sequence_querier: Arc<dyn SequenceAwareIndexer<T>>, latest_sequence_querier: Arc<dyn SequenceAwareIndexer<T>>,
db: Arc<dyn HyperlaneSequenceAwareIndexerStoreReader<T>>, store: Arc<dyn HyperlaneSequenceAwareIndexerStoreReader<T>>,
chunk_size: u32, chunk_size: u32,
mode: IndexMode, mode: IndexMode,
) -> Result<Self> { ) -> Result<Self> {
@ -86,13 +86,13 @@ impl<T: Debug> ForwardBackwardSequenceAwareSyncCursor<T> {
let forward_cursor = ForwardSequenceAwareSyncCursor::new( let forward_cursor = ForwardSequenceAwareSyncCursor::new(
chunk_size, chunk_size,
latest_sequence_querier.clone(), latest_sequence_querier.clone(),
db.clone(), store.clone(),
sequence_count, sequence_count,
tip, tip,
mode, mode,
); );
let backward_cursor = let backward_cursor =
BackwardSequenceAwareSyncCursor::new(chunk_size, db, sequence_count, tip, mode); BackwardSequenceAwareSyncCursor::new(chunk_size, store, sequence_count, tip, mode);
Ok(Self { Ok(Self {
forward: forward_cursor, forward: forward_cursor,
backward: backward_cursor, backward: backward_cursor,

@ -43,21 +43,26 @@ struct IndexedTxIdAndSequence {
/// Extracts chain-specific data (emitted checkpoints, messages, etc) from an /// Extracts chain-specific data (emitted checkpoints, messages, etc) from an
/// `indexer` and fills the agent's db with this data. /// `indexer` and fills the agent's db with this data.
#[derive(Debug)] #[derive(Debug)]
pub struct ContractSync<T: Indexable, D: HyperlaneLogStore<T>, I: Indexer<T>> { pub struct ContractSync<T: Indexable, S: HyperlaneLogStore<T>, I: Indexer<T>> {
domain: HyperlaneDomain, domain: HyperlaneDomain,
db: D, store: S,
indexer: I, indexer: I,
metrics: ContractSyncMetrics, metrics: ContractSyncMetrics,
broadcast_sender: Option<BroadcastMpscSender<H512>>, broadcast_sender: Option<BroadcastMpscSender<H512>>,
_phantom: PhantomData<T>, _phantom: PhantomData<T>,
} }
impl<T: Indexable, D: HyperlaneLogStore<T>, I: Indexer<T>> ContractSync<T, D, I> { impl<T: Indexable, S: HyperlaneLogStore<T>, I: Indexer<T>> ContractSync<T, S, I> {
/// Create a new ContractSync /// Create a new ContractSync
pub fn new(domain: HyperlaneDomain, db: D, indexer: I, metrics: ContractSyncMetrics) -> Self { pub fn new(
domain: HyperlaneDomain,
store: S,
indexer: I,
metrics: ContractSyncMetrics,
) -> Self {
Self { Self {
domain, domain,
db, store,
indexer, indexer,
metrics, metrics,
broadcast_sender: T::broadcast_channel_size().map(BroadcastMpscSender::new), broadcast_sender: T::broadcast_channel_size().map(BroadcastMpscSender::new),
@ -66,10 +71,10 @@ impl<T: Indexable, D: HyperlaneLogStore<T>, I: Indexer<T>> ContractSync<T, D, I>
} }
} }
impl<T, D, I> ContractSync<T, D, I> impl<T, S, I> ContractSync<T, S, I>
where where
T: Indexable + Debug + Send + Sync + Clone + Eq + Hash + 'static, T: Indexable + Debug + Send + Sync + Clone + Eq + Hash + 'static,
D: HyperlaneLogStore<T>, S: HyperlaneLogStore<T>,
I: Indexer<T> + 'static, I: Indexer<T> + 'static,
{ {
/// The domain that this ContractSync is running on /// The domain that this ContractSync is running on
@ -221,7 +226,7 @@ where
let logs = Vec::from_iter(deduped_logs); let logs = Vec::from_iter(deduped_logs);
// Store deliveries // Store deliveries
let stored = match self.db.store_logs(&logs).await { let stored = match self.store.store_logs(&logs).await {
Ok(stored) => stored, Ok(stored) => stored,
Err(err) => { Err(err) => {
warn!(?err, "Error storing logs in db"); warn!(?err, "Error storing logs in db");
@ -298,7 +303,7 @@ where
&self, &self,
index_settings: IndexSettings, index_settings: IndexSettings,
) -> Result<Box<dyn ContractSyncCursor<T>>> { ) -> Result<Box<dyn ContractSyncCursor<T>>> {
let watermark = self.db.retrieve_high_watermark().await.unwrap(); let watermark = self.store.retrieve_high_watermark().await.unwrap();
let index_settings = IndexSettings { let index_settings = IndexSettings {
from: watermark.unwrap_or(index_settings.from), from: watermark.unwrap_or(index_settings.from),
chunk_size: index_settings.chunk_size, chunk_size: index_settings.chunk_size,
@ -307,7 +312,7 @@ where
Ok(Box::new( Ok(Box::new(
RateLimitedContractSyncCursor::new( RateLimitedContractSyncCursor::new(
Arc::new(self.indexer.clone()), Arc::new(self.indexer.clone()),
self.db.clone(), self.store.clone(),
index_settings.chunk_size, index_settings.chunk_size,
index_settings.from, index_settings.from,
) )
@ -348,7 +353,7 @@ where
Ok(Box::new( Ok(Box::new(
ForwardBackwardSequenceAwareSyncCursor::new( ForwardBackwardSequenceAwareSyncCursor::new(
self.indexer.clone(), self.indexer.clone(),
Arc::new(self.db.clone()), Arc::new(self.store.clone()),
index_settings.chunk_size, index_settings.chunk_size,
index_settings.mode, index_settings.mode,
) )

@ -152,48 +152,48 @@ impl Settings {
build_contract_fns!(build_provider, build_providers -> dyn HyperlaneProvider); build_contract_fns!(build_provider, build_providers -> dyn HyperlaneProvider);
/// Build a contract sync for type `T` using log store `D` /// Build a contract sync for type `T` using log store `D`
pub async fn sequenced_contract_sync<T, D>( pub async fn sequenced_contract_sync<T, S>(
&self, &self,
domain: &HyperlaneDomain, domain: &HyperlaneDomain,
metrics: &CoreMetrics, metrics: &CoreMetrics,
sync_metrics: &ContractSyncMetrics, sync_metrics: &ContractSyncMetrics,
db: Arc<D>, store: Arc<S>,
) -> eyre::Result<Arc<SequencedDataContractSync<T>>> ) -> eyre::Result<Arc<SequencedDataContractSync<T>>>
where where
T: Indexable + Debug, T: Indexable + Debug,
SequenceIndexer<T>: TryFromWithMetrics<ChainConf>, SequenceIndexer<T>: TryFromWithMetrics<ChainConf>,
D: HyperlaneLogStore<T> + HyperlaneSequenceAwareIndexerStoreReader<T> + 'static, S: HyperlaneLogStore<T> + HyperlaneSequenceAwareIndexerStoreReader<T> + 'static,
{ {
let setup = self.chain_setup(domain)?; let setup = self.chain_setup(domain)?;
// Currently, all indexers are of the `SequenceIndexer` type // Currently, all indexers are of the `SequenceIndexer` type
let indexer = SequenceIndexer::<T>::try_from_with_metrics(setup, metrics).await?; let indexer = SequenceIndexer::<T>::try_from_with_metrics(setup, metrics).await?;
Ok(Arc::new(ContractSync::new( Ok(Arc::new(ContractSync::new(
domain.clone(), domain.clone(),
db.clone() as SequenceAwareLogStore<_>, store.clone() as SequenceAwareLogStore<_>,
indexer, indexer,
sync_metrics.clone(), sync_metrics.clone(),
))) )))
} }
/// Build a contract sync for type `T` using log store `D` /// Build a contract sync for type `T` using log store `D`
pub async fn watermark_contract_sync<T, D>( pub async fn watermark_contract_sync<T, S>(
&self, &self,
domain: &HyperlaneDomain, domain: &HyperlaneDomain,
metrics: &CoreMetrics, metrics: &CoreMetrics,
sync_metrics: &ContractSyncMetrics, sync_metrics: &ContractSyncMetrics,
db: Arc<D>, store: Arc<S>,
) -> eyre::Result<Arc<WatermarkContractSync<T>>> ) -> eyre::Result<Arc<WatermarkContractSync<T>>>
where where
T: Indexable + Debug, T: Indexable + Debug,
SequenceIndexer<T>: TryFromWithMetrics<ChainConf>, SequenceIndexer<T>: TryFromWithMetrics<ChainConf>,
D: HyperlaneLogStore<T> + HyperlaneWatermarkedLogStore<T> + 'static, S: HyperlaneLogStore<T> + HyperlaneWatermarkedLogStore<T> + 'static,
{ {
let setup = self.chain_setup(domain)?; let setup = self.chain_setup(domain)?;
// Currently, all indexers are of the `SequenceIndexer` type // Currently, all indexers are of the `SequenceIndexer` type
let indexer = SequenceIndexer::<T>::try_from_with_metrics(setup, metrics).await?; let indexer = SequenceIndexer::<T>::try_from_with_metrics(setup, metrics).await?;
Ok(Arc::new(ContractSync::new( Ok(Arc::new(ContractSync::new(
domain.clone(), domain.clone(),
db.clone() as WatermarkLogStore<_>, store.clone() as WatermarkLogStore<_>,
indexer, indexer,
sync_metrics.clone(), sync_metrics.clone(),
))) )))
@ -202,17 +202,17 @@ impl Settings {
/// Build multiple contract syncs. /// Build multiple contract syncs.
/// All contracts have to implement both sequenced and /// All contracts have to implement both sequenced and
/// watermark trait bounds /// watermark trait bounds
pub async fn contract_syncs<T, D>( pub async fn contract_syncs<T, S>(
&self, &self,
domains: impl Iterator<Item = &HyperlaneDomain>, domains: impl Iterator<Item = &HyperlaneDomain>,
metrics: &CoreMetrics, metrics: &CoreMetrics,
sync_metrics: &ContractSyncMetrics, sync_metrics: &ContractSyncMetrics,
dbs: HashMap<HyperlaneDomain, Arc<D>>, stores: HashMap<HyperlaneDomain, Arc<S>>,
) -> Result<HashMap<HyperlaneDomain, Arc<dyn ContractSyncer<T>>>> ) -> Result<HashMap<HyperlaneDomain, Arc<dyn ContractSyncer<T>>>>
where where
T: Indexable + Debug + Send + Sync + Clone + Eq + Hash + 'static, T: Indexable + Debug + Send + Sync + Clone + Eq + Hash + 'static,
SequenceIndexer<T>: TryFromWithMetrics<ChainConf>, SequenceIndexer<T>: TryFromWithMetrics<ChainConf>,
D: HyperlaneLogStore<T> S: HyperlaneLogStore<T>
+ HyperlaneSequenceAwareIndexerStoreReader<T> + HyperlaneSequenceAwareIndexerStoreReader<T>
+ HyperlaneWatermarkedLogStore<T> + HyperlaneWatermarkedLogStore<T>
+ 'static, + 'static,
@ -226,7 +226,7 @@ impl Settings {
domain, domain,
metrics, metrics,
sync_metrics, sync_metrics,
dbs.get(domain).unwrap().clone(), stores.get(domain).unwrap().clone(),
) )
.await .await
.map(|r| r as Arc<dyn ContractSyncer<T>>)?, .map(|r| r as Arc<dyn ContractSyncer<T>>)?,
@ -235,7 +235,7 @@ impl Settings {
domain, domain,
metrics, metrics,
sync_metrics, sync_metrics,
dbs.get(domain).unwrap().clone(), stores.get(domain).unwrap().clone(),
) )
.await .await
.map(|r| r as Arc<dyn ContractSyncer<T>>)?, .map(|r| r as Arc<dyn ContractSyncer<T>>)?,

@ -42,8 +42,8 @@ pub trait HyperlaneSequenceAwareIndexerStore<T>:
} }
/// Auto-impl for HyperlaneSequenceAwareIndexerStore /// Auto-impl for HyperlaneSequenceAwareIndexerStore
impl<T, U> HyperlaneSequenceAwareIndexerStore<T> for U where impl<T, S> HyperlaneSequenceAwareIndexerStore<T> for S where
U: HyperlaneLogStore<T> + HyperlaneSequenceAwareIndexerStoreReader<T> + Send + Sync + Debug S: HyperlaneLogStore<T> + HyperlaneSequenceAwareIndexerStoreReader<T> + Send + Sync + Debug
{ {
} }

Loading…
Cancel
Save