feat: use multiple `mpsc` channels instead of `broadcast` for async interface (#4190)

### Description

Follow up to
https://github.com/hyperlane-xyz/hyperlane-monorepo/pull/4180

Lowers txid broadcast channel capacity from 1M to 30k and refactors it
to use a vec of `mpsc` under the hood.

### Context
The heap profiler showed that each new chain takes up 100MB of memory
because of the txid broadcast channel. In addition to this, the tokio
broadcast channel is a ring buffer where new items simply overwrite old
items, making it hard to safely lower the capacity.

As such, to be able to safely lower the capacity of this channel, usage
of `tokio`'s `broadcast` channel is replaced with a `Vec` of `mpsc`s,
which offer an async `send` interface for backpressure. So instead of
overwriting old items, senders will block on sends, which makes sense in
our case as we absolutely don't want to miss items. The submitter is
slower than the indexer based on empirical observation, so blocking on
channel size doesn't bottleneck throughput.

I noticed the max number of channel items we can reach is 29k, by
running the RC relayer after an 8 day break, during which around 160k
messages were submitted. Lowering the capacity to 30k should cause us to
only encounter backpressure in very rare cases, and the footprint of
this channel would lower from 100MB to 3MB.

### Drive-by changes

The delivery indexer in the scraper is no longer passed a txid receiver,
since message deliveries are not expected to occur in the same tx as
message dispatches.

### Related issues

- Fixes https://github.com/hyperlane-xyz/issues/issues/1293

### Testing

E2E, by adding invariants to make sure txid indexing works

---------

Co-authored-by: Trevor Porter <tkporter4@gmail.com>
pull/4219/head
Daniel Savu 4 months ago committed by GitHub
parent 98eb680adf
commit b643fba90a
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
  1. 26
      rust/Cargo.lock
  2. 2
      rust/Cargo.toml
  3. 15
      rust/agents/relayer/src/relayer.rs
  4. 30
      rust/agents/scraper/src/agent.rs
  5. 2
      rust/chains/hyperlane-ethereum/src/rpc_clients/trait_builder.rs
  6. 52
      rust/hyperlane-base/src/contract_sync/broadcast.rs
  7. 4
      rust/hyperlane-base/src/contract_sync/cursors/mod.rs
  8. 32
      rust/hyperlane-base/src/contract_sync/mod.rs
  9. 17
      rust/utils/run-locally/src/invariants.rs

26
rust/Cargo.lock generated

@ -3951,9 +3951,9 @@ dependencies = [
[[package]]
name = "hermit-abi"
version = "0.3.3"
version = "0.3.9"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "d77f7ec81a6d05a3abb01ab6eb7590f6083d08449fe5a1c8b1e620283546ccb7"
checksum = "d231dfb89cfffdbc30e7fc41579ed6066ad03abda9e567ccafae602b97ec5024"
[[package]]
name = "hex"
@ -5077,7 +5077,7 @@ version = "0.4.10"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "0bad00257d07be169d870ab665980b06cdb366d792ad690bf2e76876dc503455"
dependencies = [
"hermit-abi 0.3.3",
"hermit-abi 0.3.9",
"rustix",
"windows-sys 0.52.0",
]
@ -5525,13 +5525,14 @@ checksum = "9bec4598fddb13cc7b528819e697852653252b760f1228b7642679bf2ff2cd07"
[[package]]
name = "mio"
version = "0.8.10"
version = "1.0.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "8f3d0b296e374a4e6f3c7b0a1f5a51d748a0d34c85e7dc48fc3fa9a87657fe09"
checksum = "4569e456d394deccd22ce1c1913e6ea0e54519f577285001215d33557431afe4"
dependencies = [
"hermit-abi 0.3.9",
"libc",
"wasi 0.11.0+wasi-snapshot-preview1",
"windows-sys 0.48.0",
"windows-sys 0.52.0",
]
[[package]]
@ -5845,7 +5846,7 @@ version = "1.16.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "4161fcb6d602d4d2081af7c3a45852d875a03dd337a6bfdd6e06407b61342a43"
dependencies = [
"hermit-abi 0.3.3",
"hermit-abi 0.3.9",
"libc",
]
@ -10079,22 +10080,21 @@ checksum = "1f3ccbac311fea05f86f61904b462b55fb3df8837a366dfc601a0161d0532f20"
[[package]]
name = "tokio"
version = "1.35.1"
version = "1.39.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "c89b4efa943be685f629b149f53829423f8f5531ea21249408e8e2f8671ec104"
checksum = "d040ac2b29ab03b09d4129c2f5bbd012a3ac2f79d38ff506a4bf8dd34b0eac8a"
dependencies = [
"backtrace",
"bytes",
"libc",
"mio",
"num_cpus",
"parking_lot 0.12.1",
"pin-project-lite",
"signal-hook-registry",
"socket2 0.5.5",
"tokio-macros",
"tracing",
"windows-sys 0.48.0",
"windows-sys 0.52.0",
]
[[package]]
@ -10109,9 +10109,9 @@ dependencies = [
[[package]]
name = "tokio-macros"
version = "2.2.0"
version = "2.4.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "5b8a1e28f2deaa14e508979454cb3a223b10b938b45af148bc0986de36f1923b"
checksum = "693d596312e88961bc67d7f1f97af8a70227d9f90c31bba5806eec004978d752"
dependencies = [
"proc-macro2 1.0.76",
"quote 1.0.35",

@ -173,7 +173,7 @@ tendermint-rpc = { version = "0.32.0", features = ["http-client", "tokio"] }
thiserror = "1.0"
time = "0.3"
tiny-keccak = "2.0.2"
tokio = { version = "1", features = ["parking_lot", "tracing"] }
tokio = { version = "1.37", features = ["parking_lot", "tracing"] }
tokio-metrics = { version = "0.3.1", default-features = false }
tokio-test = "0.4"
toml_edit = "0.19.14"

@ -9,6 +9,7 @@ use derive_more::AsRef;
use eyre::Result;
use futures_util::future::try_join_all;
use hyperlane_base::{
broadcast::BroadcastMpscSender,
db::{HyperlaneRocksDB, DB},
metrics::{AgentMetrics, MetricsUpdater},
settings::ChainConf,
@ -21,8 +22,8 @@ use hyperlane_core::{
};
use tokio::{
sync::{
broadcast::{Receiver, Sender},
mpsc::{self, UnboundedSender},
broadcast::Sender as BroadcastSender,
mpsc::{self, Receiver as MpscReceiver, UnboundedSender},
RwLock,
},
task::JoinHandle,
@ -309,7 +310,7 @@ impl BaseAgent for Relayer {
}));
tasks.push(console_server.instrument(info_span!("Tokio console server")));
}
let sender = Sender::<MessageRetryRequest>::new(ENDPOINT_MESSAGES_QUEUE_SIZE);
let sender = BroadcastSender::<MessageRetryRequest>::new(ENDPOINT_MESSAGES_QUEUE_SIZE);
// send channels by destination chain
let mut send_channels = HashMap::with_capacity(self.destination_chains.len());
let mut prep_queues = HashMap::with_capacity(self.destination_chains.len());
@ -358,7 +359,7 @@ impl BaseAgent for Relayer {
tasks.push(
self.run_interchain_gas_payment_sync(
origin,
maybe_broadcaster.clone().map(|b| b.subscribe()),
BroadcastMpscSender::map_get_receiver(maybe_broadcaster.as_ref()).await,
task_monitor.clone(),
)
.await,
@ -366,7 +367,7 @@ impl BaseAgent for Relayer {
tasks.push(
self.run_merkle_tree_hook_syncs(
origin,
maybe_broadcaster.map(|b| b.subscribe()),
BroadcastMpscSender::map_get_receiver(maybe_broadcaster.as_ref()).await,
task_monitor.clone(),
)
.await,
@ -428,7 +429,7 @@ impl Relayer {
async fn run_interchain_gas_payment_sync(
&self,
origin: &HyperlaneDomain,
tx_id_receiver: Option<Receiver<H512>>,
tx_id_receiver: Option<MpscReceiver<H512>>,
task_monitor: TaskMonitor,
) -> Instrumented<JoinHandle<()>> {
let index_settings = self.as_ref().settings.chains[origin.name()].index_settings();
@ -453,7 +454,7 @@ impl Relayer {
async fn run_merkle_tree_hook_syncs(
&self,
origin: &HyperlaneDomain,
tx_id_receiver: Option<Receiver<H512>>,
tx_id_receiver: Option<MpscReceiver<H512>>,
task_monitor: TaskMonitor,
) -> Instrumented<JoinHandle<()>> {
let index_settings = self.as_ref().settings.chains[origin.name()].index.clone();

@ -4,14 +4,12 @@ use async_trait::async_trait;
use derive_more::AsRef;
use futures::future::try_join_all;
use hyperlane_base::{
metrics::AgentMetrics, settings::IndexSettings, BaseAgent, ChainMetrics, ContractSyncMetrics,
ContractSyncer, CoreMetrics, HyperlaneAgentCore, MetricsUpdater, SyncOptions,
broadcast::BroadcastMpscSender, metrics::AgentMetrics, settings::IndexSettings, BaseAgent,
ChainMetrics, ContractSyncMetrics, ContractSyncer, CoreMetrics, HyperlaneAgentCore,
MetricsUpdater, SyncOptions,
};
use hyperlane_core::{Delivery, HyperlaneDomain, HyperlaneMessage, InterchainGasPayment, H512};
use tokio::{
sync::broadcast::{Receiver, Sender},
task::JoinHandle,
};
use tokio::{sync::mpsc::Receiver as MpscReceiver, task::JoinHandle};
use tracing::{info_span, instrument::Instrumented, trace, Instrument};
use crate::{chain_scraper::HyperlaneSqlDb, db::ScraperDb, settings::ScraperSettings};
@ -155,7 +153,6 @@ impl Scraper {
self.contract_sync_metrics.clone(),
db.clone(),
index_settings.clone(),
maybe_broadcaster.clone().map(|b| b.subscribe()),
)
.await,
);
@ -166,7 +163,7 @@ impl Scraper {
self.contract_sync_metrics.clone(),
db,
index_settings.clone(),
maybe_broadcaster.map(|b| b.subscribe()),
BroadcastMpscSender::<H512>::map_get_receiver(maybe_broadcaster.as_ref()).await,
)
.await,
);
@ -187,7 +184,10 @@ impl Scraper {
contract_sync_metrics: Arc<ContractSyncMetrics>,
db: HyperlaneSqlDb,
index_settings: IndexSettings,
) -> (Instrumented<JoinHandle<()>>, Option<Sender<H512>>) {
) -> (
Instrumented<JoinHandle<()>>,
Option<BroadcastMpscSender<H512>>,
) {
let sync = self
.as_ref()
.settings
@ -215,7 +215,6 @@ impl Scraper {
contract_sync_metrics: Arc<ContractSyncMetrics>,
db: HyperlaneSqlDb,
index_settings: IndexSettings,
tx_id_receiver: Option<Receiver<H512>>,
) -> Instrumented<JoinHandle<()>> {
let sync = self
.as_ref()
@ -231,11 +230,10 @@ impl Scraper {
let label = "message_delivery";
let cursor = sync.cursor(index_settings.clone()).await;
tokio::spawn(async move {
sync.sync(label, SyncOptions::new(Some(cursor), tx_id_receiver))
.await
})
.instrument(info_span!("ChainContractSync", chain=%domain.name(), event=label))
// there is no txid receiver for delivery indexing, since delivery txs aren't batched with
// other types of indexed txs / events
tokio::spawn(async move { sync.sync(label, SyncOptions::new(Some(cursor), None)).await })
.instrument(info_span!("ChainContractSync", chain=%domain.name(), event=label))
}
async fn build_interchain_gas_payment_indexer(
@ -245,7 +243,7 @@ impl Scraper {
contract_sync_metrics: Arc<ContractSyncMetrics>,
db: HyperlaneSqlDb,
index_settings: IndexSettings,
tx_id_receiver: Option<Receiver<H512>>,
tx_id_receiver: Option<MpscReceiver<H512>>,
) -> Instrumented<JoinHandle<()>> {
let sync = self
.as_ref()

@ -213,11 +213,13 @@ pub trait BuildableWithProvider {
M: Middleware + 'static,
{
Ok(if let Some(signer) = signer {
println!("Building provider with siger");
let signing_provider = wrap_with_signer(provider, signer)
.await
.map_err(ChainCommunicationError::from_other)?;
self.build_with_provider(signing_provider, conn, locator)
} else {
println!("Building provider without siger");
self.build_with_provider(provider, conn, locator)
}
.await)

@ -0,0 +1,52 @@
use std::sync::Arc;
use derive_new::new;
use eyre::Result;
use hyperlane_core::H512;
use tokio::sync::{
mpsc::{Receiver as MpscReceiver, Sender as MpscSender},
Mutex,
};
#[derive(Debug, Clone, new)]
/// Wrapper around a vec of mpsc senders that broadcasts messages to all of them.
/// This is a workaround to get an async interface for `send`, so senders are blocked if any of the receiving channels is full,
/// rather than overwriting old messages (as the `broadcast` channel ring buffer implementation does).
pub struct BroadcastMpscSender<T> {
capacity: usize,
/// To make this safe to `Clone`, the sending end has to be in an arc-mutex.
/// Otherwise it would be possible to call `get_receiver` and create new receiver-sender pairs, whose sender is later dropped
/// because the other `BroadcastMpscSender`s have no reference to it. The receiver would then point to a closed
/// channel. So all instances of `BroadcastMpscSender` have to point to the entire set of senders.
#[new(default)]
sender: Arc<Mutex<Vec<MpscSender<T>>>>,
}
impl BroadcastMpscSender<H512> {
/// Send a message to all the receiving channels.
// This will block if at least one of the receiving channels is full
pub async fn send(&self, txid: H512) -> Result<()> {
let senders = self.sender.lock().await;
for sender in &*senders {
sender.send(txid).await?
}
Ok(())
}
/// Get a receiver channel that will receive messages broadcasted by all the senders
pub async fn get_receiver(&self) -> MpscReceiver<H512> {
let (sender, receiver) = tokio::sync::mpsc::channel(self.capacity);
self.sender.lock().await.push(sender);
receiver
}
/// Utility function map an option of `BroadcastMpscSender` to an option of `MpscReceiver`
pub async fn map_get_receiver(maybe_self: Option<&Self>) -> Option<MpscReceiver<H512>> {
if let Some(s) = maybe_self {
Some(s.get_receiver().await)
} else {
None
}
}
}

@ -13,8 +13,8 @@ pub enum CursorType {
RateLimited,
}
// H512 * 1M = 64MB per origin chain
const TX_ID_CHANNEL_CAPACITY: Option<usize> = Some(1_000_000);
// H512 * 30k =~ 2MB per origin chain
const TX_ID_CHANNEL_CAPACITY: Option<usize> = Some(30_000);
pub trait Indexable {
/// Returns the configured cursor type of this type for the given domain, (e.g. `SequenceAware` or `RateLimited`)

@ -3,6 +3,7 @@ use std::{
};
use axum::async_trait;
use broadcast::BroadcastMpscSender;
use cursors::*;
use derive_new::new;
use hyperlane_core::{
@ -13,13 +14,14 @@ use hyperlane_core::{
use hyperlane_core::{Indexed, LogMeta, H512};
pub use metrics::ContractSyncMetrics;
use prometheus::core::{AtomicI64, AtomicU64, GenericCounter, GenericGauge};
use tokio::sync::broadcast::error::TryRecvError;
use tokio::sync::broadcast::{Receiver as BroadcastReceiver, Sender as BroadcastSender};
use tokio::sync::mpsc::{error::TryRecvError, Receiver as MpscReceiver};
use tokio::time::sleep;
use tracing::{debug, info, instrument, trace, warn};
use crate::settings::IndexSettings;
/// Broadcast channel utility, with async interface for `send`
pub mod broadcast;
pub(crate) mod cursors;
mod eta_calculator;
mod metrics;
@ -37,7 +39,7 @@ pub struct ContractSync<T: Indexable, D: HyperlaneLogStore<T>, I: Indexer<T>> {
db: D,
indexer: I,
metrics: ContractSyncMetrics,
broadcast_sender: Option<BroadcastSender<H512>>,
broadcast_sender: Option<BroadcastMpscSender<H512>>,
_phantom: PhantomData<T>,
}
@ -49,7 +51,7 @@ impl<T: Indexable, D: HyperlaneLogStore<T>, I: Indexer<T>> ContractSync<T, D, I>
db,
indexer,
metrics,
broadcast_sender: T::broadcast_channel_size().map(BroadcastSender::new),
broadcast_sender: T::broadcast_channel_size().map(BroadcastMpscSender::new),
_phantom: PhantomData,
}
}
@ -66,7 +68,7 @@ where
&self.domain
}
fn get_broadcaster(&self) -> Option<BroadcastSender<H512>> {
fn get_broadcaster(&self) -> Option<BroadcastMpscSender<H512>> {
self.broadcast_sender.clone()
}
@ -97,7 +99,7 @@ where
#[instrument(fields(domain=self.domain().name()), skip(self, recv, stored_logs_metric))]
async fn fetch_logs_from_receiver(
&self,
recv: &mut BroadcastReceiver<H512>,
recv: &mut MpscReceiver<H512>,
stored_logs_metric: &GenericCounter<AtomicU64>,
) {
loop {
@ -121,11 +123,11 @@ where
);
}
Err(TryRecvError::Empty) => {
trace!("No txid received");
trace!("No tx id received");
break;
}
Err(err) => {
warn!(?err, "Error receiving txid from channel");
warn!(?err, "Error receiving tx id from channel");
break;
}
}
@ -175,11 +177,11 @@ where
);
if let Some(tx) = self.broadcast_sender.as_ref() {
logs.iter().for_each(|(_, meta)| {
if let Err(err) = tx.send(meta.transaction_id) {
for (_, meta) in &logs {
if let Err(err) = tx.send(meta.transaction_id).await {
trace!(?err, "Error sending txid to receiver");
}
});
}
}
// Update cursor
@ -247,7 +249,7 @@ pub trait ContractSyncer<T>: Send + Sync {
fn domain(&self) -> &HyperlaneDomain;
/// If this syncer is also a broadcaster, return the channel to receive txids
fn get_broadcaster(&self) -> Option<BroadcastSender<H512>>;
fn get_broadcaster(&self) -> Option<BroadcastMpscSender<H512>>;
}
#[derive(new)]
@ -257,7 +259,7 @@ pub struct SyncOptions<T> {
// Might want to refactor into an enum later, where we either index with a cursor or rely on receiving
// txids from a channel to other indexing tasks
cursor: Option<Box<dyn ContractSyncCursor<T>>>,
tx_id_receiver: Option<BroadcastReceiver<H512>>,
tx_id_receiver: Option<MpscReceiver<H512>>,
}
impl<T> From<Box<dyn ContractSyncCursor<T>>> for SyncOptions<T> {
@ -302,7 +304,7 @@ where
ContractSync::domain(self)
}
fn get_broadcaster(&self) -> Option<BroadcastSender<H512>> {
fn get_broadcaster(&self) -> Option<BroadcastMpscSender<H512>> {
ContractSync::get_broadcaster(self)
}
}
@ -341,7 +343,7 @@ where
ContractSync::domain(self)
}
fn get_broadcaster(&self) -> Option<BroadcastSender<H512>> {
fn get_broadcaster(&self) -> Option<BroadcastMpscSender<H512>> {
ContractSync::get_broadcaster(self)
}
}

@ -65,12 +65,16 @@ pub fn termination_invariants_met(
const STORING_NEW_MESSAGE_LOG_MESSAGE: &str = "Storing new message in db";
const LOOKING_FOR_EVENTS_LOG_MESSAGE: &str = "Looking for events in index range";
const HYPER_INCOMING_BODY_LOG_MESSAGE: &str = "incoming body completed";
const TX_ID_INDEXING_LOG_MESSAGE: &str = "Found log(s) for tx id";
let relayer_logfile = File::open(log_file_path)?;
let invariant_logs = &[
STORING_NEW_MESSAGE_LOG_MESSAGE,
LOOKING_FOR_EVENTS_LOG_MESSAGE,
GAS_EXPENDITURE_LOG_MESSAGE,
HYPER_INCOMING_BODY_LOG_MESSAGE,
TX_ID_INDEXING_LOG_MESSAGE,
];
let log_counts = get_matching_lines(&relayer_logfile, invariant_logs);
// Zero insertion messages don't reach `submit` stage where gas is spent, so we only expect these logs for the other messages.
@ -93,6 +97,19 @@ pub fn termination_invariants_met(
log_counts.get(LOOKING_FOR_EVENTS_LOG_MESSAGE).unwrap() > &0,
"Didn't find any logs about looking for events in index range"
);
let total_tx_id_log_count = log_counts.get(TX_ID_INDEXING_LOG_MESSAGE).unwrap();
assert!(
// there are 3 txid-indexed events:
// - relayer: merkle insertion and gas payment
// - scraper: gas payment
// some logs are emitted for multiple events, so requiring there to be at least
// `config.kathy_messages` logs is a reasonable approximation, since all three of these events
// are expected to be logged for each message.
*total_tx_id_log_count as u64 >= config.kathy_messages,
"Didn't find as many tx id logs as expected. Found {} and expected {}",
total_tx_id_log_count,
config.kathy_messages
);
assert!(
log_counts.get(HYPER_INCOMING_BODY_LOG_MESSAGE).is_none(),
"Verbose logs not expected at the log level set in e2e"

Loading…
Cancel
Save