feature: adds configurable pause to updater (#167)

* feature: adds pause to updater and fixes tests to account for change

* refactor: joins two chain api calls and spawns task for wait and check

* refactor: poll_and_handle_update returns join handle
buddies-main-deployment
Luke Tchang 4 years ago committed by GitHub
parent e0122705d4
commit a1040a04e2
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 8
      rust/optics-base/src/home.rs
  2. 3
      rust/optics-core/src/traits/home.rs
  3. 9
      rust/optics-ethereum/src/home.rs
  4. 6
      rust/optics-test/src/mocks/home.rs
  5. 1
      rust/updater/config/default.toml
  6. 1
      rust/updater/src/settings.rs
  7. 146
      rust/updater/src/updater.rs

@ -127,6 +127,14 @@ impl Home for Homes {
} }
} }
async fn queue_contains(&self, root: H256) -> Result<bool, ChainCommunicationError> {
match self {
Homes::Ethereum(home) => home.queue_contains(root).await,
Homes::Mock(mock_home) => mock_home.queue_contains(root).await,
Homes::Other(home) => home.queue_contains(root).await,
}
}
async fn improper_update( async fn improper_update(
&self, &self,
update: &SignedUpdate, update: &SignedUpdate,

@ -117,6 +117,9 @@ pub trait Home: Common + Send + Sync + std::fmt::Debug {
/// Queue a message. /// Queue a message.
async fn enqueue(&self, message: &Message) -> Result<TxOutcome, ChainCommunicationError>; async fn enqueue(&self, message: &Message) -> Result<TxOutcome, ChainCommunicationError>;
/// Check if queue contains root.
async fn queue_contains(&self, root: H256) -> Result<bool, ChainCommunicationError>;
/// Submit an improper update for slashing /// Submit an improper update for slashing
async fn improper_update( async fn improper_update(
&self, &self,

@ -251,6 +251,15 @@ where
.into()) .into())
} }
async fn queue_contains(&self, root: H256) -> Result<bool, ChainCommunicationError> {
Ok(self
.contract
.queue_contains(root.into())
.call()
.await?
.into())
}
#[tracing::instrument(err)] #[tracing::instrument(err)]
async fn improper_update( async fn improper_update(
&self, &self,

@ -40,6 +40,8 @@ mock! {
pub fn _enqueue(&self, message: &Message) -> Result<TxOutcome, ChainCommunicationError> {} pub fn _enqueue(&self, message: &Message) -> Result<TxOutcome, ChainCommunicationError> {}
pub fn _queue_contains(&self, root: H256) -> Result<bool, ChainCommunicationError> {}
pub fn _improper_update( pub fn _improper_update(
&self, &self,
update: &SignedUpdate, update: &SignedUpdate,
@ -123,6 +125,10 @@ impl Home for MockHomeContract {
self._enqueue(message) self._enqueue(message)
} }
async fn queue_contains(&self, root: H256) -> Result<bool, ChainCommunicationError> {
self._queue_contains(root)
}
async fn improper_update( async fn improper_update(
&self, &self,
update: &SignedUpdate, update: &SignedUpdate,

@ -1,6 +1,7 @@
updater = {type = "hex-key", key = "380eb0f3d505f087e438eca80bc4df9a7faa24f868e69fc0440261a0fc0567dc"} updater = {type = "hex-key", key = "380eb0f3d505f087e438eca80bc4df9a7faa24f868e69fc0440261a0fc0567dc"}
polling_interval = 100 polling_interval = 100
update_pause = 100
[home] [home]
name = "ethereum" name = "ethereum"

@ -6,5 +6,6 @@ decl_settings!(
"OPT_UPDATER", "OPT_UPDATER",
updater: EthereumSigner, updater: EthereumSigner,
polling_interval: u64, polling_interval: u64,
update_pause: u64,
} }
); );

@ -1,4 +1,4 @@
use std::sync::Arc; use std::{sync::Arc, time::Duration};
use async_trait::async_trait; use async_trait::async_trait;
use color_eyre::{eyre::ensure, Result}; use color_eyre::{eyre::ensure, Result};
@ -21,6 +21,7 @@ use crate::settings::Settings;
pub struct Updater<S> { pub struct Updater<S> {
signer: Arc<S>, signer: Arc<S>,
interval_seconds: u64, interval_seconds: u64,
update_pause: u64,
core: AgentCore, core: AgentCore,
} }
@ -32,31 +33,54 @@ impl<S> AsRef<AgentCore> for Updater<S> {
impl<S> Updater<S> impl<S> Updater<S>
where where
S: Signer, S: Signer + 'static,
{ {
/// Instantiate a new updater /// Instantiate a new updater
pub fn new(signer: S, interval_seconds: u64, core: AgentCore) -> Self { pub fn new(signer: S, interval_seconds: u64, update_pause: u64, core: AgentCore) -> Self {
Self { Self {
signer: Arc::new(signer), signer: Arc::new(signer),
interval_seconds, interval_seconds,
update_pause,
core, core,
} }
} }
async fn poll_and_handle_update(home: Arc<Homes>, signer: Arc<S>) -> Result<()> { async fn poll_and_handle_update(
home: Arc<Homes>,
signer: Arc<S>,
update_pause: u64,
) -> Result<Option<JoinHandle<()>>> {
// Check if there is an update // Check if there is an update
let update_opt = home.produce_update().await?; let update_opt = home.produce_update().await?;
// If there is, sign it and submit it // If update exists, spawn task to wait, recheck, and submit update
if let Some(update) = update_opt { if let Some(update) = update_opt {
let signed = update.sign_with(signer.as_ref()).await.unwrap(); return Ok(Some(tokio::spawn(async move {
home.update(&signed).await?; interval(Duration::from_secs(update_pause)).tick().await;
let res = tokio::join!(
home.queue_contains(update.new_root),
home.current_root()
);
if let (Ok(in_queue), Ok(current_root)) = res {
if in_queue && current_root == update.previous_root {
let signed = update.sign_with(signer.as_ref()).await.unwrap();
if let Err(ref e) = home.update(&signed).await {
tracing::error!("Error submitting update to home: {:?}", e)
}
}
}
})));
} }
Ok(())
Ok(None)
} }
fn interval(&self) -> Interval { fn interval(&self) -> Interval {
interval(std::time::Duration::from_secs(self.interval_seconds)) interval(Duration::from_secs(self.interval_seconds))
} }
} }
@ -74,6 +98,7 @@ impl OpticsAgent for Updater<LocalWallet> {
Ok(Self::new( Ok(Self::new(
settings.updater.try_into_wallet()?, settings.updater.try_into_wallet()?,
settings.polling_interval, settings.polling_interval,
settings.update_pause,
settings.as_ref().try_into_core().await?, settings.as_ref().try_into_core().await?,
)) ))
} }
@ -83,6 +108,7 @@ impl OpticsAgent for Updater<LocalWallet> {
let home = self.home(); let home = self.home();
let address = self.signer.address(); let address = self.signer.address();
let mut interval = self.interval(); let mut interval = self.interval();
let update_pause = self.update_pause;
let signer = self.signer.clone(); let signer = self.signer.clone();
tokio::spawn(async move { tokio::spawn(async move {
@ -96,7 +122,8 @@ impl OpticsAgent for Updater<LocalWallet> {
// Set up the polling loop. // Set up the polling loop.
loop { loop {
let res = Self::poll_and_handle_update(home.clone(), signer.clone()).await; let res =
Self::poll_and_handle_update(home.clone(), signer.clone(), update_pause).await;
if let Err(ref e) = res { if let Err(ref e) = res {
tracing::error!("Error polling and handling update: {:?}", e) tracing::error!("Error polling and handling update: {:?}", e)
@ -120,6 +147,36 @@ mod test {
use optics_core::{traits::TxOutcome, SignedUpdate, Update}; use optics_core::{traits::TxOutcome, SignedUpdate, Update};
use optics_test::mocks::MockHomeContract; use optics_test::mocks::MockHomeContract;
#[tokio::test]
async fn ignores_empty_update() {
let signer: LocalWallet =
"1111111111111111111111111111111111111111111111111111111111111111"
.parse()
.unwrap();
let mut mock_home = MockHomeContract::new();
// home.produce_update returns Ok(None)
mock_home
.expect__produce_update()
.return_once(move || Ok(None));
// Expect home.update to NOT be called
mock_home.expect__update().times(0).returning(|_| {
Ok(TxOutcome {
txid: H256::default(),
executed: true,
})
});
let mut home: Arc<Homes> = Arc::new(mock_home.into());
Updater::poll_and_handle_update(home.clone(), Arc::new(signer), 1)
.await
.expect("Should have returned Ok(())");
let mock_home = Arc::get_mut(&mut home).unwrap();
mock_home.checkpoint();
}
#[tokio::test] #[tokio::test]
async fn polls_and_submits_update() { async fn polls_and_submits_update() {
let signer: LocalWallet = let signer: LocalWallet =
@ -139,11 +196,25 @@ mod test {
let mut mock_home = MockHomeContract::new(); let mut mock_home = MockHomeContract::new();
// home.produce_update returns created update value // home.produce_update called once and returns created update value
mock_home mock_home
.expect__produce_update() .expect__produce_update()
.times(1)
.return_once(move || Ok(Some(update))); .return_once(move || Ok(Some(update)));
// home.queue_contains called once and returns Ok(true)
mock_home
.expect__queue_contains()
.withf(move |r: &H256| *r == new_root)
.times(1)
.return_once(move |_| Ok(true));
// home.current_root called once and returns Ok(previous_root)
mock_home
.expect__current_root()
.times(1)
.return_once(move || Ok(previous_root));
// Expect home.update to be called once // Expect home.update to be called once
mock_home mock_home
.expect__update() .expect__update()
@ -157,32 +228,56 @@ mod test {
}); });
let mut home: Arc<Homes> = Arc::new(mock_home.into()); let mut home: Arc<Homes> = Arc::new(mock_home.into());
Updater::poll_and_handle_update(home.clone(), Arc::new(signer)) let handle = Updater::poll_and_handle_update(home.clone(), Arc::new(signer), 1)
.await .await
.expect("Should have returned Ok(())"); .expect("poll_and_handle_update returned error")
.expect("poll_and_handle_update should have returned Some(JoinHandle)");
handle.await.expect("poll_and_handle_update join handle errored on await");
let mock_home = Arc::get_mut(&mut home).unwrap(); let mock_home = Arc::get_mut(&mut home).unwrap();
if let Homes::Mock(home) = mock_home { mock_home.checkpoint();
home.checkpoint();
} else {
panic!("Home should be mock variant!");
}
} }
#[tokio::test] #[tokio::test]
async fn ignores_empty_update() { async fn does_not_submit_update_after_bad_reorg() {
let signer: LocalWallet = let signer: LocalWallet =
"1111111111111111111111111111111111111111111111111111111111111111" "1111111111111111111111111111111111111111111111111111111111111111"
.parse() .parse()
.unwrap(); .unwrap();
let previous_root = H256::from([1; 32]);
let new_root = H256::from([2; 32]);
let update = Update {
origin_domain: 0,
previous_root,
new_root,
};
let mut mock_home = MockHomeContract::new(); let mut mock_home = MockHomeContract::new();
// home.produce_update returns Ok(None)
// home.produce_update called once and returns created update value
mock_home mock_home
.expect__produce_update() .expect__produce_update()
.return_once(move || Ok(None)); .times(1)
.return_once(move || Ok(Some(update)));
// Expect home.update to NOT be called // home.queue_contains called once but returns false (reorg removed new
// root from history)
mock_home
.expect__queue_contains()
.withf(move |r: &H256| *r == new_root)
.times(1)
.return_once(move |_| Ok(false));
// home.current_root called once and returns Ok(previous_root)
mock_home
.expect__current_root()
.times(1)
.return_once(move || Ok(previous_root));
// Expect home.update NOT to be called
mock_home.expect__update().times(0).returning(|_| { mock_home.expect__update().times(0).returning(|_| {
Ok(TxOutcome { Ok(TxOutcome {
txid: H256::default(), txid: H256::default(),
@ -191,9 +286,12 @@ mod test {
}); });
let mut home: Arc<Homes> = Arc::new(mock_home.into()); let mut home: Arc<Homes> = Arc::new(mock_home.into());
Updater::poll_and_handle_update(home.clone(), Arc::new(signer)) let handle = Updater::poll_and_handle_update(home.clone(), Arc::new(signer), 1)
.await .await
.expect("Should have returned Ok(())"); .expect("poll_and_handle_update returned error")
.expect("poll_and_handle_update should have returned Some(JoinHandle)");
handle.await.expect("poll_and_handle_update join handle errored on await");
let mock_home = Arc::get_mut(&mut home).unwrap(); let mock_home = Arc::get_mut(&mut home).unwrap();
mock_home.checkpoint(); mock_home.checkpoint();

Loading…
Cancel
Save