|
|
|
@ -231,7 +231,7 @@ impl Replica { |
|
|
|
|
self.replica.process(message.as_ref()).await?; |
|
|
|
|
} |
|
|
|
|
MessageStatus::Processed => { |
|
|
|
|
warn!(target: "possible_race_condition", "Message {idx} already processed", idx = message.leaf_index); |
|
|
|
|
warn!(target: "possible_race_condition", "Message {domain}:{idx} already processed", domain = message.message.destination, idx = message.leaf_index); |
|
|
|
|
} // Indicates race condition?
|
|
|
|
|
} |
|
|
|
|
|
|
|
|
@ -326,16 +326,14 @@ impl OpticsAgent for Processor { |
|
|
|
|
#[tracing::instrument(err)] |
|
|
|
|
async fn run_many(&self, replicas: &[&str]) -> Result<()> { |
|
|
|
|
let (_tx, rx) = channel(); |
|
|
|
|
let interval = self.interval; |
|
|
|
|
|
|
|
|
|
info!("Starting ProverSync task"); |
|
|
|
|
let sync = ProverSync::new(self.prover.clone(), self.home(), self.db(), rx); |
|
|
|
|
let sync_task = tokio::spawn(async move { |
|
|
|
|
sync.spawn(interval) |
|
|
|
|
.await |
|
|
|
|
.wrap_err("ProverSync task has shut down") |
|
|
|
|
}) |
|
|
|
|
.in_current_span(); |
|
|
|
|
let sync_task = |
|
|
|
|
tokio::spawn( |
|
|
|
|
async move { sync.spawn().await.wrap_err("ProverSync task has shut down") }, |
|
|
|
|
) |
|
|
|
|
.in_current_span(); |
|
|
|
|
|
|
|
|
|
// for each specified replica, spawn a joinable task
|
|
|
|
|
let mut handles: Vec<_> = replicas.iter().map(|name| self.run(name)).collect(); |
|
|
|
|