Block validator submitter on nonzero mailbox count (#2341)

### Description

Ensures all checkpoint submitters wait for nonzero mailbox count

### Drive-by Changes

Notably always spawns backfill submitter after seeing nonzero mailbox
count

### Related issues

Fixes https://github.com/hyperlane-xyz/hyperlane-monorepo/issues/2340

### Backward compatibility

Yes

### Testing

Updated E2E Tests
pull/2371/head
Yorke Rhodes 1 year ago committed by GitHub
parent efad4e1326
commit eb17a9af15
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 12
      rust/agents/validator/src/submit.rs
  2. 30
      rust/agents/validator/src/validator.rs
  3. 27
      rust/utils/run-locally/src/main.rs

@ -133,18 +133,6 @@ impl ValidatorSubmitter {
} }
pub(crate) async fn legacy_checkpoint_submitter(self) -> Result<()> { pub(crate) async fn legacy_checkpoint_submitter(self) -> Result<()> {
// Ensure that the mailbox has > 0 messages before we enter the main
// validator submit loop. This is to avoid an underflow / reverted
// call when we invoke the `mailbox.latest_checkpoint()` method,
// which returns the **index** of the last element in the tree
// rather than just the size. See
// https://github.com/hyperlane-network/hyperlane-monorepo/issues/575 for
// more details.
while self.mailbox.count(self.reorg_period).await? == 0 {
info!("Waiting for first message to mailbox");
sleep(self.interval).await;
}
// current_index will be None if the validator cannot find // current_index will be None if the validator cannot find
// a previously signed checkpoint // a previously signed checkpoint
let mut current_index = self.checkpoint_syncer.latest_index().await?; let mut current_index = self.checkpoint_syncer.latest_index().await?;

@ -105,6 +105,21 @@ impl BaseAgent for Validator {
async fn run(&self) -> Instrumented<JoinHandle<Result<()>>> { async fn run(&self) -> Instrumented<JoinHandle<Result<()>>> {
self.announce().await.expect("Failed to announce validator"); self.announce().await.expect("Failed to announce validator");
let reorg_period = NonZeroU64::new(self.reorg_period);
// Ensure that the mailbox has count > 0 before we begin indexing
// messages or submitting checkpoints.
while self
.mailbox
.count(reorg_period)
.await
.expect("Failed to get count of mailbox")
== 0
{
info!("Waiting for first message to mailbox");
sleep(self.interval).await;
}
let mut tasks = vec![]; let mut tasks = vec![];
tasks.push(self.run_message_sync().await); tasks.push(self.run_message_sync().await);
@ -146,19 +161,19 @@ impl Validator {
); );
let empty_tree = IncrementalMerkle::default(); let empty_tree = IncrementalMerkle::default();
let lag = NonZeroU64::new(self.reorg_period); let reorg_period = NonZeroU64::new(self.reorg_period);
let tip_tree = self let tip_tree = self
.mailbox .mailbox
.tree(lag) .tree(reorg_period)
.await .await
.expect("failed to get mailbox tree"); .expect("failed to get mailbox tree");
let mut tasks = vec![]; assert!(tip_tree.count() > 0, "mailbox tree is empty");
let backfill_target = submitter.checkpoint(&tip_tree);
let legacy_submitter = submitter.clone(); let legacy_submitter = submitter.clone();
if tip_tree.count() > 0 {
let backfill_target = submitter.checkpoint(&tip_tree);
let backfill_submitter = submitter.clone(); let backfill_submitter = submitter.clone();
let mut tasks = vec![];
tasks.push( tasks.push(
tokio::spawn(async move { tokio::spawn(async move {
backfill_submitter backfill_submitter
@ -167,7 +182,6 @@ impl Validator {
}) })
.instrument(info_span!("BackfillCheckpointSubmitter")), .instrument(info_span!("BackfillCheckpointSubmitter")),
); );
}
tasks.push( tasks.push(
tokio::spawn(async move { submitter.checkpoint_submitter(tip_tree, None).await }) tokio::spawn(async move { submitter.checkpoint_submitter(tip_tree, None).await })
@ -222,6 +236,7 @@ impl Validator {
validator_address=?announcement.validator, validator_address=?announcement.validator,
"Please send tokens to the validator address to announce", "Please send tokens to the validator address to announce",
); );
sleep(self.interval).await;
} else { } else {
let outcome = self let outcome = self
.validator_announce .validator_announce
@ -235,7 +250,6 @@ impl Validator {
} }
} }
} }
sleep(self.interval).await;
} }
Ok(()) Ok(())
} }

@ -373,7 +373,29 @@ fn main() -> ExitCode {
state.watchers.push(scraper_stderr); state.watchers.push(scraper_stderr);
state.scraper = Some(scraper); state.scraper = Some(scraper);
// Send half the kathy messages before starting the agents let mut validator_iter = validator_envs.iter();
// spawn 1st validator before any messages have been sent to test empty mailbox
let validator1_env = validator_iter.next().unwrap();
let (validator, validator_stdout, validator_stderr) = run_agent(
"validator",
&common_env
.clone()
.into_iter()
.chain(validator1_env.clone())
.collect(),
&[],
"VAL1",
log_all,
&log_dir,
);
state.watchers.push(validator_stdout);
state.watchers.push(validator_stderr);
state.validators.push(validator);
sleep(Duration::from_secs(5));
// Send half the kathy messages before starting the rest of the agents
let mut kathy = Command::new("yarn"); let mut kathy = Command::new("yarn");
kathy kathy
.arg("kathy") .arg("kathy")
@ -392,7 +414,8 @@ fn main() -> ExitCode {
state.watchers.push(kathy_stderr); state.watchers.push(kathy_stderr);
kathy.wait().unwrap(); kathy.wait().unwrap();
for (i, validator_env) in validator_envs.iter().enumerate() { // spawn the rest of the validators
for (i, validator_env) in validator_iter.enumerate() {
let (validator, validator_stdout, validator_stderr) = run_agent( let (validator, validator_stdout, validator_stderr) = run_agent(
"validator", "validator",
&common_env &common_env

Loading…
Cancel
Save