Monitor validator checkpoints in CheckpointSyncer (#993)

* Monitor validator checkpoints in CheckpointSyncer

* PR review
pull/1023/head
Nam Chu Hoai 2 years ago committed by GitHub
parent 09ef472844
commit 03ebda6acd
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 20
      rust/abacus-base/src/metrics/core.rs
  2. 33
      rust/abacus-base/src/types/checkpoint_syncer.rs
  3. 12
      rust/abacus-base/src/types/local_storage.rs
  4. 18
      rust/abacus-base/src/types/s3_storage.rs
  5. 15
      rust/agents/relayer/src/relayer.rs
  6. 3
      rust/agents/validator/src/validator.rs

@ -42,6 +42,7 @@ pub struct CoreMetrics {
span_durations: HistogramVec,
span_events: IntCounterVec,
last_known_message_leaf_index: IntGaugeVec,
validator_checkpoint_index: IntGaugeVec,
submitter_queue_length: IntGaugeVec,
submitter_queue_duration_histogram: HistogramVec,
@ -106,6 +107,16 @@ impl CoreMetrics {
registry
)?;
let validator_checkpoint_index = register_int_gauge_vec_with_registry!(
opts!(
namespaced!("validator_checkpoint_index"),
"Observed signed checkpoint indices per validator",
const_labels_ref
),
&["origin", "validator"],
registry
)?;
let submitter_queue_length = register_int_gauge_vec_with_registry!(
opts!(
namespaced!("submitter_queue_length"),
@ -170,6 +181,7 @@ impl CoreMetrics {
span_durations,
span_events,
last_known_message_leaf_index,
validator_checkpoint_index,
submitter_queue_length,
submitter_queue_duration_histogram,
@ -290,6 +302,14 @@ impl CoreMetrics {
self.last_known_message_leaf_index.clone()
}
/// Gauge for reporting the most recent validator checkpoint index
/// Labels:
/// - `origin`: Origin chain
/// - `validator`: Address of the validator
pub fn validator_checkpoint_index(&self) -> IntGaugeVec {
self.validator_checkpoint_index.clone()
}
/// Gauge for reporting the current outbox state. This is either 0 (for
/// UnInitialized), 1 (for Active), or 2 (for Failed). These are from the
/// outbox contract States enum.

@ -1,5 +1,6 @@
use core::str::FromStr;
use ethers::types::Address;
use prometheus::{IntGauge, IntGaugeVec};
use std::collections::HashMap;
use tracing::instrument;
@ -30,14 +31,21 @@ pub enum CheckpointSyncerConf {
impl CheckpointSyncerConf {
/// Turn conf info a Checkpoint Syncer
pub fn try_into_checkpoint_syncer(&self) -> Result<CheckpointSyncers, Report> {
pub fn try_into_checkpoint_syncer(
&self,
latest_index_gauge: Option<IntGauge>,
) -> Result<CheckpointSyncers, Report> {
match self {
CheckpointSyncerConf::LocalStorage { path } => {
Ok(CheckpointSyncers::Local(LocalStorage::new(path)))
}
CheckpointSyncerConf::S3 { bucket, region } => Ok(CheckpointSyncers::S3(
S3Storage::new(bucket, region.parse().expect("invalid s3 region")),
CheckpointSyncerConf::LocalStorage { path } => Ok(CheckpointSyncers::Local(
LocalStorage::new(path, latest_index_gauge),
)),
CheckpointSyncerConf::S3 { bucket, region } => {
Ok(CheckpointSyncers::S3(S3Storage::new(
bucket,
region.parse().expect("invalid s3 region"),
latest_index_gauge,
)))
}
}
}
}
@ -54,10 +62,19 @@ pub struct MultisigCheckpointSyncerConf {
impl MultisigCheckpointSyncerConf {
/// Get a MultisigCheckpointSyncer from the config
pub fn try_into_multisig_checkpoint_syncer(&self) -> Result<MultisigCheckpointSyncer, Report> {
pub fn try_into_multisig_checkpoint_syncer(
&self,
origin: &str,
validator_checkpoint_index: IntGaugeVec,
) -> Result<MultisigCheckpointSyncer, Report> {
let mut checkpoint_syncers = HashMap::new();
for (key, value) in self.checkpointsyncers.iter() {
checkpoint_syncers.insert(Address::from_str(key)?, value.try_into_checkpoint_syncer()?);
let gauge =
validator_checkpoint_index.with_label_values(&[origin, &key.to_lowercase()]);
checkpoint_syncers.insert(
Address::from_str(key)?,
value.try_into_checkpoint_syncer(Some(gauge))?,
);
}
Ok(MultisigCheckpointSyncer::new(
self.threshold,

@ -2,21 +2,24 @@ use abacus_core::SignedCheckpoint;
use async_trait::async_trait;
use eyre::Result;
use prometheus::IntGauge;
use crate::traits::CheckpointSyncer;
#[derive(Debug, Clone, serde::Deserialize)]
#[derive(Debug, Clone)]
/// Type for reading/write to LocalStorage
pub struct LocalStorage {
/// base path
pub path: String,
path: String,
latest_index: Option<IntGauge>,
}
impl LocalStorage {
/// Constructor
pub fn new(path: &str) -> Self {
pub fn new(path: &str, latest_index: Option<IntGauge>) -> Self {
LocalStorage {
path: path.to_owned(),
latest_index,
}
}
fn checkpoint_file_path(&self, index: u32) -> String {
@ -44,6 +47,9 @@ impl CheckpointSyncer for LocalStorage {
}) {
Ok(data) => {
let index = data.parse()?;
if let Some(gauge) = &self.latest_index {
gauge.set(index as i64);
}
Ok(Some(index))
}
_ => Ok(None),

@ -4,6 +4,7 @@ use abacus_core::SignedCheckpoint;
use async_trait::async_trait;
use eyre::{bail, Result};
use futures_util::TryStreamExt;
use prometheus::IntGauge;
use rusoto_core::{credential::EnvironmentProvider, HttpClient, Region, RusotoError};
use rusoto_s3::{GetObjectError, GetObjectRequest, PutObjectRequest, S3Client, S3};
@ -18,6 +19,7 @@ pub struct S3Storage {
region: Region,
/// client
client: S3Client,
latest_index: Option<IntGauge>,
}
impl fmt::Debug for S3Storage {
@ -31,7 +33,7 @@ impl fmt::Debug for S3Storage {
impl S3Storage {
/// constructor
pub fn new(bucket: &str, region: Region) -> Self {
pub fn new(bucket: &str, region: Region, latest_index: Option<IntGauge>) -> Self {
let client = S3Client::new_with(
HttpClient::new().unwrap(),
EnvironmentProvider::default(),
@ -42,6 +44,7 @@ impl S3Storage {
bucket: bucket.to_owned(),
region,
client,
latest_index,
}
}
@ -84,11 +87,20 @@ impl S3Storage {
#[async_trait]
impl CheckpointSyncer for S3Storage {
async fn latest_index(&self) -> Result<Option<u32>> {
self.read_from_bucket(S3Storage::index_key())
let ret = self
.read_from_bucket(S3Storage::index_key())
.await?
.map(|data| serde_json::from_slice(&data))
.transpose()
.map_err(Into::into)
.map_err(Into::into);
if let Ok(Some(latest_index)) = ret {
if let Some(gauge) = &self.latest_index {
gauge.set(latest_index as i64);
}
}
ret
}
async fn fetch_checkpoint(&self, index: u32) -> Result<Option<SignedCheckpoint>> {
self.read_from_bucket(S3Storage::checkpoint_key(index))

@ -46,9 +46,17 @@ impl BaseAgent for Relayer {
where
Self: Sized,
{
let core = settings
.as_ref()
.try_into_abacus_core(Self::AGENT_NAME, true)
.await?;
let multisig_checkpoint_syncer: MultisigCheckpointSyncer = settings
.multisigcheckpointsyncer
.try_into_multisig_checkpoint_syncer()?;
.try_into_multisig_checkpoint_syncer(
core.outbox.outbox().chain_name(),
core.metrics.validator_checkpoint_index(),
)?;
let whitelist = parse_matching_list(&settings.whitelist);
let blacklist = parse_matching_list(&settings.blacklist);
@ -60,10 +68,7 @@ impl BaseAgent for Relayer {
.parse()
.unwrap_or(5),
multisig_checkpoint_syncer,
core: settings
.as_ref()
.try_into_abacus_core(Self::AGENT_NAME, true)
.await?,
core,
whitelist,
blacklist,
})

@ -59,11 +59,12 @@ impl BaseAgent for Validator {
let signer = settings.validator.try_into_signer().await?;
let reorg_period = settings.reorgperiod.parse().expect("invalid uint");
let interval = settings.interval.parse().expect("invalid uint");
let checkpoint_syncer = settings.checkpointsyncer.try_into_checkpoint_syncer()?;
let core = settings
.as_ref()
.try_into_abacus_core(Self::AGENT_NAME, false)
.await?;
let checkpoint_syncer = settings.checkpointsyncer.try_into_checkpoint_syncer(None)?;
Ok(Self::new(
signer,
reorg_period,

Loading…
Cancel
Save