bugs: lots of rust agent bugs

buddies-main-deployment
James Prestwich 3 years ago
parent 7e8a816a2f
commit bb2768384a
No known key found for this signature in database
GPG Key ID: 7CC174C250AD83AD
  1. 1
      .gitignore
  2. 4
      rust/config/1623444701428/alfajores_config.json
  3. 9
      rust/config/1623444701428/kathy-partial.json
  4. 4
      rust/config/1623444701428/kovan_config.json
  5. 3
      rust/config/1623444701428/processor-partial.json
  6. 3
      rust/config/1623444701428/relayer-partial.json
  7. 8
      rust/config/1623444701428/updater-partial.json
  8. 39
      rust/config/1623454033875/alfajores_config.json
  9. 22
      rust/config/1623454033875/alfajores_contracts.json
  10. 9
      rust/config/1623454033875/kathy-partial.json
  11. 39
      rust/config/1623454033875/kovan_config.json
  12. 22
      rust/config/1623454033875/kovan_contracts.json
  13. 3
      rust/config/1623454033875/processor-partial.json
  14. 3
      rust/config/1623454033875/relayer-partial.json
  15. 8
      rust/config/1623454033875/updater-partial.json
  16. 8
      rust/optics-base/src/replica.rs
  17. 4
      rust/optics-base/src/settings/mod.rs
  18. 3
      rust/optics-core/src/traits/replica.rs
  19. 1
      rust/optics-ethereum/src/home.rs
  20. 10
      rust/optics-ethereum/src/replica.rs
  21. 6
      rust/optics-test/src/mocks/replica.rs
  22. 368
      rust/relayer/src/relayer.rs
  23. 14
      rust/updater/src/updater.rs
  24. 9
      typescript/src/chain.ts

1
.gitignore vendored

@ -3,3 +3,4 @@ test_deploy.env
config.json
typescript/tmp.ts
rust/tmp_db
tmp.env

@ -35,5 +35,5 @@
"level": "debug",
"style": "pretty"
},
"dbPath": "db_path"
}
"db": "db_path"
}

@ -0,0 +1,9 @@
{
"messageInterval": 15,
"chatGenConfig": {
"destination": 2000,
"message": "static message",
"recipient": "recipient",
"type": "static"
}
}

@ -35,5 +35,5 @@
"level": "debug",
"style": "pretty"
},
"dbPath": "db_path"
}
"db": "db_path"
}

@ -0,0 +1,3 @@
{
"pollingInterval": 30
}

@ -0,0 +1,8 @@
{
"pollingInterval": 30,
"updatePause": 15,
"updater": {
"key": "",
"type": "hexKey"
}
}

@ -0,0 +1,39 @@
{
"signers": {
"alfajores": {
"key": "",
"type": "hexKey"
},
"kovan": {
"key": "",
"type": "hexKey"
}
},
"replicas": {
"kovan": {
"address": "0x7A9CA7500C8D6f66FACD373f64F65fC8E9e11742",
"domain": 3000,
"name": "kovan",
"rpcStyle": "ethereum",
"connection": {
"type": "http",
"url": "https://kovan.infura.io/v3/5c456d7844fa40a683e934df60534c60"
}
}
},
"home": {
"address": "0x58f25C7E2662BCC2969C25A2d118d15772943AD1",
"domain": 1000,
"name": "alfajores",
"rpcStyle": "ethereum",
"connection": {
"type": "http",
"url": "https://alfajores-forno.celo-testnet.org"
}
},
"tracing": {
"level": "debug",
"style": "pretty"
},
"db": "db_path"
}

