diff --git a/rust/agents/relayer/src/merkle_tree_builder.rs b/rust/agents/relayer/src/merkle_tree_builder.rs index b2e3f38a0..31e33e739 100644 --- a/rust/agents/relayer/src/merkle_tree_builder.rs +++ b/rust/agents/relayer/src/merkle_tree_builder.rs @@ -90,7 +90,7 @@ impl MerkleTreeBuilder { } fn ingest_nonce(&mut self, nonce: u32) -> Result<(), MerkleTreeBuilderError> { - match self.db.message_id_by_nonce(nonce) { + match self.db.retrieve_message_id_by_nonce(&nonce) { Ok(Some(leaf)) => { debug!(nonce, "Ingesting leaf"); self.prover.ingest(leaf).expect("!tree full"); diff --git a/rust/agents/relayer/src/msg/gas_payment/mod.rs b/rust/agents/relayer/src/msg/gas_payment/mod.rs index bce348115..4b5a66b9c 100644 --- a/rust/agents/relayer/src/msg/gas_payment/mod.rs +++ b/rust/agents/relayer/src/msg/gas_payment/mod.rs @@ -78,8 +78,8 @@ impl GasPaymentEnforcer { tx_cost_estimate: &TxCostEstimate, ) -> Result> { let msg_id = message.id(); - let current_payment = self.db.retrieve_gas_payment_for_message_id(msg_id)?; - let current_expenditure = self.db.retrieve_gas_expenditure_for_message_id(msg_id)?; + let current_payment = self.db.retrieve_gas_payment_by_message_id(msg_id)?; + let current_expenditure = self.db.retrieve_gas_expenditure_by_message_id(msg_id)?; for (policy, whitelist) in &self.policies { if !whitelist.msg_matches(message, true) { trace!( diff --git a/rust/agents/relayer/src/msg/pending_message.rs b/rust/agents/relayer/src/msg/pending_message.rs index 3689a6219..a517599da 100644 --- a/rust/agents/relayer/src/msg/pending_message.rs +++ b/rust/agents/relayer/src/msg/pending_message.rs @@ -319,7 +319,7 @@ impl PendingMessage { fn record_message_process_success(&mut self) -> Result<()> { self.ctx .origin_db - .mark_nonce_as_processed(self.message.nonce)?; + .store_processed_by_nonce(&self.message.nonce, &true)?; self.ctx.metrics.update_nonce(&self.message); self.ctx.metrics.messages_processed.inc(); Ok(()) diff --git a/rust/agents/relayer/src/msg/processor.rs b/rust/agents/relayer/src/msg/processor.rs index 5057d84d0..93fab6046 100644 --- a/rust/agents/relayer/src/msg/processor.rs +++ b/rust/agents/relayer/src/msg/processor.rs @@ -80,7 +80,7 @@ impl MessageProcessor { fn try_get_unprocessed_message(&mut self) -> Result> { loop { // First, see if we can find the message so we can update the gauge. - if let Some(message) = self.db.message_by_nonce(self.message_nonce)? { + if let Some(message) = self.db.retrieve_message_by_nonce(self.message_nonce)? { // Update the latest nonce gauges self.metrics .max_last_known_message_nonce_gauge @@ -90,7 +90,11 @@ impl MessageProcessor { } // If this message has already been processed, on to the next one. - if !self.db.retrieve_message_processed(self.message_nonce)? { + if !self + .db + .retrieve_processed_by_nonce(&self.message_nonce)? + .unwrap_or(false) + { return Ok(Some(message)); } else { debug!(nonce=?self.message_nonce, "Message already marked as processed in DB"); diff --git a/rust/hyperlane-base/src/db/rocks/hyperlane_db.rs b/rust/hyperlane-base/src/db/rocks/hyperlane_db.rs index 8a4e04b6f..b1f7a5c12 100644 --- a/rust/hyperlane-base/src/db/rocks/hyperlane_db.rs +++ b/rust/hyperlane-base/src/db/rocks/hyperlane_db.rs @@ -3,13 +3,14 @@ use std::time::Duration; use async_trait::async_trait; use eyre::Result; +use paste::paste; use tokio::time::sleep; use tracing::{debug, trace}; use hyperlane_core::{ HyperlaneDomain, HyperlaneLogStore, HyperlaneMessage, HyperlaneMessageStore, HyperlaneWatermarkedLogStore, InterchainGasExpenditure, InterchainGasPayment, - InterchainGasPaymentMeta, LogMeta, H256, U256, + InterchainGasPaymentMeta, LogMeta, H256, }; use super::{ @@ -26,7 +27,7 @@ const MESSAGE: &str = "message_"; const NONCE_PROCESSED: &str = "nonce_processed_"; const GAS_PAYMENT_FOR_MESSAGE_ID: &str = "gas_payment_for_message_id_v2_"; const GAS_PAYMENT_META_PROCESSED: &str = "gas_payment_meta_processed_v2_"; -const GAS_EXPENDITURE_FOR_MESSAGE_ID: &str = "gas_expenditure_for_message_id_"; +const GAS_EXPENDITURE_FOR_MESSAGE_ID: &str = "gas_expenditure_for_message_id_v2_"; const LATEST_INDEXED_GAS_PAYMENT_BLOCK: &str = "latest_indexed_gas_payment_block"; type DbResult = std::result::Result; @@ -77,7 +78,7 @@ impl HyperlaneRocksDB { message: &HyperlaneMessage, dispatched_block_number: u64, ) -> DbResult { - if let Ok(Some(_)) = self.message_id_by_nonce(message.nonce) { + if let Ok(Some(_)) = self.retrieve_message_id_by_nonce(&message.nonce) { trace!(msg=?message, "Message already stored in db"); return Ok(false); } @@ -86,34 +87,20 @@ impl HyperlaneRocksDB { debug!(msg=?message, "Storing new message in db",); // - `id` --> `message` - self.store_keyed_encodable(MESSAGE, &id, message)?; + self.store_message_by_id(&id, message)?; // - `nonce` --> `id` - self.store_keyed_encodable(MESSAGE_ID, &message.nonce, &id)?; + self.store_message_id_by_nonce(&message.nonce, &id)?; // - `nonce` --> `dispatched block number` - self.store_keyed_encodable( - MESSAGE_DISPATCHED_BLOCK_NUMBER, - &message.nonce, - &dispatched_block_number, - )?; + self.store_dispatched_block_number_by_nonce(&message.nonce, &dispatched_block_number)?; Ok(true) } - /// Retrieve a message by its id - pub fn message_by_id(&self, id: H256) -> DbResult> { - self.retrieve_keyed_decodable(MESSAGE, &id) - } - - /// Retrieve the message id keyed by nonce - pub fn message_id_by_nonce(&self, nonce: u32) -> DbResult> { - self.retrieve_keyed_decodable(MESSAGE_ID, &nonce) - } - /// Retrieve a message by its nonce - pub fn message_by_nonce(&self, nonce: u32) -> DbResult> { - let id: Option = self.message_id_by_nonce(nonce)?; + pub fn retrieve_message_by_nonce(&self, nonce: u32) -> DbResult> { + let id = self.retrieve_message_id_by_nonce(&nonce)?; match id { None => Ok(None), - Some(id) => self.message_by_id(id), + Some(id) => self.retrieve_message_by_id(&id), } } @@ -123,7 +110,7 @@ impl HyperlaneRocksDB { let slf = self.clone(); async move { loop { - if let Some(id) = slf.message_id_by_nonce(nonce)? { + if let Some(id) = slf.retrieve_message_id_by_nonce(&nonce)? { return Ok(id); } sleep(Duration::from_millis(100)).await @@ -131,19 +118,6 @@ impl HyperlaneRocksDB { } } - /// Mark nonce as processed - pub fn mark_nonce_as_processed(&self, nonce: u32) -> DbResult<()> { - debug!(?nonce, "mark nonce as processed"); - self.store_keyed_encodable(NONCE_PROCESSED, &nonce, &true) - } - - /// Retrieve nonce processed status - pub fn retrieve_message_processed(&self, nonce: u32) -> Result { - Ok(self - .retrieve_keyed_decodable(NONCE_PROCESSED, &nonce)? - .unwrap_or(false)) - } - /// If the provided gas payment, identified by its metadata, has not been /// processed, processes the gas payment and records it as processed. /// Returns whether the gas payment was processed for the first time. @@ -154,7 +128,10 @@ impl HyperlaneRocksDB { ) -> DbResult { let payment_meta = log_meta.into(); // If the gas payment has already been processed, do nothing - if self.retrieve_gas_payment_meta_processed(&payment_meta)? { + if self + .retrieve_processed_by_gas_payment_meta(&payment_meta)? + .unwrap_or(false) + { trace!( ?payment, ?log_meta, @@ -164,10 +141,10 @@ impl HyperlaneRocksDB { return Ok(false); } // Set the gas payment as processed - self.store_gas_payment_meta_processed(&payment_meta)?; + self.store_processed_by_gas_payment_meta(&payment_meta, &true)?; // Update the total gas payment for the message to include the payment - self.update_gas_payment_for_message_id(payment)?; + self.update_gas_payment_by_message_id(payment)?; // Return true to indicate the gas payment was processed for the first time Ok(true) @@ -177,82 +154,57 @@ impl HyperlaneRocksDB { /// message. pub fn process_gas_expenditure(&self, expenditure: InterchainGasExpenditure) -> DbResult<()> { // Update the total gas expenditure for the message to include the payment - self.update_gas_expenditure_for_message_id(expenditure) - } - - /// Record a gas payment, identified by its metadata, as processed - fn store_gas_payment_meta_processed(&self, meta: &InterchainGasPaymentMeta) -> DbResult<()> { - self.store_keyed_encodable(GAS_PAYMENT_META_PROCESSED, meta, &true) - } - - /// Get whether a gas payment, identified by its metadata, has been - /// processed already - fn retrieve_gas_payment_meta_processed( - &self, - meta: &InterchainGasPaymentMeta, - ) -> DbResult { - Ok(self - .retrieve_keyed_decodable(GAS_PAYMENT_META_PROCESSED, meta)? - .unwrap_or(false)) + self.update_gas_expenditure_by_message_id(expenditure) } /// Update the total gas payment for a message to include gas_payment - fn update_gas_payment_for_message_id(&self, event: InterchainGasPayment) -> DbResult<()> { - let existing_payment = self.retrieve_gas_payment_for_message_id(event.message_id)?; + fn update_gas_payment_by_message_id(&self, event: InterchainGasPayment) -> DbResult<()> { + let existing_payment = self.retrieve_gas_payment_by_message_id(event.message_id)?; let total = existing_payment + event; debug!(?event, new_total_gas_payment=?total, "Storing gas payment"); - self.store_keyed_encodable::<_, InterchainGasPaymentData>( - GAS_PAYMENT_FOR_MESSAGE_ID, - &total.message_id, - &total.into(), - )?; + self.store_interchain_gas_payment_data_by_message_id(&total.message_id, &total.into())?; Ok(()) } /// Update the total gas spent for a message - fn update_gas_expenditure_for_message_id( + fn update_gas_expenditure_by_message_id( &self, event: InterchainGasExpenditure, ) -> DbResult<()> { - let existing_payment = self.retrieve_gas_expenditure_for_message_id(event.message_id)?; + let existing_payment = self.retrieve_gas_expenditure_by_message_id(event.message_id)?; let total = existing_payment + event; debug!(?event, new_total_gas_payment=?total, "Storing gas payment"); - self.store_keyed_encodable::<_, U256>( - GAS_EXPENDITURE_FOR_MESSAGE_ID, + self.store_interchain_gas_expenditure_data_by_message_id( &total.message_id, - &total.tokens_used, + &InterchainGasExpenditureData { + tokens_used: total.tokens_used, + gas_used: total.gas_used, + }, )?; - Ok(()) } /// Retrieve the total gas payment for a message - pub fn retrieve_gas_payment_for_message_id( + pub fn retrieve_gas_payment_by_message_id( &self, message_id: H256, ) -> DbResult { Ok(self - .retrieve_keyed_decodable::<_, InterchainGasPaymentData>( - GAS_PAYMENT_FOR_MESSAGE_ID, - &message_id, - )? + .retrieve_interchain_gas_payment_data_by_message_id(&message_id)? .unwrap_or_default() .complete(message_id)) } /// Retrieve the total gas payment for a message - pub fn retrieve_gas_expenditure_for_message_id( + pub fn retrieve_gas_expenditure_by_message_id( &self, message_id: H256, ) -> DbResult { Ok(self - .retrieve_keyed_decodable::<_, InterchainGasExpenditureData>( - GAS_EXPENDITURE_FOR_MESSAGE_ID, - &message_id, - )? + .retrieve_interchain_gas_expenditure_data_by_message_id(&message_id)? .unwrap_or_default() .complete(message_id)) } @@ -291,13 +243,13 @@ impl HyperlaneLogStore for HyperlaneRocksDB { impl HyperlaneMessageStore for HyperlaneRocksDB { /// Gets a message by nonce. async fn retrieve_message_by_nonce(&self, nonce: u32) -> Result> { - let message = self.message_by_nonce(nonce)?; + let message = self.retrieve_message_by_nonce(nonce)?; Ok(message) } /// Retrieve dispatched block number by message nonce async fn retrieve_dispatched_block_number(&self, nonce: u32) -> Result> { - let number = self.retrieve_keyed_decodable(MESSAGE_DISPATCHED_BLOCK_NUMBER, &nonce)?; + let number = self.retrieve_dispatched_block_number_by_nonce(&nonce)?; Ok(number) } } @@ -320,3 +272,37 @@ where Ok(result) } } + +/// Generate a call to ChainSetup for the given builder +macro_rules! make_store_and_retrieve { + ($vis:vis, $name_suffix:ident, $key_prefix: ident, $key_ty:ty, $val_ty:ty$(,)?) => { + impl HyperlaneRocksDB { + paste! { + /// Stores a key value pair in the DB + $vis fn [] ( + &self, + key: &$key_ty, + val: &$val_ty, + ) -> DbResult<()> { + self.store_keyed_encodable($key_prefix, key, val) + } + + /// Retrieves a key value pair from the DB + $vis fn [] ( + &self, + key: &$key_ty, + ) -> DbResult> { + self.retrieve_keyed_decodable($key_prefix, key) + } + } + } + }; +} + +make_store_and_retrieve!(pub, message_id_by_nonce, MESSAGE_ID, u32, H256); +make_store_and_retrieve!(pub(self), message_by_id, MESSAGE, H256, HyperlaneMessage); +make_store_and_retrieve!(pub(self), dispatched_block_number_by_nonce, MESSAGE_DISPATCHED_BLOCK_NUMBER, u32, u64); +make_store_and_retrieve!(pub, processed_by_nonce, NONCE_PROCESSED, u32, bool); +make_store_and_retrieve!(pub(self), processed_by_gas_payment_meta, GAS_PAYMENT_META_PROCESSED, InterchainGasPaymentMeta, bool); +make_store_and_retrieve!(pub(self), interchain_gas_expenditure_data_by_message_id, GAS_EXPENDITURE_FOR_MESSAGE_ID, H256, InterchainGasExpenditureData); +make_store_and_retrieve!(pub(self), interchain_gas_payment_data_by_message_id, GAS_PAYMENT_FOR_MESSAGE_ID, H256, InterchainGasPaymentData); diff --git a/rust/hyperlane-base/src/db/rocks/test_utils.rs b/rust/hyperlane-base/src/db/rocks/test_utils.rs index a9ca577b7..b9f6a3933 100644 --- a/rust/hyperlane-base/src/db/rocks/test_utils.rs +++ b/rust/hyperlane-base/src/db/rocks/test_utils.rs @@ -67,13 +67,7 @@ mod test { db.store_logs(&vec![(m.clone(), meta)]).await.unwrap(); - let by_id = db.message_by_id(m.id()).unwrap().unwrap(); - assert_eq!( - RawHyperlaneMessage::from(&by_id), - RawHyperlaneMessage::from(&m) - ); - - let by_nonce = db.message_by_nonce(m.nonce).unwrap().unwrap(); + let by_nonce = db.retrieve_message_by_nonce(m.nonce).unwrap().unwrap(); assert_eq!( RawHyperlaneMessage::from(&by_nonce), RawHyperlaneMessage::from(&m)