|
|
|
@ -1,12 +1,11 @@ |
|
|
|
|
use std::sync::Arc; |
|
|
|
|
use std::time::Duration; |
|
|
|
|
use std::time::{Duration, Instant}; |
|
|
|
|
|
|
|
|
|
use eyre::Result; |
|
|
|
|
use prometheus::IntGauge; |
|
|
|
|
use tokio::time::MissedTickBehavior; |
|
|
|
|
use tokio::{task::JoinHandle, time::sleep}; |
|
|
|
|
use tracing::warn; |
|
|
|
|
use tracing::{info, info_span, instrument::Instrumented, Instrument}; |
|
|
|
|
use tracing::{debug, info, info_span, instrument::Instrumented, warn, Instrument}; |
|
|
|
|
|
|
|
|
|
use abacus_base::{CachingOutbox, CheckpointSyncer, CheckpointSyncers, CoreMetrics}; |
|
|
|
|
use abacus_core::{Outbox, Signers}; |
|
|
|
@ -82,7 +81,7 @@ impl ValidatorSubmitter { |
|
|
|
|
// https://github.com/abacus-network/abacus-monorepo/issues/575 for
|
|
|
|
|
// more details.
|
|
|
|
|
while self.outbox.count().await? == 0 { |
|
|
|
|
info!("waiting for non-zero outbox size"); |
|
|
|
|
info!("Waiting for non-zero outbox size"); |
|
|
|
|
sleep(Duration::from_secs(self.interval)).await; |
|
|
|
|
} |
|
|
|
|
|
|
|
|
@ -96,6 +95,23 @@ impl ValidatorSubmitter { |
|
|
|
|
.latest_checkpoint_processed |
|
|
|
|
.set(current_index as i64); |
|
|
|
|
|
|
|
|
|
// How often to log checkpoint info - once every minute
|
|
|
|
|
let checkpoint_info_log_period = Duration::from_secs(60); |
|
|
|
|
// The instant in which we last logged checkpoint info, if at all
|
|
|
|
|
let mut latest_checkpoint_info_log: Option<Instant> = None; |
|
|
|
|
// Returns whether checkpoint info should be logged based off the
|
|
|
|
|
// checkpoint_info_log_period having elapsed since the last log.
|
|
|
|
|
// Sets latest_checkpoint_info_log to the current instant if true.
|
|
|
|
|
let mut should_log_checkpoint_info = || { |
|
|
|
|
if let Some(instant) = latest_checkpoint_info_log { |
|
|
|
|
if instant.elapsed() < checkpoint_info_log_period { |
|
|
|
|
return false; |
|
|
|
|
} |
|
|
|
|
} |
|
|
|
|
latest_checkpoint_info_log = Some(Instant::now()); |
|
|
|
|
true |
|
|
|
|
}; |
|
|
|
|
|
|
|
|
|
info!(current_index = current_index, "Starting Validator"); |
|
|
|
|
loop { |
|
|
|
|
// Check the latest checkpoint
|
|
|
|
@ -105,10 +121,26 @@ impl ValidatorSubmitter { |
|
|
|
|
.latest_checkpoint_observed |
|
|
|
|
.set(latest_checkpoint.index as i64); |
|
|
|
|
|
|
|
|
|
// Occasional info to make it clear to a validator operator whether things are working
|
|
|
|
|
// correctly without using the debug log level.
|
|
|
|
|
if should_log_checkpoint_info() { |
|
|
|
|
info!( |
|
|
|
|
latest_signed_checkpoint_index=?current_index, |
|
|
|
|
latest_known_checkpoint_index=?latest_checkpoint.index, |
|
|
|
|
"Latest checkpoint infos" |
|
|
|
|
); |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
debug!( |
|
|
|
|
latest_signed_checkpoint_index=?current_index, |
|
|
|
|
latest_known_checkpoint_index=?latest_checkpoint.index, |
|
|
|
|
"Polled latest checkpoint" |
|
|
|
|
); |
|
|
|
|
|
|
|
|
|
if current_index < latest_checkpoint.index { |
|
|
|
|
let signed_checkpoint = latest_checkpoint.sign_with(self.signer.as_ref()).await?; |
|
|
|
|
|
|
|
|
|
info!(signature = ?signed_checkpoint, signer=?self.signer, "Sign latest checkpoint"); |
|
|
|
|
info!(signed_checkpoint = ?signed_checkpoint, signer=?self.signer, "Signed new latest checkpoint"); |
|
|
|
|
current_index = latest_checkpoint.index; |
|
|
|
|
|
|
|
|
|
self.checkpoint_syncer |
|
|
|
|