refactor: update poller loop behavior to reduce latency (#726)

* refactor: update poller loop behavior to reduce latency

- change behavior of poller when queue is full to drop updates
- change queue length to 1
- add a pre-wait check in the update handler's handle function

* chore: comment on why we poll acceptable twice
buddies-main-deployment
James Prestwich 3 years ago committed by GitHub
parent 352e1ac42b
commit 30877c8c86
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 27
      rust/agents/updater/src/updater.rs

@ -11,7 +11,7 @@ use prometheus::IntCounter;
use rocksdb::DB; use rocksdb::DB;
use tokio::{ use tokio::{
sync::{ sync::{
mpsc::{self, Receiver, Sender}, mpsc::{self, error::TrySendError, Receiver, Sender},
Mutex, Mutex,
}, },
task::JoinHandle, task::JoinHandle,
@ -19,6 +19,7 @@ use tokio::{
}; };
use tracing::{error, info, instrument::Instrumented, Instrument}; use tracing::{error, info, instrument::Instrumented, Instrument};
use crate::settings::UpdaterSettings as Settings;
use optics_base::{ use optics_base::{
agent::{AgentCore, OpticsAgent}, agent::{AgentCore, OpticsAgent},
db::UsingPersistence, db::UsingPersistence,
@ -29,8 +30,6 @@ use optics_core::{
SignedUpdate, Signers, Update, SignedUpdate, Signers, Update,
}; };
use crate::settings::UpdaterSettings as Settings;
#[derive(Debug)] #[derive(Debug)]
struct UpdateHandler { struct UpdateHandler {
home: Arc<Homes>, home: Arc<Homes>,
@ -112,6 +111,19 @@ impl UpdateHandler {
#[tracing::instrument(err, skip(self), fields(self = %self))] #[tracing::instrument(err, skip(self), fields(self = %self))]
async fn handle_update(&self, update: Update) -> Result<()> { async fn handle_update(&self, update: Update) -> Result<()> {
info!("Have an update, awaiting the tick"); info!("Have an update, awaiting the tick");
// We poll acceptable immediately, to prevent waiting on
// any unacceptable updates that got into the channel
// We poll it AGAIN after sleeping to ensure that the update
// is still acceptable.
// TODO(james): later refactor this to only allow acceptable
// updates into the channel?
if !self.acceptable(&update).await? {
info!("Declined to submit update. No longer current");
return Ok(());
}
// Wait `update_pause` seconds // Wait `update_pause` seconds
sleep(Duration::from_secs(self.update_pause)).await; sleep(Duration::from_secs(self.update_pause)).await;
@ -184,9 +196,10 @@ impl UpdatePoller {
fn spawn(self) -> Instrumented<JoinHandle<Result<()>>> { fn spawn(self) -> Instrumented<JoinHandle<Result<()>>> {
tokio::spawn(async move { tokio::spawn(async move {
loop { loop {
match self.home.produce_update().await? { if let Some(update) = self.home.produce_update().await? {
Some(update) => self.tx.send(update).await?, if let Err(TrySendError::Closed(_)) = self.tx.try_send(update) {
None => info!("No update available"), bail!("UpdatePoller died");
}
} }
sleep(Duration::from_secs(self.interval_seconds)).await; sleep(Duration::from_secs(self.interval_seconds)).await;
} }
@ -258,7 +271,7 @@ impl OpticsAgent for Updater {
let home = self.home(); let home = self.home();
let address = self.signer.address(); let address = self.signer.address();
let (tx, rx) = mpsc::channel(32); let (tx, rx) = mpsc::channel(1);
let poller = UpdatePoller::new(self.home(), tx, self.interval_seconds); let poller = UpdatePoller::new(self.home(), tx, self.interval_seconds);
let handler = UpdateHandler::new( let handler = UpdateHandler::new(
self.home(), self.home(),

Loading…
Cancel
Save