feat: Relayer heap profiling (#4180)

### Description

During the scalability audit we've identified relayer memory usage as a
cause of concern. This PR runs the profiler each time e2e is run and
outputs a report called `dhat-heap.json` in the current dir. That file
can be visualised with a dhat viewer, such as the one hosted by the
author [here](https://nnethercote.github.io/dh_view/dh_view.html).

It adds a conditionally compiled heap memory profiler
([dhat-rs](https://docs.rs/dhat/latest/dhat/)), which is only included
when both the `memory-profiling` feature is enabled and the
`memory-profiling` cargo profile is used.

The report can order the profiling results by total lifetime allocated
bytes, peak allocated bytes, total allocation count, etc. Below is the
result of sorting by total lifetime allocated bytes, which reveals that
the biggest memory hogs are the hook indexing channel
[senders](https://github.com/hyperlane-xyz/hyperlane-monorepo/blob/main/rust/hyperlane-base/src/contract_sync/mod.rs#L52)
(one per origin chain), taking up 100+ MB. Its size can most likely be
reduced from 1M to 1k.

![Screenshot 2024-07-23 at 14 32
21](https://github.com/user-attachments/assets/f551ebfe-3ee0-40de-b53f-e5de1bc56dfa)


### Drive-by changes

Adds the remaining IDs in the channel as a field to the hook indexing
log, to know if it's safe to lower the channel size.

### Related issues

- Fixes https://github.com/hyperlane-xyz/issues/issues/1293


### Backward compatibility

Yes, since everything is conditionally compiled / isolated in a new
cargo profile.

### Testing

Manual
dan/relayer-images-bump
Daniel Savu 4 months ago committed by GitHub
parent ed63e04c4a
commit 6140e6f397
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
  1. 1
      rust/.gitignore
  2. 32
      rust/Cargo.lock
  3. 1
      rust/Cargo.toml
  4. 3
      rust/agents/relayer/Cargo.toml
  5. 13
      rust/agents/relayer/src/main.rs
  6. 48
      rust/agents/relayer/src/memory_profiler.rs
  7. 2
      rust/agents/relayer/src/server/mod.rs
  8. 3
      rust/hyperlane-base/src/contract_sync/cursors/mod.rs
  9. 1
      rust/hyperlane-base/src/contract_sync/mod.rs
  10. 2
      rust/utils/run-locally/src/main.rs

1
rust/.gitignore vendored

@ -5,3 +5,4 @@ kathydb
hyperlane_db hyperlane_db
config/test_config.json config/test_config.json
validator_db_anvil* validator_db_anvil*
dhat-heap*

32
rust/Cargo.lock generated

@ -2128,6 +2128,22 @@ dependencies = [
"syn 1.0.109", "syn 1.0.109",
] ]
[[package]]
name = "dhat"
version = "0.3.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "98cd11d84628e233de0ce467de10b8633f4ddaecafadefc86e13b84b8739b827"
dependencies = [
"backtrace",
"lazy_static",
"mintex",
"parking_lot 0.12.1",
"rustc-hash",
"serde",
"serde_json",
"thousands",
]
[[package]] [[package]]
name = "dialoguer" name = "dialoguer"
version = "0.10.4" version = "0.10.4"
@ -4345,7 +4361,7 @@ dependencies = [
"futures-util", "futures-util",
"hex 0.4.3", "hex 0.4.3",
"hyperlane-core", "hyperlane-core",
"itertools 0.10.5", "itertools 0.12.0",
"num 0.4.1", "num 0.4.1",
"num-traits", "num-traits",
"reqwest", "reqwest",
@ -5501,6 +5517,12 @@ dependencies = [
"adler", "adler",
] ]
[[package]]
name = "mintex"
version = "0.1.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "9bec4598fddb13cc7b528819e697852653252b760f1228b7642679bf2ff2cd07"
[[package]] [[package]]
name = "mio" name = "mio"
version = "0.8.10" version = "0.8.10"
@ -7045,8 +7067,10 @@ dependencies = [
"config", "config",
"console-subscriber", "console-subscriber",
"convert_case 0.6.0", "convert_case 0.6.0",
"ctrlc",
"derive-new", "derive-new",
"derive_more", "derive_more",
"dhat",
"ethers", "ethers",
"ethers-contract", "ethers-contract",
"eyre", "eyre",
@ -9954,6 +9978,12 @@ dependencies = [
"syn 2.0.48", "syn 2.0.48",
] ]
[[package]]
name = "thousands"
version = "0.2.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "3bf63baf9f5039dadc247375c29eb13706706cfde997d0330d05aa63a77d8820"
[[package]] [[package]]
name = "thread_local" name = "thread_local"
version = "1.1.7" version = "1.1.7"

@ -80,6 +80,7 @@ curve25519-dalek = { version = "~3.2", features = ["serde"] }
derive-new = "0.5" derive-new = "0.5"
derive_builder = "0.12" derive_builder = "0.12"
derive_more = "0.99" derive_more = "0.99"
dhat = "0.3.3"
ed25519-dalek = "~1.0" ed25519-dalek = "~1.0"
eyre = "=0.6.8" eyre = "=0.6.8"
fixed-hash = "0.8.0" fixed-hash = "0.8.0"

@ -15,8 +15,10 @@ axum.workspace = true
config.workspace = true config.workspace = true
console-subscriber.workspace = true console-subscriber.workspace = true
convert_case.workspace = true convert_case.workspace = true
ctrlc = { workspace = true, features = ["termination"], optional = true }
derive-new.workspace = true derive-new.workspace = true
derive_more.workspace = true derive_more.workspace = true
dhat = { workspace = true, optional = true }
ethers-contract.workspace = true ethers-contract.workspace = true
ethers.workspace = true ethers.workspace = true
eyre.workspace = true eyre.workspace = true
@ -55,3 +57,4 @@ default = ["color-eyre", "oneline-errors"]
oneline-errors = ["hyperlane-base/oneline-errors"] oneline-errors = ["hyperlane-base/oneline-errors"]
color-eyre = ["hyperlane-base/color-eyre"] color-eyre = ["hyperlane-base/color-eyre"]
test-utils = ["hyperlane-base/test-utils"] test-utils = ["hyperlane-base/test-utils"]
memory-profiling = ["dep:ctrlc", "dep:dhat"]

@ -13,7 +13,18 @@ use hyperlane_base::agent_main;
use relayer::Relayer; use relayer::Relayer;
#[cfg(feature = "memory-profiling")]
mod memory_profiler;
#[tokio::main(flavor = "multi_thread", worker_threads = 20)] #[tokio::main(flavor = "multi_thread", worker_threads = 20)]
async fn main() -> Result<()> { async fn main() -> Result<()> {
agent_main::<Relayer>().await let agent_main_fut = agent_main::<Relayer>();
#[cfg(feature = "memory-profiling")]
memory_profiler::run_future(agent_main_fut).await?;
#[cfg(not(feature = "memory-profiling"))]
agent_main_fut.await?;
Ok(())
} }

@ -0,0 +1,48 @@
use dhat::Profiler;
use eyre::Error as Report;
#[global_allocator]
static ALLOC: dhat::Alloc = dhat::Alloc;
fn initialize() -> Option<Profiler> {
let profiler = Profiler::new_heap();
Some(profiler)
}
fn termination_handler(profiler_singleton: &mut Option<Profiler>) {
// only call drop on the profiler once
if let Some(profiler) = profiler_singleton.take() {
drop(profiler);
}
}
pub(crate) async fn run_future<F, T>(fut: F) -> Result<T, Report>
where
F: std::future::Future<Output = Result<T, Report>>,
T: Default,
{
let mut profiler_singleton = initialize();
let (shutdown_tx, shutdown_rx) = tokio::sync::oneshot::channel::<()>();
let mut shutdown_tx_singleton = Some(shutdown_tx);
ctrlc::set_handler(move || {
termination_handler(&mut profiler_singleton);
// only send the shutdown signal once
let Some(shutdown_tx) = shutdown_tx_singleton.take() else {
return;
};
if let Err(_) = shutdown_tx.send(()) {
eprintln!("failed to send shutdown signal");
}
})
.expect("Error setting termination handler");
// this `select!` isn't cancellation-safe if `fut` owns state
// but for profiling scenarios this is not a risk
tokio::select! {
res = fut => { return res; },
_ = shutdown_rx => { return Ok(T::default()) },
};
}

@ -5,7 +5,7 @@ use tokio::sync::broadcast::Sender;
use crate::msg::op_queue::OperationPriorityQueue; use crate::msg::op_queue::OperationPriorityQueue;
pub const ENDPOINT_MESSAGES_QUEUE_SIZE: usize = 1_000; pub const ENDPOINT_MESSAGES_QUEUE_SIZE: usize = 100;
pub use list_messages::*; pub use list_messages::*;
pub use message_retry::*; pub use message_retry::*;

@ -13,8 +13,7 @@ pub enum CursorType {
RateLimited, RateLimited,
} }
// H256 * 1M = 32MB per origin chain worst case // H512 * 1M = 64MB per origin chain
// With one such channel per origin chain.
const TX_ID_CHANNEL_CAPACITY: Option<usize> = Some(1_000_000); const TX_ID_CHANNEL_CAPACITY: Option<usize> = Some(1_000_000);
pub trait Indexable { pub trait Indexable {

@ -116,6 +116,7 @@ where
num_logs, num_logs,
?tx_id, ?tx_id,
sequences = ?logs.iter().map(|(log, _)| log.sequence).collect::<Vec<_>>(), sequences = ?logs.iter().map(|(log, _)| log.sequence).collect::<Vec<_>>(),
pending_ids = ?recv.len(),
"Found log(s) for tx id" "Found log(s) for tx id"
); );
} }

@ -319,7 +319,7 @@ fn main() -> ExitCode {
log!("Building rust..."); log!("Building rust...");
let build_rust = Program::new("cargo") let build_rust = Program::new("cargo")
.cmd("build") .cmd("build")
.arg("features", "test-utils") .arg("features", "test-utils memory-profiling")
.arg("bin", "relayer") .arg("bin", "relayer")
.arg("bin", "validator") .arg("bin", "validator")
.arg("bin", "scraper") .arg("bin", "scraper")

Loading…
Cancel
Save