@ -19,7 +19,10 @@ use optics_base::{
replica ::Replicas ,
reset_loop_if ,
} ;
use optics_core ::traits ::{ Home , Replica } ;
use optics_core ::{
accumulator ::merkle ::Proof ,
traits ::{ CommittedMessage , Common , Home , MessageStatus , Replica } ,
} ;
use crate ::{
prover ::{ Prover , ProverSync } ,
@ -62,14 +65,18 @@ impl ReplicaProcessor {
// - If not, wait and poll again
// 4. Submit the proof to the replica
loop {
let next_to_process = self . replica . next_to_process ( ) . await ? ;
let sequence = next_to_process . as_u32 ( ) ;
let sequence = self . replica . next_to_process ( ) . await ? ;
info ! (
"Next to process for replica {} is {}" ,
self . replica . name ( ) ,
sequence
) ;
let message = self . home . message_by_sequence ( domain , sequence ) . await ? ;
reset_loop_if ! (
message . is_none ( ) ,
interval ,
"Remot e does not contain message at {}:{}" ,
"Hom e does not contain message at {}:{}" ,
domain ,
sequence
) ;
@ -86,24 +93,42 @@ impl ReplicaProcessor {
) ;
let proof = proof_res . unwrap ( ) ;
if proof . leaf ! = message . message . to_leaf ( ) {
let err = format! ( "Leaf in prover does not match retrieved message. Index: {}. Retrieved: {}. Local : {}." , message . leaf_index , message . message . to_leaf ( ) , proof . leaf ) ;
if proof . leaf ! = message . to_leaf ( ) {
let err = format! ( "Leaf in prover does not match retrieved message. Index: {}. Calculated: {}. Prover : {}." , message . leaf_index , message . to_leaf ( ) , proof . leaf ) ;
tracing ::error ! ( "{}" , err ) ;
color_eyre ::eyre ::bail ! ( err ) ;
}
// Dispatch for processing
info ! (
"Dispatching a message for processing {}:{}" ,
domain , sequence
) ;
self . replica
. prove_and_process ( message . as_ref ( ) , & proof )
. await ? ;
self . process ( message , proof ) . await ? ;
sleep ( std ::time ::Duration ::from_secs ( interval ) ) . await ;
}
} )
}
/// Dispatch a message for processing. If the message is already proven, process only.
async fn process ( & self , message : CommittedMessage , proof : Proof ) -> Result < ( ) > {
let status = self . replica . message_status ( message . to_leaf ( ) ) . await ? ;
match status {
MessageStatus ::None = > {
self . replica
. prove_and_process ( message . as_ref ( ) , & proof )
. await ? ;
}
MessageStatus ::Pending = > {
self . replica . process ( message . as_ref ( ) ) . await ? ;
}
MessageStatus ::Processed = > { } // Indicates race condition?
}
Ok ( ( ) )
}
}
decl_agent ! (