diff --git a/rust/agents/relayer/src/relayer.rs b/rust/agents/relayer/src/relayer.rs index 496628d70..d5077d678 100644 --- a/rust/agents/relayer/src/relayer.rs +++ b/rust/agents/relayer/src/relayer.rs @@ -347,7 +347,9 @@ impl BaseAgent for Relayer { Self::AGENT_NAME.to_string(), ) .await - .unwrap(); + .unwrap_or_else(|_| { + panic!("Error creating metrics updater for destination {dest_domain}") + }); tasks.push(metrics_updater.spawn()); } @@ -417,7 +419,10 @@ impl Relayer { ) -> Instrumented> { let index_settings = self.as_ref().settings.chains[origin.name()].index_settings(); let contract_sync = self.message_syncs.get(origin).unwrap().clone(); - let cursor = contract_sync.cursor(index_settings).await; + let cursor = contract_sync + .cursor(index_settings) + .await + .unwrap_or_else(|err| panic!("Error getting cursor for origin {origin}: {err}")); tokio::spawn(TaskMonitor::instrument(&task_monitor, async move { contract_sync .clone() @@ -439,7 +444,10 @@ impl Relayer { .get(origin) .unwrap() .clone(); - let cursor = contract_sync.cursor(index_settings).await; + let cursor = contract_sync + .cursor(index_settings) + .await + .unwrap_or_else(|err| panic!("Error getting cursor for origin {origin}: {err}")); tokio::spawn(TaskMonitor::instrument(&task_monitor, async move { contract_sync .clone() @@ -460,7 +468,10 @@ impl Relayer { ) -> Instrumented> { let index_settings = self.as_ref().settings.chains[origin.name()].index.clone(); let contract_sync = self.merkle_tree_hook_syncs.get(origin).unwrap().clone(); - let cursor = contract_sync.cursor(index_settings).await; + let cursor = contract_sync + .cursor(index_settings) + .await + .unwrap_or_else(|err| panic!("Error getting cursor for origin {origin}: {err}")); tokio::spawn(TaskMonitor::instrument(&task_monitor, async move { contract_sync .clone() diff --git a/rust/agents/scraper/src/agent.rs b/rust/agents/scraper/src/agent.rs index 3febc4092..fd9afc6d7 100644 --- a/rust/agents/scraper/src/agent.rs +++ b/rust/agents/scraper/src/agent.rs @@ -200,7 +200,10 @@ impl Scraper { ) .await .unwrap(); - let cursor = sync.cursor(index_settings.clone()).await; + let cursor = sync + .cursor(index_settings.clone()) + .await + .unwrap_or_else(|err| panic!("Error getting cursor for domain {domain}: {err}")); let maybe_broadcaser = sync.get_broadcaster(); let task = tokio::spawn(async move { sync.sync("message_dispatch", cursor.into()).await }) .instrument( @@ -230,7 +233,10 @@ impl Scraper { .unwrap(); let label = "message_delivery"; - let cursor = sync.cursor(index_settings.clone()).await; + let cursor = sync + .cursor(index_settings.clone()) + .await + .unwrap_or_else(|err| panic!("Error getting cursor for domain {domain}: {err}")); // there is no txid receiver for delivery indexing, since delivery txs aren't batched with // other types of indexed txs / events tokio::spawn(async move { sync.sync(label, SyncOptions::new(Some(cursor), None)).await }) @@ -259,7 +265,10 @@ impl Scraper { .unwrap(); let label = "gas_payment"; - let cursor = sync.cursor(index_settings.clone()).await; + let cursor = sync + .cursor(index_settings.clone()) + .await + .unwrap_or_else(|err| panic!("Error getting cursor for domain {domain}: {err}")); tokio::spawn(async move { sync.sync(label, SyncOptions::new(Some(cursor), tx_id_receiver)) .await diff --git a/rust/agents/validator/src/validator.rs b/rust/agents/validator/src/validator.rs index bb7339e6b..a4f9a9e00 100644 --- a/rust/agents/validator/src/validator.rs +++ b/rust/agents/validator/src/validator.rs @@ -216,7 +216,15 @@ impl Validator { let index_settings = self.as_ref().settings.chains[self.origin_chain.name()].index_settings(); let contract_sync = self.merkle_tree_hook_sync.clone(); - let cursor = contract_sync.cursor(index_settings).await; + let cursor = contract_sync + .cursor(index_settings) + .await + .unwrap_or_else(|err| { + panic!( + "Error getting merkle tree hook cursor for origin {0}: {err}", + self.origin_chain + ) + }); tokio::spawn(async move { contract_sync .clone() diff --git a/rust/hyperlane-base/src/contract_sync/mod.rs b/rust/hyperlane-base/src/contract_sync/mod.rs index b4116d14b..50d0eac0f 100644 --- a/rust/hyperlane-base/src/contract_sync/mod.rs +++ b/rust/hyperlane-base/src/contract_sync/mod.rs @@ -6,6 +6,7 @@ use axum::async_trait; use broadcast::BroadcastMpscSender; use cursors::*; use derive_new::new; +use eyre::Result; use hyperlane_core::{ utils::fmt_sync_time, ContractSyncCursor, CursorAction, HyperlaneDomain, HyperlaneLogStore, HyperlaneSequenceAwareIndexerStore, HyperlaneWatermarkedLogStore, Indexer, @@ -240,7 +241,8 @@ pub type WatermarkContractSync = #[async_trait] pub trait ContractSyncer: Send + Sync { /// Returns a new cursor to be used for syncing events from the indexer - async fn cursor(&self, index_settings: IndexSettings) -> Box>; + async fn cursor(&self, index_settings: IndexSettings) + -> Result>>; /// Syncs events from the indexer using the provided cursor async fn sync(&self, label: &'static str, opts: SyncOptions); @@ -277,23 +279,25 @@ where T: Indexable + Debug + Send + Sync + Clone + Eq + Hash + 'static, { /// Returns a new cursor to be used for syncing events from the indexer based on time - async fn cursor(&self, index_settings: IndexSettings) -> Box> { + async fn cursor( + &self, + index_settings: IndexSettings, + ) -> Result>> { let watermark = self.db.retrieve_high_watermark().await.unwrap(); let index_settings = IndexSettings { from: watermark.unwrap_or(index_settings.from), chunk_size: index_settings.chunk_size, mode: index_settings.mode, }; - Box::new( + Ok(Box::new( RateLimitedContractSyncCursor::new( Arc::new(self.indexer.clone()), self.db.clone(), index_settings.chunk_size, index_settings.from, ) - .await - .unwrap(), - ) + .await?, + )) } async fn sync(&self, label: &'static str, opts: SyncOptions) { @@ -322,17 +326,19 @@ where T: Indexable + Send + Sync + Debug + Clone + Eq + Hash + 'static, { /// Returns a new cursor to be used for syncing dispatched messages from the indexer - async fn cursor(&self, index_settings: IndexSettings) -> Box> { - Box::new( + async fn cursor( + &self, + index_settings: IndexSettings, + ) -> Result>> { + Ok(Box::new( ForwardBackwardSequenceAwareSyncCursor::new( self.indexer.clone(), Arc::new(self.db.clone()), index_settings.chunk_size, index_settings.mode, ) - .await - .unwrap(), - ) + .await?, + )) } async fn sync(&self, label: &'static str, opts: SyncOptions) {