@ -1,21 +1,23 @@
use async_trait ::async_trait ;
use color_eyre ::{
eyre ::{ eyre , WrapErr } ,
eyre ::{ bail , eyre , WrapErr } ,
Result ,
} ;
use futures_util ::future ::select_all ;
use std ::{ collections ::HashMap , sync ::Arc } ;
use rocksdb ::DB ;
use std ::{ collections ::HashMap , sync ::Arc , time ::Duration } ;
use tokio ::{
sync ::{ oneshot ::channel , RwLock } ,
task ::JoinHandle ,
time ::sleep ,
} ;
use tracing ::{ info , instrument , Instrument } ;
use tracing ::{ error , info , instrument , Instrument } ;
use optics_base ::{
agent ::{ AgentCore , OpticsAgent } ,
cancel_task , decl_agent ,
home ::Homes ,
persistence ::UsingPersistence ,
replica ::Replicas ,
reset_loop_if ,
} ;
@ -31,33 +33,40 @@ use crate::{
#[ derive(Debug) ]
pub ( crate ) struct ReplicaProcessor {
interval_seconds : u64 ,
interval : u64 ,
replica : Arc < Replicas > ,
home : Arc < Homes > ,
prover : Arc < RwLock < Prover > > ,
db : Arc < DB > ,
}
impl UsingPersistence < usize , Proof > for ReplicaProcessor {
const KEY_PREFIX : & ' static [ u8 ] = "proof_" . as_bytes ( ) ;
fn key_to_bytes ( key : usize ) -> Vec < u8 > {
key . to_be_bytes ( ) . into ( )
}
}
impl ReplicaProcessor {
pub ( crate ) fn new (
interval_seconds : u64 ,
interval : u64 ,
replica : Arc < Replicas > ,
home : Arc < Homes > ,
prover : Arc < RwLock < Prover > > ,
db : Arc < DB > ,
) -> Self {
Self {
interval_seconds ,
interval ,
replica ,
home ,
prover ,
db ,
}
}
#[ instrument ]
pub ( crate ) fn spawn ( self ) -> JoinHandle < Result < ( ) > > {
tokio ::spawn ( async move {
info ! ( "Starting processor" ) ;
info ! ( "Starting processor for {} " , self . replica . name ( ) ) ;
let domain = self . replica . local_domain ( ) ;
let interval = self . interval_seconds ;
// The basic structure of this loop is as follows:
// 1. Get the last processed index
@ -77,28 +86,29 @@ impl ReplicaProcessor {
let message = self . home . message_by_sequence ( domain , sequence ) . await ? ;
reset_loop_if ! (
message . is_none ( ) ,
interval ,
self . interval ,
"Home does not contain message at {}:{}" ,
domain ,
sequence
sequence ,
) ;
let message = message . unwrap ( ) ;
// Lock is dropped immediately
let proof_res = self . prover . read ( ) . await . prove ( message . leaf_index as usize ) ;
let proof_opt = Self ::db_get ( & self . db , message . leaf_index as usize ) ? ;
reset_loop_if ! (
proof_res . is_err ( ) ,
interval ,
"Prover does not contain leaf at index {}" ,
message . leaf_index
proof_opt . is_none ( ) ,
self . interval ,
"Proof not yet available for message at {}:{}" ,
domain ,
sequence ,
) ;
let proof = proof_res . unwrap ( ) ;
let proof = proof_opt . unwrap ( ) ;
if proof . leaf ! = message . to_leaf ( ) {
let err = format! ( "Leaf in prover does not match retrieved message. Index: {}. Calculated: {}. Prover: {}." , message . leaf_index , message . to_leaf ( ) , proof . leaf ) ;
tracing ::error ! ( "{}" , err ) ;
color_eyre ::eyre ::bail ! ( err ) ;
error ! ( "{}" , err ) ;
bail ! ( err ) ;
}
// Dispatch for processing
@ -108,7 +118,7 @@ impl ReplicaProcessor {
) ;
self . process ( message , proof ) . await ? ;
sleep ( std ::time ::Duration ::from_secs ( interval ) ) . await ;
sleep ( Duration ::from_secs ( self . interval ) ) . await ;
}
} . in_current_span ( ) )
}
@ -137,7 +147,7 @@ impl ReplicaProcessor {
decl_agent ! (
/// A processor agent
Processor {
interval_seconds : u64 ,
interval : u64 ,
prover : Arc < RwLock < Prover > > ,
replica_tasks : RwLock < HashMap < String , JoinHandle < Result < ( ) > > > > ,
}
@ -145,9 +155,9 @@ decl_agent!(
impl Processor {
/// Instantiate a new processor
pub fn new ( interval_seconds : u64 , core : AgentCore ) -> Self {
pub fn new ( interval : u64 , core : AgentCore ) -> Self {
Self {
interval_seconds ,
interval ,
prover : Arc ::new ( RwLock ::new ( Prover ::from_disk ( & core . db ) ) ) ,
core ,
replica_tasks : Default ::default ( ) ,
@ -172,15 +182,15 @@ impl OpticsAgent for Processor {
fn run ( & self , name : & str ) -> JoinHandle < Result < ( ) > > {
let home = self . home ( ) ;
let prover = self . prover . clone ( ) ;
let interval_seconds = self . interval_seconds ;
let interval = self . interval ;
let replica_opt = self . replica_by_name ( name ) ;
let name = name . to_owned ( ) ;
let db = self . db ( ) ;
tokio ::spawn ( async move {
let replica = replica_opt . ok_or_else ( | | eyre ! ( "No replica named {}" , name ) ) ? ;
ReplicaProcessor ::new ( interval_seconds , replica , home , prover )
ReplicaProcessor ::new ( interval , replica , home , db )
. spawn ( )
. await ?
} )
@ -189,12 +199,12 @@ impl OpticsAgent for Processor {
#[ tracing::instrument(err) ]
async fn run_many ( & self , replicas : & [ & str ] ) -> Result < ( ) > {
let ( _tx , rx ) = channel ( ) ;
let interval_seconds = self . interval_seconds ;
let interval = self . interval ;
info ! ( "Starting ProverSync task" ) ;
let sync = ProverSync ::new ( self . prover . clone ( ) , self . home ( ) , self . db ( ) , rx ) ;
let sync_task = tokio ::spawn ( async move {
sync . poll_updates ( interval_seconds )
sync . spawn ( interval )
. await
. wrap_err ( "ProverSync task has shut down" )
} ) ;