From 57bbb16cc173a14fa4bbf1eb63bb87412d099a38 Mon Sep 17 00:00:00 2001 From: Trevor Porter Date: Fri, 16 Aug 2024 14:02:34 +0100 Subject: [PATCH] feat: panics upon startup when RPC issues occur include the offending chain (#4323) ### Description Some quick low hanging fruit to make debugging easier in the future. Today (https://discord.com/channels/935678348330434570/1273582453889699843) some chains were having RPC issues and it caused an agent deploy to panic upon startup. Debugging this was hard because the panic message just indicates an error is occurring, but doesn't say what the offending chain is! Examples: - https://cloudlogging.app.goo.gl/puQk7hm4fXyJhYKs8 - https://cloudlogging.app.goo.gl/5qZrbc2Ww4LWQXWC6 - https://cloudlogging.app.goo.gl/ZL7mKbBzzJaePtty9 All these panic happen when constructing the cursor, which isn't yet done in an indexing task. The quick and easy fix is to just use expect instead of unwrap We still unwrap in a few places, but mostly in places that aren't susceptible to flaky RPC issues, e.g. reading from a HashMap that we know should always have an entry, etc ### Drive-by changes ### Related issues ### Backward compatibility ### Testing --- rust/agents/relayer/src/relayer.rs | 19 ++++++++++--- rust/agents/scraper/src/agent.rs | 15 ++++++++--- rust/agents/validator/src/validator.rs | 10 ++++++- rust/hyperlane-base/src/contract_sync/mod.rs | 28 ++++++++++++-------- 4 files changed, 53 insertions(+), 19 deletions(-) diff --git a/rust/agents/relayer/src/relayer.rs b/rust/agents/relayer/src/relayer.rs index 496628d70..d5077d678 100644 --- a/rust/agents/relayer/src/relayer.rs +++ b/rust/agents/relayer/src/relayer.rs @@ -347,7 +347,9 @@ impl BaseAgent for Relayer { Self::AGENT_NAME.to_string(), ) .await - .unwrap(); + .unwrap_or_else(|_| { + panic!("Error creating metrics updater for destination {dest_domain}") + }); tasks.push(metrics_updater.spawn()); } @@ -417,7 +419,10 @@ impl Relayer { ) -> Instrumented> { let index_settings = self.as_ref().settings.chains[origin.name()].index_settings(); let contract_sync = self.message_syncs.get(origin).unwrap().clone(); - let cursor = contract_sync.cursor(index_settings).await; + let cursor = contract_sync + .cursor(index_settings) + .await + .unwrap_or_else(|err| panic!("Error getting cursor for origin {origin}: {err}")); tokio::spawn(TaskMonitor::instrument(&task_monitor, async move { contract_sync .clone() @@ -439,7 +444,10 @@ impl Relayer { .get(origin) .unwrap() .clone(); - let cursor = contract_sync.cursor(index_settings).await; + let cursor = contract_sync + .cursor(index_settings) + .await + .unwrap_or_else(|err| panic!("Error getting cursor for origin {origin}: {err}")); tokio::spawn(TaskMonitor::instrument(&task_monitor, async move { contract_sync .clone() @@ -460,7 +468,10 @@ impl Relayer { ) -> Instrumented> { let index_settings = self.as_ref().settings.chains[origin.name()].index.clone(); let contract_sync = self.merkle_tree_hook_syncs.get(origin).unwrap().clone(); - let cursor = contract_sync.cursor(index_settings).await; + let cursor = contract_sync + .cursor(index_settings) + .await + .unwrap_or_else(|err| panic!("Error getting cursor for origin {origin}: {err}")); tokio::spawn(TaskMonitor::instrument(&task_monitor, async move { contract_sync .clone() diff --git a/rust/agents/scraper/src/agent.rs b/rust/agents/scraper/src/agent.rs index 3febc4092..fd9afc6d7 100644 --- a/rust/agents/scraper/src/agent.rs +++ b/rust/agents/scraper/src/agent.rs @@ -200,7 +200,10 @@ impl Scraper { ) .await .unwrap(); - let cursor = sync.cursor(index_settings.clone()).await; + let cursor = sync + .cursor(index_settings.clone()) + .await + .unwrap_or_else(|err| panic!("Error getting cursor for domain {domain}: {err}")); let maybe_broadcaser = sync.get_broadcaster(); let task = tokio::spawn(async move { sync.sync("message_dispatch", cursor.into()).await }) .instrument( @@ -230,7 +233,10 @@ impl Scraper { .unwrap(); let label = "message_delivery"; - let cursor = sync.cursor(index_settings.clone()).await; + let cursor = sync + .cursor(index_settings.clone()) + .await + .unwrap_or_else(|err| panic!("Error getting cursor for domain {domain}: {err}")); // there is no txid receiver for delivery indexing, since delivery txs aren't batched with // other types of indexed txs / events tokio::spawn(async move { sync.sync(label, SyncOptions::new(Some(cursor), None)).await }) @@ -259,7 +265,10 @@ impl Scraper { .unwrap(); let label = "gas_payment"; - let cursor = sync.cursor(index_settings.clone()).await; + let cursor = sync + .cursor(index_settings.clone()) + .await + .unwrap_or_else(|err| panic!("Error getting cursor for domain {domain}: {err}")); tokio::spawn(async move { sync.sync(label, SyncOptions::new(Some(cursor), tx_id_receiver)) .await diff --git a/rust/agents/validator/src/validator.rs b/rust/agents/validator/src/validator.rs index bb7339e6b..a4f9a9e00 100644 --- a/rust/agents/validator/src/validator.rs +++ b/rust/agents/validator/src/validator.rs @@ -216,7 +216,15 @@ impl Validator { let index_settings = self.as_ref().settings.chains[self.origin_chain.name()].index_settings(); let contract_sync = self.merkle_tree_hook_sync.clone(); - let cursor = contract_sync.cursor(index_settings).await; + let cursor = contract_sync + .cursor(index_settings) + .await + .unwrap_or_else(|err| { + panic!( + "Error getting merkle tree hook cursor for origin {0}: {err}", + self.origin_chain + ) + }); tokio::spawn(async move { contract_sync .clone() diff --git a/rust/hyperlane-base/src/contract_sync/mod.rs b/rust/hyperlane-base/src/contract_sync/mod.rs index b4116d14b..50d0eac0f 100644 --- a/rust/hyperlane-base/src/contract_sync/mod.rs +++ b/rust/hyperlane-base/src/contract_sync/mod.rs @@ -6,6 +6,7 @@ use axum::async_trait; use broadcast::BroadcastMpscSender; use cursors::*; use derive_new::new; +use eyre::Result; use hyperlane_core::{ utils::fmt_sync_time, ContractSyncCursor, CursorAction, HyperlaneDomain, HyperlaneLogStore, HyperlaneSequenceAwareIndexerStore, HyperlaneWatermarkedLogStore, Indexer, @@ -240,7 +241,8 @@ pub type WatermarkContractSync = #[async_trait] pub trait ContractSyncer: Send + Sync { /// Returns a new cursor to be used for syncing events from the indexer - async fn cursor(&self, index_settings: IndexSettings) -> Box>; + async fn cursor(&self, index_settings: IndexSettings) + -> Result>>; /// Syncs events from the indexer using the provided cursor async fn sync(&self, label: &'static str, opts: SyncOptions); @@ -277,23 +279,25 @@ where T: Indexable + Debug + Send + Sync + Clone + Eq + Hash + 'static, { /// Returns a new cursor to be used for syncing events from the indexer based on time - async fn cursor(&self, index_settings: IndexSettings) -> Box> { + async fn cursor( + &self, + index_settings: IndexSettings, + ) -> Result>> { let watermark = self.db.retrieve_high_watermark().await.unwrap(); let index_settings = IndexSettings { from: watermark.unwrap_or(index_settings.from), chunk_size: index_settings.chunk_size, mode: index_settings.mode, }; - Box::new( + Ok(Box::new( RateLimitedContractSyncCursor::new( Arc::new(self.indexer.clone()), self.db.clone(), index_settings.chunk_size, index_settings.from, ) - .await - .unwrap(), - ) + .await?, + )) } async fn sync(&self, label: &'static str, opts: SyncOptions) { @@ -322,17 +326,19 @@ where T: Indexable + Send + Sync + Debug + Clone + Eq + Hash + 'static, { /// Returns a new cursor to be used for syncing dispatched messages from the indexer - async fn cursor(&self, index_settings: IndexSettings) -> Box> { - Box::new( + async fn cursor( + &self, + index_settings: IndexSettings, + ) -> Result>> { + Ok(Box::new( ForwardBackwardSequenceAwareSyncCursor::new( self.indexer.clone(), Arc::new(self.db.clone()), index_settings.chunk_size, index_settings.mode, ) - .await - .unwrap(), - ) + .await?, + )) } async fn sync(&self, label: &'static str, opts: SyncOptions) {