@ -0,0 +1,22 @@
{
"upgradeBeaconController": "0x0F94Fd6B560D5A11c7f3085996A57c753B9826E0",
"xappConnectionManager": "0xA6AA79e842b5cF73cEf7047bD4A0B1094d24d04B",
"updaterManager": "0x97FE1dF46Dc3494B607ed5CBAB1a7781a2eC23ec",
"governance": {
"implementation": "0x60Cb2E6815785862ED86ccC2A898ea2A6C8D9080",
"proxy": "0xcE6fB0d0BEA7e7F93E8a3F439dFf378054b09bc6",
"beacon": "0x3E05FC1c07cF3471Edc80e879332229Ea11411eC"
},
"home": {
"implementation": "0xDb83f8C0c2823dba563BC75A7f36708A9B054247",
"proxy": "0x58f25C7E2662BCC2969C25A2d118d15772943AD1",
"beacon": "0x83D34279F7355E7a2c11E0ba29690086cf41bE6e"
},
"replicas": {
"3000": {
"implementation": "0x0C740974008D2e5b9fA373A2C7f1A22D8eD5f3f8",
"proxy": "0x3e4349c4F188DC753C4bbeB0De87683355dA091f",
"beacon": "0xa39f1CEd5C91B88ba4b7e9134fdbbd044D2cCb4F"
}
}
}

@ -0,0 +1,9 @@
{
"messageInterval": 15,
"chatGenConfig": {
"destination": 2000,
"message": "static message",
"recipient": "recipient",
"type": "static"
}
}

@ -0,0 +1,39 @@
{
"signers": {
"kovan": {
"key": "",
"type": "hexKey"
},
"alfajores": {
"key": "",
"type": "hexKey"
}
},
"replicas": {
"alfajores": {
"address": "0x3e4349c4F188DC753C4bbeB0De87683355dA091f",
"domain": 1000,
"name": "alfajores",
"rpcStyle": "ethereum",
"connection": {
"type": "http",
"url": "https://alfajores-forno.celo-testnet.org"
}
}
},
"home": {
"address": "0x5c60096b13266B73c2B4302052b036A387a63B8b",
"domain": 3000,
"name": "kovan",
"rpcStyle": "ethereum",
"connection": {
"type": "http",
"url": "https://kovan.infura.io/v3/5c456d7844fa40a683e934df60534c60"
}
},
"tracing": {
"level": "debug",
"style": "pretty"
},
"db": "db_path"
}

@ -0,0 +1,22 @@
{
"upgradeBeaconController": "0x5FE7511952b1ae450CcE53eb39F850aF0B526add",
"xappConnectionManager": "0x0F3f2D1C8A8B7A5222C81D99E12BF0B513C237Fc",
"updaterManager": "0xF1BdC4E3625d6A2acFc6E91014e0f17D206fB9BC",
"governance": {
"implementation": "0x734814cc1eBA10FF1641b35311d3Ae8d76a228d6",
"proxy": "0xF46c05d2cE3c07552FEF96D775141dB7E32CcddC",
"beacon": "0x632C27F5ca0Bb1fD4BF3D240b453a8170019e23a"
},
"home": {
"implementation": "0x40dE77C52e490d1198Fefd2D6BA815088564dEEb",
"proxy": "0x5c60096b13266B73c2B4302052b036A387a63B8b",
"beacon": "0xd055fD70A3bD94df89917AE3b828f39F4ff4c337"
},
"replicas": {
"1000": {
"implementation": "0x78E7CDEE08eAD8f4eFDd7Eb589785811a07DA644",
"proxy": "0x7A9CA7500C8D6f66FACD373f64F65fC8E9e11742",
"beacon": "0x6E6010E6bd43a9d2F7AE3b7eA9f61760e58758f3"
}
}
}

@ -0,0 +1,3 @@
{
"pollingInterval": 30
}

@ -0,0 +1,8 @@
{
"pollingInterval": 30,
"updatePause": 15,
"updater": {
"key": "",
"type": "hexKey"
}
}

@ -127,6 +127,14 @@ impl Replica for Replicas {
Replicas::Other(replica) => replica.process(message).await,
}
}
async fn queue_end(&self) -> Result<Option<H256>, ChainCommunicationError> {
match self {
Replicas::Ethereum(replica) => replica.queue_end().await,
Replicas::Mock(mock_replica) => mock_replica.queue_end().await,
Replicas::Other(replica) => replica.queue_end().await,
}
}
}
#[async_trait]

