@ -5,16 +5,20 @@ use std::time::{Duration, Instant};
use eyre ::Result ;
use prometheus ::IntGauge ;
use tokio ::{ task ::JoinHandle , time ::sleep } ;
use tracing ::{ debug , info , info_span , instrument ::Instrumented , Instrument } ;
use tracing ::{ debug , error , info , info_span , instrument ::Instrumented , warn , Instrument } ;
use hyperlane_base ::{ CheckpointSyncer , CoreMetrics } ;
use hyperlane_core ::{ Announcement , HyperlaneDomain , HyperlaneSigner , HyperlaneSignerExt , Mailbox } ;
use hyperlane_core ::{
Announcement , HyperlaneDomain , HyperlaneSigner , HyperlaneSignerExt , Mailbox , ValidatorAnnounce ,
H256 , U256 ,
} ;
pub ( crate ) struct ValidatorSubmitter {
interval : Duration ,
reorg_period : Option < NonZeroU64 > ,
signer : Arc < dyn HyperlaneSigner > ,
mailbox : Arc < dyn Mailbox > ,
validator_announce : Arc < dyn ValidatorAnnounce > ,
checkpoint_syncer : Arc < dyn CheckpointSyncer > ,
metrics : ValidatorSubmitterMetrics ,
}
@ -24,6 +28,7 @@ impl ValidatorSubmitter {
interval : Duration ,
reorg_period : u64 ,
mailbox : Arc < dyn Mailbox > ,
validator_announce : Arc < dyn ValidatorAnnounce > ,
signer : Arc < dyn HyperlaneSigner > ,
checkpoint_syncer : Arc < dyn CheckpointSyncer > ,
metrics : ValidatorSubmitterMetrics ,
@ -32,6 +37,7 @@ impl ValidatorSubmitter {
reorg_period : NonZeroU64 ::new ( reorg_period ) ,
interval ,
mailbox ,
validator_announce ,
signer ,
checkpoint_syncer ,
metrics ,
@ -43,7 +49,7 @@ impl ValidatorSubmitter {
tokio ::spawn ( async move { self . main_task ( ) . await } ) . instrument ( span )
}
async fn m ai n_task( self ) -> Result < ( ) > {
async fn announce _task ( & self ) -> Result < ( ) > {
// Sign and post the validator announcement
let announcement = Announcement {
validator : self . signer . eth_address ( ) ,
@ -51,11 +57,60 @@ impl ValidatorSubmitter {
mailbox_domain : self . mailbox . domain ( ) . id ( ) ,
storage_location : self . checkpoint_syncer . announcement_location ( ) ,
} ;
let signed_announcement = self . signer . sign ( announcement ) . await ? ;
let signed_announcement = self . signer . sign ( announcement . clone ( ) ) . await ? ;
self . checkpoint_syncer
. write_announcement ( & signed_announcement )
. await ? ;
// Ensure that the validator has announced themselves before we enter
// the main validator submit loop. This is to avoid a situation in
// which the validator is signing checkpoints but has not announced
// their locations, which makes them functionally unusable.
let validators : [ H256 ; 1 ] = [ self . signer . eth_address ( ) . into ( ) ] ;
loop {
info ! ( "Checking for validator announcement" ) ;
if let Some ( locations ) = self
. validator_announce
. get_announced_storage_locations ( & validators )
. await ?
. first ( )
{
if locations . contains ( & self . checkpoint_syncer . announcement_location ( ) ) {
info ! ( "Validator has announced signature storage location" ) ;
break ;
}
info ! ( "Validator has not announced signature storage location" ) ;
let balance_delta = self
. validator_announce
. announce_tokens_needed ( signed_announcement . clone ( ) )
. await ? ;
if balance_delta > U256 ::zero ( ) {
warn ! (
tokens_needed = % balance_delta ,
validator_address = ? announcement . validator ,
"Please send tokens to the validator address to announce" ,
) ;
} else {
let outcome = self
. validator_announce
. announce ( signed_announcement . clone ( ) , None )
. await ? ;
if ! outcome . executed {
error ! (
hash = ? outcome . txid ,
"Transaction attempting to announce validator reverted"
) ;
}
}
}
sleep ( self . interval ) . await ;
}
Ok ( ( ) )
}
async fn main_task ( self ) -> Result < ( ) > {
self . announce_task ( ) . await ? ;
// Ensure that the mailbox has > 0 messages before we enter the main
// validator submit loop. This is to avoid an underflow / reverted
// call when we invoke the `mailbox.latest_checkpoint()` method,