Multithreaded relayer (#3748)

### Description

- Sets the tokio runtime flavor to be multithreaded and hardcodes the
worker threads to 20. The latter usually defaults to the number of CPUs
in the system so 20 may be high, but it worked fine in RC.

Improved performance metrics:
- time to first `Ingesting leaf` log: **1:30 mins (multithreaded) vs
5:51 mins (single threaded)**
- time to first message in the submit queue of Optimism (one of the
slowest chains to boot up for): **8:30 mins vs 20 mins**

### Drive-by changes

Sets more descriptive `span`s when instrumenting the contract syncs

### Related issues

<!--
- Fixes #[issue number here]
-->

### Backward compatibility

Yes

### Testing

Manually, by checking the RC logs
kunal/avs-contract-deployment
Daniel Savu 7 months ago committed by GitHub
parent 5f1de67ee1
commit 5d96847ffb
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
  1. 2
      rust/agents/relayer/Cargo.toml
  2. 2
      rust/agents/relayer/src/main.rs
  3. 6
      rust/agents/relayer/src/relayer.rs

@ -31,7 +31,7 @@ serde.workspace = true
serde_json.workspace = true
strum.workspace = true
thiserror.workspace = true
tokio = { workspace = true, features = ["rt", "macros", "parking_lot"] }
tokio = { workspace = true, features = ["rt", "macros", "parking_lot", "rt-multi-thread"] }
tracing-futures.workspace = true
tracing.workspace = true

@ -21,7 +21,7 @@ mod relayer;
mod server;
mod settings;
#[tokio::main(flavor = "current_thread")]
#[tokio::main(flavor = "multi_thread", worker_threads = 20)]
async fn main() -> Result<()> {
agent_main::<Relayer>().await
}

@ -365,7 +365,7 @@ impl Relayer {
.sync("dispatched_messages", cursor)
.await
})
.instrument(info_span!("ContractSync"))
.instrument(info_span!("MessageSync"))
}
async fn run_interchain_gas_payment_sync(
@ -380,7 +380,7 @@ impl Relayer {
.clone();
let cursor = contract_sync.cursor(index_settings).await;
tokio::spawn(async move { contract_sync.clone().sync("gas_payments", cursor).await })
.instrument(info_span!("ContractSync"))
.instrument(info_span!("IgpSync"))
}
async fn run_merkle_tree_hook_syncs(
@ -391,7 +391,7 @@ impl Relayer {
let contract_sync = self.merkle_tree_hook_syncs.get(origin).unwrap().clone();
let cursor = contract_sync.cursor(index_settings).await;
tokio::spawn(async move { contract_sync.clone().sync("merkle_tree_hook", cursor).await })
.instrument(info_span!("ContractSync"))
.instrument(info_span!("MerkleTreeHookSync"))
}
fn run_message_processor(

Loading…
Cancel
Save