refactor: adds a bit more logging and error reporting to OpticsAgent

buddies-main-deployment
James Prestwich 4 years ago committed by James Prestwich
parent 5ea330cfbb
commit fc63fed64c
No known key found for this signature in database
GPG Key ID: 7CC174C250AD83AD
  1. 1
      rust/Cargo.lock
  2. 2
      rust/optics-base/Cargo.toml
  3. 35
      rust/optics-base/src/agent.rs
  4. 2
      rust/optics-core/src/traits/replica.rs

1
rust/Cargo.lock generated

@ -1232,6 +1232,7 @@ dependencies = [
"ethers-providers",
"ethers-signers",
"futures-util",
"log",
"optics-core",
"serde 1.0.120",
"serde_json",

@ -13,6 +13,8 @@ config = "0.10"
serde = "1.0.120"
serde_json = { version = "1.0.61", default-features = false }
log = "0.4.13"
ethers = { git = "https://github.com/gakonst/ethers-rs" }
ethers-signers = { git = "https://github.com/gakonst/ethers-rs" }
ethers-middleware = { git = "https://github.com/gakonst/ethers-rs" }

@ -1,6 +1,10 @@
use async_trait::async_trait;
use color_eyre::{eyre::WrapErr, Result};
use color_eyre::{
eyre::{eyre, WrapErr},
Result,
};
use futures_util::future::{join_all, select_all};
use log::error;
use std::sync::Arc;
use crate::settings::Settings;
@ -13,25 +17,36 @@ pub trait OpticsAgent: Send + Sync + std::fmt::Debug {
/// Run the agent with the given home and replica
async fn run(home: Arc<Box<dyn Home>>, replica: Box<dyn Replica>) -> Result<()>;
/// Run the Agent, and tag errors with the slip44 ID of the replica
async fn run_report_error(home: Arc<Box<dyn Home>>, replica: Box<dyn Replica>) -> Result<()> {
let slip44 = replica.destination_slip44();
Self::run(home, replica)
.await
.wrap_err_with(|| format!("Replica with ID {} failed", slip44))
}
#[allow(unreachable_code)]
/// Run several agents
async fn run_many(home: Box<dyn Home>, replicas: Vec<Box<dyn Replica>>) -> Result<()> {
let home = Arc::new(home);
let mut replica_tasks: Vec<_> = replicas
let mut futs: Vec<_> = replicas
.into_iter()
.map(|replica| Self::run(home.clone(), replica))
.map(|replica| Self::run_report_error(home.clone(), replica))
.collect();
loop {
let (_res, _, rem) = select_all(replica_tasks).await;
// TODO: report failure
replica_tasks = rem;
if replica_tasks.is_empty() {
break;
// This gets the first future to resolve.
let (res, _, remaining) = select_all(futs).await;
if res.is_err() {
error!("Replica shut down: {:#}", res.unwrap_err());
}
futs = remaining;
if futs.is_empty() {
return Err(eyre!("All replicas have shut down"));
}
}
Ok(())
unreachable!()
}
/// Run several agents based on the settings

@ -21,7 +21,7 @@ pub trait Replica: Common + Send + Sync + std::fmt::Debug {
/// Fetch the previous root.
async fn previous_root(&self) -> Result<H256, ChainCommunicationError>;
/// Prove inclusion of some leaf in the replica
/// Dispatch a transaction to prove inclusion of some leaf in the replica.
async fn prove(
&self,
leaf: H256,

Loading…
Cancel
Save