Add Validator scaffold (#213)
parent
b588e248fb
commit
4a28875af8
File diff suppressed because it is too large
Load Diff
@ -0,0 +1,14 @@ |
||||
use abacus_core::SignedCheckpoint; |
||||
use async_trait::async_trait; |
||||
use color_eyre::Result; |
||||
|
||||
/// A generic trait to read/write Checkpoints offchain
|
||||
#[async_trait] |
||||
pub trait CheckpointSyncer { |
||||
/// Read the highest index of this Syncer
|
||||
async fn latest_index(&self) -> Result<Option<u32>>; |
||||
/// Attempt to fetch the signed checkpoint at this index
|
||||
async fn fetch_checkpoint(&self, index: u32) -> Result<Option<SignedCheckpoint>>; |
||||
/// Write the signed checkpoint to this syncer
|
||||
async fn write_checkpoint(&self, signed_checkpoint: SignedCheckpoint) -> Result<()>; |
||||
} |
@ -0,0 +1,3 @@ |
||||
mod checkpoint_syncer; |
||||
|
||||
pub use checkpoint_syncer::*; |
@ -0,0 +1,76 @@ |
||||
use abacus_core::SignedCheckpoint; |
||||
|
||||
use async_trait::async_trait; |
||||
use color_eyre::Result; |
||||
|
||||
use crate::traits::CheckpointSyncer; |
||||
/// Type for reading/write to LocalStorage
|
||||
pub struct LocalStorage { |
||||
/// base path
|
||||
pub path: String, |
||||
} |
||||
|
||||
impl LocalStorage { |
||||
fn checkpoint_file_path(&self, index: u32) -> String { |
||||
let mut path = self.path.clone(); |
||||
path.push_str(&format!("/{}.json", index)); |
||||
path |
||||
} |
||||
|
||||
fn latest_index_file_path(&self) -> String { |
||||
let mut path = self.path.clone(); |
||||
path.push_str("/index.json"); |
||||
path |
||||
} |
||||
|
||||
async fn write_index(&self, index: u32) -> Result<()> { |
||||
tokio::fs::write(self.latest_index_file_path(), index.to_string()).await?; |
||||
Ok(()) |
||||
} |
||||
} |
||||
|
||||
#[async_trait] |
||||
impl CheckpointSyncer for LocalStorage { |
||||
async fn latest_index(&self) -> Result<Option<u32>> { |
||||
match tokio::fs::read(self.latest_index_file_path()) |
||||
.await |
||||
.and_then(|data| { |
||||
String::from_utf8(data) |
||||
.map_err(|err| std::io::Error::new(std::io::ErrorKind::InvalidData, err)) |
||||
}) { |
||||
Ok(data) => { |
||||
let index = data.parse()?; |
||||
Ok(Some(index)) |
||||
} |
||||
_ => Ok(None), |
||||
} |
||||
} |
||||
async fn fetch_checkpoint(&self, index: u32) -> Result<Option<SignedCheckpoint>> { |
||||
match tokio::fs::read(self.checkpoint_file_path(index)).await { |
||||
Ok(data) => { |
||||
let checkpoint = serde_json::from_slice(&data)?; |
||||
Ok(Some(checkpoint)) |
||||
} |
||||
_ => Ok(None), |
||||
} |
||||
} |
||||
async fn write_checkpoint(&self, signed_checkpoint: SignedCheckpoint) -> Result<()> { |
||||
let serialized_checkpoint = serde_json::to_string_pretty(&signed_checkpoint)?; |
||||
tokio::fs::write( |
||||
self.checkpoint_file_path(signed_checkpoint.checkpoint.index), |
||||
&serialized_checkpoint, |
||||
) |
||||
.await?; |
||||
|
||||
match self.latest_index().await? { |
||||
Some(current_latest_index) => { |
||||
if current_latest_index < signed_checkpoint.checkpoint.index { |
||||
self.write_index(signed_checkpoint.checkpoint.index).await? |
||||
} |
||||
} |
||||
None => self.write_index(signed_checkpoint.checkpoint.index).await?, |
||||
} |
||||
|
||||
Ok(()) |
||||
} |
||||
} |
@ -0,0 +1,3 @@ |
||||
mod local_storage; |
||||
|
||||
pub use local_storage::*; |
@ -0,0 +1,31 @@ |
||||
[package] |
||||
name = "validator" |
||||
version = "0.1.0" |
||||
authors = ["Abacus Team"] |
||||
edition = "2021" |
||||
|
||||
[dependencies] |
||||
tokio = { version = "1.0.1", features = ["rt", "macros"] } |
||||
config = "0.10" |
||||
serde = "1.0.120" |
||||
serde_json = { version = "1.0.61", default-features = false } |
||||
log = "0.4.13" |
||||
ethers = { git = "https://github.com/gakonst/ethers-rs", branch = "master" } |
||||
thiserror = { version = "1.0.22", default-features = false } |
||||
async-trait = { version = "0.1.42", default-features = false } |
||||
futures-util = "0.3.12" |
||||
color-eyre = "0.5.0" |
||||
tracing = "0.1.22" |
||||
tracing-futures = "0.2.4" |
||||
tracing-subscriber = "0.2.15" |
||||
rocksdb = { git = "https://github.com/rust-rocksdb/rust-rocksdb" } |
||||
|
||||
abacus-core = { path = "../../abacus-core" } |
||||
abacus-base = { path = "../../abacus-base" } |
||||
abacus-ethereum = { path = "../../chains/abacus-ethereum" } |
||||
paste = "1.0.5" |
||||
|
||||
[dev-dependencies] |
||||
tokio-test = "0.4.0" |
||||
abacus-test = { path = "../../abacus-test" } |
||||
prometheus = "0.12" |
@ -0,0 +1,40 @@ |
||||
//! The checkpointer observes the Outbox contract and calls checkpoint.
|
||||
|
||||
#![forbid(unsafe_code)] |
||||
#![warn(missing_docs)] |
||||
#![warn(unused_extern_crates)] |
||||
|
||||
mod settings; |
||||
mod submit; |
||||
mod validator; |
||||
|
||||
use color_eyre::Result; |
||||
|
||||
use abacus_base::Agent; |
||||
|
||||
use crate::{settings::ValidatorSettings as Settings, validator::Validator}; |
||||
|
||||
async fn _main() -> Result<()> { |
||||
color_eyre::install()?; |
||||
let settings = Settings::new()?; |
||||
|
||||
let agent = Validator::from_settings(settings).await?; |
||||
|
||||
agent |
||||
.as_ref() |
||||
.settings |
||||
.tracing |
||||
.start_tracing(agent.metrics().span_duration())?; |
||||
let _ = agent.metrics().run_http_server(); |
||||
|
||||
agent.run().await??; |
||||
Ok(()) |
||||
} |
||||
|
||||
fn main() -> Result<()> { |
||||
tokio::runtime::Builder::new_current_thread() |
||||
.enable_all() |
||||
.build() |
||||
.unwrap() |
||||
.block_on(_main()) |
||||
} |
@ -0,0 +1,12 @@ |
||||
//! Configuration
|
||||
|
||||
use abacus_base::decl_settings; |
||||
|
||||
decl_settings!(Validator { |
||||
/// The validator attestation signer
|
||||
validator: abacus_base::SignerConf, |
||||
/// The reorg_period in blocks
|
||||
reorgperiod: String, |
||||
/// How frequently to check for new checkpoints
|
||||
interval: String, |
||||
}); |
@ -0,0 +1,57 @@ |
||||
use std::sync::Arc; |
||||
|
||||
use abacus_base::{CachingOutbox, CheckpointSyncer, LocalStorage}; |
||||
use abacus_core::{AbacusCommon, Signers}; |
||||
use std::time::Duration; |
||||
|
||||
use color_eyre::Result; |
||||
use tokio::{task::JoinHandle, time::sleep}; |
||||
use tracing::{info, info_span, instrument::Instrumented, Instrument}; |
||||
pub(crate) struct ValidatorSubmitter { |
||||
interval: u64, |
||||
reorg_period: u64, |
||||
signer: Arc<Signers>, |
||||
outbox: Arc<CachingOutbox>, |
||||
} |
||||
|
||||
impl ValidatorSubmitter { |
||||
pub(crate) fn new( |
||||
interval: u64, |
||||
reorg_period: u64, |
||||
outbox: Arc<CachingOutbox>, |
||||
signer: Arc<Signers>, |
||||
) -> Self { |
||||
Self { |
||||
reorg_period, |
||||
interval, |
||||
outbox, |
||||
signer, |
||||
} |
||||
} |
||||
|
||||
pub(crate) fn spawn(self) -> Instrumented<JoinHandle<Result<()>>> { |
||||
let span = info_span!("ValidatorSubmitter"); |
||||
let reorg_period = Some(self.reorg_period); |
||||
tokio::spawn(async move { |
||||
let starting_checkpoint = self.outbox.latest_checkpoint(reorg_period).await?; |
||||
let mut current_index = starting_checkpoint.index; |
||||
loop { |
||||
sleep(Duration::from_secs(self.interval)).await; |
||||
|
||||
// Check the current checkpoint
|
||||
let checkpoint = self.outbox.latest_checkpoint(reorg_period).await?; |
||||
|
||||
if current_index < checkpoint.index { |
||||
let signed_checkpoint = checkpoint.sign_with(self.signer.as_ref()).await?; |
||||
|
||||
info!(signature = ?signed_checkpoint, signer=?self.signer, "Sign latest checkpoint"); |
||||
current_index = checkpoint.index; |
||||
|
||||
let storage = LocalStorage { path: "/tmp/validatorsignatures".to_string() }; |
||||
storage.write_checkpoint(signed_checkpoint).await?; |
||||
} |
||||
} |
||||
}) |
||||
.instrument(span) |
||||
} |
||||
} |
@ -0,0 +1,75 @@ |
||||
use std::sync::Arc; |
||||
|
||||
use async_trait::async_trait; |
||||
use color_eyre::Result; |
||||
use tokio::task::JoinHandle; |
||||
use tracing::instrument::Instrumented; |
||||
|
||||
use crate::{settings::ValidatorSettings as Settings, submit::ValidatorSubmitter}; |
||||
use abacus_base::{AbacusAgentCore, Agent}; |
||||
use abacus_core::Signers; |
||||
|
||||
/// An validator agent
|
||||
#[derive(Debug)] |
||||
pub struct Validator { |
||||
signer: Arc<Signers>, |
||||
reorg_period: u64, |
||||
interval: u64, |
||||
pub(crate) core: AbacusAgentCore, |
||||
} |
||||
|
||||
impl AsRef<AbacusAgentCore> for Validator { |
||||
fn as_ref(&self) -> &AbacusAgentCore { |
||||
&self.core |
||||
} |
||||
} |
||||
|
||||
impl Validator { |
||||
/// Instantiate a new validator
|
||||
pub fn new(signer: Signers, reorg_period: u64, interval: u64, core: AbacusAgentCore) -> Self { |
||||
Self { |
||||
signer: Arc::new(signer), |
||||
reorg_period, |
||||
interval, |
||||
core, |
||||
} |
||||
} |
||||
} |
||||
|
||||
#[async_trait] |
||||
impl Agent for Validator { |
||||
const AGENT_NAME: &'static str = "validator"; |
||||
|
||||
type Settings = Settings; |
||||
|
||||
async fn from_settings(settings: Self::Settings) -> Result<Self> |
||||
where |
||||
Self: Sized, |
||||
{ |
||||
let signer = settings.validator.try_into_signer().await?; |
||||
let reorg_period = settings.reorgperiod.parse().expect("invalid uint"); |
||||
let interval = settings.interval.parse().expect("invalid uint"); |
||||
let core = settings |
||||
.as_ref() |
||||
.try_into_abacus_core(Self::AGENT_NAME) |
||||
.await?; |
||||
Ok(Self::new(signer, reorg_period, interval, core)) |
||||
} |
||||
} |
||||
|
||||
impl Validator { |
||||
pub fn run(&self) -> Instrumented<JoinHandle<Result<()>>> { |
||||
let outbox = self.outbox(); |
||||
let submit = ValidatorSubmitter::new( |
||||
self.interval, |
||||
self.reorg_period, |
||||
outbox, |
||||
self.signer.clone(), |
||||
); |
||||
|
||||
self.run_all(vec![submit.spawn()]) |
||||
} |
||||
} |
||||
|
||||
#[cfg(test)] |
||||
mod test {} |
Loading…
Reference in new issue