Infallible relayer tasks (#3257)

### Description

The intended behavior of agents is described by @tkporter's comment
here:
https://github.com/hyperlane-xyz/hyperlane-monorepo/pull/3257#discussion_r1489233847

### Drive-by changes

<!--
Are there any minor or drive-by changes also included?
-->

### Related issues

- Fixes https://github.com/hyperlane-xyz/hyperlane-monorepo/issues/3209

### Backward compatibility

<!--
Are these changes backward compatible? Are there any infrastructure
implications, e.g. changes that would prohibit deploying older commits
using this infra tooling?

Yes/No
-->

### Testing

<!--
What kind of testing have these changes undergone?

None/Manual/Unit Tests
-->
trevor/add-injective-back
Daniel Savu 9 months ago committed by GitHub
parent 02e64c9f44
commit 155c85eda2
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
  1. 3
      rust/Cargo.lock
  2. 1
      rust/agents/relayer/Cargo.toml
  3. 2
      rust/agents/relayer/src/msg/pending_message.rs
  4. 47
      rust/agents/relayer/src/msg/pending_operation.rs
  5. 33
      rust/agents/relayer/src/msg/serial_submitter.rs
  6. 13
      rust/agents/relayer/src/processor.rs
  7. 69
      rust/agents/relayer/src/relayer.rs
  8. 25
      rust/agents/scraper/src/agent.rs
  9. 1
      rust/agents/validator/Cargo.toml
  10. 26
      rust/agents/validator/src/validator.rs
  11. 1
      rust/hyperlane-base/Cargo.toml
  12. 32
      rust/hyperlane-base/src/agent.rs
  13. 6
      rust/hyperlane-base/src/contract_sync/mod.rs
  14. 5
      rust/hyperlane-base/src/metrics/agent_metrics.rs

3
rust/Cargo.lock generated

@ -4119,6 +4119,7 @@ dependencies = [
"ethers-prometheus",
"eyre",
"fuels",
"futures",
"futures-util",
"hyperlane-core",
"hyperlane-cosmos",
@ -6937,6 +6938,7 @@ dependencies = [
"ethers",
"ethers-contract",
"eyre",
"futures",
"futures-util",
"hyperlane-base",
"hyperlane-core",
@ -10576,6 +10578,7 @@ dependencies = [
"derive_more",
"ethers",
"eyre",
"futures",
"futures-util",
"hyperlane-base",
"hyperlane-core",

@ -19,6 +19,7 @@ enum_dispatch.workspace = true
ethers-contract.workspace = true
ethers.workspace = true
eyre.workspace = true
futures.workspace = true
futures-util.workspace = true
itertools.workspace = true
num-derive.workspace = true

@ -6,7 +6,7 @@ use std::{
use async_trait::async_trait;
use derive_new::new;
use eyre::{Context, Result};
use eyre::Result;
use hyperlane_base::{db::HyperlaneRocksDB, CoreMetrics};
use hyperlane_core::{HyperlaneChain, HyperlaneDomain, HyperlaneMessage, Mailbox, U256};
use prometheus::{IntCounter, IntGauge};

@ -2,7 +2,6 @@ use std::{cmp::Ordering, time::Instant};
use async_trait::async_trait;
use enum_dispatch::enum_dispatch;
use eyre::Report;
use hyperlane_core::HyperlaneDomain;
#[allow(unused_imports)] // required for enum_dispatch
@ -110,9 +109,6 @@ pub enum PendingOperationResult {
Reprepare,
/// Do not attempt to run the operation again, forget about it
Drop,
/// Pass the error up the chain, this is non-recoverable and indicates a
/// system failure.
CriticalFailure(Report),
}
/// create a `op_try!` macro for the `on_retry` handler.
@ -121,30 +117,27 @@ macro_rules! make_op_try {
/// Handle a result and either return early with retry or a critical failure on
/// error.
macro_rules! op_try {
(critical: $e:expr, $ctx:literal) => {
match $e {
Ok(v) => v,
Err(e) => {
error!(error=?e, concat!("Error when ", $ctx));
return PendingOperationResult::CriticalFailure(
Err::<(), _>(e)
.context(concat!("When ", $ctx))
.unwrap_err()
);
}
}
};
($e:expr, $ctx:literal) => {
match $e {
Ok(v) => v,
Err(e) => {
warn!(error=?e, concat!("Error when ", $ctx));
#[allow(clippy::redundant_closure_call)]
return $on_retry();
}
}
};
(critical: $e:expr, $ctx:literal) => {
match $e {
Ok(v) => v,
Err(e) => {
error!(error=?e, concat!("Critical error when ", $ctx));
#[allow(clippy::redundant_closure_call)]
return $on_retry();
}
}
};
($e:expr, $ctx:literal) => {
match $e {
Ok(v) => v,
Err(e) => {
warn!(error=?e, concat!("Error when ", $ctx));
#[allow(clippy::redundant_closure_call)]
return $on_retry();
}
}
};
}
};
}

@ -4,7 +4,6 @@ use std::sync::Arc;
use std::time::Duration;
use derive_new::new;
use eyre::{bail, Result};
use futures_util::future::try_join_all;
use prometheus::{IntCounter, IntGauge};
use tokio::spawn;
@ -81,12 +80,12 @@ pub struct SerialSubmitter {
}
impl SerialSubmitter {
pub fn spawn(self) -> Instrumented<JoinHandle<Result<()>>> {
pub fn spawn(self) -> Instrumented<JoinHandle<()>> {
let span = info_span!("SerialSubmitter", destination=%self.domain);
spawn(async move { self.run().await }).instrument(span)
}
async fn run(self) -> Result<()> {
async fn run(self) {
let Self {
domain,
metrics,
@ -128,10 +127,13 @@ impl SerialSubmitter {
)),
];
for i in try_join_all(tasks).await? {
i?
if let Err(err) = try_join_all(tasks).await {
tracing::error!(
error=?err,
?domain,
"SerialSubmitter task panicked for domain"
);
}
Ok(())
}
}
@ -140,7 +142,7 @@ async fn receive_task(
domain: HyperlaneDomain,
mut rx: mpsc::UnboundedReceiver<Box<DynPendingOperation>>,
prepare_queue: OpQueue,
) -> Result<()> {
) {
// Pull any messages sent to this submitter
while let Some(op) = rx.recv().await {
trace!(?op, "Received new operation");
@ -149,7 +151,6 @@ async fn receive_task(
debug_assert_eq!(*op.domain(), domain);
prepare_queue.lock().await.push(Reverse(op));
}
bail!("Submitter receive channel was closed")
}
#[instrument(skip_all, fields(%domain))]
@ -158,7 +159,7 @@ async fn prepare_task(
prepare_queue: OpQueue,
tx_submit: mpsc::Sender<Box<DynPendingOperation>>,
metrics: SerialSubmitterMetrics,
) -> Result<()> {
) {
loop {
// Pick the next message to try preparing.
let next = {
@ -179,7 +180,9 @@ async fn prepare_task(
debug!(?op, "Operation prepared");
metrics.ops_prepared.inc();
// this send will pause this task if the submitter is not ready to accept yet
tx_submit.send(op).await?;
if let Err(err) = tx_submit.send(op).await {
tracing::error!(error=?err, "Failed to send prepared operation to submitter");
}
}
PendingOperationResult::NotReady => {
// none of the operations are ready yet, so wait for a little bit
@ -193,9 +196,6 @@ async fn prepare_task(
PendingOperationResult::Drop => {
metrics.ops_dropped.inc();
}
PendingOperationResult::CriticalFailure(e) => {
return Err(e);
}
}
}
}
@ -207,7 +207,7 @@ async fn submit_task(
prepare_queue: OpQueue,
confirm_queue: OpQueue,
metrics: SerialSubmitterMetrics,
) -> Result<()> {
) {
while let Some(mut op) = rx_submit.recv().await {
trace!(?op, "Submitting operation");
debug_assert_eq!(*op.domain(), domain);
@ -228,10 +228,8 @@ async fn submit_task(
PendingOperationResult::Drop => {
metrics.ops_dropped.inc();
}
PendingOperationResult::CriticalFailure(e) => return Err(e),
}
}
bail!("Internal submitter channel was closed");
}
#[instrument(skip_all, fields(%domain))]
@ -240,7 +238,7 @@ async fn confirm_task(
prepare_queue: OpQueue,
confirm_queue: OpQueue,
metrics: SerialSubmitterMetrics,
) -> Result<()> {
) {
loop {
// Pick the next message to try confirming.
let next = {
@ -272,7 +270,6 @@ async fn confirm_task(
PendingOperationResult::Drop => {
metrics.ops_dropped.inc();
}
PendingOperationResult::CriticalFailure(e) => return Err(e),
}
}
}

@ -5,7 +5,7 @@ use derive_new::new;
use eyre::Result;
use hyperlane_core::HyperlaneDomain;
use tokio::task::JoinHandle;
use tracing::instrument;
use tracing::{instrument, warn};
#[async_trait]
pub trait ProcessorExt: Send + Debug {
@ -23,14 +23,17 @@ pub struct Processor {
}
impl Processor {
pub fn spawn(self) -> JoinHandle<Result<()>> {
pub fn spawn(self) -> JoinHandle<()> {
tokio::spawn(async move { self.main_loop().await })
}
#[instrument(ret, err, skip(self), level = "info", fields(domain=%self.ticker.domain()))]
async fn main_loop(mut self) -> Result<()> {
#[instrument(ret, skip(self), level = "info", fields(domain=%self.ticker.domain()))]
async fn main_loop(mut self) {
loop {
self.ticker.tick().await?;
if let Err(err) = self.ticker.tick().await {
warn!(error=%err, "Error in processor tick");
tokio::time::sleep(std::time::Duration::from_secs(5)).await;
}
}
}
}

@ -7,10 +7,10 @@ use std::{
use async_trait::async_trait;
use derive_more::AsRef;
use eyre::Result;
use futures_util::future::try_join_all;
use hyperlane_base::{
db::{HyperlaneRocksDB, DB},
metrics::{AgentMetrics, MetricsUpdater},
run_all,
settings::ChainConf,
BaseAgent, ChainMetrics, ContractSyncMetrics, CoreMetrics, HyperlaneAgentCore,
SequencedDataContractSync, WatermarkContractSync,
@ -25,10 +25,9 @@ use tokio::{
},
task::JoinHandle,
};
use tracing::{info, info_span, instrument::Instrumented, Instrument};
use tracing::{info, info_span, instrument::Instrumented, warn, Instrument};
use crate::merkle_tree::processor::{MerkleTreeProcessor, MerkleTreeProcessorMetrics};
use crate::processor::{Processor, ProcessorExt};
use crate::processor::Processor;
use crate::{
merkle_tree::builder::MerkleTreeBuilder,
msg::{
@ -41,6 +40,10 @@ use crate::{
},
settings::{matching_list::MatchingList, RelayerSettings},
};
use crate::{
merkle_tree::processor::{MerkleTreeProcessor, MerkleTreeProcessorMetrics},
processor::ProcessorExt,
};
#[derive(Debug, Hash, PartialEq, Eq, Copy, Clone)]
struct ContextKey {
@ -268,7 +271,7 @@ impl BaseAgent for Relayer {
}
#[allow(clippy::async_yields_async)]
async fn run(self) -> Instrumented<JoinHandle<Result<()>>> {
async fn run(self) {
let mut tasks = vec![];
// send channels by destination chain
@ -304,15 +307,17 @@ impl BaseAgent for Relayer {
tasks.push(self.run_merkle_tree_processor(origin));
}
run_all(tasks)
if let Err(err) = try_join_all(tasks).await {
tracing::error!(
error=?err,
"Relayer task panicked"
);
}
}
}
impl Relayer {
async fn run_message_sync(
&self,
origin: &HyperlaneDomain,
) -> Instrumented<JoinHandle<eyre::Result<()>>> {
async fn run_message_sync(&self, origin: &HyperlaneDomain) -> Instrumented<JoinHandle<()>> {
let index_settings = self.as_ref().settings.chains[origin.name()].index_settings();
let contract_sync = self.message_syncs.get(origin).unwrap().clone();
let cursor = contract_sync
@ -330,7 +335,7 @@ impl Relayer {
async fn run_interchain_gas_payment_sync(
&self,
origin: &HyperlaneDomain,
) -> Instrumented<JoinHandle<eyre::Result<()>>> {
) -> Instrumented<JoinHandle<()>> {
let index_settings = self.as_ref().settings.chains[origin.name()].index_settings();
let contract_sync = self
.interchain_gas_payment_syncs
@ -345,7 +350,7 @@ impl Relayer {
async fn run_merkle_tree_hook_syncs(
&self,
origin: &HyperlaneDomain,
) -> Instrumented<JoinHandle<eyre::Result<()>>> {
) -> Instrumented<JoinHandle<()>> {
let index_settings = self.as_ref().settings.chains[origin.name()].index.clone();
let contract_sync = self.merkle_tree_hook_syncs.get(origin).unwrap().clone();
let cursor = contract_sync
@ -359,13 +364,13 @@ impl Relayer {
&self,
origin: &HyperlaneDomain,
send_channels: HashMap<u32, UnboundedSender<Box<DynPendingOperation>>>,
) -> Instrumented<JoinHandle<Result<()>>> {
) -> Instrumented<JoinHandle<()>> {
let metrics = MessageProcessorMetrics::new(
&self.core.metrics,
origin,
self.destination_chains.keys(),
);
let destination_ctxs = self
let destination_ctxs: HashMap<_, _> = self
.destination_chains
.keys()
.filter(|&destination| destination != origin)
@ -380,6 +385,7 @@ impl Relayer {
)
})
.collect();
let message_processor = MessageProcessor::new(
self.dbs.get(origin).unwrap().clone(),
self.whitelist.clone(),
@ -391,18 +397,11 @@ impl Relayer {
let span = info_span!("MessageProcessor", origin=%message_processor.domain());
let processor = Processor::new(Box::new(message_processor));
tokio::spawn(async move {
let res = tokio::try_join!(processor.spawn())?;
info!(?res, "try_join finished for message processor");
Ok(())
})
.instrument(span)
processor.spawn().instrument(span)
}
fn run_merkle_tree_processor(
&self,
origin: &HyperlaneDomain,
) -> Instrumented<JoinHandle<Result<()>>> {
fn run_merkle_tree_processor(&self, origin: &HyperlaneDomain) -> Instrumented<JoinHandle<()>> {
let metrics = MerkleTreeProcessorMetrics::new();
let merkle_tree_processor = MerkleTreeProcessor::new(
self.dbs.get(origin).unwrap().clone(),
@ -412,12 +411,7 @@ impl Relayer {
let span = info_span!("MerkleTreeProcessor", origin=%merkle_tree_processor.domain());
let processor = Processor::new(Box::new(merkle_tree_processor));
tokio::spawn(async move {
let res = tokio::try_join!(processor.spawn())?;
info!(?res, "try_join finished for merkle tree processor");
Ok(())
})
.instrument(span)
processor.spawn().instrument(span)
}
#[allow(clippy::too_many_arguments)]
@ -426,19 +420,22 @@ impl Relayer {
&self,
destination: &HyperlaneDomain,
receiver: UnboundedReceiver<Box<DynPendingOperation>>,
) -> Instrumented<JoinHandle<Result<()>>> {
) -> Instrumented<JoinHandle<()>> {
let serial_submitter = SerialSubmitter::new(
destination.clone(),
receiver,
SerialSubmitterMetrics::new(&self.core.metrics, destination),
);
let span = info_span!("SerialSubmitter", destination=%destination);
let submit_fut = serial_submitter.spawn();
let destination = destination.clone();
tokio::spawn(async move {
let res = tokio::try_join!(submit_fut)?;
info!(?res, "try_join finished for submitter");
Ok(())
// Propagate task panics
serial_submitter.spawn().await.unwrap_or_else(|err| {
panic!(
"destination submitter panicked for destination {}: {:?}",
destination, err
)
});
})
.instrument(span)
}

@ -2,9 +2,10 @@ use std::{collections::HashMap, sync::Arc};
use async_trait::async_trait;
use derive_more::AsRef;
use futures::future::try_join_all;
use hyperlane_base::{
metrics::AgentMetrics, run_all, settings::IndexSettings, BaseAgent, ChainMetrics,
ContractSyncMetrics, CoreMetrics, HyperlaneAgentCore, MetricsUpdater,
metrics::AgentMetrics, settings::IndexSettings, BaseAgent, ChainMetrics, ContractSyncMetrics,
CoreMetrics, HyperlaneAgentCore, MetricsUpdater,
};
use hyperlane_core::{HyperlaneDomain, KnownHyperlaneDomain};
use num_traits::cast::FromPrimitive;
@ -91,7 +92,7 @@ impl BaseAgent for Scraper {
}
#[allow(clippy::async_yields_async)]
async fn run(self) -> Instrumented<JoinHandle<eyre::Result<()>>> {
async fn run(self) {
let mut tasks = Vec::with_capacity(self.scrapers.len());
for domain in self.scrapers.keys() {
tasks.push(self.scrape(*domain).await);
@ -109,14 +110,16 @@ impl BaseAgent for Scraper {
.unwrap();
tasks.push(metrics_updater.spawn());
}
run_all(tasks)
if let Err(err) = try_join_all(tasks).await {
tracing::error!(error = ?err, "Scraper task panicked");
}
}
}
impl Scraper {
/// Sync contract data and other blockchain with the current chain state.
/// This will spawn long-running contract sync tasks
async fn scrape(&self, domain_id: u32) -> Instrumented<JoinHandle<eyre::Result<()>>> {
async fn scrape(&self, domain_id: u32) -> Instrumented<JoinHandle<()>> {
let scraper = self.scrapers.get(&domain_id).unwrap();
let db = scraper.db.clone();
let index_settings = scraper.index_settings.clone();
@ -153,7 +156,12 @@ impl Scraper {
)
.await,
);
run_all(tasks)
tokio::spawn(async move {
// If any of the tasks panic, we want to propagate it, so we unwrap
try_join_all(tasks).await.unwrap();
})
.instrument(info_span!("Scraper Tasks"))
}
}
@ -167,7 +175,7 @@ macro_rules! spawn_sync_task {
contract_sync_metrics: Arc<ContractSyncMetrics>,
db: HyperlaneSqlDb,
index_settings: IndexSettings,
) -> Instrumented<JoinHandle<eyre::Result<()>>> {
) -> Instrumented<JoinHandle<()>> {
let sync = self
.as_ref()
.settings
@ -191,6 +199,7 @@ macro_rules! spawn_sync_task {
}
}
}
impl Scraper {
async fn build_message_indexer(
&self,
@ -199,7 +208,7 @@ impl Scraper {
contract_sync_metrics: Arc<ContractSyncMetrics>,
db: HyperlaneSqlDb,
index_settings: IndexSettings,
) -> Instrumented<JoinHandle<eyre::Result<()>>> {
) -> Instrumented<JoinHandle<()>> {
let sync = self
.as_ref()
.settings

@ -15,6 +15,7 @@ config.workspace = true
derive_more.workspace = true
ethers.workspace = true
eyre.workspace = true
futures.workspace = true
futures-util.workspace = true
prometheus.workspace = true
serde.workspace = true

@ -3,15 +3,14 @@ use std::{num::NonZeroU64, sync::Arc, time::Duration};
use async_trait::async_trait;
use derive_more::AsRef;
use eyre::Result;
use futures_util::future::ready;
use futures_util::future::try_join_all;
use tokio::{task::JoinHandle, time::sleep};
use tracing::{error, info, info_span, instrument::Instrumented, warn, Instrument};
use hyperlane_base::{
db::{HyperlaneRocksDB, DB},
metrics::AgentMetrics,
run_all,
settings::ChainConf,
BaseAgent, ChainMetrics, CheckpointSyncer, ContractSyncMetrics, CoreMetrics,
HyperlaneAgentCore, MetricsUpdater, SequencedDataContractSync,
@ -127,7 +126,7 @@ impl BaseAgent for Validator {
}
#[allow(clippy::async_yields_async)]
async fn run(mut self) -> Instrumented<JoinHandle<Result<()>>> {
async fn run(mut self) {
let mut tasks = vec![];
if let Some(signer_instance) = self.signer_instance.take() {
@ -149,7 +148,13 @@ impl BaseAgent for Validator {
)
.await
.unwrap();
tasks.push(metrics_updater.spawn());
tasks.push(
tokio::spawn(async move {
metrics_updater.spawn().await.unwrap();
Ok(())
})
.instrument(info_span!("MetricsUpdater")),
);
// announce the validator after spawning the signer task
self.announce().await.expect("Failed to announce validator");
@ -173,12 +178,14 @@ impl BaseAgent for Validator {
}
_ => {
// Future that immediately resolves
return tokio::spawn(ready(Ok(()))).instrument(info_span!("Validator"));
return;
}
}
}
run_all(tasks)
if let Err(err) = try_join_all(tasks).await {
error!(?err, "One of the validator tasks returned an error");
}
}
}
@ -190,8 +197,11 @@ impl Validator {
let cursor = contract_sync
.forward_backward_message_sync_cursor(index_settings)
.await;
tokio::spawn(async move { contract_sync.clone().sync("merkle_tree_hook", cursor).await })
.instrument(info_span!("MerkleTreeHookSyncer"))
tokio::spawn(async move {
contract_sync.clone().sync("merkle_tree_hook", cursor).await;
Ok(())
})
.instrument(info_span!("MerkleTreeHookSyncer"))
}
async fn run_checkpoint_submitters(&self) -> Vec<Instrumented<JoinHandle<Result<()>>>> {

@ -21,6 +21,7 @@ ed25519-dalek.workspace = true
ethers.workspace = true
eyre.workspace = true
fuels.workspace = true
futures.worksapce = true
futures-util.workspace = true
itertools.workspace = true
maplit.workspace = true

@ -1,11 +1,8 @@
use std::{env, fmt::Debug, sync::Arc};
use async_trait::async_trait;
use eyre::{Report, Result};
use futures_util::future::select_all;
use eyre::Result;
use hyperlane_core::config::*;
use tokio::task::JoinHandle;
use tracing::{debug_span, instrument::Instrumented, Instrument};
use crate::{
create_chain_metrics,
@ -52,7 +49,7 @@ pub trait BaseAgent: Send + Sync + Debug {
/// Start running this agent.
#[allow(clippy::async_yields_async)]
async fn run(self) -> Instrumented<JoinHandle<Result<()>>>;
async fn run(self);
}
/// Call this from `main` to fully initialize and run the agent for its entire
@ -83,26 +80,7 @@ pub async fn agent_main<A: BaseAgent>() -> Result<()> {
let agent = A::from_settings(settings, metrics.clone(), agent_metrics, chain_metrics).await?;
metrics.run_http_server();
agent.run().await.await?
}
/// Utility to run multiple tasks and shutdown if any one task ends.
#[allow(clippy::unit_arg, unused_must_use)]
pub fn run_all(
tasks: Vec<Instrumented<JoinHandle<Result<(), Report>>>>,
) -> Instrumented<JoinHandle<Result<()>>> {
debug_assert!(!tasks.is_empty(), "No tasks submitted");
let span = debug_span!("run_all");
tokio::spawn(async move {
let (res, _, remaining) = select_all(tasks).await;
for task in remaining.into_iter() {
let t = task.into_inner();
t.abort();
t.await;
}
res?
})
.instrument(span)
// This await will never end unless a panic occurs
agent.run().await;
Ok(())
}

@ -46,11 +46,7 @@ where
/// Sync logs and write them to the LogStore
#[tracing::instrument(name = "ContractSync", fields(domain=self.domain().name()), skip(self, cursor))]
pub async fn sync(
&self,
label: &'static str,
mut cursor: Box<dyn ContractSyncCursor<T>>,
) -> eyre::Result<()> {
pub async fn sync(&self, label: &'static str, mut cursor: Box<dyn ContractSyncCursor<T>>) {
let chain_name = self.domain.as_ref();
let indexed_height = self
.metrics

@ -4,7 +4,7 @@ use std::sync::Arc;
use std::time::Duration;
use derive_builder::Builder;
use eyre::{Report, Result};
use eyre::Result;
use hyperlane_core::metrics::agent::decimals_by_protocol;
use hyperlane_core::metrics::agent::u256_as_scaled_f64;
use hyperlane_core::metrics::agent::METRICS_SCRAPE_INTERVAL;
@ -221,11 +221,10 @@ impl MetricsUpdater {
}
/// Spawns a tokio task to update the metrics
pub fn spawn(self) -> Instrumented<JoinHandle<Result<(), Report>>> {
pub fn spawn(self) -> Instrumented<JoinHandle<()>> {
tokio::spawn(async move {
self.start_updating_on_interval(METRICS_SCRAPE_INTERVAL)
.await;
Ok(())
})
.instrument(info_span!("MetricsUpdater"))
}

Loading…
Cancel
Save