@ -1,59 +1,48 @@
use std ::collections ::VecDeque ;
use abacus_base ::{ CoreMetrics , InboxContracts } ;
use abacus_core ::AbacusCommon ;
use abacus_core ::{ db ::AbacusDB , Signers } ;
use abacus_core ::{ AbacusCommon , InboxValidatorManager } ;
use ethers ::signers ::Signer ;
use ethers ::types ::{ Address , U256 } ;
use ethers ::types ::Address ;
use eyre ::{ bail , Result } ;
use gelato ::chains ::Chain ;
use gelato ::fwd_req_call ::{ ForwardRequestArgs , PaymentType , NATIVE_FEE_TOKEN_ADDRESS } ;
use prometheus ::{ Histogram , IntCounter , IntGauge } ;
use tokio ::sync ::mpsc ;
use tokio ::time ::{ sleep , Duration } ;
use tokio ::sync ::mpsc ::{ self , UnboundedReceiver , UnboundedSender } ;
use tokio ::time ::{ sleep , Duration , Instant } ;
use tokio ::{ sync ::mpsc ::error ::TryRecvError , task ::JoinHandle } ;
use tracing ::{ info_span , instrument ::Instrumented , Instrument } ;
use self ::fwd_req_op ::{ ForwardRequestOp , ForwardRequestOptions } ;
use crate ::msg ::gelato_submitter ::fwd_req_op ::{
ForwardRequestOp , ForwardRequestOpArgs , ForwardRequestOptions ,
} ;
use super ::SubmitMessageArgs ;
mod fwd_req_op ;
/// The max fee to use for Gelato ForwardRequests.
/// Gelato isn't charging fees on testnet. For now, use this hardcoded value
/// of 1e18, or 1.0 ether.
/// TODO: revisit when testing on mainnet and actually considering interchain
/// gas payments.
const DEFAULT_MAX_FEE : u64 = 1000000000000000000 ;
/// The default gas limit to use for Gelato ForwardRequests.
/// TODO: instead estimate gas for messages.
const DEFAULT_GAS_LIMIT : u64 = 3000000 ;
#[ derive(Debug) ]
pub ( crate ) struct GelatoSubmitter {
/// Source of messages to submit.
pub message_receiver : mpsc ::UnboundedReceiver < SubmitMessageArgs > ,
message_receiver : mpsc ::UnboundedReceiver < SubmitMessageArgs > ,
/// Inbox / InboxValidatorManager on the destination chain.
pub inbox_contracts : InboxContracts ,
inbox_contracts : InboxContracts ,
/// The outbox chain in the format expected by the Gelato crate.
pub outbox_gelato_chain : Chain ,
outbox_gelato_chain : Chain ,
/// The inbox chain in the format expected by the Gelato crate.
pub inbox_gelato_chain : Chain ,
inbox_gelato_chain : Chain ,
/// The signer of the Gelato sponsor, used for EIP-712 meta-transaction signatures.
pub gelato_sponsor_signer : Signers ,
gelato_sponsor_signer : Signers ,
/// The address of the Gelato sponsor.
pub gelato_sponsor_address : Address ,
/// Messages we are aware of that we want to eventually submit, but haven't yet, for
/// whatever reason.
pub wait_queue : VecDeque < SubmitMessageArgs > ,
gelato_sponsor_address : Address ,
/// Interface to agent rocks DB for e.g. writing delivery status upon completion.
pub _abacus_ db : AbacusDB ,
db : AbacusDB ,
/// Shared reqwest HTTP client to use for any ops to Gelato endpoints.
pub http_client : reqwest ::Client ,
http_client : reqwest ::Client ,
/// Prometheus metrics.
pub _metrics : GelatoSubmitterMetrics ,
metrics : GelatoSubmitterMetrics ,
/// Channel used by ForwardRequestOps to send that their message has been successfully processed.
message_processed_sender : UnboundedSender < SubmitMessageArgs > ,
/// Channel to receive from ForwardRequestOps that a message has been successfully processed.
message_processed_receiver : UnboundedReceiver < SubmitMessageArgs > ,
}
impl GelatoSubmitter {
@ -66,18 +55,21 @@ impl GelatoSubmitter {
http_client : reqwest ::Client ,
metrics : GelatoSubmitterMetrics ,
) -> Self {
let ( message_processed_sender , message_processed_receiver ) =
mpsc ::unbounded_channel ::< SubmitMessageArgs > ( ) ;
Self {
message_receiver ,
outbox_gelato_chain : abacus_domain_to_gelato_chain ( outbox_domain ) . unwrap ( ) ,
inbox_gelato_chain : abacus_domain_to_gelato_chain ( inbox_contracts . inbox . local_domain ( ) )
. unwrap ( ) ,
inbox_contracts ,
_abacus_ db : abacus_db ,
db : abacus_db ,
gelato_sponsor_address : gelato_sponsor_signer . address ( ) ,
gelato_sponsor_signer ,
http_client ,
_metrics : metrics ,
wait_queue : VecDeque ::new ( ) ,
metrics ,
message_processed_sender ,
message_processed_receiver ,
}
}
@ -111,77 +103,72 @@ impl GelatoSubmitter {
}
}
// Insert received messages into the front of the wait queue, ensuring
// the asc ordering by message leaf index is preserved.
for msg in received_messages . into_iter ( ) . rev ( ) {
self . wait_queue . push_front ( msg ) ;
// Spawn a ForwardRequestOp for each received message.
for msg in received_messages . into_iter ( ) {
tracing ::info ! ( msg = ? msg , "Spawning forward request op for message" ) ;
let mut op = ForwardRequestOp ::new ( ForwardRequestOpArgs {
opts : ForwardRequestOptions ::default ( ) ,
http : self . http_client . clone ( ) ,
message : msg ,
inbox_contracts : self . inbox_contracts . clone ( ) ,
sponsor_signer : self . gelato_sponsor_signer . clone ( ) ,
sponsor_address : self . gelato_sponsor_address ,
sponsor_chain : self . outbox_gelato_chain ,
destination_chain : self . inbox_gelato_chain ,
message_processed_sender : self . message_processed_sender . clone ( ) ,
} ) ;
self . metrics . active_forward_request_ops_gauge . add ( 1 ) ;
tokio ::spawn ( async move { op . run ( ) . await } ) ;
}
// TODO: correctly process the wait queue.
// Messages should be popped from the wait queue. For messages
// with successful gas estimation, a ForwardRequestOp should
// be created. Messages whose gas estimation reverts should be
// pushed to the back of the queue.
// Pick the next message to try processing.
let msg = match self . wait_queue . pop_front ( ) {
Some ( m ) = > m ,
None = > return Ok ( ( ) ) ,
} ;
let op = ForwardRequestOp {
args : self . create_forward_request_args ( msg ) ? ,
opts : ForwardRequestOptions ::default ( ) ,
signer : self . gelato_sponsor_signer . clone ( ) ,
http : self . http_client . clone ( ) ,
} ;
tokio ::spawn ( async move {
op . run ( )
. await
. expect ( "failed unimplemented forward request submit op" ) ;
} ) ;
// Pull any messages that have been successfully processed by ForwardRequestOps
loop {
match self . message_processed_receiver . try_recv ( ) {
Ok ( msg ) = > {
self . record_message_process_success ( & msg ) ? ;
}
Err ( TryRecvError ::Empty ) = > {
break ;
}
Err ( _ ) = > {
bail ! ( "Disconnected receive channel or fatal err" ) ;
}
}
}
Ok ( ( ) )
}
fn create_forward_request_args ( & self , msg : SubmitMessageArgs ) -> Result < ForwardRequestArgs > {
let calldata = self . inbox_contracts . validator_manager . process_calldata (
& msg . checkpoint ,
& msg . committed_message . message ,
& msg . proof ,
) ? ;
Ok ( ForwardRequestArgs {
chain_id : self . inbox_gelato_chain ,
target : self
. inbox_contracts
. validator_manager
. contract_address ( )
. into ( ) ,
data : calldata ,
fee_token : NATIVE_FEE_TOKEN_ADDRESS ,
payment_type : PaymentType ::AsyncGasTank ,
max_fee : DEFAULT_MAX_FEE . into ( ) ,
gas : DEFAULT_GAS_LIMIT . into ( ) ,
sponsor_chain_id : self . outbox_gelato_chain ,
nonce : U256 ::zero ( ) ,
enforce_sponsor_nonce : false ,
enforce_sponsor_nonce_ordering : false ,
sponsor : self . gelato_sponsor_address ,
} )
/// Record in AbacusDB and various metrics that this process has observed the successful
/// processing of a message. An Ok(()) value returned by this function is the 'commit' point
/// in a message's lifetime for final processing -- after this function has been seen to
/// return 'Ok(())', then without a wiped AbacusDB, we will never re-attempt processing for
/// this message again, even after the relayer restarts.
fn record_message_process_success ( & mut self , msg : & SubmitMessageArgs ) -> Result < ( ) > {
tracing ::info ! ( msg = ? msg , "Recording message as successfully processed" ) ;
self . db . mark_leaf_as_processed ( msg . leaf_index ) ? ;
self . metrics . active_forward_request_ops_gauge . sub ( 1 ) ;
self . metrics
. queue_duration_hist
. observe ( ( Instant ::now ( ) - msg . enqueue_time ) . as_secs_f64 ( ) ) ;
self . metrics . highest_submitted_leaf_index =
std ::cmp ::max ( self . metrics . highest_submitted_leaf_index , msg . leaf_index ) ;
self . metrics
. processed_gauge
. set ( self . metrics . highest_submitted_leaf_index as i64 ) ;
self . metrics . messages_processed_count . inc ( ) ;
Ok ( ( ) )
}
}
// TODO(tkporter): Drop allow dead code directive once we handle
// updating each of these metrics.
#[ allow(dead_code) ]
#[ derive(Debug) ]
pub ( crate ) struct GelatoSubmitterMetrics {
wait_queue_length_gauge : IntGauge ,
queue_duration_hist : Histogram ,
processed_gauge : IntGauge ,
messages_processed_count : IntCounter ,
active_forward_request_ops_gauge : IntGauge ,
/// Private state used to update actual metrics each tick.
highest_submitted_leaf_index : u32 ,
}
@ -189,11 +176,6 @@ pub(crate) struct GelatoSubmitterMetrics {
impl GelatoSubmitterMetrics {
pub fn new ( metrics : & CoreMetrics , outbox_chain : & str , inbox_chain : & str ) -> Self {
Self {
wait_queue_length_gauge : metrics . submitter_queue_length ( ) . with_label_values ( & [
outbox_chain ,
inbox_chain ,
"wait_queue" ,
] ) ,
queue_duration_hist : metrics
. submitter_queue_duration_histogram ( )
. with_label_values ( & [ outbox_chain , inbox_chain ] ) ,
@ -205,6 +187,9 @@ impl GelatoSubmitterMetrics {
outbox_chain ,
inbox_chain ,
] ) ,
active_forward_request_ops_gauge : metrics
. submitter_queue_length ( )
. with_label_values ( & [ outbox_chain , inbox_chain , "active_forward_request_ops" ] ) ,
highest_submitted_leaf_index : 0 ,
}
}