Checkpointer policy v1 (#217)

* Add script for running local dev environment

* Prettier

* Move Checkpointer to (Abacus)Agent

* Lint

* Fix

* Commit dev files to run checkpointer

* Use @abacus-network/utils in solidity/abacus-core/scripts/abacus.ts

* Compiles, some warnings to think of

* rename checkpoint() -> create_checkpoint()

* cargo fmt

* Move to index based decision

* More info

* Move from (H256, u32) to Checkpoint

* cargo fmt

* Add some scripts for testing

* Actually fix merge conflicts

* Add latency

* Nits

* rm testing utils

* rm 1000_config.json

* nit

* fix package-lock.json

* Better names

* Sleep instead of using last_checkpoint_time

* cargo fmt

* Rm comment

* Add checkpointer to output-agent-env-vars

Co-authored-by: Asa Oines <a@celo.org>
Co-authored-by: nambrot <nambrot@googlemail.com>
pull/281/head
Trevor Porter 3 years ago committed by GitHub
parent 15e970382e
commit 328941fc2c
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 2
      rust/abacus-base/src/agent.rs
  2. 5
      rust/abacus-base/src/inbox.rs
  3. 24
      rust/abacus-base/src/outbox.rs
  4. 9
      rust/abacus-core/src/traits/outbox.rs
  5. 12
      rust/abacus-test/src/mocks/outbox.rs
  6. 27
      rust/agents/checkpointer/src/checkpointer.rs
  7. 4
      rust/agents/checkpointer/src/settings.rs
  8. 70
      rust/agents/checkpointer/src/submit.rs
  9. 12
      rust/chains/abacus-ethereum/src/outbox.rs
  10. 4
      solidity/core/package.json
  11. 4
      typescript/deploy/config/environments/local/agent.ts
  12. 9
      typescript/deploy/src/agents/index.ts
  13. 8
      typescript/deploy/src/config/agent.ts

@ -47,7 +47,7 @@ pub struct AbacusAgentCore {
pub settings: crate::settings::Settings,
}
/// A trait for an abaus agent
/// A trait for an abacus agent
#[async_trait]
pub trait Agent: Send + Sync + std::fmt::Debug + AsRef<AbacusAgentCore> {
/// The agent's name

@ -1,8 +1,7 @@
use abacus_core::{
accumulator::merkle::Proof, db::AbacusDB, AbacusMessage, ChainCommunicationError,
MessageStatus, TxOutcome,
accumulator::merkle::Proof, db::AbacusDB, AbacusCommon, AbacusMessage, ChainCommunicationError,
Checkpoint, Inbox, MessageStatus, SignedCheckpoint, TxOutcome,
};
use abacus_core::{AbacusCommon, Checkpoint, Inbox, SignedCheckpoint};
use abacus_test::mocks::inbox::MockInboxContract;
use async_trait::async_trait;
use color_eyre::eyre::Result;

@ -99,6 +99,14 @@ impl Outbox for CachingOutbox {
async fn state(&self) -> Result<State, ChainCommunicationError> {
self.outbox.state().await
}
async fn count(&self) -> Result<u32, ChainCommunicationError> {
self.outbox.count().await
}
async fn create_checkpoint(&self) -> Result<TxOutcome, ChainCommunicationError> {
self.outbox.create_checkpoint().await
}
}
#[async_trait]
@ -269,6 +277,22 @@ impl Outbox for OutboxVariants {
OutboxVariants::Other(outbox) => outbox.state().await,
}
}
async fn count(&self) -> Result<u32, ChainCommunicationError> {
match self {
OutboxVariants::Ethereum(outbox) => outbox.count().await,
OutboxVariants::Mock(mock_outbox) => mock_outbox.count().await,
OutboxVariants::Other(outbox) => outbox.count().await,
}
}
async fn create_checkpoint(&self) -> Result<TxOutcome, ChainCommunicationError> {
match self {
OutboxVariants::Ethereum(outbox) => outbox.create_checkpoint().await,
OutboxVariants::Mock(mock_outbox) => mock_outbox.create_checkpoint().await,
OutboxVariants::Other(outbox) => outbox.create_checkpoint().await,
}
}
}
#[async_trait]