@ -76,7 +76,7 @@ impl SignerConf {
#[serde(rename_all = "camelCase")]
pub struct Settings {
/// The path to use for the DB file
pub db_path: String,
pub db: String,
/// The home configuration
pub home: ChainSetup,
/// The replica configurations
@ -120,7 +120,7 @@ impl Settings {
pub async fn try_into_core(&self) -> Result<AgentCore, Report> {
let home = Arc::new(self.try_home().await?);
let replicas = self.try_replicas().await?;
let db = Arc::new(db::from_path(&self.db_path)?);
let db = Arc::new(db::from_path(&self.db)?);
Ok(AgentCore { home, replicas, db })
}

@ -47,4 +47,7 @@ pub trait Replica: Common + Send + Sync + std::fmt::Debug {
Ok(self.process(message).await?)
}
/// Fetch the root at the end of the confirmation queue
async fn queue_end(&self) -> Result<Option<H256>, ChainCommunicationError>;
}

@ -89,6 +89,7 @@ where
) -> Result<Option<SignedUpdate>, ChainCommunicationError> {
self.contract
.update_filter()
.from_block(0)
.topic2(old_root)
.query()
.await?

@ -256,4 +256,14 @@ where
.prove_and_process(message.to_vec(), sol_proof, proof.index.into());
Ok(report_tx!(tx).into())
}
#[tracing::instrument(err)]
async fn queue_end(&self) -> Result<Option<H256>, ChainCommunicationError> {
let end: H256 = self.contract.queue_end().call().await?.into();
if end.is_zero() {
Ok(None)
} else {
Ok(Some(end))
}
}
}

@ -41,6 +41,8 @@ mock! {
// Common
pub fn _name(&self) -> &str {}
pub fn _queue_end(&self) -> Result<Option<H256>, ChainCommunicationError> {}
pub fn _status(&self, txid: H256) -> Result<Option<TxOutcome>, ChainCommunicationError> {}
pub fn _updater(&self) -> Result<H256, ChainCommunicationError> {}
@ -119,6 +121,10 @@ impl Replica for MockReplicaContract {
) -> Result<TxOutcome, ChainCommunicationError> {
self._prove_and_process(message, proof)
}
async fn queue_end(&self) -> Result<Option<H256>, ChainCommunicationError> {
self._queue_end()
}
}
#[async_trait]

