feat: HyperlaneDb trait (#4609)

### Description

Defines a `HyperlaneDb` trait for use in the validator submitter, so a
mock can be passed in to simulate indexed data from reorged state.

Although the `HyperlaneDb` trait can't be `Clone` to be object safe, as
long as we use `Arc`s everywhere, all will be ok. Eventually the aim is
to replace all direct usage of `HyperlaneRocksDb` with `Arc<dyn
HyperlaneDb>`.
pull/4619/head
Daniel Savu 2 months ago committed by GitHub
parent 4f92871c2b
commit e2b5a4c785
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
  1. 2
      rust/main/agents/relayer/src/merkle_tree/processor.rs
  2. 2
      rust/main/agents/relayer/src/msg/metadata/base.rs
  3. 5
      rust/main/agents/relayer/src/msg/pending_message.rs
  4. 168
      rust/main/agents/relayer/src/msg/processor.rs
  5. 11
      rust/main/agents/validator/src/submit.rs
  6. 4
      rust/main/agents/validator/src/validator.rs
  7. 188
      rust/main/config/testnet_config.json
  8. 34
      rust/main/hyperlane-base/src/db/error.rs
  9. 160
      rust/main/hyperlane-base/src/db/mod.rs
  10. 331
      rust/main/hyperlane-base/src/db/rocks/hyperlane_db.rs
  11. 39
      rust/main/hyperlane-base/src/db/rocks/mod.rs
  12. 2
      rust/main/hyperlane-base/src/db/rocks/typed_db.rs
  13. 9
      rust/main/hyperlane-base/src/db/storage_types.rs

@ -7,7 +7,7 @@ use std::{
use async_trait::async_trait;
use derive_new::new;
use eyre::Result;
use hyperlane_base::db::HyperlaneRocksDB;
use hyperlane_base::db::{HyperlaneDb, HyperlaneRocksDB};
use hyperlane_core::{HyperlaneDomain, MerkleTreeInsertion};
use prometheus::IntGauge;
use tokio::sync::RwLock;

@ -22,7 +22,7 @@ use crate::{
use async_trait::async_trait;
use derive_new::new;
use eyre::{Context, Result};
use hyperlane_base::db::HyperlaneRocksDB;
use hyperlane_base::db::{HyperlaneDb, HyperlaneRocksDB};
use hyperlane_base::{
settings::{ChainConf, CheckpointSyncerConf},
CheckpointSyncer, CoreMetrics, MultisigCheckpointSyncer,

@ -9,7 +9,10 @@ use std::{
use async_trait::async_trait;
use derive_new::new;
use eyre::Result;
use hyperlane_base::{db::HyperlaneRocksDB, CoreMetrics};
use hyperlane_base::{
db::{HyperlaneDb, HyperlaneRocksDB},
CoreMetrics,
};
use hyperlane_core::{
gas_used_by_operation, BatchItem, ChainCommunicationError, ChainResult, ConfirmReason,
HyperlaneChain, HyperlaneDomain, HyperlaneMessage, Mailbox, MessageSubmissionData,

@ -11,7 +11,7 @@ use derive_new::new;
use ethers::utils::hex;
use eyre::Result;
use hyperlane_base::{
db::{HyperlaneRocksDB, ProcessMessage},
db::{HyperlaneDb, HyperlaneRocksDB},
CoreMetrics,
};
use hyperlane_core::{HyperlaneDomain, HyperlaneMessage, QueueOperation};
@ -52,7 +52,7 @@ struct ForwardBackwardIterator {
impl ForwardBackwardIterator {
#[instrument(skip(db), ret)]
fn new(db: Arc<dyn ProcessMessage>) -> Self {
fn new(db: Arc<dyn HyperlaneDb>) -> Self {
let high_nonce = db.retrieve_highest_seen_message_nonce().ok().flatten();
let domain = db.domain().name().to_owned();
let high_nonce_iter = DirectionalNonceIterator::new(
@ -125,7 +125,7 @@ enum NonceDirection {
struct DirectionalNonceIterator {
nonce: Option<u32>,
direction: NonceDirection,
db: Arc<dyn ProcessMessage>,
db: Arc<dyn HyperlaneDb>,
domain_name: String,
}
@ -196,7 +196,10 @@ impl DirectionalNonceIterator {
let Some(nonce) = self.nonce else {
return Ok(false);
};
let processed = self.db.retrieve_processed_by_nonce(nonce)?.unwrap_or(false);
let processed = self
.db
.retrieve_processed_by_nonce(&nonce)?
.unwrap_or(false);
if processed {
trace!(
nonce,
@ -326,7 +329,7 @@ impl MessageProcessor {
send_channels,
destination_ctxs,
metric_app_contexts,
nonce_iterator: ForwardBackwardIterator::new(Arc::new(db) as Arc<dyn ProcessMessage>),
nonce_iterator: ForwardBackwardIterator::new(Arc::new(db) as Arc<dyn HyperlaneDb>),
}
}
@ -394,9 +397,16 @@ mod test {
use super::*;
use hyperlane_base::{
db::{test_utils, DbResult, HyperlaneRocksDB},
db::{
test_utils, DbResult, HyperlaneRocksDB, InterchainGasExpenditureData,
InterchainGasPaymentData,
},
settings::{ChainConf, ChainConnectionConf, Settings},
};
use hyperlane_core::{
GasPaymentKey, InterchainGasPayment, InterchainGasPaymentMeta, MerkleTreeInsertion,
PendingOperationStatus, H256,
};
use hyperlane_test::mocks::{MockMailboxContract, MockValidatorAnnounceContract};
use prometheus::{IntCounter, Registry};
use tokio::{
@ -591,11 +601,153 @@ mod test {
fn fmt<'a>(&self, f: &mut std::fmt::Formatter<'a>) -> std::fmt::Result;
}
impl ProcessMessage for Db {
impl HyperlaneDb for Db {
/// Retrieve the nonce of the highest processed message we're aware of
fn retrieve_highest_seen_message_nonce(&self) -> DbResult<Option<u32>>;
/// Retrieve a message by its nonce
fn retrieve_message_by_nonce(&self, nonce: u32) -> DbResult<Option<HyperlaneMessage>>;
fn retrieve_processed_by_nonce(&self, nonce: u32) -> DbResult<Option<bool>>;
/// Retrieve whether a message has been processed
fn retrieve_processed_by_nonce(&self, nonce: &u32) -> DbResult<Option<bool>>;
/// Get the origin domain of the database
fn domain(&self) -> &HyperlaneDomain;
fn store_message_id_by_nonce(&self, nonce: &u32, id: &H256) -> DbResult<()>;
fn retrieve_message_id_by_nonce(&self, nonce: &u32) -> DbResult<Option<H256>>;
fn store_message_by_id(&self, id: &H256, message: &HyperlaneMessage) -> DbResult<()>;
fn retrieve_message_by_id(&self, id: &H256) -> DbResult<Option<HyperlaneMessage>>;
fn store_dispatched_block_number_by_nonce(
&self,
nonce: &u32,
block_number: &u64,
) -> DbResult<()>;
fn retrieve_dispatched_block_number_by_nonce(&self, nonce: &u32) -> DbResult<Option<u64>>;
/// Store whether a message was processed by its nonce
fn store_processed_by_nonce(&self, nonce: &u32, processed: &bool) -> DbResult<()>;
fn store_processed_by_gas_payment_meta(
&self,
meta: &InterchainGasPaymentMeta,
processed: &bool,
) -> DbResult<()>;
fn retrieve_processed_by_gas_payment_meta(
&self,
meta: &InterchainGasPaymentMeta,
) -> DbResult<Option<bool>>;
fn store_interchain_gas_expenditure_data_by_message_id(
&self,
message_id: &H256,
data: &InterchainGasExpenditureData,
) -> DbResult<()>;
fn retrieve_interchain_gas_expenditure_data_by_message_id(
&self,
message_id: &H256,
) -> DbResult<Option<InterchainGasExpenditureData>>;
/// Store the status of an operation by its message id
fn store_status_by_message_id(
&self,
message_id: &H256,
status: &PendingOperationStatus,
) -> DbResult<()>;
/// Retrieve the status of an operation by its message id
fn retrieve_status_by_message_id(
&self,
message_id: &H256,
) -> DbResult<Option<PendingOperationStatus>>;
fn store_interchain_gas_payment_data_by_gas_payment_key(
&self,
key: &GasPaymentKey,
data: &InterchainGasPaymentData,
) -> DbResult<()>;
fn retrieve_interchain_gas_payment_data_by_gas_payment_key(
&self,
key: &GasPaymentKey,
) -> DbResult<Option<InterchainGasPaymentData>>;
fn store_gas_payment_by_sequence(
&self,
sequence: &u32,
payment: &InterchainGasPayment,
) -> DbResult<()>;
fn retrieve_gas_payment_by_sequence(
&self,
sequence: &u32,
) -> DbResult<Option<InterchainGasPayment>>;
fn store_gas_payment_block_by_sequence(
&self,
sequence: &u32,
block_number: &u64,
) -> DbResult<()>;
fn retrieve_gas_payment_block_by_sequence(&self, sequence: &u32) -> DbResult<Option<u64>>;
/// Store the retry count for a pending message by its message id
fn store_pending_message_retry_count_by_message_id(
&self,
message_id: &H256,
count: &u32,
) -> DbResult<()>;
/// Retrieve the retry count for a pending message by its message id
fn retrieve_pending_message_retry_count_by_message_id(
&self,
message_id: &H256,
) -> DbResult<Option<u32>>;
fn store_merkle_tree_insertion_by_leaf_index(
&self,
leaf_index: &u32,
insertion: &MerkleTreeInsertion,
) -> DbResult<()>;
/// Retrieve the merkle tree insertion event by its leaf index
fn retrieve_merkle_tree_insertion_by_leaf_index(
&self,
leaf_index: &u32,
) -> DbResult<Option<MerkleTreeInsertion>>;
fn store_merkle_leaf_index_by_message_id(
&self,
message_id: &H256,
leaf_index: &u32,
) -> DbResult<()>;
/// Retrieve the merkle leaf index of a message in the merkle tree
fn retrieve_merkle_leaf_index_by_message_id(&self, message_id: &H256) -> DbResult<Option<u32>>;
fn store_merkle_tree_insertion_block_number_by_leaf_index(
&self,
leaf_index: &u32,
block_number: &u64,
) -> DbResult<()>;
fn retrieve_merkle_tree_insertion_block_number_by_leaf_index(
&self,
leaf_index: &u32,
) -> DbResult<Option<u64>>;
fn store_highest_seen_message_nonce_number(&self, nonce: &u32) -> DbResult<()>;
/// Retrieve the nonce of the highest processed message we're aware of
fn retrieve_highest_seen_message_nonce_number(&self) -> DbResult<Option<u32>>;
}
}

@ -3,13 +3,14 @@ use std::sync::Arc;
use std::time::{Duration, Instant};
use std::vec;
use hyperlane_base::db::HyperlaneDb;
use hyperlane_core::rpc_clients::call_and_retry_indefinitely;
use hyperlane_core::{ChainResult, MerkleTreeHook};
use prometheus::IntGauge;
use tokio::time::sleep;
use tracing::{debug, error, info};
use hyperlane_base::{db::HyperlaneRocksDB, CheckpointSyncer, CoreMetrics};
use hyperlane_base::{CheckpointSyncer, CoreMetrics};
use hyperlane_core::{
accumulator::incremental::IncrementalMerkle, Checkpoint, CheckpointWithMessageId,
HyperlaneChain, HyperlaneContract, HyperlaneDomain, HyperlaneSignerExt,
@ -23,7 +24,7 @@ pub(crate) struct ValidatorSubmitter {
signer: SingletonSignerHandle,
merkle_tree_hook: Arc<dyn MerkleTreeHook>,
checkpoint_syncer: Arc<dyn CheckpointSyncer>,
message_db: HyperlaneRocksDB,
db: Arc<dyn HyperlaneDb>,
metrics: ValidatorSubmitterMetrics,
}
@ -34,7 +35,7 @@ impl ValidatorSubmitter {
merkle_tree_hook: Arc<dyn MerkleTreeHook>,
signer: SingletonSignerHandle,
checkpoint_syncer: Arc<dyn CheckpointSyncer>,
message_db: HyperlaneRocksDB,
db: Arc<dyn HyperlaneDb>,
metrics: ValidatorSubmitterMetrics,
) -> Self {
Self {
@ -43,7 +44,7 @@ impl ValidatorSubmitter {
merkle_tree_hook,
signer,
checkpoint_syncer,
message_db,
db,
metrics,
}
}
@ -159,7 +160,7 @@ impl ValidatorSubmitter {
// and convert the correctness_checkpoint.index to a count by adding 1.
while tree.count() as u32 <= correctness_checkpoint.index {
if let Some(insertion) = self
.message_db
.db
.retrieve_merkle_tree_insertion_by_leaf_index(&(tree.count() as u32))
.unwrap_or_else(|err| {
panic!(

@ -10,7 +10,7 @@ use tokio::{task::JoinHandle, time::sleep};
use tracing::{error, info, info_span, instrument::Instrumented, warn, Instrument};
use hyperlane_base::{
db::{HyperlaneRocksDB, DB},
db::{HyperlaneDb, HyperlaneRocksDB, DB},
metrics::AgentMetrics,
settings::ChainConf,
AgentMetadata, BaseAgent, ChainMetrics, CheckpointSyncer, ContractSyncMetrics, ContractSyncer,
@ -241,7 +241,7 @@ impl Validator {
self.merkle_tree_hook.clone(),
self.signer.clone(),
self.checkpoint_syncer.clone(),
self.db.clone(),
Arc::new(self.db.clone()) as Arc<dyn HyperlaneDb>,
ValidatorSubmitterMetrics::new(&self.core.metrics, &self.origin_chain),
);

@ -1446,6 +1446,194 @@
"index": {
"from": 1921514
}
},
"test1": {
"blockExplorers": [
{
"apiKey": "fakekey",
"apiUrl": "https://api.etherscan.io/api",
"family": "etherscan",
"name": "Etherscan",
"url": "https://etherscan.io"
}
],
"blocks": {
"confirmations": 1,
"estimateBlockTime": 3,
"reorgPeriod": 0
},
"chainId": 9913371,
"displayName": "Test 1",
"domainId": 9913371,
"isTestnet": true,
"name": "test1",
"nativeToken": {
"decimals": 18,
"name": "Ether",
"symbol": "ETH"
},
"protocol": "ethereum",
"rpcUrls": [
{
"http": "http://127.0.0.1:8545"
}
],
"aggregationHook": "0x7F54A0734c5B443E5B04cc26B54bb8ecE0455785",
"domainRoutingIsm": "0xb0279Db6a2F1E01fbC8483FCCef0Be2bC6299cC3",
"fallbackRoutingHook": "0x99bbA657f2BbC93c02D617f8bA121cB8Fc104Acf",
"interchainGasPaymaster": "0x5eb3Bc0a489C5A8288765d2336659EbCA68FCd00",
"interchainSecurityModule": "0xb0279Db6a2F1E01fbC8483FCCef0Be2bC6299cC3",
"mailbox": "0xE6E340D132b5f46d1e472DebcD681B2aBc16e57E",
"merkleTreeHook": "0x4826533B4897376654Bb4d4AD88B7faFD0C98528",
"protocolFee": "0x1291Be112d480055DaFd8a610b7d1e203891C274",
"proxyAdmin": "0xc5a5C42992dECbae36851359345FE25997F5C42d",
"storageGasOracle": "0x0E801D84Fa97b50751Dbf25036d067dCf18858bF",
"testRecipient": "0xCD8a1C3ba11CF5ECfa6267617243239504a98d90",
"validatorAnnounce": "0xb7278A61aa25c888815aFC32Ad3cC52fF24fE575",
"index": {
"from": 30
}
},
"test2": {
"blockExplorers": [
{
"apiKey": "fakekey",
"apiUrl": "https://api.etherscan.io/api",
"family": "etherscan",
"name": "Etherscan",
"url": "https://etherscan.io"
}
],
"blocks": {
"confirmations": 1,
"estimateBlockTime": 3,
"reorgPeriod": 1
},
"chainId": 9913372,
"displayName": "Test 2",
"domainId": 9913372,
"isTestnet": true,
"name": "test2",
"nativeToken": {
"decimals": 18,
"name": "Ether",
"symbol": "ETH"
},
"protocol": "ethereum",
"rpcUrls": [
{
"http": "http://127.0.0.1:8545"
}
],
"aggregationHook": "0x5f07F66a6c12BAE727A0e0C84c2f83Ef3c83b44c",
"domainRoutingIsm": "0x3Ca8f9C04c7e3E1624Ac2008F92f6F366A869444",
"fallbackRoutingHook": "0x4C4a2f8c81640e47606d3fd77B353E87Ba015584",
"interchainGasPaymaster": "0xDC11f7E700A4c898AE5CAddB1082cFfa76512aDD",
"interchainSecurityModule": "0x3Ca8f9C04c7e3E1624Ac2008F92f6F366A869444",
"mailbox": "0x7bc06c482DEAd17c0e297aFbC32f6e63d3846650",
"merkleTreeHook": "0x04C89607413713Ec9775E14b954286519d836FEf",
"protocolFee": "0x0355B7B8cb128fA5692729Ab3AAa199C1753f726",
"proxyAdmin": "0x2bdCC0de6bE1f7D2ee689a0342D76F52E8EFABa3",
"storageGasOracle": "0x21dF544947ba3E8b3c32561399E88B52Dc8b2823",
"testRecipient": "0x172076E0166D1F9Cc711C77Adf8488051744980C",
"validatorAnnounce": "0xf4B146FbA71F41E0592668ffbF264F1D186b2Ca8",
"index": {
"from": 57
}
},
"test3": {
"blockExplorers": [
{
"apiKey": "fakekey",
"apiUrl": "https://api.etherscan.io/api",
"family": "etherscan",
"name": "Etherscan",
"url": "https://etherscan.io"
}
],
"blocks": {
"confirmations": 1,
"estimateBlockTime": 3,
"reorgPeriod": 2
},
"chainId": 9913373,
"displayName": "Test 3",
"domainId": 9913373,
"isTestnet": true,
"name": "test3",
"nativeToken": {
"decimals": 18,
"name": "Ether",
"symbol": "ETH"
},
"protocol": "ethereum",
"rpcUrls": [
{
"http": "http://127.0.0.1:8545"
}
],
"aggregationHook": "0xd5BA21a5bDE25af311a900191c52ce9Fc8Ab9b8d",
"domainRoutingIsm": "0xa12fFA0B9f159BB4C54bce579611927Addc51610",
"fallbackRoutingHook": "0xf953b3A269d80e3eB0F2947630Da976B896A8C5b",
"interchainGasPaymaster": "0xe8D2A1E88c91DCd5433208d4152Cc4F399a7e91d",
"interchainSecurityModule": "0xa12fFA0B9f159BB4C54bce579611927Addc51610",
"mailbox": "0x2B0d36FACD61B71CC05ab8F3D2355ec3631C0dd5",
"merkleTreeHook": "0xA4899D35897033b927acFCf422bc745916139776",
"protocolFee": "0xCace1b78160AE76398F486c8a18044da0d66d86D",
"proxyAdmin": "0xBEc49fA140aCaA83533fB00A2BB19bDdd0290f25",
"storageGasOracle": "0xAA292E8611aDF267e563f334Ee42320aC96D0463",
"testRecipient": "0xc0F115A19107322cFBf1cDBC7ea011C19EbDB4F8",
"validatorAnnounce": "0xF8e31cb472bc70500f08Cd84917E5A1912Ec8397",
"index": {
"from": 84
}
},
"test4": {
"blockExplorers": [
{
"apiKey": "fakekey",
"apiUrl": "https://api.etherscan.io/api",
"family": "etherscan",
"name": "Etherscan",
"url": "https://etherscan.io"
}
],
"blocks": {
"confirmations": 1,
"estimateBlockTime": 3,
"reorgPeriod": 0
},
"chainId": 31337,
"displayName": "Test 4",
"domainId": 31337,
"isTestnet": true,
"name": "test4",
"nativeToken": {
"decimals": 18,
"name": "Ether",
"symbol": "ETH"
},
"protocol": "ethereum",
"rpcUrls": [
{
"http": "http://127.0.0.1:8545"
}
],
"aggregationHook": "0xfD4Ab5938aAcE9B094cc3B298d18be83E170B2fc",
"domainRoutingIsm": "0x532B02BD614Fd18aEE45603d02866cFb77575CB3",
"fallbackRoutingHook": "0x1f10F3Ba7ACB61b2F50B9d6DdCf91a6f787C0E82",
"interchainGasPaymaster": "0x5fc748f1FEb28d7b76fa1c6B07D8ba2d5535177c",
"interchainSecurityModule": "0x532B02BD614Fd18aEE45603d02866cFb77575CB3",
"mailbox": "0x07882Ae1ecB7429a84f1D53048d35c4bB2056877",
"merkleTreeHook": "0xE3011A37A904aB90C8881a99BD1F6E21401f1522",
"protocolFee": "0x8A93d247134d91e0de6f96547cB0204e5BE8e5D8",
"proxyAdmin": "0x34B40BA116d5Dec75548a9e9A8f15411461E8c70",
"storageGasOracle": "0x457cCf29090fe5A24c19c1bc95F492168C0EaFdb",
"testRecipient": "0xd6e1afe5cA8D00A2EFC01B89997abE2De47fdfAf",
"validatorAnnounce": "0xF32D39ff9f6Aa7a7A64d7a4F00a54826Ef791a55",
"index": {
"from": 111
}
}
},
"defaultRpcConsensusType": "fallback"

@ -0,0 +1,34 @@
use std::{io, path::PathBuf};
use hyperlane_core::{ChainCommunicationError, HyperlaneProtocolError};
/// DB Error type
#[derive(thiserror::Error, Debug)]
pub enum DbError {
/// Rocks DB Error
#[error("{0}")]
RockError(#[from] rocksdb::Error),
#[error("Failed to open {path}, canonicalized as {canonicalized}: {source}")]
/// Error opening the database
OpeningError {
/// Rocksdb error during opening
#[source]
source: rocksdb::Error,
/// Raw database path provided
path: PathBuf,
/// Parsed path used
canonicalized: PathBuf,
},
/// Could not parse the provided database path string
#[error("Invalid database path supplied {1:?}; {0}")]
InvalidDbPath(#[source] io::Error, String),
/// Hyperlane Error
#[error("{0}")]
HyperlaneError(#[from] HyperlaneProtocolError),
}
impl From<DbError> for ChainCommunicationError {
fn from(value: DbError) -> Self {
ChainCommunicationError::from_other(value)
}
}

@ -1,2 +1,162 @@
pub use error::*;
use hyperlane_core::{
GasPaymentKey, HyperlaneDomain, HyperlaneMessage, InterchainGasPayment,
InterchainGasPaymentMeta, MerkleTreeInsertion, PendingOperationStatus, H256,
};
pub use rocks::*;
pub use self::storage_types::{InterchainGasExpenditureData, InterchainGasPaymentData};
mod error;
mod rocks;
pub(crate) mod storage_types;
#[allow(missing_docs)]
/// Hyperlane database interface
pub trait HyperlaneDb: Send + Sync {
/// Retrieve the nonce of the highest processed message we're aware of
fn retrieve_highest_seen_message_nonce(&self) -> DbResult<Option<u32>>;
/// Retrieve a message by its nonce
fn retrieve_message_by_nonce(&self, nonce: u32) -> DbResult<Option<HyperlaneMessage>>;
/// Retrieve whether a message has been processed
fn retrieve_processed_by_nonce(&self, nonce: &u32) -> DbResult<Option<bool>>;
/// Get the origin domain of the database
fn domain(&self) -> &HyperlaneDomain;
fn store_message_id_by_nonce(&self, nonce: &u32, id: &H256) -> DbResult<()>;
fn retrieve_message_id_by_nonce(&self, nonce: &u32) -> DbResult<Option<H256>>;
fn store_message_by_id(&self, id: &H256, message: &HyperlaneMessage) -> DbResult<()>;
fn retrieve_message_by_id(&self, id: &H256) -> DbResult<Option<HyperlaneMessage>>;
fn store_dispatched_block_number_by_nonce(
&self,
nonce: &u32,
block_number: &u64,
) -> DbResult<()>;
fn retrieve_dispatched_block_number_by_nonce(&self, nonce: &u32) -> DbResult<Option<u64>>;
/// Store whether a message was processed by its nonce
fn store_processed_by_nonce(&self, nonce: &u32, processed: &bool) -> DbResult<()>;
fn store_processed_by_gas_payment_meta(
&self,
meta: &InterchainGasPaymentMeta,
processed: &bool,
) -> DbResult<()>;
fn retrieve_processed_by_gas_payment_meta(
&self,
meta: &InterchainGasPaymentMeta,
) -> DbResult<Option<bool>>;
fn store_interchain_gas_expenditure_data_by_message_id(
&self,
message_id: &H256,
data: &InterchainGasExpenditureData,
) -> DbResult<()>;
fn retrieve_interchain_gas_expenditure_data_by_message_id(
&self,
message_id: &H256,
) -> DbResult<Option<InterchainGasExpenditureData>>;
/// Store the status of an operation by its message id
fn store_status_by_message_id(
&self,
message_id: &H256,
status: &PendingOperationStatus,
) -> DbResult<()>;
/// Retrieve the status of an operation by its message id
fn retrieve_status_by_message_id(
&self,
message_id: &H256,
) -> DbResult<Option<PendingOperationStatus>>;
fn store_interchain_gas_payment_data_by_gas_payment_key(
&self,
key: &GasPaymentKey,
data: &InterchainGasPaymentData,
) -> DbResult<()>;
fn retrieve_interchain_gas_payment_data_by_gas_payment_key(
&self,
key: &GasPaymentKey,
) -> DbResult<Option<InterchainGasPaymentData>>;
fn store_gas_payment_by_sequence(
&self,
sequence: &u32,
payment: &InterchainGasPayment,
) -> DbResult<()>;
fn retrieve_gas_payment_by_sequence(
&self,
sequence: &u32,
) -> DbResult<Option<InterchainGasPayment>>;
fn store_gas_payment_block_by_sequence(
&self,
sequence: &u32,
block_number: &u64,
) -> DbResult<()>;
fn retrieve_gas_payment_block_by_sequence(&self, sequence: &u32) -> DbResult<Option<u64>>;
/// Store the retry count for a pending message by its message id
fn store_pending_message_retry_count_by_message_id(
&self,
message_id: &H256,
count: &u32,
) -> DbResult<()>;
/// Retrieve the retry count for a pending message by its message id
fn retrieve_pending_message_retry_count_by_message_id(
&self,
message_id: &H256,
) -> DbResult<Option<u32>>;
fn store_merkle_tree_insertion_by_leaf_index(
&self,
leaf_index: &u32,
insertion: &MerkleTreeInsertion,
) -> DbResult<()>;
/// Retrieve the merkle tree insertion event by its leaf index
fn retrieve_merkle_tree_insertion_by_leaf_index(
&self,
leaf_index: &u32,
) -> DbResult<Option<MerkleTreeInsertion>>;
fn store_merkle_leaf_index_by_message_id(
&self,
message_id: &H256,
leaf_index: &u32,
) -> DbResult<()>;
/// Retrieve the merkle leaf index of a message in the merkle tree
fn retrieve_merkle_leaf_index_by_message_id(&self, message_id: &H256) -> DbResult<Option<u32>>;
fn store_merkle_tree_insertion_block_number_by_leaf_index(
&self,
leaf_index: &u32,
block_number: &u64,
) -> DbResult<()>;
fn retrieve_merkle_tree_insertion_block_number_by_leaf_index(
&self,
leaf_index: &u32,
) -> DbResult<Option<u64>>;
fn store_highest_seen_message_nonce_number(&self, nonce: &u32) -> DbResult<()>;
/// Retrieve the nonce of the highest processed message we're aware of
fn retrieve_highest_seen_message_nonce_number(&self) -> DbResult<Option<u32>>;
}

@ -1,18 +1,18 @@
use async_trait::async_trait;
use eyre::{bail, Result};
use paste::paste;
use tracing::{debug, instrument, trace};
use hyperlane_core::{
GasPaymentKey, HyperlaneDomain, HyperlaneLogStore, HyperlaneMessage,
Decode, Encode, GasPaymentKey, HyperlaneDomain, HyperlaneLogStore, HyperlaneMessage,
HyperlaneSequenceAwareIndexerStoreReader, HyperlaneWatermarkedLogStore, Indexed,
InterchainGasExpenditure, InterchainGasPayment, InterchainGasPaymentMeta, LogMeta,
MerkleTreeInsertion, PendingOperationStatus, H256,
};
use super::{
use super::{DbError, TypedDB, DB};
use crate::db::{
storage_types::{InterchainGasExpenditureData, InterchainGasPaymentData},
DbError, TypedDB, DB,
HyperlaneDb,
};
// these keys MUST not be given multiple uses in case multiple agents are
@ -120,16 +120,11 @@ impl HyperlaneRocksDB {
.retrieve_highest_seen_message_nonce()?
.unwrap_or_default();
if nonce >= current_max {
self.store_highest_seen_message_nonce_number(&Default::default(), &nonce)?;
self.store_highest_seen_message_nonce_number(&nonce)?;
}
Ok(())
}
/// Retrieve the nonce of the highest processed message we're aware of
pub fn retrieve_highest_seen_message_nonce(&self) -> DbResult<Option<u32>> {
self.retrieve_highest_seen_message_nonce_number(&Default::default())
}
/// If the provided gas payment, identified by its metadata, has not been
/// processed, processes the gas payment and records it as processed.
/// Returns whether the gas payment was processed for the first time.
@ -443,109 +438,241 @@ impl HyperlaneWatermarkedLogStore<MerkleTreeInsertion> for HyperlaneRocksDB {
}
}
/// Database interface required for processing messages
pub trait ProcessMessage: Send + Sync {
/// Retrieve the nonce of the highest processed message we're aware of
fn retrieve_highest_seen_message_nonce(&self) -> DbResult<Option<u32>>;
impl HyperlaneDb for HyperlaneRocksDB {
fn retrieve_highest_seen_message_nonce(&self) -> DbResult<Option<u32>> {
self.retrieve_highest_seen_message_nonce_number()
}
/// Retrieve a message by its nonce
fn retrieve_message_by_nonce(&self, nonce: u32) -> DbResult<Option<HyperlaneMessage>>;
fn retrieve_message_by_nonce(&self, nonce: u32) -> DbResult<Option<HyperlaneMessage>> {
self.retrieve_message_by_nonce(nonce)
}
/// Retrieve whether a message has been processed
fn retrieve_processed_by_nonce(&self, nonce: u32) -> DbResult<Option<bool>>;
fn domain(&self) -> &HyperlaneDomain {
self.domain()
}
/// Get the origin domain of the database
fn domain(&self) -> &HyperlaneDomain;
}
fn store_message_id_by_nonce(&self, nonce: &u32, id: &H256) -> DbResult<()> {
self.store_value_by_key(MESSAGE_ID, nonce, id)
}
impl ProcessMessage for HyperlaneRocksDB {
fn retrieve_highest_seen_message_nonce(&self) -> DbResult<Option<u32>> {
self.retrieve_highest_seen_message_nonce()
fn retrieve_message_id_by_nonce(&self, nonce: &u32) -> DbResult<Option<H256>> {
self.retrieve_value_by_key(MESSAGE_ID, nonce)
}
fn retrieve_message_by_nonce(&self, nonce: u32) -> DbResult<Option<HyperlaneMessage>> {
self.retrieve_message_by_nonce(nonce)
fn store_message_by_id(&self, id: &H256, message: &HyperlaneMessage) -> DbResult<()> {
self.store_value_by_key(MESSAGE, id, message)
}
fn retrieve_processed_by_nonce(&self, nonce: u32) -> DbResult<Option<bool>> {
self.retrieve_processed_by_nonce(&nonce)
fn retrieve_message_by_id(&self, id: &H256) -> DbResult<Option<HyperlaneMessage>> {
self.retrieve_value_by_key(MESSAGE, id)
}
fn domain(&self) -> &HyperlaneDomain {
self.domain()
fn store_dispatched_block_number_by_nonce(
&self,
nonce: &u32,
block_number: &u64,
) -> DbResult<()> {
self.store_value_by_key(MESSAGE_DISPATCHED_BLOCK_NUMBER, nonce, block_number)
}
}
/// Generate a call to ChainSetup for the given builder
macro_rules! make_store_and_retrieve {
($vis:vis, $name_suffix:ident, $key_prefix: ident, $key_ty:ty, $val_ty:ty$(,)?) => {
impl HyperlaneRocksDB {
paste! {
/// Stores a key value pair in the DB
$vis fn [<store_ $name_suffix>] (
&self,
key: &$key_ty,
val: &$val_ty,
) -> DbResult<()> {
self.store_keyed_encodable($key_prefix, key, val)
}
/// Retrieves a key value pair from the DB
$vis fn [<retrieve_ $name_suffix>] (
&self,
key: &$key_ty,
) -> DbResult<Option<$val_ty>> {
self.retrieve_keyed_decodable($key_prefix, key)
}
}
}
};
fn retrieve_dispatched_block_number_by_nonce(&self, nonce: &u32) -> DbResult<Option<u64>> {
self.retrieve_value_by_key(MESSAGE_DISPATCHED_BLOCK_NUMBER, nonce)
}
/// Store whether a message was processed by its nonce
fn store_processed_by_nonce(&self, nonce: &u32, processed: &bool) -> DbResult<()> {
self.store_value_by_key(NONCE_PROCESSED, nonce, processed)
}
fn retrieve_processed_by_nonce(&self, nonce: &u32) -> DbResult<Option<bool>> {
self.retrieve_value_by_key(NONCE_PROCESSED, nonce)
}
fn store_processed_by_gas_payment_meta(
&self,
meta: &InterchainGasPaymentMeta,
processed: &bool,
) -> DbResult<()> {
self.store_value_by_key(GAS_PAYMENT_META_PROCESSED, meta, processed)
}
fn retrieve_processed_by_gas_payment_meta(
&self,
meta: &InterchainGasPaymentMeta,
) -> DbResult<Option<bool>> {
self.retrieve_value_by_key(GAS_PAYMENT_META_PROCESSED, meta)
}
fn store_interchain_gas_expenditure_data_by_message_id(
&self,
message_id: &H256,
data: &InterchainGasExpenditureData,
) -> DbResult<()> {
self.store_value_by_key(GAS_EXPENDITURE_FOR_MESSAGE_ID, message_id, data)
}
fn retrieve_interchain_gas_expenditure_data_by_message_id(
&self,
message_id: &H256,
) -> DbResult<Option<InterchainGasExpenditureData>> {
self.retrieve_value_by_key(GAS_EXPENDITURE_FOR_MESSAGE_ID, message_id)
}
/// Store the status of an operation by its message id
fn store_status_by_message_id(
&self,
message_id: &H256,
status: &PendingOperationStatus,
) -> DbResult<()> {
self.store_value_by_key(STATUS_BY_MESSAGE_ID, message_id, status)
}
/// Retrieve the status of an operation by its message id
fn retrieve_status_by_message_id(
&self,
message_id: &H256,
) -> DbResult<Option<PendingOperationStatus>> {
self.retrieve_value_by_key(STATUS_BY_MESSAGE_ID, message_id)
}
fn store_interchain_gas_payment_data_by_gas_payment_key(
&self,
key: &GasPaymentKey,
data: &InterchainGasPaymentData,
) -> DbResult<()> {
self.store_value_by_key(GAS_PAYMENT_FOR_MESSAGE_ID, key, data)
}
fn retrieve_interchain_gas_payment_data_by_gas_payment_key(
&self,
key: &GasPaymentKey,
) -> DbResult<Option<InterchainGasPaymentData>> {
self.retrieve_value_by_key(GAS_PAYMENT_FOR_MESSAGE_ID, key)
}
fn store_gas_payment_by_sequence(
&self,
sequence: &u32,
payment: &InterchainGasPayment,
) -> DbResult<()> {
self.store_value_by_key(GAS_PAYMENT_BY_SEQUENCE, sequence, payment)
}
fn retrieve_gas_payment_by_sequence(
&self,
sequence: &u32,
) -> DbResult<Option<InterchainGasPayment>> {
self.retrieve_value_by_key(GAS_PAYMENT_BY_SEQUENCE, sequence)
}
fn store_gas_payment_block_by_sequence(
&self,
sequence: &u32,
block_number: &u64,
) -> DbResult<()> {
self.store_value_by_key(GAS_PAYMENT_BLOCK_BY_SEQUENCE, sequence, block_number)
}
fn retrieve_gas_payment_block_by_sequence(&self, sequence: &u32) -> DbResult<Option<u64>> {
self.retrieve_value_by_key(GAS_PAYMENT_BLOCK_BY_SEQUENCE, sequence)
}
/// Store the retry count for a pending message by its message id
fn store_pending_message_retry_count_by_message_id(
&self,
message_id: &H256,
count: &u32,
) -> DbResult<()> {
self.store_value_by_key(
PENDING_MESSAGE_RETRY_COUNT_FOR_MESSAGE_ID,
message_id,
count,
)
}
/// Retrieve the retry count for a pending message by its message id
fn retrieve_pending_message_retry_count_by_message_id(
&self,
message_id: &H256,
) -> DbResult<Option<u32>> {
self.retrieve_value_by_key(PENDING_MESSAGE_RETRY_COUNT_FOR_MESSAGE_ID, message_id)
}
fn store_merkle_tree_insertion_by_leaf_index(
&self,
leaf_index: &u32,
insertion: &MerkleTreeInsertion,
) -> DbResult<()> {
self.store_value_by_key(MERKLE_TREE_INSERTION, leaf_index, insertion)
}
/// Retrieve the merkle tree insertion event by its leaf index
fn retrieve_merkle_tree_insertion_by_leaf_index(
&self,
leaf_index: &u32,
) -> DbResult<Option<MerkleTreeInsertion>> {
self.retrieve_value_by_key(MERKLE_TREE_INSERTION, leaf_index)
}
fn store_merkle_leaf_index_by_message_id(
&self,
message_id: &H256,
leaf_index: &u32,
) -> DbResult<()> {
self.store_value_by_key(MERKLE_LEAF_INDEX_BY_MESSAGE_ID, message_id, leaf_index)
}
/// Retrieve the merkle leaf index of a message in the merkle tree
fn retrieve_merkle_leaf_index_by_message_id(&self, message_id: &H256) -> DbResult<Option<u32>> {
self.retrieve_value_by_key(MERKLE_LEAF_INDEX_BY_MESSAGE_ID, message_id)
}
fn store_merkle_tree_insertion_block_number_by_leaf_index(
&self,
leaf_index: &u32,
block_number: &u64,
) -> DbResult<()> {
self.store_value_by_key(
MERKLE_TREE_INSERTION_BLOCK_NUMBER_BY_LEAF_INDEX,
leaf_index,
block_number,
)
}
fn retrieve_merkle_tree_insertion_block_number_by_leaf_index(
&self,
leaf_index: &u32,
) -> DbResult<Option<u64>> {
self.retrieve_value_by_key(MERKLE_TREE_INSERTION_BLOCK_NUMBER_BY_LEAF_INDEX, leaf_index)
}
fn store_highest_seen_message_nonce_number(&self, nonce: &u32) -> DbResult<()> {
// There's no unit struct Encode/Decode impl, so just use `bool` and always use the `Default::default()` key
self.store_value_by_key(HIGHEST_SEEN_MESSAGE_NONCE, &bool::default(), nonce)
}
/// Retrieve the nonce of the highest processed message we're aware of
fn retrieve_highest_seen_message_nonce_number(&self) -> DbResult<Option<u32>> {
// There's no unit struct Encode/Decode impl, so just use `bool` and always use the `Default::default()` key
self.retrieve_value_by_key(HIGHEST_SEEN_MESSAGE_NONCE, &bool::default())
}
}
make_store_and_retrieve!(pub, message_id_by_nonce, MESSAGE_ID, u32, H256);
make_store_and_retrieve!(pub(self), message_by_id, MESSAGE, H256, HyperlaneMessage);
make_store_and_retrieve!(pub(self), dispatched_block_number_by_nonce, MESSAGE_DISPATCHED_BLOCK_NUMBER, u32, u64);
make_store_and_retrieve!(pub, processed_by_nonce, NONCE_PROCESSED, u32, bool);
make_store_and_retrieve!(pub(self), processed_by_gas_payment_meta, GAS_PAYMENT_META_PROCESSED, InterchainGasPaymentMeta, bool);
make_store_and_retrieve!(pub(self), interchain_gas_expenditure_data_by_message_id, GAS_EXPENDITURE_FOR_MESSAGE_ID, H256, InterchainGasExpenditureData);
make_store_and_retrieve!(
pub,
status_by_message_id,
STATUS_BY_MESSAGE_ID,
H256,
PendingOperationStatus
);
make_store_and_retrieve!(pub(self), interchain_gas_payment_data_by_gas_payment_key, GAS_PAYMENT_FOR_MESSAGE_ID, GasPaymentKey, InterchainGasPaymentData);
make_store_and_retrieve!(pub(self), gas_payment_by_sequence, GAS_PAYMENT_BY_SEQUENCE, u32, InterchainGasPayment);
make_store_and_retrieve!(pub(self), gas_payment_block_by_sequence, GAS_PAYMENT_BLOCK_BY_SEQUENCE, u32, u64);
make_store_and_retrieve!(
pub,
pending_message_retry_count_by_message_id,
PENDING_MESSAGE_RETRY_COUNT_FOR_MESSAGE_ID,
H256,
u32
);
make_store_and_retrieve!(
pub,
merkle_tree_insertion_by_leaf_index,
MERKLE_TREE_INSERTION,
u32,
MerkleTreeInsertion
);
make_store_and_retrieve!(
pub,
merkle_leaf_index_by_message_id,
MERKLE_LEAF_INDEX_BY_MESSAGE_ID,
H256,
u32
);
make_store_and_retrieve!(
pub,
merkle_tree_insertion_block_number_by_leaf_index,
MERKLE_TREE_INSERTION_BLOCK_NUMBER_BY_LEAF_INDEX,
u32,
u64
);
// There's no unit struct Encode/Decode impl, so just use `bool`, have visibility be private (by omitting the first argument), and wrap
// with a function that always uses the `Default::default()` key
make_store_and_retrieve!(, highest_seen_message_nonce_number, HIGHEST_SEEN_MESSAGE_NONCE, bool, u32);
impl HyperlaneRocksDB {
fn store_value_by_key<K: Encode, V: Encode>(
&self,
prefix: impl AsRef<[u8]>,
key: &K,
value: &V,
) -> DbResult<()> {
self.store_encodable(prefix, key.to_vec(), value)
}
fn retrieve_value_by_key<K: Encode, V: Decode>(
&self,
prefix: impl AsRef<[u8]>,
key: &K,
) -> DbResult<Option<V>> {
self.retrieve_decodable(prefix, key.to_vec())
}
}

@ -1,7 +1,6 @@
use std::path::PathBuf;
use std::{io, path::Path, sync::Arc};
use std::{path::Path, sync::Arc};
use hyperlane_core::{ChainCommunicationError, HyperlaneProtocolError};
use super::error::DbError;
use rocksdb::{Options, DB as Rocks};
use tracing::info;
@ -16,9 +15,6 @@ mod hyperlane_db;
/// Type-specific db operations
mod typed_db;
/// Internal-use storage types.
mod storage_types;
/// Database test utilities.
#[cfg(any(test, feature = "test-utils"))]
pub mod test_utils;
@ -33,37 +29,6 @@ impl From<Rocks> for DB {
}
}
/// DB Error type
#[derive(thiserror::Error, Debug)]
pub enum DbError {
/// Rocks DB Error
#[error("{0}")]
RockError(#[from] rocksdb::Error),
#[error("Failed to open {path}, canonicalized as {canonicalized}: {source}")]
/// Error opening the database
OpeningError {
/// Rocksdb error during opening
#[source]
source: rocksdb::Error,
/// Raw database path provided
path: PathBuf,
/// Parsed path used
canonicalized: PathBuf,
},
/// Could not parse the provided database path string
#[error("Invalid database path supplied {1:?}; {0}")]
InvalidDbPath(#[source] io::Error, String),
/// Hyperlane Error
#[error("{0}")]
HyperlaneError(#[from] HyperlaneProtocolError),
}
impl From<DbError> for ChainCommunicationError {
fn from(value: DbError) -> Self {
ChainCommunicationError::from_other(value)
}
}
type Result<T> = std::result::Result<T, DbError>;
impl DB {

@ -1,6 +1,6 @@
use hyperlane_core::{Decode, Encode, HyperlaneDomain};
use crate::db::{DbError, DB};
use crate::db::{error::DbError, DB};
type Result<T> = std::result::Result<T, DbError>;

@ -8,15 +8,18 @@ use hyperlane_core::{
/// Subset of `InterchainGasPayment` excluding the message id which is stored in
/// the key.
#[derive(Debug, Copy, Clone)]
pub(super) struct InterchainGasPaymentData {
pub struct InterchainGasPaymentData {
/// The amount of tokens paid for the gas.
pub payment: U256,
/// The amount of gas paid for.
pub gas_amount: U256,
}
/// Subset of `InterchainGasExpenditure` excluding the message id which is
/// stored in the key.
#[allow(missing_docs)]
#[derive(Debug, Copy, Clone)]
pub(super) struct InterchainGasExpenditureData {
pub struct InterchainGasExpenditureData {
pub tokens_used: U256,
pub gas_used: U256,
}
@ -31,6 +34,7 @@ impl Default for InterchainGasPaymentData {
}
impl InterchainGasPaymentData {
/// Complete the data with the message id and destination.
pub fn complete(self, message_id: H256, destination: u32) -> InterchainGasPayment {
InterchainGasPayment {
message_id,
@ -82,6 +86,7 @@ impl Default for InterchainGasExpenditureData {
}
impl InterchainGasExpenditureData {
/// Complete the data with the message id.
pub fn complete(self, message_id: H256) -> InterchainGasExpenditure {
InterchainGasExpenditure {
message_id,
Loading…
Cancel
Save