refactor: move DB to agentcore (#228)

- add db_path to settings for all agents
- refactor updater poll_and_handle_update to use a mutex guard
- add db() method to optics agent trait
buddies-main-deployment
James Prestwich 4 years ago committed by GitHub
parent be4704e85a
commit 1174800624
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 13
      rust/optics-base/src/agent.rs
  2. 17
      rust/optics-base/src/db/mod.rs
  3. 8
      rust/optics-base/src/settings/mod.rs
  4. 14
      rust/processor/src/processor.rs
  5. 35
      rust/updater/src/updater.rs
  6. 9
      rust/watcher/src/watcher.rs

@ -1,11 +1,11 @@
use crate::{cancel_task, home::Homes, replica::Replicas, settings::Settings};
use async_trait::async_trait;
use color_eyre::{eyre::WrapErr, Result};
use futures_util::future::select_all;
use rocksdb::DB;
use std::{collections::HashMap, sync::Arc};
use tokio::task::JoinHandle;
use crate::{cancel_task, home::Homes, replica::Replicas, settings::Settings};
/// Properties shared across all agents
#[derive(Debug)]
pub struct AgentCore {
@ -13,6 +13,8 @@ pub struct AgentCore {
pub home: Arc<Homes>,
/// A map of boxed Replicas
pub replicas: HashMap<String, Arc<Replicas>>,
/// A persistent KV Store (currently implemented as rocksdb)
pub db: Arc<DB>,
}
/// A trait for an application that runs on a replica and a reference to a
@ -27,6 +29,11 @@ pub trait OpticsAgent: Send + Sync + std::fmt::Debug + AsRef<AgentCore> {
where
Self: Sized;
/// Return a handle to the DB
fn db(&self) -> Arc<DB> {
self.as_ref().db.clone()
}
/// Return a reference to a home contract
fn home(&self) -> Arc<Homes> {
self.as_ref().home.clone()

@ -1,12 +1,21 @@
use color_eyre::eyre::{Result, WrapErr};
use rocksdb::{Options, DB};
use std::path::Path;
/// Shared functionality surrounding use of rocksdb
pub mod persistence;
pub use persistence::UsingPersistence;
use rocksdb::{Options, DB};
pub use persistence::UsingPersistence;
/// Opens db at `db_path` and creates if missing
pub fn from_path(db_path: String) -> DB {
pub fn from_path(db_path: &str) -> Result<DB> {
let path = Path::new(db_path).canonicalize()?;
let mut opts = Options::default();
opts.create_if_missing(true);
DB::open(&opts, db_path).expect("Failed to open db path")
Ok(DB::open(&opts, &path).wrap_err(format!(
"Failed to open db path {}, canonicalized as {:?}",
db_path, path
))?)
}

@ -3,7 +3,7 @@ use config::{Config, ConfigError, Environment, File};
use serde::Deserialize;
use std::{collections::HashMap, env, sync::Arc};
use crate::{home::Homes, replica::Replicas};
use crate::{db, home::Homes, replica::Replicas};
/// Ethereum configuration
pub mod ethereum;
@ -86,6 +86,8 @@ impl ChainSetup {
/// ```
#[derive(Debug, Deserialize)]
pub struct Settings {
/// The path to use for the DB file
pub db_path: String,
/// The home configuration
pub home: ChainSetup,
/// The replica configurations
@ -113,7 +115,9 @@ impl Settings {
pub async fn try_into_core(&self) -> Result<AgentCore, Report> {
let home = Arc::new(self.try_home().await?);
let replicas = self.try_replicas().await?;
Ok(AgentCore { home, replicas })
let db = Arc::new(db::from_path(&self.db_path)?);
Ok(AgentCore { home, replicas, db })
}
/// Read settings from the config file

@ -4,7 +4,6 @@ use color_eyre::{
Result,
};
use futures_util::future::select_all;
use rocksdb::DB;
use std::{collections::HashMap, sync::Arc};
use tokio::{
sync::{oneshot::channel, RwLock},
@ -14,7 +13,7 @@ use tokio::{
use optics_base::{
agent::{AgentCore, OpticsAgent},
cancel_task, db, decl_agent,
cancel_task, decl_agent,
home::Homes,
replica::Replicas,
reset_loop_if,
@ -112,21 +111,17 @@ decl_agent!(
interval_seconds: u64,
prover: Arc<RwLock<Prover>>,
replica_tasks: RwLock<HashMap<String, JoinHandle<Result<()>>>>,
db: Arc<DB>,
}
);
impl Processor {
/// Instantiate a new processor
pub fn new(interval_seconds: u64, db_path: String, core: AgentCore) -> Self {
let db = db::from_path(db_path);
pub fn new(interval_seconds: u64, core: AgentCore) -> Self {
Self {
interval_seconds,
prover: Arc::new(RwLock::new(Prover::from_disk(&db))),
prover: Arc::new(RwLock::new(Prover::from_disk(&core.db))),
core,
replica_tasks: Default::default(),
db: Arc::new(db),
}
}
}
@ -142,7 +137,6 @@ impl OpticsAgent for Processor {
{
Ok(Self::new(
settings.polling_interval,
settings.db_path.clone(),
settings.as_ref().try_into_core().await?,
))
}
@ -168,7 +162,7 @@ impl OpticsAgent for Processor {
let (_tx, rx) = channel();
let interval_seconds = self.interval_seconds;
let sync = ProverSync::new(self.prover.clone(), self.home(), self.db.clone(), rx);
let sync = ProverSync::new(self.prover.clone(), self.home(), self.db(), rx);
let sync_task = tokio::spawn(async move {
sync.poll_updates(interval_seconds)
.await

@ -5,14 +5,13 @@ use color_eyre::{eyre::ensure, Result};
use ethers::{core::types::H256, prelude::LocalWallet, signers::Signer, types::Address};
use rocksdb::DB;
use tokio::{
sync::RwLock,
sync::Mutex,
task::JoinHandle,
time::{interval, Interval},
};
use optics_base::{
agent::{AgentCore, OpticsAgent},
db,
db::UsingPersistence,
home::Homes,
};
@ -71,7 +70,8 @@ where
async fn poll_and_handle_update(
home: Arc<Homes>,
signer: Arc<S>,
db: Arc<RwLock<DB>>,
db: Arc<DB>,
mutex: Arc<Mutex<()>>,
update_pause: u64,
) -> Result<Option<JoinHandle<()>>> {
// Check if there is an update
@ -99,24 +99,26 @@ where
if in_queue && current_root == old_root {
// If update still valid and doesn't conflict with local
// history of signed updates, sign and submit update. Note
// that because we write-acquire RwLock, only one thread
// that because we acquire a guard, only one thread
// can check and enter the below `if` block at a time,
// protecting from races between threads.
let db_write = db.write().await;
if let Ok(None) = Self::db_get(&db_write, old_root) {
// acquire guard
let _guard = mutex.lock().await;
if let Ok(None) = Self::db_get(&db, old_root) {
let signed = update.sign_with(signer.as_ref()).await.unwrap();
// If successfully submitted update, record in db
match home.update(&signed).await {
Ok(_) => {
Self::db_put(&db_write, old_root, signed).expect("!db_put");
Self::db_put(&db, old_root, signed).expect("!db_put");
}
Err(ref e) => {
tracing::error!("Error submitting update to home: {:?}", e)
}
}
}
}
} // guard dropped here
})));
}
@ -155,7 +157,9 @@ impl OpticsAgent for Updater<LocalWallet> {
let mut interval = self.interval();
let update_pause = self.update_pause;
let signer = self.signer.clone();
let db = Arc::new(RwLock::new(db::from_path(self.db_path.clone())));
let db = self.db();
let mutex = Arc::new(Mutex::new(()));
tokio::spawn(async move {
let expected: Address = home.updater().await?.into();
@ -172,6 +176,7 @@ impl OpticsAgent for Updater<LocalWallet> {
home.clone(),
signer.clone(),
db.clone(),
mutex.clone(),
update_pause,
)
.await;
@ -189,9 +194,6 @@ impl OpticsAgent for Updater<LocalWallet> {
#[cfg(test)]
mod test {
use std::sync::Arc;
use tokio::sync::RwLock;
use ethers::core::types::H256;
use optics_base::home::Homes;
@ -226,7 +228,8 @@ mod test {
Updater::poll_and_handle_update(
home.clone(),
Arc::new(signer),
Arc::new(RwLock::new(db)),
Arc::new(db),
Arc::new(Mutex::new(())),
1,
)
.await
@ -293,7 +296,8 @@ mod test {
let handle = Updater::poll_and_handle_update(
home.clone(),
Arc::new(signer),
Arc::new(RwLock::new(db)),
Arc::new(db),
Arc::new(Mutex::new(())),
1,
)
.await
@ -361,7 +365,8 @@ mod test {
let handle = Updater::poll_and_handle_update(
home.clone(),
Arc::new(signer),
Arc::new(RwLock::new(db)),
Arc::new(db),
Arc::new(Mutex::new(())),
1,
)
.await

@ -17,7 +17,7 @@ use tokio::{
use optics_base::{
agent::{AgentCore, OpticsAgent},
cancel_task, db, decl_agent,
cancel_task, decl_agent,
home::Homes,
persistence::UsingPersistence,
};
@ -243,7 +243,6 @@ decl_agent!(
/// A watcher agent
Watcher {
interval_seconds: u64,
db: Arc<DB>,
sync_tasks: RwLock<HashMap<String, JoinHandle<Result<()>>>>,
watch_tasks: RwLock<HashMap<String, JoinHandle<Result<()>>>>,
}
@ -252,10 +251,9 @@ decl_agent!(
#[allow(clippy::unit_arg)]
impl Watcher {
/// Instantiate a new watcher.
pub fn new(interval_seconds: u64, db_path: String, core: AgentCore) -> Self {
pub fn new(interval_seconds: u64, core: AgentCore) -> Self {
Self {
interval_seconds,
db: Arc::new(db::from_path(db_path)),
core,
sync_tasks: Default::default(),
watch_tasks: Default::default(),
@ -304,7 +302,6 @@ impl OpticsAgent for Watcher {
{
Ok(Self::new(
settings.polling_interval,
settings.db_path.clone(),
settings.as_ref().try_into_core().await?,
))
}
@ -319,7 +316,7 @@ impl OpticsAgent for Watcher {
#[tracing::instrument(err)]
async fn run_many(&self, replicas: &[&str]) -> Result<()> {
let (tx, rx) = mpsc::channel(200);
let handler = UpdateHandler::new(rx, self.db.clone(), self.home()).spawn();
let handler = UpdateHandler::new(rx, self.db(), self.home()).spawn();
for name in replicas.iter() {
let replica = self

Loading…
Cancel
Save