@ -9,12 +9,13 @@ use derive_new::new;
use eyre ::Result ;
use eyre ::Result ;
use hyperlane_base ::{ db ::HyperlaneRocksDB , CoreMetrics } ;
use hyperlane_base ::{ db ::HyperlaneRocksDB , CoreMetrics } ;
use hyperlane_core ::{
use hyperlane_core ::{
gas_used_by_operation , make_op_try , BatchItem , ChainCommunicationError , ChainResult ,
gas_used_by_operation , BatchItem , ChainCommunicationError , ChainResult , ConfirmReason ,
HyperlaneChain , HyperlaneDomain , HyperlaneMessage , Mailbox , MessageSubmissionData ,
HyperlaneChain , HyperlaneDomain , HyperlaneMessage , Mailbox , MessageSubmissionData ,
PendingOperation , PendingOperationResult , TryBatchAs , TxOutcome , H256 , U256 ,
PendingOperation , PendingOperationResult , PendingOperationStatus , ReprepareReason , TryBatchAs ,
TxOutcome , H256 , U256 ,
} ;
} ;
use prometheus ::{ IntCounter , IntGauge } ;
use prometheus ::{ IntCounter , IntGauge } ;
use tracing ::{ debug , error , info , instrument , trace , warn } ;
use tracing ::{ debug , error , info , info_span , in strument , trace , warn , Instrument } ;
use super ::{
use super ::{
gas_payment ::GasPaymentEnforcer ,
gas_payment ::GasPaymentEnforcer ,
@ -53,6 +54,7 @@ pub struct MessageContext {
pub struct PendingMessage {
pub struct PendingMessage {
pub message : HyperlaneMessage ,
pub message : HyperlaneMessage ,
ctx : Arc < MessageContext > ,
ctx : Arc < MessageContext > ,
status : PendingOperationStatus ,
app_context : Option < String > ,
app_context : Option < String > ,
#[ new(default) ]
#[ new(default) ]
submitted : bool ,
submitted : bool ,
@ -120,6 +122,21 @@ impl PendingOperation for PendingMessage {
self . message . id ( )
self . message . id ( )
}
}
fn status ( & self ) -> PendingOperationStatus {
self . status . clone ( )
}
fn set_status ( & mut self , status : PendingOperationStatus ) {
if let Err ( e ) = self
. ctx
. origin_db
. store_status_by_message_id ( & self . message . id ( ) , & self . status )
{
warn ! ( message_id = ? self . message . id ( ) , err = % e , status = % self . status , "Persisting `status` failed for message" ) ;
}
self . status = status ;
}
fn priority ( & self ) -> u32 {
fn priority ( & self ) -> u32 {
self . message . nonce
self . message . nonce
}
}
@ -132,14 +149,22 @@ impl PendingOperation for PendingMessage {
self . ctx . destination_mailbox . domain ( )
self . ctx . destination_mailbox . domain ( )
}
}
fn retrieve_status_from_db ( & self ) -> Option < PendingOperationStatus > {
match self . ctx . origin_db . retrieve_status_by_message_id ( & self . id ( ) ) {
Ok ( status ) = > status ,
Err ( e ) = > {
warn ! ( error = ? e , "Failed to retrieve status for message" ) ;
None
}
}
}
fn app_context ( & self ) -> Option < String > {
fn app_context ( & self ) -> Option < String > {
self . app_context . clone ( )
self . app_context . clone ( )
}
}
#[ instrument(skip(self), ret, fields(id=?self.id()), level = " debug " ) ]
#[ instrument(skip(self), ret, fields(id=?self.id()), level = " debug " ) ]
async fn prepare ( & mut self ) -> PendingOperationResult {
async fn prepare ( & mut self ) -> PendingOperationResult {
make_op_try ! ( | | self . on_reprepare ( ) ) ;
if ! self . is_ready ( ) {
if ! self . is_ready ( ) {
trace ! ( "Message is not ready to be submitted yet" ) ;
trace ! ( "Message is not ready to be submitted yet" ) ;
return PendingOperationResult ::NotReady ;
return PendingOperationResult ::NotReady ;
@ -148,27 +173,36 @@ impl PendingOperation for PendingMessage {
// If the message has already been processed, e.g. due to another relayer having
// If the message has already been processed, e.g. due to another relayer having
// already processed, then mark it as already-processed, and move on to
// already processed, then mark it as already-processed, and move on to
// the next tick.
// the next tick.
let is_already_delivered = op_try ! (
let is_already_delivered = match self
self . ctx
. ctx
. destination_mailbox
. destination_mailbox
. delivered ( self . message . id ( ) )
. delivered ( self . message . id ( ) )
. await ,
. await
"checking message delivery status"
{
) ;
Ok ( is_delivered ) = > is_delivered ,
Err ( err ) = > {
return self . on_reprepare ( Some ( err ) , ReprepareReason ::ErrorCheckingDeliveryStatus ) ;
}
} ;
if is_already_delivered {
if is_already_delivered {
debug ! ( "Message has already been delivered, marking as submitted." ) ;
debug ! ( "Message has already been delivered, marking as submitted." ) ;
self . submitted = true ;
self . submitted = true ;
self . set_next_attempt_after ( CONFIRM_DELAY ) ;
self . set_next_attempt_after ( CONFIRM_DELAY ) ;
return PendingOperationResult ::Confirm ;
return PendingOperationResult ::Confirm ( ConfirmReason ::AlreadySubmitted ) ;
}
}
let provider = self . ctx . destination_mailbox . provider ( ) ;
let provider = self . ctx . destination_mailbox . provider ( ) ;
// We cannot deliver to an address that is not a contract so check and drop if it isn't.
// We cannot deliver to an address that is not a contract so check and drop if it isn't.
let is_contract = op_try ! (
let is_contract = match provider . is_contract ( & self . message . recipient ) . await {
provider . is_contract ( & self . message . recipient ) . await ,
Ok ( is_contract ) = > is_contract ,
"checking if message recipient is a contract"
Err ( err ) = > {
) ;
return self . on_reprepare (
Some ( err ) ,
ReprepareReason ::ErrorCheckingIfRecipientIsContract ,
) ;
}
} ;
if ! is_contract {
if ! is_contract {
info ! (
info ! (
recipient = ? self . message . recipient ,
recipient = ? self . message . recipient ,
@ -177,56 +211,76 @@ impl PendingOperation for PendingMessage {
return PendingOperationResult ::Drop ;
return PendingOperationResult ::Drop ;
}
}
let ism_address = op_try ! (
let ism_address = match self
self . ctx
. ctx
. destination_mailbox
. destination_mailbox
. recipient_ism ( self . message . recipient )
. recipient_ism ( self . message . recipient )
. await ,
. await
"fetching ISM address. Potentially malformed recipient ISM address."
{
) ;
Ok ( ism_address ) = > ism_address ,
Err ( err ) = > {
return self . on_reprepare ( Some ( err ) , ReprepareReason ::ErrorFetchingIsmAddress ) ;
}
} ;
let message_metadata_builder = op_try ! (
let message_metadata_builder = match MessageMetadataBuilder ::new (
MessageMetadataBuilder ::new (
ism_address ,
ism_address ,
& self . message ,
& self . message ,
self . ctx . metadata_builder . clone ( ) ,
self . ctx . metadata_builder . clone ( )
)
)
. await
. await ,
{
"getting the message metadata builder"
Ok ( message_metadata_builder ) = > message_metadata_builder ,
) ;
Err ( err ) = > {
return self . on_reprepare ( Some ( err ) , ReprepareReason ::ErrorGettingMetadataBuilder ) ;
}
} ;
let metadata = match message_metadata_builder
. build ( ism_address , & self . message )
. await
{
Ok ( metadata ) = > metadata ,
Err ( err ) = > {
return self . on_reprepare ( Some ( err ) , ReprepareReason ::ErrorBuildingMetadata ) ;
}
} ;
let Some ( metadata ) = op_try ! (
let Some ( metadata ) = metadata else {
message_metadata_builder
return self . on_reprepare ::< String > ( None , ReprepareReason ::CouldNotFetchMetadata ) ;
. build ( ism_address , & self . message )
. await ,
"building metadata"
) else {
info ! ( "Could not fetch metadata" ) ;
return self . on_reprepare ( ) ;
} ;
} ;
// Estimate transaction costs for the process call. If there are issues, it's
// Estimate transaction costs for the process call. If there are issues, it's
// likely that gas estimation has failed because the message is
// likely that gas estimation has failed because the message is
// reverting. This is defined behavior, so we just log the error and
// reverting. This is defined behavior, so we just log the error and
// move onto the next tick.
// move onto the next tick.
let tx_cost_estimate = op_try ! (
let tx_cost_estimate = match self
self . ctx
. ctx
. destination_mailbox
. destination_mailbox
. process_estimate_costs ( & self . message , & metadata )
. process_estimate_costs ( & self . message , & metadata )
. await ,
. await
"estimating costs for process call"
{
) ;
Ok ( metadata ) = > metadata ,
Err ( err ) = > {
return self . on_reprepare ( Some ( err ) , ReprepareReason ::ErrorEstimatingGas ) ;
}
} ;
// If the gas payment requirement hasn't been met, move to the next tick.
// If the gas payment requirement hasn't been met, move to the next tick.
let Some ( gas_limit ) = op_try ! (
let gas_limit = match self
self . ctx
. ctx
. origin_gas_payment_enforcer
. origin_gas_payment_enforcer
. message_meets_gas_payment_requirement ( & self . message , & tx_cost_estimate )
. message_meets_gas_payment_requirement ( & self . message , & tx_cost_estimate )
. await ,
. await
"checking if message meets gas payment requirement"
{
) else {
Ok ( gas_limit ) = > gas_limit ,
warn ! ( ? tx_cost_estimate , "Gas payment requirement not met yet" ) ;
Err ( err ) = > {
return self . on_reprepare ( ) ;
return self . on_reprepare ( Some ( err ) , ReprepareReason ::ErrorCheckingGasRequirement ) ;
}
} ;
let Some ( gas_limit ) = gas_limit else {
return self . on_reprepare ::< String > ( None , ReprepareReason ::GasPaymentRequirementNotMet ) ;
} ;
} ;
// Go ahead and attempt processing of message to destination chain.
// Go ahead and attempt processing of message to destination chain.
@ -238,8 +292,8 @@ impl PendingOperation for PendingMessage {
if let Some ( max_limit ) = self . ctx . transaction_gas_limit {
if let Some ( max_limit ) = self . ctx . transaction_gas_limit {
if gas_limit > max_limit {
if gas_limit > max_limit {
info ! ( "Message delivery estimated gas exceeds max gas limit" ) ;
// TODO: consider dropping instead of repreparing in this case
return self . on_reprepare ( ) ;
return self . on_reprepare ::< String > ( None , ReprepareReason ::ExceedsMaxGasLimit ) ;
}
}
}
}
@ -288,41 +342,41 @@ impl PendingOperation for PendingMessage {
}
}
async fn confirm ( & mut self ) -> PendingOperationResult {
async fn confirm ( & mut self ) -> PendingOperationResult {
make_op_try ! ( | | {
// Provider error; just try again later
// Note: this means that we are using `NotReady` for a retryable error case
self . inc_attempts ( ) ;
PendingOperationResult ::NotReady
} ) ;
if ! self . is_ready ( ) {
if ! self . is_ready ( ) {
return PendingOperationResult ::NotReady ;
return PendingOperationResult ::NotReady ;
}
}
let is_delivered = op_try ! (
let is_delivered = match self
self . ctx
. ctx
. destination_mailbox
. destination_mailbox
. delivered ( self . message . id ( ) )
. delivered ( self . message . id ( ) )
. await ,
. await
"Confirming message delivery"
{
) ;
Ok ( is_delivered ) = > is_delivered ,
Err ( err ) = > {
return self . on_reconfirm ( Some ( err ) , "Error confirming message delivery" ) ;
}
} ;
if is_delivered {
if is_delivered {
op_try ! (
if let Err ( err ) = self . record_message_process_success ( ) {
critical : self . record_message_process_success ( ) ,
return self
"recording message process success"
. on_reconfirm ( Some ( err ) , "Error when recording message process success" ) ;
) ;
}
info ! (
info ! (
submission = ? self . submission_outcome ,
submission = ? self . submission_outcome ,
"Message successfully processed"
"Message successfully processed"
) ;
) ;
PendingOperationResult ::Success
PendingOperationResult ::Success
} else {
} else {
warn ! (
let span = info_span ! (
"Error: Transaction attempting to process message either reverted or was reorged" ,
tx_outcome = ? self . submission_outcome ,
tx_outcome = ? self . submission_outcome ,
message_id = ? self . message . id ( ) ,
message_id = ? self . message . id ( )
"Transaction attempting to process message either reverted or was reorged"
) ;
) ;
self . on_reprepare ( )
self . on_reprepare ::< String > ( None , ReprepareReason ::RevertedOrReorged )
. instrument ( span )
. into_inner ( )
}
}
}
}
@ -395,7 +449,13 @@ impl PendingMessage {
ctx : Arc < MessageContext > ,
ctx : Arc < MessageContext > ,
app_context : Option < String > ,
app_context : Option < String > ,
) -> Self {
) -> Self {
let mut pm = Self ::new ( message , ctx , app_context ) ;
let mut pm = Self ::new (
message ,
ctx ,
// Since we don't persist the message status for now, assume it's the first attempt
PendingOperationStatus ::FirstPrepareAttempt ,
app_context ,
) ;
match pm
match pm
. ctx
. ctx
. origin_db
. origin_db
@ -414,10 +474,29 @@ impl PendingMessage {
pm
pm
}
}
fn on_reprepare ( & mut self ) -> PendingOperationResult {
fn on_reprepare < E : Debug > (
& mut self ,
err : Option < E > ,
reason : ReprepareReason ,
) -> PendingOperationResult {
self . inc_attempts ( ) ;
self . inc_attempts ( ) ;
self . submitted = false ;
self . submitted = false ;
PendingOperationResult ::Reprepare
if let Some ( e ) = err {
warn ! ( error = ? e , "Repreparing message: {}" , reason . clone ( ) ) ;
} else {
warn ! ( "Repreparing message: {}" , reason . clone ( ) ) ;
}
PendingOperationResult ::Reprepare ( reason )
}
fn on_reconfirm < E : Debug > ( & mut self , err : Option < E > , reason : & str ) -> PendingOperationResult {
self . inc_attempts ( ) ;
if let Some ( e ) = err {
warn ! ( error = ? e , id = ? self . id ( ) , "Reconfirming message: {}" , reason . clone ( ) ) ;
} else {
warn ! ( id = ? self . id ( ) , "Reconfirming message: {}" , reason . clone ( ) ) ;
}
PendingOperationResult ::NotReady
}
}
fn is_ready ( & self ) -> bool {
fn is_ready ( & self ) -> bool {
@ -443,7 +522,6 @@ impl PendingMessage {
}
}
fn reset_attempts ( & mut self ) {
fn reset_attempts ( & mut self ) {
self . set_retries ( 0 ) ;
self . next_attempt_after = None ;
self . next_attempt_after = None ;
self . last_attempted_at = Instant ::now ( ) ;
self . last_attempted_at = Instant ::now ( ) ;
}
}
@ -484,8 +562,17 @@ impl PendingMessage {
i if ( 24 .. 36 ) . contains ( & i ) = > 60 * 30 ,
i if ( 24 .. 36 ) . contains ( & i ) = > 60 * 30 ,
// wait 60min for the next 12 attempts
// wait 60min for the next 12 attempts
i if ( 36 .. 48 ) . contains ( & i ) = > 60 * 60 ,
i if ( 36 .. 48 ) . contains ( & i ) = > 60 * 60 ,
// wait 3h for the next 12 attempts,
// linearly increase the backoff time after 48 attempts,
_ = > 60 * 60 * 3 ,
// adding 1h for each additional attempt
_ = > {
let hour : u64 = 60 * 60 ;
// To be extra safe, `max` to make sure it's at least 1 hour.
let target = hour . max ( ( num_retries - 47 ) as u64 * hour ) ;
// Schedule it at some random point in the next hour to
// avoid scheduling messages with the same # of retries
// at the exact same time.
target + ( rand ::random ::< u64 > ( ) % hour )
}
} ) )
} ) )
}
}
}
}