@ -1,7 +1,10 @@
use async_trait::async_trait;
use color_eyre::{eyre::eyre, Result};
use color_eyre::{eyre::bail, Result};
use futures_util::future::select_all;
use std::sync::Arc;
use tokio::{
join,
sync::Mutex,
task::JoinHandle,
time::{interval, Interval},
};
@ -16,67 +19,140 @@ use optics_core::traits::{Common, Replica};
use crate::settings::Settings;
/// A relayer agent
#[derive(Debug)]
pub struct Relayer {
interval_seconds: u64,
core: AgentCore,
}
impl AsRef<AgentCore> for Relayer {
fn as_ref(&self) -> &AgentCore {
&self.core
}
struct UpdatePoller {
home: Arc<Homes>,
replica: Arc<Replicas>,
interval: Interval,
semaphore: Mutex<()>,
}
#[allow(clippy::unit_arg)]
impl Relayer {
/// Instantiate a new relayer
pub fn new(interval_seconds: u64, core: AgentCore) -> Self {
impl UpdatePoller {
fn new(home: Arc<Homes>, replica: Arc<Replicas>, interval: Interval) -> Self {
Self {
interval_seconds,
core,
home,
replica,
interval,
semaphore: Mutex::new(()),
}
}
#[tracing::instrument(err)]
async fn poll_and_relay_update(home: Arc<Homes>, replica: Arc<Replicas>) -> Result<()> {
// Get replica's current root
let old_root = replica.current_root().await?;
async fn poll_and_relay_update(&self) -> Result<()> {
// Get replica's current root.
// If the replica has a queue of pending updates, we use the last queue
// root instead
let (old_root_res, queue_end_res) =
join!(self.replica.current_root(), self.replica.queue_end());
let old_root = {
if let Some(end) = queue_end_res? {
end
} else {
old_root_res?
}
};
// Check for first signed update building off of the replica's current root
let signed_update_opt = home.signed_update_by_old_root(old_root).await?;
let signed_update_opt = self.home.signed_update_by_old_root(old_root).await?;
// If signed update exists, update replica's current root
if let Some(signed_update) = signed_update_opt {
info!(
"Have a signed update, dispatching to replica {}",
replica.name()
"Update for replica {}. Root {:?} to {:?}",
self.replica.name(),
&signed_update.update.previous_root,
&signed_update.update.new_root,
);
replica.update(&signed_update).await?;
let lock = self.semaphore.try_lock();
if lock.is_err() {
return Ok(()); // tx in flight. just do nothing
}
self.replica.update(&signed_update).await?;
// lock dropped here
} else {
info!("No update.");
info!(
"No update. Current root for replica {} is {:?}",
self.replica.name(),
old_root
);
}
Ok(())
}
fn spawn(mut self) -> JoinHandle<Result<()>> {
tokio::spawn(async move {
loop {
self.poll_and_relay_update().await?;
self.interval.tick().await;
}
})
}
}
#[derive(Debug)]
struct ConfirmPoller {
replica: Arc<Replicas>,
interval: Interval,
}
impl ConfirmPoller {
fn new(replica: Arc<Replicas>, interval: Interval) -> Self {
Self { replica, interval }
}
#[tracing::instrument(err)]
async fn poll_confirm(replica: Arc<Replicas>) -> Result<()> {
async fn poll_confirm(&self) -> Result<()> {
// Check for pending update that can be confirmed
let can_confirm = replica.can_confirm().await?;
let can_confirm = self.replica.can_confirm().await?;
// If valid pending update exists, confirm it
if can_confirm {
info!("Can confirm. Confirming on replica {}", replica.name());
replica.confirm().await?;
info!("Can confirm. Confirming on replica {}", self.replica.name());
// don't care if it succeeds
let _ = self.replica.confirm().await;
} else {
info!("Can't confirm");
info!("Can't confirm on replica {}", self.replica.name());
}
Ok(())
}
fn spawn(mut self) -> JoinHandle<Result<()>> {
tokio::spawn(async move {
loop {
self.poll_confirm().await?;
self.interval.tick().await;
}
})
}
}
/// A relayer agent
#[derive(Debug)]
pub struct Relayer {
interval_seconds: u64,
core: AgentCore,
}
impl AsRef<AgentCore> for Relayer {
fn as_ref(&self) -> &AgentCore {
&self.core
}
}
#[allow(clippy::unit_arg)]
impl Relayer {
/// Instantiate a new relayer
pub fn new(interval_seconds: u64, core: AgentCore) -> Self {
Self {
interval_seconds,
core,
}
}
fn interval(&self) -> Interval {
interval(std::time::Duration::from_secs(self.interval_seconds))
}
@ -101,130 +177,130 @@ impl OpticsAgent for Relayer {
fn run(&self, name: &str) -> JoinHandle<Result<()>> {
let replica_opt = self.replica_by_name(name);
let home = self.home();
let mut interval = self.interval();
let i1 = self.interval();
let i2 = self.interval();
let name = name.to_owned();
tokio::spawn(async move {
let replica = replica_opt.ok_or_else(|| eyre!("No replica named {}", name))?;
loop {
let (updated, confirmed) = tokio::join!(
Self::poll_and_relay_update(home.clone(), replica.clone()),
Self::poll_confirm(replica.clone())
);
if let Err(ref e) = updated {
tracing::error!("Error polling updates: {:?}", e)
}
if let Err(ref e) = confirmed {
tracing::error!("Error polling confirms: {:?}", e)
}
updated?;
confirmed?;
interval.tick().await;
if replica_opt.is_none() {
bail!("No replica named {}", name);
}
})
}
}
let replica = replica_opt.unwrap();
#[cfg(test)]
mod test {
use ethers::{core::types::H256, prelude::LocalWallet};
use std::sync::Arc;
use super::*;
use optics_core::{traits::TxOutcome, SignedUpdate, Update};
use optics_test::mocks::{MockHomeContract, MockReplicaContract};
#[tokio::test]
async fn polls_and_relays_updates() {
let signer: LocalWallet =
"1111111111111111111111111111111111111111111111111111111111111111"
.parse()
.unwrap();
let first_root = H256::from([1; 32]);
let second_root = H256::from([2; 32]);
let signed_update = Update {
home_domain: 1,
previous_root: first_root,
new_root: second_root,
}
.sign_with(&signer)
.await
.expect("!sign");
let mut mock_home = MockHomeContract::new();
let mut mock_replica = MockReplicaContract::new();
{
let signed_update = signed_update.clone();
// home.signed_update_by_old_root(first_root) called once and
// returns mock value signed_update
mock_home
.expect__signed_update_by_old_root()
.withf(move |r: &H256| *r == first_root)
.times(1)
.return_once(move |_| Ok(Some(signed_update)));
}
{
let signed_update = signed_update.clone();
// replica.current_root called once and returns mock value
// first_root
mock_replica
.expect__current_root()
.times(1)
.returning(move || Ok(first_root));
// replica.update(signed_update) called once and returns
// mock default value
mock_replica
.expect__update()
.withf(move |s: &SignedUpdate| *s == signed_update)
.times(1)
.returning(|_| {
Ok(TxOutcome {
txid: H256::default(),
executed: true,
})
});
}
let update_poller = UpdatePoller::new(home, replica.clone(), i1);
let update_task = update_poller.spawn();
let mut home: Arc<Homes> = Arc::new(mock_home.into());
let mut replica: Arc<Replicas> = Arc::new(mock_replica.into());
Relayer::poll_and_relay_update(home.clone(), replica.clone())
.await
.expect("Should have returned Ok(())");
let confirm_poller = ConfirmPoller::new(replica, i2);
let confirm_task = confirm_poller.spawn();
let mock_home = Arc::get_mut(&mut home).unwrap();
mock_home.checkpoint();
let (res, _, _) = select_all(vec![confirm_task, update_task]).await;
let mock_replica = Arc::get_mut(&mut replica).unwrap();
mock_replica.checkpoint();
}
let res = res?;
#[tokio::test]
async fn confirms_updates() {
let mut mock_replica = MockReplicaContract::new();
// replica.can_confirm called once and returns mock true
mock_replica
.expect__can_confirm()
.times(1)
.returning(|| Ok(true));
// replica.confirm called once and returns mock default
mock_replica.expect__confirm().times(1).returning(|| {
Ok(TxOutcome {
txid: H256::default(),
executed: true,
})
});
let mut replica: Arc<Replicas> = Arc::new(mock_replica.into());
Relayer::poll_confirm(replica.clone())
.await
.expect("Should have returned Ok(())");
let mock_replica = Arc::get_mut(&mut replica).unwrap();
mock_replica.checkpoint();
tracing::error!("Relayer error. {:?}", res);
res
})
}
}
#[cfg(test)]
mod test {
// use ethers::{core::types::H256, prelude::LocalWallet};
// use std::sync::Arc;
// use super::*;
// use optics_core::{traits::TxOutcome, SignedUpdate, Update};
// use optics_test::mocks::{MockHomeContract, MockReplicaContract};
// #[tokio::test]
// async fn polls_and_relays_updates() {
// let signer: LocalWallet =
// "1111111111111111111111111111111111111111111111111111111111111111"
// .parse()
// .unwrap();
// let first_root = H256::from([1; 32]);
// let second_root = H256::from([2; 32]);
// let signed_update = Update {
// home_domain: 1,
// previous_root: first_root,
// new_root: second_root,
// }
// .sign_with(&signer)
// .await
// .expect("!sign");
// let mut mock_home = MockHomeContract::new();
// let mut mock_replica = MockReplicaContract::new();
// {
// let signed_update = signed_update.clone();
// // home.signed_update_by_old_root(first_root) called once and
// // returns mock value signed_update
// mock_home
// .expect__signed_update_by_old_root()
// .withf(move |r: &H256| *r == first_root)
// .times(1)
// .return_once(move |_| Ok(Some(signed_update)));
// }
// {
// let signed_update = signed_update.clone();
// // replica.current_root called once and returns mock value
// // first_root
// mock_replica
// .expect__current_root()
// .times(1)
// .returning(move || Ok(first_root));
// // replica.update(signed_update) called once and returns
// // mock default value
// mock_replica
// .expect__update()
// .withf(move |s: &SignedUpdate| *s == signed_update)
// .times(1)
// .returning(|_| {
// Ok(TxOutcome {
// txid: H256::default(),
// executed: true,
// })
// });
// }
// let mut home: Arc<Homes> = Arc::new(mock_home.into());
// let mut replica: Arc<Replicas> = Arc::new(mock_replica.into());
// Relayer::poll_and_relay_update(home.clone(), replica.clone())
// .await
// .expect("Should have returned Ok(())");
// let mock_home = Arc::get_mut(&mut home).unwrap();
// mock_home.checkpoint();
// let mock_replica = Arc::get_mut(&mut replica).unwrap();
// mock_replica.checkpoint();
// }
// #[tokio::test]
// async fn confirms_updates() {
// let mut mock_replica = MockReplicaContract::new();
// // replica.can_confirm called once and returns mock true
// mock_replica
// .expect__can_confirm()
// .times(1)
// .returning(|| Ok(true));
// // replica.confirm called once and returns mock default
// mock_replica.expect__confirm().times(1).returning(|| {
// Ok(TxOutcome {
// txid: H256::default(),
// executed: true,
// })
// });
// let mut replica: Arc<Replicas> = Arc::new(mock_replica.into());
// Relayer::poll_confirm(replica.clone())
// .await
// .expect("Should have returned Ok(())");
// let mock_replica = Arc::get_mut(&mut replica).unwrap();
// mock_replica.checkpoint();
// }
}

@ -96,14 +96,22 @@ impl Updater {
// can check and enter the below `if` block at a time,
// protecting from races between threads.
// acquire guard
let _guard = mutex.lock().await;
// acquire guard. If the guard can't be acquired, that
// means a tx is in flight and we should try again later.
let _guard = mutex.try_lock();
if _guard.is_err() {
return;
}
if let Ok(None) = Self::db_get(&db, old_root) {
info!("signing update");
let signed = update.sign_with(signer.as_ref()).await.unwrap();
// If successfully submitted update, record in db
info!("dispatching signed update to contract");
info!(
"Dispatching signed update to contract. Current root is {:?}, new root is {:?}",
&signed.update.previous_root, &signed.update.new_root
);
match home.update(&signed).await {
Ok(_) => {
info!("Storing signed update in db");

@ -145,7 +145,7 @@ type RustConfig = {
level: string;
style: string;
};
dbPath: string;
db: string;
};
export function buildConfig(local: Deploy, remotes: Deploy[]): RustConfig {
@ -170,15 +170,10 @@ export function buildConfig(local: Deploy, remotes: Deploy[]): RustConfig {
level: 'debug',
style: 'pretty',
},
dbPath: 'db_path',
db: 'db_path',
};
console.log(`fff have ${remotes.length} remotes`);
for (var remote of remotes) {
console.log(`remote ${remote.chain.domain}`);
console.log(`local ${local.chain.domain}`);
const replica = {
address: remote.contracts.replicas[local.chain.domain].proxy.address,
domain: remote.chain.domain,

Loading…
Cancel
Save