From a1040a04e2a5627b605ed3c2f2d65acc4677dfff Mon Sep 17 00:00:00 2001 From: Luke Tchang Date: Wed, 24 Mar 2021 16:15:58 -0700 Subject: [PATCH] feature: adds configurable pause to updater (#167) * feature: adds pause to updater and fixes tests to account for change * refactor: joins two chain api calls and spawns task for wait and check * refactor: poll_and_handle_update returns join handle --- rust/optics-base/src/home.rs | 8 ++ rust/optics-core/src/traits/home.rs | 3 + rust/optics-ethereum/src/home.rs | 9 ++ rust/optics-test/src/mocks/home.rs | 6 ++ rust/updater/config/default.toml | 1 + rust/updater/src/settings.rs | 1 + rust/updater/src/updater.rs | 146 +++++++++++++++++++++++----- 7 files changed, 150 insertions(+), 24 deletions(-) diff --git a/rust/optics-base/src/home.rs b/rust/optics-base/src/home.rs index 63169b024..b115be757 100644 --- a/rust/optics-base/src/home.rs +++ b/rust/optics-base/src/home.rs @@ -127,6 +127,14 @@ impl Home for Homes { } } + async fn queue_contains(&self, root: H256) -> Result { + match self { + Homes::Ethereum(home) => home.queue_contains(root).await, + Homes::Mock(mock_home) => mock_home.queue_contains(root).await, + Homes::Other(home) => home.queue_contains(root).await, + } + } + async fn improper_update( &self, update: &SignedUpdate, diff --git a/rust/optics-core/src/traits/home.rs b/rust/optics-core/src/traits/home.rs index 4977ce331..9fec9c224 100644 --- a/rust/optics-core/src/traits/home.rs +++ b/rust/optics-core/src/traits/home.rs @@ -117,6 +117,9 @@ pub trait Home: Common + Send + Sync + std::fmt::Debug { /// Queue a message. async fn enqueue(&self, message: &Message) -> Result; + /// Check if queue contains root. + async fn queue_contains(&self, root: H256) -> Result; + /// Submit an improper update for slashing async fn improper_update( &self, diff --git a/rust/optics-ethereum/src/home.rs b/rust/optics-ethereum/src/home.rs index 38e528fc9..3b98018fb 100644 --- a/rust/optics-ethereum/src/home.rs +++ b/rust/optics-ethereum/src/home.rs @@ -251,6 +251,15 @@ where .into()) } + async fn queue_contains(&self, root: H256) -> Result { + Ok(self + .contract + .queue_contains(root.into()) + .call() + .await? + .into()) + } + #[tracing::instrument(err)] async fn improper_update( &self, diff --git a/rust/optics-test/src/mocks/home.rs b/rust/optics-test/src/mocks/home.rs index fd863c4d7..1b2f7a3ff 100644 --- a/rust/optics-test/src/mocks/home.rs +++ b/rust/optics-test/src/mocks/home.rs @@ -40,6 +40,8 @@ mock! { pub fn _enqueue(&self, message: &Message) -> Result {} + pub fn _queue_contains(&self, root: H256) -> Result {} + pub fn _improper_update( &self, update: &SignedUpdate, @@ -123,6 +125,10 @@ impl Home for MockHomeContract { self._enqueue(message) } + async fn queue_contains(&self, root: H256) -> Result { + self._queue_contains(root) + } + async fn improper_update( &self, update: &SignedUpdate, diff --git a/rust/updater/config/default.toml b/rust/updater/config/default.toml index 64b529007..65b3d3e8c 100644 --- a/rust/updater/config/default.toml +++ b/rust/updater/config/default.toml @@ -1,6 +1,7 @@ updater = {type = "hex-key", key = "380eb0f3d505f087e438eca80bc4df9a7faa24f868e69fc0440261a0fc0567dc"} polling_interval = 100 +update_pause = 100 [home] name = "ethereum" diff --git a/rust/updater/src/settings.rs b/rust/updater/src/settings.rs index 9d2b3d9a3..1bac11cf4 100644 --- a/rust/updater/src/settings.rs +++ b/rust/updater/src/settings.rs @@ -6,5 +6,6 @@ decl_settings!( "OPT_UPDATER", updater: EthereumSigner, polling_interval: u64, + update_pause: u64, } ); diff --git a/rust/updater/src/updater.rs b/rust/updater/src/updater.rs index d506fd180..ea0a3ad8e 100644 --- a/rust/updater/src/updater.rs +++ b/rust/updater/src/updater.rs @@ -1,4 +1,4 @@ -use std::sync::Arc; +use std::{sync::Arc, time::Duration}; use async_trait::async_trait; use color_eyre::{eyre::ensure, Result}; @@ -21,6 +21,7 @@ use crate::settings::Settings; pub struct Updater { signer: Arc, interval_seconds: u64, + update_pause: u64, core: AgentCore, } @@ -32,31 +33,54 @@ impl AsRef for Updater { impl Updater where - S: Signer, + S: Signer + 'static, { /// Instantiate a new updater - pub fn new(signer: S, interval_seconds: u64, core: AgentCore) -> Self { + pub fn new(signer: S, interval_seconds: u64, update_pause: u64, core: AgentCore) -> Self { Self { signer: Arc::new(signer), interval_seconds, + update_pause, core, } } - async fn poll_and_handle_update(home: Arc, signer: Arc) -> Result<()> { + async fn poll_and_handle_update( + home: Arc, + signer: Arc, + update_pause: u64, + ) -> Result>> { // Check if there is an update let update_opt = home.produce_update().await?; - // If there is, sign it and submit it + // If update exists, spawn task to wait, recheck, and submit update if let Some(update) = update_opt { - let signed = update.sign_with(signer.as_ref()).await.unwrap(); - home.update(&signed).await?; + return Ok(Some(tokio::spawn(async move { + interval(Duration::from_secs(update_pause)).tick().await; + + let res = tokio::join!( + home.queue_contains(update.new_root), + home.current_root() + ); + + if let (Ok(in_queue), Ok(current_root)) = res { + if in_queue && current_root == update.previous_root { + let signed = update.sign_with(signer.as_ref()).await.unwrap(); + + if let Err(ref e) = home.update(&signed).await { + tracing::error!("Error submitting update to home: {:?}", e) + } + } + } + + }))); } - Ok(()) + + Ok(None) } fn interval(&self) -> Interval { - interval(std::time::Duration::from_secs(self.interval_seconds)) + interval(Duration::from_secs(self.interval_seconds)) } } @@ -74,6 +98,7 @@ impl OpticsAgent for Updater { Ok(Self::new( settings.updater.try_into_wallet()?, settings.polling_interval, + settings.update_pause, settings.as_ref().try_into_core().await?, )) } @@ -83,6 +108,7 @@ impl OpticsAgent for Updater { let home = self.home(); let address = self.signer.address(); let mut interval = self.interval(); + let update_pause = self.update_pause; let signer = self.signer.clone(); tokio::spawn(async move { @@ -96,7 +122,8 @@ impl OpticsAgent for Updater { // Set up the polling loop. loop { - let res = Self::poll_and_handle_update(home.clone(), signer.clone()).await; + let res = + Self::poll_and_handle_update(home.clone(), signer.clone(), update_pause).await; if let Err(ref e) = res { tracing::error!("Error polling and handling update: {:?}", e) @@ -120,6 +147,36 @@ mod test { use optics_core::{traits::TxOutcome, SignedUpdate, Update}; use optics_test::mocks::MockHomeContract; + #[tokio::test] + async fn ignores_empty_update() { + let signer: LocalWallet = + "1111111111111111111111111111111111111111111111111111111111111111" + .parse() + .unwrap(); + + let mut mock_home = MockHomeContract::new(); + // home.produce_update returns Ok(None) + mock_home + .expect__produce_update() + .return_once(move || Ok(None)); + + // Expect home.update to NOT be called + mock_home.expect__update().times(0).returning(|_| { + Ok(TxOutcome { + txid: H256::default(), + executed: true, + }) + }); + + let mut home: Arc = Arc::new(mock_home.into()); + Updater::poll_and_handle_update(home.clone(), Arc::new(signer), 1) + .await + .expect("Should have returned Ok(())"); + + let mock_home = Arc::get_mut(&mut home).unwrap(); + mock_home.checkpoint(); + } + #[tokio::test] async fn polls_and_submits_update() { let signer: LocalWallet = @@ -139,11 +196,25 @@ mod test { let mut mock_home = MockHomeContract::new(); - // home.produce_update returns created update value + // home.produce_update called once and returns created update value mock_home .expect__produce_update() + .times(1) .return_once(move || Ok(Some(update))); + // home.queue_contains called once and returns Ok(true) + mock_home + .expect__queue_contains() + .withf(move |r: &H256| *r == new_root) + .times(1) + .return_once(move |_| Ok(true)); + + // home.current_root called once and returns Ok(previous_root) + mock_home + .expect__current_root() + .times(1) + .return_once(move || Ok(previous_root)); + // Expect home.update to be called once mock_home .expect__update() @@ -157,32 +228,56 @@ mod test { }); let mut home: Arc = Arc::new(mock_home.into()); - Updater::poll_and_handle_update(home.clone(), Arc::new(signer)) + let handle = Updater::poll_and_handle_update(home.clone(), Arc::new(signer), 1) .await - .expect("Should have returned Ok(())"); + .expect("poll_and_handle_update returned error") + .expect("poll_and_handle_update should have returned Some(JoinHandle)"); + + handle.await.expect("poll_and_handle_update join handle errored on await"); let mock_home = Arc::get_mut(&mut home).unwrap(); - if let Homes::Mock(home) = mock_home { - home.checkpoint(); - } else { - panic!("Home should be mock variant!"); - } + mock_home.checkpoint(); } #[tokio::test] - async fn ignores_empty_update() { + async fn does_not_submit_update_after_bad_reorg() { let signer: LocalWallet = "1111111111111111111111111111111111111111111111111111111111111111" .parse() .unwrap(); + let previous_root = H256::from([1; 32]); + let new_root = H256::from([2; 32]); + + let update = Update { + origin_domain: 0, + previous_root, + new_root, + }; + let mut mock_home = MockHomeContract::new(); - // home.produce_update returns Ok(None) + + // home.produce_update called once and returns created update value mock_home .expect__produce_update() - .return_once(move || Ok(None)); + .times(1) + .return_once(move || Ok(Some(update))); - // Expect home.update to NOT be called + // home.queue_contains called once but returns false (reorg removed new + // root from history) + mock_home + .expect__queue_contains() + .withf(move |r: &H256| *r == new_root) + .times(1) + .return_once(move |_| Ok(false)); + + // home.current_root called once and returns Ok(previous_root) + mock_home + .expect__current_root() + .times(1) + .return_once(move || Ok(previous_root)); + + // Expect home.update NOT to be called mock_home.expect__update().times(0).returning(|_| { Ok(TxOutcome { txid: H256::default(), @@ -191,9 +286,12 @@ mod test { }); let mut home: Arc = Arc::new(mock_home.into()); - Updater::poll_and_handle_update(home.clone(), Arc::new(signer)) + let handle = Updater::poll_and_handle_update(home.clone(), Arc::new(signer), 1) .await - .expect("Should have returned Ok(())"); + .expect("poll_and_handle_update returned error") + .expect("poll_and_handle_update should have returned Some(JoinHandle)"); + + handle.await.expect("poll_and_handle_update join handle errored on await"); let mock_home = Arc::get_mut(&mut home).unwrap(); mock_home.checkpoint();