diff --git a/rust/agents/updater/src/updater.rs b/rust/agents/updater/src/updater.rs index 0b83de203..b98d84234 100644 --- a/rust/agents/updater/src/updater.rs +++ b/rust/agents/updater/src/updater.rs @@ -11,7 +11,7 @@ use prometheus::IntCounter; use rocksdb::DB; use tokio::{ sync::{ - mpsc::{self, Receiver, Sender}, + mpsc::{self, error::TrySendError, Receiver, Sender}, Mutex, }, task::JoinHandle, @@ -19,6 +19,7 @@ use tokio::{ }; use tracing::{error, info, instrument::Instrumented, Instrument}; +use crate::settings::UpdaterSettings as Settings; use optics_base::{ agent::{AgentCore, OpticsAgent}, db::UsingPersistence, @@ -29,8 +30,6 @@ use optics_core::{ SignedUpdate, Signers, Update, }; -use crate::settings::UpdaterSettings as Settings; - #[derive(Debug)] struct UpdateHandler { home: Arc, @@ -112,6 +111,19 @@ impl UpdateHandler { #[tracing::instrument(err, skip(self), fields(self = %self))] async fn handle_update(&self, update: Update) -> Result<()> { info!("Have an update, awaiting the tick"); + + // We poll acceptable immediately, to prevent waiting on + // any unacceptable updates that got into the channel + // We poll it AGAIN after sleeping to ensure that the update + // is still acceptable. + + // TODO(james): later refactor this to only allow acceptable + // updates into the channel? + if !self.acceptable(&update).await? { + info!("Declined to submit update. No longer current"); + return Ok(()); + } + // Wait `update_pause` seconds sleep(Duration::from_secs(self.update_pause)).await; @@ -184,9 +196,10 @@ impl UpdatePoller { fn spawn(self) -> Instrumented>> { tokio::spawn(async move { loop { - match self.home.produce_update().await? { - Some(update) => self.tx.send(update).await?, - None => info!("No update available"), + if let Some(update) = self.home.produce_update().await? { + if let Err(TrySendError::Closed(_)) = self.tx.try_send(update) { + bail!("UpdatePoller died"); + } } sleep(Duration::from_secs(self.interval_seconds)).await; } @@ -258,7 +271,7 @@ impl OpticsAgent for Updater { let home = self.home(); let address = self.signer.address(); - let (tx, rx) = mpsc::channel(32); + let (tx, rx) = mpsc::channel(1); let poller = UpdatePoller::new(self.home(), tx, self.interval_seconds); let handler = UpdateHandler::new( self.home(),