@ -18,8 +18,17 @@ pub trait Outbox: AbacusCommon + Send + Sync + std::fmt::Debug {
/// Fetch the nonce
async fn nonces(&self, destination: u32) -> Result<u32, ChainCommunicationError>;
/// Gets the current leaf count of the merkle tree
async fn count(&self) -> Result<u32, ChainCommunicationError>;
/// Dispatch a message.
async fn dispatch(&self, message: &Message) -> Result<TxOutcome, ChainCommunicationError>;
/// Creates a new checkpoint.
/// This isn't called `checkpoint` to avoid a conflict with the MockOutboxContract,
/// which has a conflicting `checkpoint` function automatically created by the mockall
/// library's automocking attribute macro.
async fn create_checkpoint(&self) -> Result<TxOutcome, ChainCommunicationError>;
}
/// Interface for retrieving event data emitted specifically by the outbox

@ -35,6 +35,10 @@ mock! {
pub fn _dispatch(&self, message: &Message) -> Result<TxOutcome, ChainCommunicationError> {}
pub fn _count(&self) -> Result<u32, ChainCommunicationError> {}
pub fn _create_checkpoint(&self) -> Result<TxOutcome, ChainCommunicationError> {}
// Common
pub fn _name(&self) -> &str {}
@ -69,6 +73,14 @@ impl Outbox for MockOutboxContract {
async fn state(&self) -> Result<State, ChainCommunicationError> {
self._state()
}
async fn count(&self) -> Result<u32, ChainCommunicationError> {
self._count()
}
async fn create_checkpoint(&self) -> Result<TxOutcome, ChainCommunicationError> {
self._create_checkpoint()
}
}
#[async_trait]

@ -5,12 +5,14 @@ use tracing::instrument::Instrumented;
use crate::{settings::CheckpointerSettings as Settings, submit::CheckpointSubmitter};
use abacus_base::{AbacusAgentCore, Agent};
use abacus_core::{db::AbacusDB, AbacusCommon};
/// An checkpointer agent
/// A checkpointer agent
#[derive(Debug)]
pub struct Checkpointer {
interval_seconds: u64,
/// The polling interval (in seconds)
polling_interval: u64,
/// The minimum period between created checkpoints (in seconds)
creation_latency: u64,
pub(crate) core: AbacusAgentCore,
}
@ -22,9 +24,10 @@ impl AsRef<AbacusAgentCore> for Checkpointer {
impl Checkpointer {
/// Instantiate a new checkpointer
pub fn new(interval_seconds: u64, core: AbacusAgentCore) -> Self {
pub fn new(polling_interval: u64, creation_latency: u64, core: AbacusAgentCore) -> Self {
Self {
interval_seconds,
polling_interval,
creation_latency,
core,
}
}
@ -40,21 +43,27 @@ impl Agent for Checkpointer {
where
Self: Sized,
{
let interval_seconds = settings.interval.parse().expect("invalid uint");
let polling_interval = settings
.pollinginterval
.parse()
.expect("invalid pollinginterval uint");
let creation_latency = settings
.creationlatency
.parse()
.expect("invalid creationlatency uint");
let core = settings
.as_ref()
.try_into_abacus_core(Self::AGENT_NAME)
.await?;
Ok(Self::new(interval_seconds, core))
Ok(Self::new(polling_interval, creation_latency, core))
}
}
impl Checkpointer {
pub fn run(&self) -> Instrumented<JoinHandle<Result<()>>> {
let outbox = self.outbox();
let db = AbacusDB::new(self.outbox().name(), self.db());
let submit = CheckpointSubmitter::new(outbox, db, self.interval_seconds);
let submit = CheckpointSubmitter::new(outbox, self.polling_interval, self.creation_latency);
self.run_all(vec![submit.spawn()])
}

@ -4,5 +4,7 @@ use abacus_base::decl_settings;
decl_settings!(Checkpointer {
/// The polling interval (in seconds)
interval: String,
pollinginterval: String,
/// The minimum period between submitted checkpoints (in seconds)
creationlatency: String,
});

@ -1,25 +1,30 @@
use std::sync::Arc;
use std::{sync::Arc, time::Duration};
use abacus_base::CachingOutbox;
use abacus_core::{db::AbacusDB, AbacusCommon, CommittedMessage};
use std::time::Duration;
use abacus_core::{AbacusCommon, Checkpoint, Outbox};
use color_eyre::Result;
use tokio::{task::JoinHandle, time::sleep};
use tracing::{info, info_span, instrument::Instrumented, Instrument};
use tracing::{debug, info, info_span, instrument::Instrumented, Instrument};
pub(crate) struct CheckpointSubmitter {
outbox: Arc<CachingOutbox>,
db: AbacusDB,
interval_seconds: u64,
/// The polling interval
polling_interval: Duration,
/// The minimum period between submitted checkpoints
creation_latency: Duration,
}
impl CheckpointSubmitter {
pub(crate) fn new(outbox: Arc<CachingOutbox>, db: AbacusDB, interval_seconds: u64) -> Self {
pub(crate) fn new(
outbox: Arc<CachingOutbox>,
polling_interval: u64,
creation_latency: u64,
) -> Self {
Self {
outbox,
db,
interval_seconds,
polling_interval: Duration::from_secs(polling_interval),
creation_latency: Duration::from_secs(creation_latency),
}
}
@ -27,31 +32,30 @@ impl CheckpointSubmitter {
let span = info_span!("CheckpointSubmitter");
tokio::spawn(async move {
// This is just some dummy code
loop {
sleep(Duration::from_secs(self.interval_seconds)).await;
// Check the current checkpoint
let root = self.outbox.checkpointed_root().await?;
info!(root=?root, "Checked root");
// Get the latest message
if let Some(leaf) = self.db.retrieve_latest_leaf_index()? {
if let Some(message) = self.db.message_by_leaf_index(leaf)? {
let parsed_message = CommittedMessage::try_from(message)?;
info!(parsed_message=?parsed_message, "Latest leaf");
if let Some(update) = self
.db
.update_by_previous_root(parsed_message.committed_root)?
{
// Check if we want to submit a checkpoint tx
if parsed_message.committed_root == update.update.previous_root {
info!("Submit checkpoint");
}
}
}
sleep(self.polling_interval).await;
// Check the latest checkpointed index
let Checkpoint {
index: latest_checkpoint_index,
..
} = self.outbox.latest_checkpoint(None).await?;
// Get the current count of the tree
let count = self.outbox.count().await?;
info!(
latest_checkpoint_index=?latest_checkpoint_index,
count=?count,
"Got latest checkpoint and count"
);
// If there are any new messages, the count will be greater than
// the latest checkpoint index.
if count > latest_checkpoint_index {
debug!("Creating checkpoint");
self.outbox.create_checkpoint().await?;
// Sleep to ensure that another checkpoint isn't made until
// creation_latency has passed
sleep(self.creation_latency).await;
}
}
})

@ -273,4 +273,16 @@ where
_ => unreachable!(),
}
}
#[tracing::instrument(err, skip(self))]
async fn count(&self) -> Result<u32, ChainCommunicationError> {
Ok(self.contract.count().call().await?.as_u32())
}
#[tracing::instrument(err, skip(self))]
async fn create_checkpoint(&self) -> Result<TxOutcome, ChainCommunicationError> {
let tx = self.contract.checkpoint();
Ok(report_tx!(tx).into())
}
}

@ -29,8 +29,8 @@
"prettier": "prettier --write ./contracts ./libs ./test",
"build": "hardhat compile && hardhat typechain && npm run prettier && tsc && npm run copy-types",
"copy-types": "cp types/*.d.ts dist/",
"test": "hardhat test",
"coverage": "hardhat coverage"
"coverage": "hardhat coverage",
"test": "hardhat test"
},
"license": "MIT OR Apache-2.0",
"dependencies": {

@ -14,4 +14,8 @@ export const agentConfig: AgentConfig = {
relayer: {
interval: 5,
},
checkpointer: {
pollingInterval: 5,
creationLatency: 10,
},
};

@ -220,6 +220,15 @@ export async function getAgentEnvVars(
}
}
if (role.startsWith('checkpointer')) {
envVars.push(
`OPT_CHECKPOINTER_POLLINGINTERVAL=${agentConfig.checkpointer?.pollingInterval}`,
);
envVars.push(
`OPT_CHECKPOINTER_CREATIONLATENCY=${agentConfig.checkpointer?.creationLatency}`,
);
}
return envVars;
}

@ -28,6 +28,13 @@ interface ValidatorConfig {
pause?: number;
}
interface CheckpointerConfig {
// Polling interval (in seconds)
pollingInterval: number;
// Minimum time between created checkpoints (in seconds)
creationLatency: number;
}
export interface DockerConfig {
repo: string;
tag: string;
@ -43,6 +50,7 @@ export interface AgentConfig {
processor?: ProcessorConfig;
validator?: ValidatorConfig;
relayer?: RelayerConfig;
checkpointer?: CheckpointerConfig;
}
export type RustSigner = {

Loading…
Cancel
Save