@ -3,18 +3,18 @@ use std::sync::Arc;
use std ::time ::{ Duration , Instant } ;
use std ::vec ;
use hyperlane_base ::db ::HyperlaneDb ;
use hyperlane_core ::rpc_clients ::call_and_retry_indefinitely ;
use hyperlane_core ::{ ChainResult , MerkleTreeHook } ;
use prometheus ::IntGauge ;
use tokio ::time ::sleep ;
use tracing ::{ debug , error , info } ;
use hyperlane_base ::db ::HyperlaneDb ;
use hyperlane_base ::{ CheckpointSyncer , CoreMetrics } ;
use hyperlane_core ::rpc_clients ::call_and_retry_indefinitely ;
use hyperlane_core ::{
accumulator ::incremental ::IncrementalMerkle , Checkpoint , CheckpointWithMessageId ,
HyperlaneChain , HyperlaneContract , HyperlaneDomain , HyperlaneSignerExt ,
} ;
use hyperlane_core ::{ ChainResult , MerkleTreeHook , ReorgEvent } ;
use hyperlane_ethereum ::SingletonSignerHandle ;
#[ derive(Clone) ]
@ -206,12 +206,32 @@ impl ValidatorSubmitter {
// If the tree's checkpoint doesn't match the correctness checkpoint, something went wrong
// and we bail loudly.
if checkpoint ! = * correctness_checkpoint {
let reorg_event = ReorgEvent ::new (
tree . root ( ) ,
correctness_checkpoint . root ,
checkpoint . index ,
chrono ::Utc ::now ( ) . timestamp ( ) as u64 ,
self . reorg_period . map ( | x | x . get ( ) ) . unwrap_or ( 0 ) ,
) ;
error ! (
? checkpoint ,
? correctness_checkpoint ,
? reorg_event ,
"Incorrect tree root, something went wrong"
) ;
panic! ( "Incorrect tree root, something went wrong" ) ;
let mut panic_message = "Incorrect tree root, something went wrong." . to_owned ( ) ;
if let Err ( e ) = self
. checkpoint_syncer
. write_reorg_status ( & reorg_event )
. await
{
panic_message . push_str ( & format! (
" Reorg troubleshooting details couldn't be written to checkpoint storage: {}" ,
e
) ) ;
}
panic! ( "{panic_message}" ) ;
}
if ! checkpoint_queue . is_empty ( ) {
@ -313,3 +333,321 @@ impl ValidatorSubmitterMetrics {
}
}
}
#[ cfg(test) ]
mod test {
use super ::* ;
use async_trait ::async_trait ;
use eyre ::Result ;
use hyperlane_base ::{
db ::{ DbResult , HyperlaneDb , InterchainGasExpenditureData , InterchainGasPaymentData } ,
AgentMetadata ,
} ;
use hyperlane_core ::{
test_utils ::dummy_domain , GasPaymentKey , HyperlaneChain , HyperlaneContract ,
HyperlaneDomain , HyperlaneMessage , HyperlaneProvider , InterchainGasPayment ,
InterchainGasPaymentMeta , MerkleTreeHook , MerkleTreeInsertion , PendingOperationStatus ,
ReorgEvent , SignedAnnouncement , SignedCheckpointWithMessageId , H160 , H256 ,
} ;
use prometheus ::Registry ;
use std ::{ fmt ::Debug , sync ::Arc , time ::Duration } ;
use tokio ::sync ::mpsc ;
mockall ::mock ! {
pub Db {
fn provider ( & self ) -> Box < dyn HyperlaneProvider > ;
}
impl Debug for Db {
fn fmt < ' a > ( & self , f : & mut std ::fmt ::Formatter < ' a > ) -> std ::fmt ::Result ;
}
impl HyperlaneDb for Db {
fn retrieve_highest_seen_message_nonce ( & self ) -> DbResult < Option < u32 > > ;
fn retrieve_message_by_nonce ( & self , nonce : u32 ) -> DbResult < Option < HyperlaneMessage > > ;
fn retrieve_processed_by_nonce ( & self , nonce : & u32 ) -> DbResult < Option < bool > > ;
fn domain ( & self ) -> & HyperlaneDomain ;
fn store_message_id_by_nonce ( & self , nonce : & u32 , id : & H256 ) -> DbResult < ( ) > ;
fn retrieve_message_id_by_nonce ( & self , nonce : & u32 ) -> DbResult < Option < H256 > > ;
fn store_message_by_id ( & self , id : & H256 , message : & HyperlaneMessage ) -> DbResult < ( ) > ;
fn retrieve_message_by_id ( & self , id : & H256 ) -> DbResult < Option < HyperlaneMessage > > ;
fn store_dispatched_block_number_by_nonce (
& self ,
nonce : & u32 ,
block_number : & u64 ,
) -> DbResult < ( ) > ;
fn retrieve_dispatched_block_number_by_nonce ( & self , nonce : & u32 ) -> DbResult < Option < u64 > > ;
fn store_processed_by_nonce ( & self , nonce : & u32 , processed : & bool ) -> DbResult < ( ) > ;
fn store_processed_by_gas_payment_meta (
& self ,
meta : & InterchainGasPaymentMeta ,
processed : & bool ,
) -> DbResult < ( ) > ;
fn retrieve_processed_by_gas_payment_meta (
& self ,
meta : & InterchainGasPaymentMeta ,
) -> DbResult < Option < bool > > ;
fn store_interchain_gas_expenditure_data_by_message_id (
& self ,
message_id : & H256 ,
data : & InterchainGasExpenditureData ,
) -> DbResult < ( ) > ;
fn retrieve_interchain_gas_expenditure_data_by_message_id (
& self ,
message_id : & H256 ,
) -> DbResult < Option < InterchainGasExpenditureData > > ;
fn store_status_by_message_id (
& self ,
message_id : & H256 ,
status : & PendingOperationStatus ,
) -> DbResult < ( ) > ;
fn retrieve_status_by_message_id (
& self ,
message_id : & H256 ,
) -> DbResult < Option < PendingOperationStatus > > ;
fn store_interchain_gas_payment_data_by_gas_payment_key (
& self ,
key : & GasPaymentKey ,
data : & InterchainGasPaymentData ,
) -> DbResult < ( ) > ;
fn retrieve_interchain_gas_payment_data_by_gas_payment_key (
& self ,
key : & GasPaymentKey ,
) -> DbResult < Option < InterchainGasPaymentData > > ;
fn store_gas_payment_by_sequence (
& self ,
sequence : & u32 ,
payment : & InterchainGasPayment ,
) -> DbResult < ( ) > ;
fn retrieve_gas_payment_by_sequence (
& self ,
sequence : & u32 ,
) -> DbResult < Option < InterchainGasPayment > > ;
fn store_gas_payment_block_by_sequence (
& self ,
sequence : & u32 ,
block_number : & u64 ,
) -> DbResult < ( ) > ;
fn retrieve_gas_payment_block_by_sequence ( & self , sequence : & u32 ) -> DbResult < Option < u64 > > ;
fn store_pending_message_retry_count_by_message_id (
& self ,
message_id : & H256 ,
count : & u32 ,
) -> DbResult < ( ) > ;
fn retrieve_pending_message_retry_count_by_message_id (
& self ,
message_id : & H256 ,
) -> DbResult < Option < u32 > > ;
fn store_merkle_tree_insertion_by_leaf_index (
& self ,
leaf_index : & u32 ,
insertion : & MerkleTreeInsertion ,
) -> DbResult < ( ) > ;
fn retrieve_merkle_tree_insertion_by_leaf_index (
& self ,
leaf_index : & u32 ,
) -> DbResult < Option < MerkleTreeInsertion > > ;
fn store_merkle_leaf_index_by_message_id (
& self ,
message_id : & H256 ,
leaf_index : & u32 ,
) -> DbResult < ( ) > ;
fn retrieve_merkle_leaf_index_by_message_id ( & self , message_id : & H256 ) -> DbResult < Option < u32 > > ;
fn store_merkle_tree_insertion_block_number_by_leaf_index (
& self ,
leaf_index : & u32 ,
block_number : & u64 ,
) -> DbResult < ( ) > ;
fn retrieve_merkle_tree_insertion_block_number_by_leaf_index (
& self ,
leaf_index : & u32 ,
) -> DbResult < Option < u64 > > ;
fn store_highest_seen_message_nonce_number ( & self , nonce : & u32 ) -> DbResult < ( ) > ;
fn retrieve_highest_seen_message_nonce_number ( & self ) -> DbResult < Option < u32 > > ;
}
}
mockall ::mock ! {
pub MerkleTreeHook { }
impl Debug for MerkleTreeHook {
fn fmt < ' a > ( & self , f : & mut std ::fmt ::Formatter < ' a > ) -> std ::fmt ::Result ;
}
impl HyperlaneChain for MerkleTreeHook {
fn domain ( & self ) -> & HyperlaneDomain ;
fn provider ( & self ) -> Box < dyn HyperlaneProvider > ;
}
impl HyperlaneContract for MerkleTreeHook {
fn address ( & self ) -> H256 ;
}
#[ async_trait ]
impl MerkleTreeHook for MerkleTreeHook {
async fn tree ( & self , lag : Option < NonZeroU64 > ) -> ChainResult < IncrementalMerkle > ;
async fn count ( & self , lag : Option < NonZeroU64 > ) -> ChainResult < u32 > ;
async fn latest_checkpoint ( & self , lag : Option < NonZeroU64 > ) -> ChainResult < Checkpoint > ;
}
}
mockall ::mock ! {
pub CheckpointSyncer { }
impl Debug for CheckpointSyncer {
fn fmt < ' a > ( & self , f : & mut std ::fmt ::Formatter < ' a > ) -> std ::fmt ::Result ;
}
#[ async_trait ]
impl CheckpointSyncer for CheckpointSyncer {
async fn latest_index ( & self ) -> Result < Option < u32 > > ;
async fn write_latest_index ( & self , index : u32 ) -> Result < ( ) > ;
async fn update_latest_index ( & self , index : u32 ) -> Result < ( ) > ;
async fn fetch_checkpoint ( & self , index : u32 ) -> Result < Option < SignedCheckpointWithMessageId > > ;
async fn write_checkpoint (
& self ,
signed_checkpoint : & SignedCheckpointWithMessageId ,
) -> Result < ( ) > ;
async fn write_metadata ( & self , metadata : & AgentMetadata ) -> Result < ( ) > ;
async fn write_announcement ( & self , signed_announcement : & SignedAnnouncement ) -> Result < ( ) > ;
fn announcement_location ( & self ) -> String ;
async fn write_reorg_status ( & self , reorg_event : & ReorgEvent ) -> Result < ( ) > ;
async fn reorg_status ( & self ) -> Result < Option < ReorgEvent > > ;
}
}
fn dummy_metrics ( ) -> ValidatorSubmitterMetrics {
let origin_domain = dummy_domain ( 0 , "dummy_origin_domain" ) ;
let core_metrics = CoreMetrics ::new ( "dummy_relayer" , 37582 , Registry ::new ( ) ) . unwrap ( ) ;
ValidatorSubmitterMetrics ::new ( & core_metrics , & origin_domain )
}
fn dummy_singleton_handle ( ) -> SingletonSignerHandle {
SingletonSignerHandle ::new ( H160 ::from_low_u64_be ( 0 ) , mpsc ::unbounded_channel ( ) . 0 )
}
fn reorg_event_is_correct (
reorg_event : & ReorgEvent ,
expected_local_merkle_tree : & IncrementalMerkle ,
mock_onchain_merkle_tree : & IncrementalMerkle ,
unix_timestamp : u64 ,
expected_reorg_period : u64 ,
) {
assert_eq! (
reorg_event . canonical_merkle_root ,
mock_onchain_merkle_tree . root ( )
) ;
assert_eq! (
reorg_event . local_merkle_root ,
expected_local_merkle_tree . root ( )
) ;
assert_eq! (
reorg_event . checkpoint_index ,
expected_local_merkle_tree . index ( )
) ;
// timestamp diff should be less than 1 second
let timestamp_diff = reorg_event . unix_timestamp as i64 - unix_timestamp as i64 ;
assert! ( timestamp_diff . abs ( ) < 1 ) ;
assert_eq! ( reorg_event . reorg_period , expected_reorg_period ) ;
}
#[ tokio::test ]
#[ should_panic(expected = " Incorrect tree root, something went wrong. " ) ]
async fn reorg_is_detected_and_persisted_to_checkpoint_storage ( ) {
let unix_timestamp = chrono ::Utc ::now ( ) . timestamp ( ) as u64 ;
let expected_reorg_period = 12 ;
let pre_reorg_merke_insertions = vec! [
MerkleTreeInsertion ::new ( 0 , H256 ::random ( ) ) ,
MerkleTreeInsertion ::new ( 1 , H256 ::random ( ) ) ,
MerkleTreeInsertion ::new ( 2 , H256 ::random ( ) ) ,
] ;
let mut expected_local_merkle_tree = IncrementalMerkle ::default ( ) ;
for insertion in pre_reorg_merke_insertions . iter ( ) {
expected_local_merkle_tree . ingest ( insertion . message_id ( ) ) ;
}
// the last leaf is different post-reorg
let post_reorg_merkle_insertions = vec! [
pre_reorg_merke_insertions [ 0 ] . clone ( ) ,
pre_reorg_merke_insertions [ 1 ] . clone ( ) ,
MerkleTreeInsertion ::new ( 2 , H256 ::random ( ) ) ,
] ;
let mut mock_onchain_merkle_tree = IncrementalMerkle ::default ( ) ;
for insertion in post_reorg_merkle_insertions . iter ( ) {
mock_onchain_merkle_tree . ingest ( insertion . message_id ( ) ) ;
}
// assert the reorg resulted in different merkle tree roots
assert_ne! (
mock_onchain_merkle_tree . root ( ) ,
expected_local_merkle_tree . root ( )
) ;
// the db returns the pre-reorg merkle tree insertions
let mut db = MockDb ::new ( ) ;
db . expect_retrieve_merkle_tree_insertion_by_leaf_index ( )
. returning ( move | sequence | {
Ok ( Some ( pre_reorg_merke_insertions [ * sequence as usize ] . clone ( ) ) )
} ) ;
// boilerplate mocks
let mut mock_merkle_tree_hook = MockMerkleTreeHook ::new ( ) ;
mock_merkle_tree_hook
. expect_address ( )
. returning ( | | H256 ::from_low_u64_be ( 0 ) ) ;
let dummy_domain = dummy_domain ( 0 , "dummy_domain" ) ;
mock_merkle_tree_hook
. expect_domain ( )
. return_const ( dummy_domain . clone ( ) ) ;
// expect the checkpoint syncer to post the reorg event to the checkpoint storage
// and not submit any checkpoints (this is checked implicitly, by not setting any `expect`s)
let mut mock_checkpoint_syncer = MockCheckpointSyncer ::new ( ) ;
let mock_onchain_merkle_tree_clone = mock_onchain_merkle_tree . clone ( ) ;
mock_checkpoint_syncer
. expect_write_reorg_status ( )
. once ( )
. returning ( move | reorg_event | {
// unit test correctness criteria
reorg_event_is_correct (
reorg_event ,
& expected_local_merkle_tree ,
& mock_onchain_merkle_tree_clone ,
unix_timestamp ,
expected_reorg_period ,
) ;
Ok ( ( ) )
} ) ;
// instantiate the validator submitter
let validator_submitter = ValidatorSubmitter ::new (
Duration ::from_secs ( 1 ) ,
expected_reorg_period ,
Arc ::new ( mock_merkle_tree_hook ) ,
dummy_singleton_handle ( ) ,
Arc ::new ( mock_checkpoint_syncer ) ,
Arc ::new ( db ) ,
dummy_metrics ( ) ,
) ;
// mock the correctness checkpoint response
let mock_onchain_checkpoint = Checkpoint {
root : mock_onchain_merkle_tree . root ( ) ,
index : mock_onchain_merkle_tree . index ( ) ,
merkle_tree_hook_address : H256 ::from_low_u64_be ( 0 ) ,
mailbox_domain : dummy_domain . id ( ) ,
} ;
// Start the submitter with an empty merkle tree, so it gets rebuilt from the db.
// A panic is expected here, as the merkle root inconsistency is a critical error that may indicate fraud.
validator_submitter
. submit_checkpoints_until_correctness_checkpoint (
& mut IncrementalMerkle ::default ( ) ,
& mock_onchain_checkpoint ,
)
. await ;
}
}