feat: panics upon startup when RPC issues occur include the offending chain (#4323)

### Description

Some quick low hanging fruit to make debugging easier in the future.
Today
(https://discord.com/channels/935678348330434570/1273582453889699843)
some chains were having RPC issues and it caused an agent deploy to
panic upon startup. Debugging this was hard because the panic message
just indicates an error is occurring, but doesn't say what the offending
chain is!

Examples:
- https://cloudlogging.app.goo.gl/puQk7hm4fXyJhYKs8
- https://cloudlogging.app.goo.gl/5qZrbc2Ww4LWQXWC6
- https://cloudlogging.app.goo.gl/ZL7mKbBzzJaePtty9

All these panic happen when constructing the cursor, which isn't yet
done in an indexing task. The quick and easy fix is to just use expect
instead of unwrap

We still unwrap in a few places, but mostly in places that aren't
susceptible to flaky RPC issues, e.g. reading from a HashMap that we
know should always have an entry, etc

### Drive-by changes

<!--
Are there any minor or drive-by changes also included?
-->

### Related issues

<!--
- Fixes #[issue number here]
-->

### Backward compatibility

<!--
Are these changes backward compatible? Are there any infrastructure
implications, e.g. changes that would prohibit deploying older commits
using this infra tooling?

Yes/No
-->

### Testing

<!--
What kind of testing have these changes undergone?

None/Manual/Unit Tests
-->
pull/4329/head
Trevor Porter 2 months ago committed by GitHub
parent ddf6834c7b
commit 57bbb16cc1
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
  1. 19
      rust/agents/relayer/src/relayer.rs
  2. 15
      rust/agents/scraper/src/agent.rs
  3. 10
      rust/agents/validator/src/validator.rs
  4. 28
      rust/hyperlane-base/src/contract_sync/mod.rs

@ -347,7 +347,9 @@ impl BaseAgent for Relayer {
Self::AGENT_NAME.to_string(),
)
.await
.unwrap();
.unwrap_or_else(|_| {
panic!("Error creating metrics updater for destination {dest_domain}")
});
tasks.push(metrics_updater.spawn());
}
@ -417,7 +419,10 @@ impl Relayer {
) -> Instrumented<JoinHandle<()>> {
let index_settings = self.as_ref().settings.chains[origin.name()].index_settings();
let contract_sync = self.message_syncs.get(origin).unwrap().clone();
let cursor = contract_sync.cursor(index_settings).await;
let cursor = contract_sync
.cursor(index_settings)
.await
.unwrap_or_else(|err| panic!("Error getting cursor for origin {origin}: {err}"));
tokio::spawn(TaskMonitor::instrument(&task_monitor, async move {
contract_sync
.clone()
@ -439,7 +444,10 @@ impl Relayer {
.get(origin)
.unwrap()
.clone();
let cursor = contract_sync.cursor(index_settings).await;
let cursor = contract_sync
.cursor(index_settings)
.await
.unwrap_or_else(|err| panic!("Error getting cursor for origin {origin}: {err}"));
tokio::spawn(TaskMonitor::instrument(&task_monitor, async move {
contract_sync
.clone()
@ -460,7 +468,10 @@ impl Relayer {
) -> Instrumented<JoinHandle<()>> {
let index_settings = self.as_ref().settings.chains[origin.name()].index.clone();
let contract_sync = self.merkle_tree_hook_syncs.get(origin).unwrap().clone();
let cursor = contract_sync.cursor(index_settings).await;
let cursor = contract_sync
.cursor(index_settings)
.await
.unwrap_or_else(|err| panic!("Error getting cursor for origin {origin}: {err}"));
tokio::spawn(TaskMonitor::instrument(&task_monitor, async move {
contract_sync
.clone()

@ -200,7 +200,10 @@ impl Scraper {
)
.await
.unwrap();
let cursor = sync.cursor(index_settings.clone()).await;
let cursor = sync
.cursor(index_settings.clone())
.await
.unwrap_or_else(|err| panic!("Error getting cursor for domain {domain}: {err}"));
let maybe_broadcaser = sync.get_broadcaster();
let task = tokio::spawn(async move { sync.sync("message_dispatch", cursor.into()).await })
.instrument(
@ -230,7 +233,10 @@ impl Scraper {
.unwrap();
let label = "message_delivery";
let cursor = sync.cursor(index_settings.clone()).await;
let cursor = sync
.cursor(index_settings.clone())
.await
.unwrap_or_else(|err| panic!("Error getting cursor for domain {domain}: {err}"));
// there is no txid receiver for delivery indexing, since delivery txs aren't batched with
// other types of indexed txs / events
tokio::spawn(async move { sync.sync(label, SyncOptions::new(Some(cursor), None)).await })
@ -259,7 +265,10 @@ impl Scraper {
.unwrap();
let label = "gas_payment";
let cursor = sync.cursor(index_settings.clone()).await;
let cursor = sync
.cursor(index_settings.clone())
.await
.unwrap_or_else(|err| panic!("Error getting cursor for domain {domain}: {err}"));
tokio::spawn(async move {
sync.sync(label, SyncOptions::new(Some(cursor), tx_id_receiver))
.await

@ -216,7 +216,15 @@ impl Validator {
let index_settings =
self.as_ref().settings.chains[self.origin_chain.name()].index_settings();
let contract_sync = self.merkle_tree_hook_sync.clone();
let cursor = contract_sync.cursor(index_settings).await;
let cursor = contract_sync
.cursor(index_settings)
.await
.unwrap_or_else(|err| {
panic!(
"Error getting merkle tree hook cursor for origin {0}: {err}",
self.origin_chain
)
});
tokio::spawn(async move {
contract_sync
.clone()

@ -6,6 +6,7 @@ use axum::async_trait;
use broadcast::BroadcastMpscSender;
use cursors::*;
use derive_new::new;
use eyre::Result;
use hyperlane_core::{
utils::fmt_sync_time, ContractSyncCursor, CursorAction, HyperlaneDomain, HyperlaneLogStore,
HyperlaneSequenceAwareIndexerStore, HyperlaneWatermarkedLogStore, Indexer,
@ -240,7 +241,8 @@ pub type WatermarkContractSync<T> =
#[async_trait]
pub trait ContractSyncer<T>: Send + Sync {
/// Returns a new cursor to be used for syncing events from the indexer
async fn cursor(&self, index_settings: IndexSettings) -> Box<dyn ContractSyncCursor<T>>;
async fn cursor(&self, index_settings: IndexSettings)
-> Result<Box<dyn ContractSyncCursor<T>>>;
/// Syncs events from the indexer using the provided cursor
async fn sync(&self, label: &'static str, opts: SyncOptions<T>);
@ -277,23 +279,25 @@ where
T: Indexable + Debug + Send + Sync + Clone + Eq + Hash + 'static,
{
/// Returns a new cursor to be used for syncing events from the indexer based on time
async fn cursor(&self, index_settings: IndexSettings) -> Box<dyn ContractSyncCursor<T>> {
async fn cursor(
&self,
index_settings: IndexSettings,
) -> Result<Box<dyn ContractSyncCursor<T>>> {
let watermark = self.db.retrieve_high_watermark().await.unwrap();
let index_settings = IndexSettings {
from: watermark.unwrap_or(index_settings.from),
chunk_size: index_settings.chunk_size,
mode: index_settings.mode,
};
Box::new(
Ok(Box::new(
RateLimitedContractSyncCursor::new(
Arc::new(self.indexer.clone()),
self.db.clone(),
index_settings.chunk_size,
index_settings.from,
)
.await
.unwrap(),
)
.await?,
))
}
async fn sync(&self, label: &'static str, opts: SyncOptions<T>) {
@ -322,17 +326,19 @@ where
T: Indexable + Send + Sync + Debug + Clone + Eq + Hash + 'static,
{
/// Returns a new cursor to be used for syncing dispatched messages from the indexer
async fn cursor(&self, index_settings: IndexSettings) -> Box<dyn ContractSyncCursor<T>> {
Box::new(
async fn cursor(
&self,
index_settings: IndexSettings,
) -> Result<Box<dyn ContractSyncCursor<T>>> {
Ok(Box::new(
ForwardBackwardSequenceAwareSyncCursor::new(
self.indexer.clone(),
Arc::new(self.db.clone()),
index_settings.chunk_size,
index_settings.mode,
)
.await
.unwrap(),
)
.await?,
))
}
async fn sync(&self, label: &'static str, opts: SyncOptions<T>) {

Loading…
Cancel
Save