|
|
|
@ -32,6 +32,8 @@ use optics_core::{ |
|
|
|
|
|
|
|
|
|
use crate::{prover::Prover, prover_sync::ProverSync, settings::ProcessorSettings as Settings}; |
|
|
|
|
|
|
|
|
|
static LAST_INSPECTED: &str = "lastInspected"; |
|
|
|
|
|
|
|
|
|
/// The replica processor is responsible for polling messages and waiting until they validate
|
|
|
|
|
/// before proving/processing them.
|
|
|
|
|
#[derive(Debug)] |
|
|
|
@ -77,7 +79,6 @@ impl Replica { |
|
|
|
|
tokio::spawn( |
|
|
|
|
async move { |
|
|
|
|
use optics_core::traits::Replica; |
|
|
|
|
info!("Starting processor for {}", self.replica.name()); |
|
|
|
|
|
|
|
|
|
let domain = self.replica.local_domain(); |
|
|
|
|
|
|
|
|
@ -89,10 +90,18 @@ impl Replica { |
|
|
|
|
// - If not, wait and poll again
|
|
|
|
|
// 4. Check if the proof is valid under the replica
|
|
|
|
|
// 5. Submit the proof to the replica
|
|
|
|
|
let mut next_to_inspect: u32 = match Self::db_get(&self.db, "lastInspected")? { |
|
|
|
|
let mut next_to_inspect: u32 = match Self::db_get(&self.db, LAST_INSPECTED)? { |
|
|
|
|
Some(n) => n + 1, |
|
|
|
|
None => 0, |
|
|
|
|
}; |
|
|
|
|
|
|
|
|
|
info!( |
|
|
|
|
"Starting processor for {} {} at leaf index {}", |
|
|
|
|
domain, |
|
|
|
|
self.replica.name(), |
|
|
|
|
next_to_inspect |
|
|
|
|
); |
|
|
|
|
|
|
|
|
|
loop { |
|
|
|
|
use optics_core::traits::Replica; |
|
|
|
|
let seq_span = tracing::trace_span!( |
|
|
|
@ -109,7 +118,7 @@ impl Replica { |
|
|
|
|
.await |
|
|
|
|
{ |
|
|
|
|
Ok(true) => { |
|
|
|
|
Self::db_put(&self.db, "lastInspected", next_to_inspect)?; |
|
|
|
|
Self::db_put(&self.db, LAST_INSPECTED, next_to_inspect)?; |
|
|
|
|
next_to_inspect += 1; |
|
|
|
|
} |
|
|
|
|
Ok(false) => { |
|
|
|
|