@ -25,7 +25,8 @@ use tokio::{
} ,
task ::JoinHandle ,
} ;
use tracing ::{ info , info_span , instrument ::Instrumented , warn , Instrument } ;
use tokio_metrics ::TaskMonitor ;
use tracing ::{ error , info , info_span , instrument ::Instrumented , warn , Instrument } ;
use crate ::{
merkle_tree ::builder ::MerkleTreeBuilder ,
@ -79,6 +80,8 @@ pub struct Relayer {
// or move them in `core_metrics`, like the validator metrics
agent_metrics : AgentMetrics ,
chain_metrics : ChainMetrics ,
/// Tokio console server
pub tokio_console_server : Option < console_subscriber ::Server > ,
}
impl Debug for Relayer {
@ -109,6 +112,7 @@ impl BaseAgent for Relayer {
core_metrics : Arc < CoreMetrics > ,
agent_metrics : AgentMetrics ,
chain_metrics : ChainMetrics ,
tokio_console_server : console_subscriber ::Server ,
) -> Result < Self >
where
Self : Sized ,
@ -280,13 +284,26 @@ impl BaseAgent for Relayer {
core_metrics ,
agent_metrics ,
chain_metrics ,
tokio_console_server : Some ( tokio_console_server ) ,
} )
}
#[ allow(clippy::async_yields_async) ]
async fn run ( self ) {
async fn run ( mut self ) {
let mut tasks = vec! [ ] ;
let task_monitor = tokio_metrics ::TaskMonitor ::new ( ) ;
if let Some ( tokio_console_server ) = self . tokio_console_server . take ( ) {
let console_server =
tokio ::spawn ( TaskMonitor ::instrument ( & task_monitor . clone ( ) , async move {
info ! ( "Starting tokio console server" ) ;
if let Err ( e ) = tokio_console_server . serve ( ) . await {
error ! ( error = ? e , "Tokio console server failed to start" ) ;
}
} ) ) ;
tasks . push ( console_server . instrument ( info_span ! ( "Tokio console server" ) ) ) ;
}
// run server
let mpmc_channel = MpmcChannel ::< MessageRetryRequest > ::new ( ENDPOINT_MESSAGES_QUEUE_SIZE ) ;
let custom_routes = relayer_server ::routes ( mpmc_channel . sender ( ) ) ;
@ -318,6 +335,7 @@ impl BaseAgent for Relayer {
. operation_batch_config ( )
. map ( | c | c . max_batch_size )
. unwrap_or ( 1 ) ,
task_monitor . clone ( ) ,
) ,
) ;
@ -334,15 +352,25 @@ impl BaseAgent for Relayer {
}
for origin in & self . origin_chains {
tasks . push ( self . run_message_sync ( origin ) . await ) ;
tasks . push ( self . run_interchain_gas_payment_sync ( origin ) . await ) ;
tasks . push ( self . run_merkle_tree_hook_syncs ( origin ) . await ) ;
tasks . push ( self . run_message_sync ( origin , task_monitor . clone ( ) ) . await ) ;
tasks . push (
self . run_interchain_gas_payment_sync ( origin , task_monitor . clone ( ) )
. await ,
) ;
tasks . push (
self . run_merkle_tree_hook_syncs ( origin , task_monitor . clone ( ) )
. await ,
) ;
}
// each message process attempts to send messages from a chain
for origin in & self . origin_chains {
tasks . push ( self . run_message_processor ( origin , send_channels . clone ( ) ) ) ;
tasks . push ( self . run_merkle_tree_processor ( origin ) ) ;
tasks . push ( self . run_message_processor (
origin ,
send_channels . clone ( ) ,
task_monitor . clone ( ) ,
) ) ;
tasks . push ( self . run_merkle_tree_processor ( origin , task_monitor . clone ( ) ) ) ;
}
if let Err ( err ) = try_join_all ( tasks ) . await {
@ -355,22 +383,27 @@ impl BaseAgent for Relayer {
}
impl Relayer {
async fn run_message_sync ( & self , origin : & HyperlaneDomain ) -> Instrumented < JoinHandle < ( ) > > {
async fn run_message_sync (
& self ,
origin : & HyperlaneDomain ,
task_monitor : TaskMonitor ,
) -> Instrumented < JoinHandle < ( ) > > {
let index_settings = self . as_ref ( ) . settings . chains [ origin . name ( ) ] . index_settings ( ) ;
let contract_sync = self . message_syncs . get ( origin ) . unwrap ( ) . clone ( ) ;
let cursor = contract_sync . cursor ( index_settings ) . await ;
tokio ::spawn ( async move {
tokio ::spawn ( TaskMonitor ::instrument ( & task_monitor , async move {
contract_sync
. clone ( )
. sync ( "dispatched_messages" , cursor )
. await
} )
} ) )
. instrument ( info_span ! ( "MessageSync" ) )
}
async fn run_interchain_gas_payment_sync (
& self ,
origin : & HyperlaneDomain ,
task_monitor : TaskMonitor ,
) -> Instrumented < JoinHandle < ( ) > > {
let index_settings = self . as_ref ( ) . settings . chains [ origin . name ( ) ] . index_settings ( ) ;
let contract_sync = self
@ -379,25 +412,31 @@ impl Relayer {
. unwrap ( )
. clone ( ) ;
let cursor = contract_sync . cursor ( index_settings ) . await ;
tokio ::spawn ( async move { contract_sync . clone ( ) . sync ( "gas_payments" , cursor ) . await } )
. instrument ( info_span ! ( "IgpSync" ) )
tokio ::spawn ( TaskMonitor ::instrument ( & task_monitor , async move {
contract_sync . clone ( ) . sync ( "gas_payments" , cursor ) . await
} ) )
. instrument ( info_span ! ( "IgpSync" ) )
}
async fn run_merkle_tree_hook_syncs (
& self ,
origin : & HyperlaneDomain ,
task_monitor : TaskMonitor ,
) -> Instrumented < JoinHandle < ( ) > > {
let index_settings = self . as_ref ( ) . settings . chains [ origin . name ( ) ] . index . clone ( ) ;
let contract_sync = self . merkle_tree_hook_syncs . get ( origin ) . unwrap ( ) . clone ( ) ;
let cursor = contract_sync . cursor ( index_settings ) . await ;
tokio ::spawn ( async move { contract_sync . clone ( ) . sync ( "merkle_tree_hook" , cursor ) . await } )
. instrument ( info_span ! ( "MerkleTreeHookSync" ) )
tokio ::spawn ( TaskMonitor ::instrument ( & task_monitor , async move {
contract_sync . clone ( ) . sync ( "merkle_tree_hook" , cursor ) . await
} ) )
. instrument ( info_span ! ( "MerkleTreeHookSync" ) )
}
fn run_message_processor (
& self ,
origin : & HyperlaneDomain ,
send_channels : HashMap < u32 , UnboundedSender < QueueOperation > > ,
task_monitor : TaskMonitor ,
) -> Instrumented < JoinHandle < ( ) > > {
let metrics = MessageProcessorMetrics ::new (
& self . core . metrics ,
@ -431,12 +470,16 @@ impl Relayer {
) ;
let span = info_span ! ( "MessageProcessor" , origin = % message_processor . domain ( ) ) ;
let processor = Processor ::new ( Box ::new ( message_processor ) ) ;
let processor = Processor ::new ( Box ::new ( message_processor ) , task_monitor . clone ( ) ) ;
processor . spawn ( ) . instrument ( span )
}
fn run_merkle_tree_processor ( & self , origin : & HyperlaneDomain ) -> Instrumented < JoinHandle < ( ) > > {
fn run_merkle_tree_processor (
& self ,
origin : & HyperlaneDomain ,
task_monitor : TaskMonitor ,
) -> Instrumented < JoinHandle < ( ) > > {
let metrics = MerkleTreeProcessorMetrics ::new ( ) ;
let merkle_tree_processor = MerkleTreeProcessor ::new (
self . dbs . get ( origin ) . unwrap ( ) . clone ( ) ,
@ -445,7 +488,7 @@ impl Relayer {
) ;
let span = info_span ! ( "MerkleTreeProcessor" , origin = % merkle_tree_processor . domain ( ) ) ;
let processor = Processor ::new ( Box ::new ( merkle_tree_processor ) ) ;
let processor = Processor ::new ( Box ::new ( merkle_tree_processor ) , task_monitor . clone ( ) ) ;
processor . spawn ( ) . instrument ( span )
}
@ -457,6 +500,7 @@ impl Relayer {
receiver : UnboundedReceiver < QueueOperation > ,
retry_receiver_channel : MpmcReceiver < MessageRetryRequest > ,
batch_size : u32 ,
task_monitor : TaskMonitor ,
) -> Instrumented < JoinHandle < ( ) > > {
let serial_submitter = SerialSubmitter ::new (
destination . clone ( ) ,
@ -464,10 +508,11 @@ impl Relayer {
retry_receiver_channel ,
SerialSubmitterMetrics ::new ( & self . core . metrics , destination ) ,
batch_size ,
task_monitor . clone ( ) ,
) ;
let span = info_span ! ( "SerialSubmitter" , destination = % destination ) ;
let destination = destination . clone ( ) ;
tokio ::spawn ( async move {
tokio ::spawn ( TaskMonitor ::instrument ( & task_monitor , async move {
// Propagate task panics
serial_submitter . spawn ( ) . await . unwrap_or_else ( | err | {
panic! (
@ -475,7 +520,7 @@ impl Relayer {
destination , err
)
} ) ;
} )
} ) )
. instrument ( span )
}
}