Refactor agent event indexing (#2246)
### Description This PR refactors event indexing in the agents, allowing similar logic to be shared across multiple event types (i.e. messages, deliveries, gas payments) and database types (i.e. the Relayer rocks DB and Scraper SQL DB). Furthermore, it adds new syncing modes by way of `MessageSyncCursors` that take advantage of the monotonically increasing dispatched message nonce to sync more intelligently. ### Drive-by changes - Fixes a bug in the existing cursor that caused the same block range to be indexed three times - Modifies kathy to get rid of the idea of "rounds", just sends messages with a sleep in between - Minor modifications to the e2e test for performance - Expand macros in settings - Add scraper to e2e test ### Opportunities for improvement - We can further reduce RPC usage (or improve latency) by sharing the view of the latest finalized block number between cursors - We can speed up the effective time for (a relayer to start deliving messages | the scraper to scraper recent events) by creating forward/backward cursors for gas payments and deliveries where the backwards cursor terminates at index_settings.from - We can remove the need for index_settings.from by terminating backwards cursors based on the block number that the first message was dispatched at ### Related issues - Fixes #[issue number here] ### Backward compatibility _Are these changes backward compatible?_ Yes _Are there any infrastructure implications, e.g. changes that would prohibit deploying older commits using this infra tooling?_ None ### Testing _What kind of testing have these changes undergone?_ E2E tests --------- Co-authored-by: Mattie Conover <git@mconover.dev>pull/2257/head
parent
3711b64de3
commit
63562c7211
@ -1,326 +0,0 @@ |
||||
use std::collections::HashMap; |
||||
use std::ops::Deref; |
||||
use std::sync::Arc; |
||||
use std::time::Duration; |
||||
|
||||
use eyre::Result; |
||||
use itertools::Itertools; |
||||
use prometheus::{IntCounter, IntGauge, IntGaugeVec}; |
||||
use time::Instant; |
||||
use tracing::{debug, info, instrument, trace, warn}; |
||||
|
||||
use hyperlane_base::{last_message::validate_message_continuity, RateLimitedSyncBlockRangeCursor}; |
||||
use hyperlane_core::{ |
||||
utils::fmt_sync_time, KnownHyperlaneDomain, ListValidity, MailboxIndexer, SyncBlockRangeCursor, |
||||
H256, |
||||
}; |
||||
|
||||
use crate::chain_scraper::{ |
||||
Delivery, HyperlaneMessageWithMeta, Payment, SqlChainScraper, TxnWithId, |
||||
}; |
||||
|
||||
/// Workhorse of synchronization. This consumes a `SqlChainScraper` which has
|
||||
/// the needed connections and information to work and then adds additional
|
||||
/// running state that can be modified. This is a fn-like struct which allows us
|
||||
/// to pass a bunch of state around without having a lot of arguments to
|
||||
/// functions.
|
||||
///
|
||||
/// Conceptually this is *just* sync loop code with initial vars being
|
||||
/// configured but as a struct + multiple functions.
|
||||
pub(super) struct Syncer { |
||||
scraper: SqlChainScraper, |
||||
indexed_height: IntGauge, |
||||
stored_messages: IntCounter, |
||||
stored_deliveries: IntCounter, |
||||
stored_payments: IntCounter, |
||||
missed_messages: IntCounter, |
||||
message_nonce: IntGaugeVec, |
||||
sync_cursor: RateLimitedSyncBlockRangeCursor<Arc<dyn MailboxIndexer>>, |
||||
|
||||
last_valid_range_start_block: u32, |
||||
last_nonce: Option<u32>, |
||||
} |
||||
|
||||
impl Deref for Syncer { |
||||
type Target = SqlChainScraper; |
||||
|
||||
fn deref(&self) -> &Self::Target { |
||||
&self.scraper |
||||
} |
||||
} |
||||
|
||||
impl Syncer { |
||||
/// Create a new syncer from the `SqlChainScraper` which holds the needed
|
||||
/// information and connections to create the running state.
|
||||
///
|
||||
/// **Note:** Run must be called for syncing to commence.
|
||||
#[instrument(skip_all)] |
||||
pub async fn new(scraper: SqlChainScraper) -> Result<Self> { |
||||
let domain = scraper.domain(); |
||||
let chain_name = domain.name(); |
||||
|
||||
let indexed_height = scraper |
||||
.metrics |
||||
.indexed_height |
||||
.with_label_values(&["all", chain_name]); |
||||
let stored_deliveries = scraper |
||||
.metrics |
||||
.stored_events |
||||
.with_label_values(&["deliveries", chain_name]); |
||||
let stored_payments = scraper |
||||
.metrics |
||||
.stored_events |
||||
.with_label_values(&["gas_payments", chain_name]); |
||||
let stored_messages = scraper |
||||
.metrics |
||||
.stored_events |
||||
.with_label_values(&["messages", chain_name]); |
||||
let missed_messages = scraper |
||||
.metrics |
||||
.missed_events |
||||
.with_label_values(&["messages", chain_name]); |
||||
let message_nonce = scraper.metrics.message_nonce.clone(); |
||||
|
||||
let chunk_size = scraper.chunk_size; |
||||
let initial_height = scraper.cursor.height().await as u32; |
||||
let last_valid_range_start_block = initial_height; |
||||
let last_nonce = scraper.last_message_nonce().await?; |
||||
|
||||
let sync_cursor = RateLimitedSyncBlockRangeCursor::new( |
||||
scraper.contracts.mailbox_indexer.clone(), |
||||
chunk_size, |
||||
initial_height, |
||||
) |
||||
.await?; |
||||
|
||||
Ok(Self { |
||||
scraper, |
||||
indexed_height, |
||||
stored_messages, |
||||
stored_deliveries, |
||||
stored_payments, |
||||
missed_messages, |
||||
message_nonce, |
||||
sync_cursor, |
||||
last_valid_range_start_block, |
||||
last_nonce, |
||||
}) |
||||
} |
||||
|
||||
/// Sync contract and other blockchain data with the current chain state.
|
||||
#[instrument(skip(self), fields(domain = %self.domain(), chunk_size = self.chunk_size))] |
||||
pub async fn run(mut self) -> Result<()> { |
||||
let start_block = self.sync_cursor.current_position(); |
||||
info!(from = start_block, "Resuming chain sync"); |
||||
self.indexed_height.set(start_block as i64); |
||||
|
||||
let mut last_logged_time: Option<Instant> = None; |
||||
let mut should_log_checkpoint_info = || { |
||||
if last_logged_time.is_none() |
||||
|| last_logged_time.unwrap().elapsed() > Duration::from_secs(30) |
||||
{ |
||||
last_logged_time = Some(Instant::now()); |
||||
true |
||||
} else { |
||||
false |
||||
} |
||||
}; |
||||
|
||||
loop { |
||||
let start_block = self.sync_cursor.current_position(); |
||||
let Ok((from, to, eta)) = self.sync_cursor.next_range().await else { continue }; |
||||
if should_log_checkpoint_info() { |
||||
info!( |
||||
from, |
||||
to, |
||||
distance_from_tip = self.sync_cursor.distance_from_tip(), |
||||
estimated_time_to_sync = fmt_sync_time(eta), |
||||
"Syncing range" |
||||
); |
||||
} else { |
||||
debug!( |
||||
from, |
||||
to, |
||||
distance_from_tip = self.sync_cursor.distance_from_tip(), |
||||
estimated_time_to_sync = fmt_sync_time(eta), |
||||
"Syncing range" |
||||
); |
||||
} |
||||
|
||||
let extracted = self.scrape_range(from, to).await?; |
||||
|
||||
let validation = validate_message_continuity( |
||||
self.last_nonce, |
||||
&extracted |
||||
.sorted_messages |
||||
.iter() |
||||
.map(|r| &r.message) |
||||
.collect::<Vec<_>>(), |
||||
); |
||||
match validation { |
||||
ListValidity::Valid => { |
||||
let max_nonce_of_batch = self.record_data(extracted).await?; |
||||
|
||||
self.cursor.update(from as u64).await; |
||||
self.last_nonce = max_nonce_of_batch; |
||||
self.last_valid_range_start_block = from; |
||||
self.indexed_height.set(to as i64); |
||||
} |
||||
ListValidity::Empty => { |
||||
let _ = self.record_data(extracted).await?; |
||||
self.indexed_height.set(to as i64); |
||||
} |
||||
ListValidity::InvalidContinuation => { |
||||
self.missed_messages.inc(); |
||||
warn!( |
||||
last_nonce = self.last_nonce, |
||||
start_block = from, |
||||
end_block = to, |
||||
last_valid_range_start_block = self.last_valid_range_start_block, |
||||
"Found invalid continuation in range. Re-indexing from the start block of the last successful range." |
||||
); |
||||
self.sync_cursor |
||||
.backtrack(self.last_valid_range_start_block); |
||||
self.indexed_height |
||||
.set(self.last_valid_range_start_block as i64); |
||||
} |
||||
ListValidity::ContainsGaps => { |
||||
self.missed_messages.inc(); |
||||
self.sync_cursor.backtrack(start_block); |
||||
warn!( |
||||
last_nonce = self.last_nonce, |
||||
start_block = from, |
||||
end_block = to, |
||||
last_valid_range_start_block = self.last_valid_range_start_block, |
||||
"Found gaps in the message in range, re-indexing the same range." |
||||
); |
||||
} |
||||
} |
||||
} |
||||
} |
||||
|
||||
/// Fetch contract data from a given block range.
|
||||
#[instrument(skip(self))] |
||||
async fn scrape_range(&self, from: u32, to: u32) -> Result<ExtractedData> { |
||||
debug!(from, to, "Fetching messages for range"); |
||||
let sorted_messages = self |
||||
.contracts |
||||
.mailbox_indexer |
||||
.fetch_sorted_messages(from, to) |
||||
.await?; |
||||
trace!(?sorted_messages, "Fetched messages"); |
||||
|
||||
debug!("Fetching deliveries for range"); |
||||
let deliveries = self |
||||
.contracts |
||||
.mailbox_indexer |
||||
.fetch_delivered_messages(from, to) |
||||
.await? |
||||
.into_iter() |
||||
.map(|(message_id, meta)| Delivery { message_id, meta }) |
||||
.collect_vec(); |
||||
trace!(?deliveries, "Fetched deliveries"); |
||||
|
||||
debug!("Fetching payments for range"); |
||||
let payments = self |
||||
.contracts |
||||
.igp_indexer |
||||
.fetch_gas_payments(from, to) |
||||
.await? |
||||
.into_iter() |
||||
.map(|(payment, meta)| Payment { payment, meta }) |
||||
.collect_vec(); |
||||
trace!(?payments, "Fetched payments"); |
||||
|
||||
info!( |
||||
message_count = sorted_messages.len(), |
||||
delivery_count = deliveries.len(), |
||||
payment_count = payments.len(), |
||||
"Indexed block range for chain" |
||||
); |
||||
|
||||
let sorted_messages = sorted_messages |
||||
.into_iter() |
||||
.map(|(message, meta)| HyperlaneMessageWithMeta { message, meta }) |
||||
.filter(|m| { |
||||
self.last_nonce |
||||
.map_or(true, |last_nonce| m.message.nonce > last_nonce) |
||||
}) |
||||
.collect::<Vec<_>>(); |
||||
|
||||
debug!( |
||||
message_count = sorted_messages.len(), |
||||
"Filtered any messages already indexed for mailbox" |
||||
); |
||||
|
||||
Ok(ExtractedData { |
||||
sorted_messages, |
||||
deliveries, |
||||
payments, |
||||
}) |
||||
} |
||||
|
||||
/// Record messages and deliveries, will fetch any extra data needed to do
|
||||
/// so. Returns the max nonce or None if no messages were provided.
|
||||
#[instrument(
|
||||
skip_all, |
||||
fields( |
||||
sorted_messages = extracted.sorted_messages.len(), |
||||
deliveries = extracted.deliveries.len(), |
||||
payments = extracted.payments.len() |
||||
) |
||||
)] |
||||
async fn record_data(&self, extracted: ExtractedData) -> Result<Option<u32>> { |
||||
let ExtractedData { |
||||
sorted_messages, |
||||
deliveries, |
||||
payments, |
||||
} = extracted; |
||||
|
||||
let txns: HashMap<H256, TxnWithId> = self |
||||
.ensure_blocks_and_txns( |
||||
sorted_messages |
||||
.iter() |
||||
.map(|r| &r.meta) |
||||
.chain(deliveries.iter().map(|d| &d.meta)) |
||||
.chain(payments.iter().map(|p| &p.meta)), |
||||
) |
||||
.await? |
||||
.map(|t| (t.hash, t)) |
||||
.collect(); |
||||
|
||||
if !deliveries.is_empty() { |
||||
self.store_deliveries(&deliveries, &txns).await?; |
||||
self.stored_deliveries.inc_by(deliveries.len() as u64); |
||||
} |
||||
|
||||
if !payments.is_empty() { |
||||
self.store_payments(&payments, &txns).await?; |
||||
self.stored_payments.inc_by(payments.len() as u64); |
||||
} |
||||
|
||||
if !sorted_messages.is_empty() { |
||||
let max_nonce_of_batch = self.store_messages(&sorted_messages, &txns).await?; |
||||
self.stored_messages.inc_by(sorted_messages.len() as u64); |
||||
|
||||
for m in sorted_messages.iter() { |
||||
let nonce = m.message.nonce; |
||||
let dst = KnownHyperlaneDomain::try_from(m.message.destination) |
||||
.map(Into::into) |
||||
.unwrap_or("unknown"); |
||||
self.message_nonce |
||||
.with_label_values(&["dispatch", self.domain().name(), dst]) |
||||
.set(nonce as i64); |
||||
} |
||||
Ok(Some(max_nonce_of_batch)) |
||||
} else { |
||||
Ok(None) |
||||
} |
||||
} |
||||
} |
||||
|
||||
struct ExtractedData { |
||||
sorted_messages: Vec<HyperlaneMessageWithMeta>, |
||||
deliveries: Vec<Delivery>, |
||||
payments: Vec<Payment>, |
||||
} |
@ -1,78 +0,0 @@ |
||||
use tracing::{debug, info, instrument}; |
||||
|
||||
use hyperlane_core::{utils::fmt_sync_time, InterchainGasPaymasterIndexer, SyncBlockRangeCursor}; |
||||
|
||||
use crate::{ |
||||
contract_sync::{ |
||||
cursor::RateLimitedSyncBlockRangeCursor, schema::InterchainGasPaymasterContractSyncDB, |
||||
}, |
||||
ContractSync, |
||||
}; |
||||
|
||||
const GAS_PAYMENTS_LABEL: &str = "gas_payments"; |
||||
|
||||
impl<I> ContractSync<I> |
||||
where |
||||
I: InterchainGasPaymasterIndexer + Clone + 'static, |
||||
{ |
||||
/// Sync gas payments
|
||||
#[instrument(name = "GasPaymentContractSync", skip(self))] |
||||
pub(crate) async fn sync_gas_payments(&self) -> eyre::Result<()> { |
||||
let chain_name = self.domain.as_ref(); |
||||
let indexed_height = self |
||||
.metrics |
||||
.indexed_height |
||||
.with_label_values(&[GAS_PAYMENTS_LABEL, chain_name]); |
||||
let stored_messages = self |
||||
.metrics |
||||
.stored_events |
||||
.with_label_values(&[GAS_PAYMENTS_LABEL, chain_name]); |
||||
|
||||
let cursor = { |
||||
let config_initial_height = self.index_settings.from; |
||||
let initial_height = self |
||||
.db |
||||
.retrieve_latest_indexed_gas_payment_block() |
||||
.map_or(config_initial_height, |b| b + 1); |
||||
RateLimitedSyncBlockRangeCursor::new( |
||||
self.indexer.clone(), |
||||
self.index_settings.chunk_size, |
||||
initial_height, |
||||
) |
||||
}; |
||||
|
||||
let mut cursor = cursor.await?; |
||||
|
||||
let start_block = cursor.current_position(); |
||||
info!(from = start_block, "Resuming indexer"); |
||||
indexed_height.set(start_block as i64); |
||||
|
||||
loop { |
||||
let Ok((from, to, eta)) = cursor.next_range().await else { continue }; |
||||
let gas_payments = self.indexer.fetch_gas_payments(from, to).await?; |
||||
|
||||
debug!( |
||||
from, |
||||
to, |
||||
distance_from_tip = cursor.distance_from_tip(), |
||||
gas_payments_count = gas_payments.len(), |
||||
estimated_time_to_sync = fmt_sync_time(eta), |
||||
"Indexed block range" |
||||
); |
||||
|
||||
let mut new_payments_processed: u64 = 0; |
||||
for (payment, meta) in gas_payments.iter() { |
||||
// Attempt to process the gas payment, incrementing new_payments_processed
|
||||
// if it was processed for the first time.
|
||||
if self.db.process_gas_payment(*payment, meta)? { |
||||
new_payments_processed += 1; |
||||
} |
||||
} |
||||
|
||||
stored_messages.inc_by(new_payments_processed); |
||||
|
||||
self.db.store_latest_indexed_gas_payment_block(from)?; |
||||
indexed_height.set(to as i64); |
||||
} |
||||
} |
||||
} |
@ -1,40 +0,0 @@ |
||||
use hyperlane_core::{HyperlaneMessage, ListValidity}; |
||||
|
||||
/// Check if the list of sorted messages is a valid continuation of
|
||||
/// `latest_message_nonce`. If the latest index is Some, check the validity of
|
||||
/// the list in continuation of the latest. If the latest index is None, check
|
||||
/// the validity of just the list.
|
||||
///
|
||||
/// Optional latest nonce to account for possibility that ContractSync is
|
||||
/// still yet to see it's first message. We want to check for validity of new
|
||||
/// list of messages against a potential previous message (Some case) but also
|
||||
/// still validate the new messages in the case that we have not seen any
|
||||
/// previous messages (None case).
|
||||
pub fn validate_message_continuity( |
||||
latest_message_nonce: Option<u32>, |
||||
sorted_messages: &[&HyperlaneMessage], |
||||
) -> ListValidity { |
||||
if sorted_messages.is_empty() { |
||||
return ListValidity::Empty; |
||||
} |
||||
|
||||
// If we have seen another leaf in a previous block range, ensure
|
||||
// the batch contains the consecutive next leaf
|
||||
if let Some(last_seen) = latest_message_nonce { |
||||
let has_desired_message = sorted_messages |
||||
.iter() |
||||
.any(|&message| last_seen == message.nonce - 1); |
||||
if !has_desired_message { |
||||
return ListValidity::InvalidContinuation; |
||||
} |
||||
} |
||||
|
||||
// Ensure no gaps in new batch of leaves
|
||||
for pair in sorted_messages.windows(2) { |
||||
if pair[0].nonce != pair[1].nonce - 1 { |
||||
return ListValidity::ContainsGaps; |
||||
} |
||||
} |
||||
|
||||
ListValidity::Valid |
||||
} |
@ -1,486 +0,0 @@ |
||||
use std::time::{Duration, Instant}; |
||||
|
||||
use tracing::{debug, info, instrument, warn}; |
||||
|
||||
use hyperlane_core::{ |
||||
utils::fmt_sync_time, Indexer, KnownHyperlaneDomain, ListValidity, MailboxIndexer, |
||||
SyncBlockRangeCursor, |
||||
}; |
||||
|
||||
use crate::{ |
||||
contract_sync::{last_message::validate_message_continuity, schema::MailboxContractSyncDB}, |
||||
ContractSync, |
||||
}; |
||||
|
||||
const MESSAGES_LABEL: &str = "messages"; |
||||
|
||||
impl<I> ContractSync<I> |
||||
where |
||||
I: MailboxIndexer + Clone + 'static, |
||||
{ |
||||
/// Sync dispatched messages
|
||||
#[instrument(name = "MessageContractSync", skip(self))] |
||||
pub(crate) async fn sync_dispatched_messages(&self) -> eyre::Result<()> { |
||||
let chain_name = self.domain.as_ref(); |
||||
let indexed_height = self |
||||
.metrics |
||||
.indexed_height |
||||
.with_label_values(&[MESSAGES_LABEL, chain_name]); |
||||
let stored_messages = self |
||||
.metrics |
||||
.stored_events |
||||
.with_label_values(&[MESSAGES_LABEL, chain_name]); |
||||
let missed_messages = self |
||||
.metrics |
||||
.missed_events |
||||
.with_label_values(&[MESSAGES_LABEL, chain_name]); |
||||
|
||||
let message_nonce = self.metrics.message_nonce.clone(); |
||||
|
||||
let cursor = { |
||||
let config_initial_height = self.index_settings.from; |
||||
let initial_height = self |
||||
.db |
||||
.retrieve_latest_valid_message_range_start_block() |
||||
.map_or(config_initial_height, |b| b + 1); |
||||
create_cursor( |
||||
self.indexer.clone(), |
||||
self.index_settings.chunk_size, |
||||
initial_height, |
||||
) |
||||
}; |
||||
|
||||
// Indexes messages by fetching messages in ranges of blocks.
|
||||
// We've observed occasional flakiness with providers where some events in
|
||||
// a range will be missing. The leading theories are:
|
||||
//
|
||||
// 1. The provider is just flaky and sometimes misses events :(
|
||||
//
|
||||
// 2. For outbox chains with low finality times, it's possible that when
|
||||
// we query the RPC provider for the latest finalized block number,
|
||||
// we're returned a block number T. However when we attempt to index a range
|
||||
// where the `to` block is T, the `eth_getLogs` RPC is load balanced by the
|
||||
// provider to a different node whose latest known block is some block T' <T.
|
||||
//
|
||||
// The `eth_getLogs` RPC implementations seem to happily accept
|
||||
// `to` blocks that exceed the latest known block, so it's possible
|
||||
// that in our indexer we think that we've indexed up to block T but
|
||||
// we've only *actually* indexed up to block T'.
|
||||
//
|
||||
// It's easy to determine if a provider has skipped any message events by
|
||||
// looking at the indices of each message and ensuring that we've indexed a
|
||||
// valid continuation of messages.
|
||||
//
|
||||
// There are two classes of invalid continuations:
|
||||
//
|
||||
// 1. The latest previously indexed message index is M that was found in a
|
||||
// previously indexed block range. A new block range [A,B] is indexed, returning
|
||||
// a list of messages. The lowest message index in that list is `M + 1`,
|
||||
// but there are some missing messages indices in the list. This is
|
||||
// likely a flaky provider, and we can simply re-index the range [A,B]
|
||||
// hoping that the provider will soon return a correct list.
|
||||
//
|
||||
// 2. The latest previously indexed message index is M that was found in a
|
||||
// previously indexed block range, [A,B]. A new block range [C,D] is
|
||||
// indexed, returning a list of messages. However, the lowest message
|
||||
// index in that list is M' where M' > M + 1. This missing messages
|
||||
// could be anywhere in the range [A,D]:
|
||||
// * It's possible there was an issue when the prior block range [A,B] was
|
||||
// indexed, where the provider didn't provide some messages with indices >
|
||||
// M that it should have.
|
||||
// * It's possible that the range [B,C] that was presumed to be empty when it
|
||||
// was indexed actually wasn't.
|
||||
// * And it's possible that this was just a flaky gap, where there are
|
||||
// messages in the [C,D] range that weren't returned for some reason.
|
||||
//
|
||||
// We can handle this by re-indexing starting from block A.
|
||||
// Note this means we only handle this case upon observing messages in some
|
||||
// range [C,D] that indicate a previously indexed range may have
|
||||
// missed some messages.
|
||||
let mut cursor = cursor.await?; |
||||
let start_block = cursor.current_position(); |
||||
let mut last_valid_range_start_block = start_block; |
||||
info!( |
||||
from = start_block, |
||||
"Resuming indexer from latest valid message range start block" |
||||
); |
||||
indexed_height.set(start_block as i64); |
||||
|
||||
let mut last_logged_time: Option<Instant> = None; |
||||
let mut should_log_checkpoint_info = || { |
||||
if last_logged_time.is_none() |
||||
|| last_logged_time.unwrap().elapsed() > Duration::from_secs(30) |
||||
{ |
||||
last_logged_time = Some(Instant::now()); |
||||
true |
||||
} else { |
||||
false |
||||
} |
||||
}; |
||||
|
||||
loop { |
||||
let start_block = cursor.current_position(); |
||||
let Ok((from, to, eta)) = cursor.next_range().await else { continue }; |
||||
|
||||
let mut sorted_messages: Vec<_> = self |
||||
.indexer |
||||
.fetch_sorted_messages(from, to) |
||||
.await? |
||||
.into_iter() |
||||
.map(|(msg, _)| msg) |
||||
.collect(); |
||||
|
||||
if should_log_checkpoint_info() { |
||||
info!( |
||||
from, |
||||
to, |
||||
distance_from_tip = cursor.distance_from_tip(), |
||||
estimated_time_to_sync = fmt_sync_time(eta), |
||||
message_count = sorted_messages.len(), |
||||
"Indexed block range" |
||||
); |
||||
} else { |
||||
debug!( |
||||
from, |
||||
to, |
||||
distance_from_tip = cursor.distance_from_tip(), |
||||
estimated_time_to_sync = fmt_sync_time(eta), |
||||
message_count = sorted_messages.len(), |
||||
"Indexed block range" |
||||
); |
||||
} |
||||
|
||||
// Get the latest known nonce. All messages whose indices are <= this index
|
||||
// have been stored in the DB.
|
||||
let last_nonce = self.db.retrieve_latest_nonce()?; |
||||
|
||||
// Filter out any messages that have already been successfully indexed and
|
||||
// stored. This is necessary if we're re-indexing blocks in hope of
|
||||
// finding missing messages.
|
||||
if let Some(min_nonce) = last_nonce { |
||||
sorted_messages.retain(|m| m.nonce > min_nonce); |
||||
} |
||||
|
||||
debug!( |
||||
from, |
||||
to, |
||||
message_count = sorted_messages.len(), |
||||
"Filtered any messages already indexed" |
||||
); |
||||
|
||||
// Ensure the sorted messages are a valid continuation of last_nonce
|
||||
match validate_message_continuity( |
||||
last_nonce, |
||||
&sorted_messages.iter().collect::<Vec<_>>(), |
||||
) { |
||||
ListValidity::Valid => { |
||||
// Store messages
|
||||
let max_nonce_of_batch = self.db.store_messages(&sorted_messages)?; |
||||
|
||||
// Report amount of messages stored into db
|
||||
stored_messages.inc_by(sorted_messages.len() as u64); |
||||
|
||||
// Report latest nonce to gauge by dst
|
||||
for msg in sorted_messages.iter() { |
||||
let dst = KnownHyperlaneDomain::try_from(msg.destination) |
||||
.map(|d| d.as_str()) |
||||
.unwrap_or("unknown"); |
||||
message_nonce |
||||
.with_label_values(&["dispatch", chain_name, dst]) |
||||
.set(max_nonce_of_batch as i64); |
||||
} |
||||
|
||||
// Update the latest valid start block.
|
||||
self.db.store_latest_valid_message_range_start_block(from)?; |
||||
last_valid_range_start_block = from; |
||||
|
||||
// Move forward to the next height
|
||||
indexed_height.set(to as i64); |
||||
} |
||||
// The index of the first message in sorted_messages is not the
|
||||
// `last_nonce+1`.
|
||||
ListValidity::InvalidContinuation => { |
||||
missed_messages.inc(); |
||||
|
||||
warn!( |
||||
last_nonce = ?last_nonce, |
||||
start_block = from, |
||||
end_block = to, |
||||
last_valid_range_start_block, |
||||
"Found invalid continuation in range. Re-indexing from the start block of the last successful range.", |
||||
); |
||||
|
||||
cursor.backtrack(last_valid_range_start_block); |
||||
indexed_height.set(last_valid_range_start_block as i64); |
||||
} |
||||
ListValidity::ContainsGaps => { |
||||
missed_messages.inc(); |
||||
cursor.backtrack(start_block); |
||||
|
||||
warn!( |
||||
last_nonce = ?last_nonce, |
||||
start_block = from, |
||||
end_block = to, |
||||
"Found gaps in the messages in range, re-indexing the same range.", |
||||
); |
||||
} |
||||
ListValidity::Empty => { |
||||
// Continue if no messages found.
|
||||
// We don't update last_valid_range_start_block because we cannot extrapolate
|
||||
// if the range was correctly indexed if there are no messages to observe their
|
||||
// indices.
|
||||
indexed_height.set(to as i64); |
||||
} |
||||
}; |
||||
} |
||||
} |
||||
} |
||||
|
||||
#[cfg(test)] |
||||
static mut MOCK_CURSOR: Option<hyperlane_test::mocks::cursor::MockSyncBlockRangeCursor> = None; |
||||
|
||||
/// Create a new cursor. In test mode we should use the mock cursor created by
|
||||
/// the test.
|
||||
#[cfg_attr(test, allow(unused_variables))] |
||||
async fn create_cursor<I: Indexer>( |
||||
indexer: I, |
||||
chunk_size: u32, |
||||
initial_height: u32, |
||||
) -> eyre::Result<impl SyncBlockRangeCursor> { |
||||
#[cfg(not(test))] |
||||
{ |
||||
crate::RateLimitedSyncBlockRangeCursor::new(indexer, chunk_size, initial_height).await |
||||
} |
||||
#[cfg(test)] |
||||
{ |
||||
let cursor = unsafe { MOCK_CURSOR.take() }; |
||||
Ok(cursor.expect("Mock cursor was not set before it was used")) |
||||
} |
||||
} |
||||
|
||||
#[cfg(test)] |
||||
mod test { |
||||
use std::sync::Arc; |
||||
use std::time::Duration; |
||||
|
||||
use eyre::eyre; |
||||
use mockall::{predicate::eq, *}; |
||||
use tokio::{ |
||||
select, |
||||
sync::Mutex, |
||||
time::{interval, sleep, timeout}, |
||||
}; |
||||
|
||||
use hyperlane_core::{HyperlaneDomain, HyperlaneMessage, KnownHyperlaneDomain, LogMeta, H256}; |
||||
use hyperlane_test::mocks::{cursor::MockSyncBlockRangeCursor, indexer::MockHyperlaneIndexer}; |
||||
|
||||
use crate::{ |
||||
contract_sync::{mailbox::MOCK_CURSOR, schema::MailboxContractSyncDB, IndexSettings}, |
||||
db::test_utils, |
||||
db::HyperlaneDB, |
||||
ContractSync, ContractSyncMetrics, CoreMetrics, |
||||
}; |
||||
|
||||
// we need a mutex for our tests because of the static cursor object
|
||||
lazy_static! { |
||||
static ref TEST_MTX: Mutex<()> = Mutex::new(()); |
||||
} |
||||
|
||||
#[tokio::test] |
||||
async fn handles_missing_rpc_messages() { |
||||
test_utils::run_test_db(|db| async move { |
||||
let message_gen = |nonce: u32| -> HyperlaneMessage { |
||||
HyperlaneMessage { |
||||
version: 0, |
||||
nonce, |
||||
origin: 1000, |
||||
destination: 2000, |
||||
sender: H256::from([10; 32]), |
||||
recipient: H256::from([11; 32]), |
||||
body: [10u8; 5].to_vec(), |
||||
} |
||||
}; |
||||
|
||||
let messages = (0..10).map(message_gen).collect::<Vec<HyperlaneMessage>>(); |
||||
let m0 = messages[0].clone(); |
||||
let m1 = messages[1].clone(); |
||||
let m2 = messages[2].clone(); |
||||
let m3 = messages[3].clone(); |
||||
let m4 = messages[4].clone(); |
||||
let m5 = messages[5].clone(); |
||||
|
||||
let meta = || LogMeta { |
||||
address: Default::default(), |
||||
block_number: 0, |
||||
block_hash: Default::default(), |
||||
transaction_hash: Default::default(), |
||||
transaction_index: 0, |
||||
log_index: Default::default(), |
||||
}; |
||||
|
||||
let latest_valid_message_range_start_block = 100; |
||||
|
||||
let mut mock_indexer = MockHyperlaneIndexer::new(); |
||||
let mut mock_cursor = MockSyncBlockRangeCursor::new(); |
||||
{ |
||||
let mut seq = Sequence::new(); |
||||
|
||||
// Some local macros to reduce code-duplication.
|
||||
macro_rules! expect_current_position { |
||||
($return_position:literal) => { |
||||
mock_cursor |
||||
.expect__current_position() |
||||
.times(1) |
||||
.in_sequence(&mut seq) |
||||
.return_once(|| $return_position); |
||||
}; |
||||
} |
||||
macro_rules! expect_backtrack { |
||||
($expected_new_from:literal) => { |
||||
mock_cursor |
||||
.expect__backtrack() |
||||
.times(1) |
||||
.in_sequence(&mut seq) |
||||
.with(eq($expected_new_from)) |
||||
.return_once(|_| ()); |
||||
}; |
||||
} |
||||
macro_rules! expect_fetches_range { |
||||
($expected_from:literal, $expected_to:literal, $return_messages:expr) => { |
||||
let messages: &[&HyperlaneMessage] = $return_messages; |
||||
let messages = messages.iter().map(|&msg| (msg.clone(), meta())).collect(); |
||||
mock_cursor |
||||
.expect__next_range() |
||||
.times(1) |
||||
.in_sequence(&mut seq) |
||||
.return_once(|| { |
||||
Box::pin(async { |
||||
Ok(($expected_from, $expected_to, Duration::from_secs(0))) |
||||
}) |
||||
}); |
||||
mock_indexer |
||||
.expect__fetch_sorted_messages() |
||||
.times(1) |
||||
.with(eq($expected_from), eq($expected_to)) |
||||
.in_sequence(&mut seq) |
||||
.return_once(move |_, _| Ok(messages)); |
||||
}; |
||||
} |
||||
|
||||
expect_current_position!(91); |
||||
expect_current_position!(91); |
||||
|
||||
// Return m0.
|
||||
expect_fetches_range!(91, 110, &[&m0]); |
||||
|
||||
// Return m1, miss m2.
|
||||
expect_current_position!(111); |
||||
expect_fetches_range!(101, 120, &[&m1]); |
||||
|
||||
// Miss m3.
|
||||
expect_current_position!(121); |
||||
expect_fetches_range!(111, 130, &[]); |
||||
|
||||
// Empty range.
|
||||
expect_current_position!(131); |
||||
expect_fetches_range!(121, 140, &[]); |
||||
|
||||
// m1 --> m5 seen as an invalid continuation
|
||||
expect_current_position!(141); |
||||
expect_fetches_range!(131, 150, &[&m5]); |
||||
expect_backtrack!(101); |
||||
|
||||
// Indexer goes back to the last valid message range start block
|
||||
// and indexes the range
|
||||
// This time it gets m1 and m2 (which was previously skipped)
|
||||
expect_current_position!(101); |
||||
expect_fetches_range!(101, 120, &[&m1, &m2]); |
||||
|
||||
// Indexer continues, this time getting m3 and m5 message, but skipping m4,
|
||||
// which means this range contains gaps
|
||||
expect_current_position!(121); |
||||
expect_fetches_range!(118, 140, &[&m3, &m5]); |
||||
expect_backtrack!(121); |
||||
|
||||
// Indexer retries, the same range in hope of filling the gap,
|
||||
// which it now does successfully
|
||||
expect_current_position!(121); |
||||
expect_fetches_range!(121, 140, &[&m3, &m4, &m5]); |
||||
|
||||
// Indexer continues with the next block range, which happens to be empty
|
||||
expect_current_position!(141); |
||||
expect_fetches_range!(141, 160, &[]); |
||||
|
||||
// Stay at the same tip, so no other fetch_sorted_messages calls are made
|
||||
mock_cursor.expect__current_position().returning(|| 161); |
||||
mock_cursor.expect__next_range().returning(|| { |
||||
Box::pin(async move { |
||||
// this sleep should be longer than the test timeout since we don't actually
|
||||
// want to yield any more values at this point.
|
||||
sleep(Duration::from_secs(100)).await; |
||||
Ok((161, 161, Duration::from_secs(0))) |
||||
}) |
||||
}); |
||||
} |
||||
|
||||
let hyperlane_db = HyperlaneDB::new( |
||||
&HyperlaneDomain::new_test_domain("handles_missing_rpc_messages"), |
||||
db, |
||||
); |
||||
|
||||
// Set the latest valid message range start block
|
||||
hyperlane_db |
||||
.store_latest_valid_message_range_start_block( |
||||
latest_valid_message_range_start_block, |
||||
) |
||||
.unwrap(); |
||||
|
||||
let indexer = Arc::new(mock_indexer); |
||||
let metrics = Arc::new( |
||||
CoreMetrics::new("contract_sync_test", 9090, prometheus::Registry::new()) |
||||
.expect("could not make metrics"), |
||||
); |
||||
unsafe { MOCK_CURSOR = Some(mock_cursor) }; |
||||
|
||||
let sync_metrics = ContractSyncMetrics::new(metrics); |
||||
|
||||
let contract_sync = ContractSync::new( |
||||
HyperlaneDomain::Known(KnownHyperlaneDomain::Test1), |
||||
hyperlane_db.clone(), |
||||
indexer, |
||||
IndexSettings { |
||||
from: 0, |
||||
chunk_size: 19, |
||||
}, |
||||
sync_metrics, |
||||
); |
||||
|
||||
let sync_task = contract_sync.sync_dispatched_messages(); |
||||
let test_pass_fut = timeout(Duration::from_secs(5), async move { |
||||
let mut interval = interval(Duration::from_millis(20)); |
||||
loop { |
||||
if hyperlane_db.message_by_nonce(0).expect("!db").is_some() |
||||
&& hyperlane_db.message_by_nonce(1).expect("!db").is_some() |
||||
&& hyperlane_db.message_by_nonce(2).expect("!db").is_some() |
||||
&& hyperlane_db.message_by_nonce(3).expect("!db").is_some() |
||||
&& hyperlane_db.message_by_nonce(4).expect("!db").is_some() |
||||
&& hyperlane_db.message_by_nonce(5).expect("!db").is_some() |
||||
{ |
||||
break; |
||||
} |
||||
interval.tick().await; |
||||
} |
||||
}); |
||||
let test_result = select! { |
||||
err = sync_task => Err(eyre!( |
||||
"sync task unexpectedly done before test: {:?}", err.unwrap_err())), |
||||
tests_result = test_pass_fut => |
||||
if tests_result.is_ok() { Ok(()) } else { Err(eyre!("timed out")) } |
||||
}; |
||||
if let Err(err) = test_result { |
||||
panic!("Test failed: {err}") |
||||
} |
||||
}) |
||||
.await |
||||
} |
||||
} |
@ -1,31 +1,153 @@ |
||||
use std::{marker::PhantomData, sync::Arc}; |
||||
|
||||
use derive_new::new; |
||||
|
||||
pub use cursor::*; |
||||
use hyperlane_core::HyperlaneDomain; |
||||
pub use interchain_gas::*; |
||||
pub use mailbox::*; |
||||
use cursor::*; |
||||
use hyperlane_core::{ |
||||
utils::fmt_sync_time, ContractSyncCursor, HyperlaneDomain, HyperlaneLogStore, HyperlaneMessage, |
||||
HyperlaneMessageStore, HyperlaneWatermarkedLogStore, Indexer, MessageIndexer, |
||||
}; |
||||
pub use metrics::ContractSyncMetrics; |
||||
use std::fmt::Debug; |
||||
use tracing::{debug, info}; |
||||
|
||||
use crate::{chains::IndexSettings, db::HyperlaneDB}; |
||||
use crate::chains::IndexSettings; |
||||
|
||||
mod cursor; |
||||
mod eta_calculator; |
||||
mod interchain_gas; |
||||
/// Tools for working with message continuity.
|
||||
pub mod last_message; |
||||
mod mailbox; |
||||
mod metrics; |
||||
mod schema; |
||||
|
||||
/// Entity that drives the syncing of an agent's db with on-chain data.
|
||||
/// Extracts chain-specific data (emitted checkpoints, messages, etc) from an
|
||||
/// `indexer` and fills the agent's db with this data. A CachingMailbox
|
||||
/// will use a contract sync to spawn syncing tasks to keep the db up-to-date.
|
||||
#[derive(Debug, new)] |
||||
pub(crate) struct ContractSync<I> { |
||||
/// `indexer` and fills the agent's db with this data.
|
||||
#[derive(Debug, new, Clone)] |
||||
pub struct ContractSync<T, D: HyperlaneLogStore<T>, I: Indexer<T>> { |
||||
domain: HyperlaneDomain, |
||||
db: HyperlaneDB, |
||||
db: D, |
||||
indexer: I, |
||||
index_settings: IndexSettings, |
||||
metrics: ContractSyncMetrics, |
||||
_phantom: PhantomData<T>, |
||||
} |
||||
|
||||
impl<T, D, I> ContractSync<T, D, I> |
||||
where |
||||
T: Debug + Send + Sync + Clone + 'static, |
||||
D: HyperlaneLogStore<T> + 'static, |
||||
I: Indexer<T> + Clone + 'static, |
||||
{ |
||||
/// The domain that this ContractSync is running on
|
||||
pub fn domain(&self) -> &HyperlaneDomain { |
||||
&self.domain |
||||
} |
||||
|
||||
/// Sync logs and write them to the LogStore
|
||||
#[tracing::instrument(name = "ContractSync", fields(domain=self.domain().name()), skip(self, cursor))] |
||||
pub async fn sync( |
||||
&self, |
||||
label: &'static str, |
||||
mut cursor: Box<dyn ContractSyncCursor<T>>, |
||||
) -> eyre::Result<()> { |
||||
let chain_name = self.domain.as_ref(); |
||||
let indexed_height = self |
||||
.metrics |
||||
.indexed_height |
||||
.with_label_values(&[label, chain_name]); |
||||
let stored_logs = self |
||||
.metrics |
||||
.stored_events |
||||
.with_label_values(&[label, chain_name]); |
||||
|
||||
loop { |
||||
let Ok((from, to, eta)) = cursor.next_range().await else { continue }; |
||||
debug!(from, to, "Looking for for events in block range"); |
||||
|
||||
let logs = self.indexer.fetch_logs(from, to).await?; |
||||
|
||||
info!( |
||||
from, |
||||
to, |
||||
num_logs = logs.len(), |
||||
estimated_time_to_sync = fmt_sync_time(eta), |
||||
"Found log(s) in block range" |
||||
); |
||||
// Store deliveries
|
||||
let stored = self.db.store_logs(&logs).await?; |
||||
// Report amount of deliveries stored into db
|
||||
stored_logs.inc_by(stored as u64); |
||||
// We check the value of the current gauge to avoid overwriting a higher value
|
||||
// when using a ForwardBackwardMessageSyncCursor
|
||||
if to as i64 > indexed_height.get() { |
||||
indexed_height.set(to as i64); |
||||
} |
||||
// Update cursor
|
||||
cursor.update(logs).await?; |
||||
} |
||||
} |
||||
} |
||||
|
||||
/// A ContractSync for syncing events using a RateLimitedContractSyncCursor
|
||||
pub type WatermarkContractSync<T> = |
||||
ContractSync<T, Arc<dyn HyperlaneWatermarkedLogStore<T>>, Arc<dyn Indexer<T>>>; |
||||
impl<T> WatermarkContractSync<T> |
||||
where |
||||
T: Debug + Send + Sync + Clone + 'static, |
||||
{ |
||||
/// Returns a new cursor to be used for syncing events from the indexer based on time
|
||||
pub async fn rate_limited_cursor( |
||||
&self, |
||||
index_settings: IndexSettings, |
||||
) -> Box<dyn ContractSyncCursor<T>> { |
||||
let watermark = self.db.retrieve_high_watermark().await.unwrap(); |
||||
let index_settings = IndexSettings { |
||||
from: watermark.unwrap_or(index_settings.from), |
||||
chunk_size: index_settings.chunk_size, |
||||
}; |
||||
Box::new( |
||||
RateLimitedContractSyncCursor::new( |
||||
Arc::new(self.indexer.clone()), |
||||
self.db.clone(), |
||||
index_settings.chunk_size, |
||||
index_settings.from, |
||||
) |
||||
.await |
||||
.unwrap(), |
||||
) |
||||
} |
||||
} |
||||
|
||||
/// A ContractSync for syncing messages using a MessageSyncCursor
|
||||
pub type MessageContractSync = |
||||
ContractSync<HyperlaneMessage, Arc<dyn HyperlaneMessageStore>, Arc<dyn MessageIndexer>>; |
||||
impl MessageContractSync { |
||||
/// Returns a new cursor to be used for syncing dispatched messages from the indexer
|
||||
pub async fn forward_message_sync_cursor( |
||||
&self, |
||||
index_settings: IndexSettings, |
||||
) -> Box<dyn ContractSyncCursor<HyperlaneMessage>> { |
||||
let forward_data = MessageSyncCursor::new( |
||||
self.indexer.clone(), |
||||
self.db.clone(), |
||||
index_settings.chunk_size, |
||||
index_settings.from, |
||||
index_settings.from, |
||||
0, |
||||
); |
||||
Box::new(ForwardMessageSyncCursor::new(forward_data)) |
||||
} |
||||
|
||||
/// Returns a new cursor to be used for syncing dispatched messages from the indexer
|
||||
pub async fn forward_backward_message_sync_cursor( |
||||
&self, |
||||
index_settings: IndexSettings, |
||||
) -> Box<dyn ContractSyncCursor<HyperlaneMessage>> { |
||||
Box::new( |
||||
ForwardBackwardMessageSyncCursor::new( |
||||
self.indexer.clone(), |
||||
self.db.clone(), |
||||
index_settings.chunk_size, |
||||
) |
||||
.await |
||||
.unwrap(), |
||||
) |
||||
} |
||||
} |
||||
|
@ -1,43 +0,0 @@ |
||||
use eyre::Result; |
||||
|
||||
use crate::db::{DbError, HyperlaneDB}; |
||||
|
||||
/// The start block number of the latest "valid" message block range.
|
||||
/// This is an interval of block indexes where > 0 messages were indexed,
|
||||
/// all of which had a contiguous sequence of messages based off their indices,
|
||||
/// and the lowest index is the successor to the highest index of the prior
|
||||
/// valid range.
|
||||
static LATEST_VALID_MESSAGE_RANGE_START_BLOCK: &str = "latest_valid_message_range_start_block"; |
||||
static LATEST_INDEXED_GAS_PAYMENT_BLOCK: &str = "latest_indexed_gas_payment_block"; |
||||
|
||||
pub(crate) trait MailboxContractSyncDB { |
||||
fn store_latest_valid_message_range_start_block(&self, block_num: u32) -> Result<(), DbError>; |
||||
fn retrieve_latest_valid_message_range_start_block(&self) -> Option<u32>; |
||||
} |
||||
|
||||
impl MailboxContractSyncDB for HyperlaneDB { |
||||
fn store_latest_valid_message_range_start_block(&self, block_num: u32) -> Result<(), DbError> { |
||||
self.store_encodable("", LATEST_VALID_MESSAGE_RANGE_START_BLOCK, &block_num) |
||||
} |
||||
|
||||
fn retrieve_latest_valid_message_range_start_block(&self) -> Option<u32> { |
||||
self.retrieve_decodable("", LATEST_VALID_MESSAGE_RANGE_START_BLOCK) |
||||
.expect("db failure") |
||||
} |
||||
} |
||||
|
||||
pub(crate) trait InterchainGasPaymasterContractSyncDB { |
||||
fn store_latest_indexed_gas_payment_block(&self, latest_block: u32) -> Result<(), DbError>; |
||||
fn retrieve_latest_indexed_gas_payment_block(&self) -> Option<u32>; |
||||
} |
||||
|
||||
impl InterchainGasPaymasterContractSyncDB for HyperlaneDB { |
||||
fn store_latest_indexed_gas_payment_block(&self, latest_block: u32) -> Result<(), DbError> { |
||||
self.store_encodable("", LATEST_INDEXED_GAS_PAYMENT_BLOCK, &latest_block) |
||||
} |
||||
|
||||
fn retrieve_latest_indexed_gas_payment_block(&self) -> Option<u32> { |
||||
self.retrieve_decodable("", LATEST_INDEXED_GAS_PAYMENT_BLOCK) |
||||
.expect("db failure") |
||||
} |
||||
} |
@ -1,106 +1,2 @@ |
||||
use std::path::PathBuf; |
||||
use std::{io, path::Path, sync::Arc}; |
||||
|
||||
use hyperlane_core::HyperlaneProtocolError; |
||||
use rocksdb::{Options, DB as Rocks}; |
||||
use tracing::info; |
||||
|
||||
pub use hyperlane_db::*; |
||||
pub use typed_db::*; |
||||
|
||||
/// Shared functionality surrounding use of rocksdb
|
||||
pub mod iterator; |
||||
|
||||
/// DB operations tied to specific Mailbox
|
||||
mod hyperlane_db; |
||||
/// Type-specific db operations
|
||||
mod typed_db; |
||||
|
||||
/// Internal-use storage types.
|
||||
mod storage_types; |
||||
|
||||
/// Database test utilities.
|
||||
#[cfg(any(test, feature = "test-utils"))] |
||||
pub mod test_utils; |
||||
|
||||
#[derive(Debug, Clone)] |
||||
/// A KV Store
|
||||
pub struct DB(Arc<Rocks>); |
||||
|
||||
impl From<Rocks> for DB { |
||||
fn from(rocks: Rocks) -> Self { |
||||
Self(Arc::new(rocks)) |
||||
} |
||||
} |
||||
|
||||
/// DB Error type
|
||||
#[derive(thiserror::Error, Debug)] |
||||
pub enum DbError { |
||||
/// Rocks DB Error
|
||||
#[error("{0}")] |
||||
RockError(#[from] rocksdb::Error), |
||||
#[error("Failed to open {path}, canonicalized as {canonicalized}: {source}")] |
||||
/// Error opening the database
|
||||
OpeningError { |
||||
/// Rocksdb error during opening
|
||||
#[source] |
||||
source: rocksdb::Error, |
||||
/// Raw database path provided
|
||||
path: PathBuf, |
||||
/// Parsed path used
|
||||
canonicalized: PathBuf, |
||||
}, |
||||
/// Could not parse the provided database path string
|
||||
#[error("Invalid database path supplied {1:?}; {0}")] |
||||
InvalidDbPath(#[source] io::Error, String), |
||||
/// Hyperlane Error
|
||||
#[error("{0}")] |
||||
HyperlaneError(#[from] HyperlaneProtocolError), |
||||
} |
||||
|
||||
type Result<T> = std::result::Result<T, DbError>; |
||||
|
||||
impl DB { |
||||
/// Opens db at `db_path` and creates if missing
|
||||
#[tracing::instrument(err)] |
||||
pub fn from_path(db_path: &Path) -> Result<DB> { |
||||
let path = { |
||||
let mut path = db_path |
||||
.parent() |
||||
.unwrap_or(Path::new(".")) |
||||
.canonicalize() |
||||
.map_err(|e| DbError::InvalidDbPath(e, db_path.to_string_lossy().into()))?; |
||||
if let Some(file_name) = db_path.file_name() { |
||||
path.push(file_name); |
||||
} |
||||
path |
||||
}; |
||||
|
||||
if path.is_dir() { |
||||
info!(path=%path.to_string_lossy(), "Opening existing db") |
||||
} else { |
||||
info!(path=%path.to_string_lossy(), "Creating db") |
||||
} |
||||
|
||||
let mut opts = Options::default(); |
||||
opts.create_if_missing(true); |
||||
|
||||
Rocks::open(&opts, &path) |
||||
.map_err(|e| DbError::OpeningError { |
||||
source: e, |
||||
path: db_path.into(), |
||||
canonicalized: path, |
||||
}) |
||||
.map(Into::into) |
||||
} |
||||
|
||||
/// Store a value in the DB
|
||||
pub fn store(&self, key: &[u8], value: &[u8]) -> Result<()> { |
||||
Ok(self.0.put(key, value)?) |
||||
} |
||||
|
||||
/// Retrieve a value from the DB
|
||||
pub fn retrieve(&self, key: &[u8]) -> Result<Option<Vec<u8>>> { |
||||
Ok(self.0.get(key)?) |
||||
} |
||||
} |
||||
pub use rocks::*; |
||||
mod rocks; |
||||
|
@ -0,0 +1,106 @@ |
||||
use std::path::PathBuf; |
||||
use std::{io, path::Path, sync::Arc}; |
||||
|
||||
use hyperlane_core::HyperlaneProtocolError; |
||||
use rocksdb::{Options, DB as Rocks}; |
||||
use tracing::info; |
||||
|
||||
pub use hyperlane_db::*; |
||||
pub use typed_db::*; |
||||
|
||||
/// Shared functionality surrounding use of rocksdb
|
||||
pub mod iterator; |
||||
|
||||
/// DB operations tied to specific Mailbox
|
||||
mod hyperlane_db; |
||||
/// Type-specific db operations
|
||||
mod typed_db; |
||||
|
||||
/// Internal-use storage types.
|
||||
mod storage_types; |
||||
|
||||
/// Database test utilities.
|
||||
#[cfg(any(test, feature = "test-utils"))] |
||||
pub mod test_utils; |
||||
|
||||
#[derive(Debug, Clone)] |
||||
/// A KV Store
|
||||
pub struct DB(Arc<Rocks>); |
||||
|
||||
impl From<Rocks> for DB { |
||||
fn from(rocks: Rocks) -> Self { |
||||
Self(Arc::new(rocks)) |
||||
} |
||||
} |
||||
|
||||
/// DB Error type
|
||||
#[derive(thiserror::Error, Debug)] |
||||
pub enum DbError { |
||||
/// Rocks DB Error
|
||||
#[error("{0}")] |
||||
RockError(#[from] rocksdb::Error), |
||||
#[error("Failed to open {path}, canonicalized as {canonicalized}: {source}")] |
||||
/// Error opening the database
|
||||
OpeningError { |
||||
/// Rocksdb error during opening
|
||||
#[source] |
||||
source: rocksdb::Error, |
||||
/// Raw database path provided
|
||||
path: PathBuf, |
||||
/// Parsed path used
|
||||
canonicalized: PathBuf, |
||||
}, |
||||
/// Could not parse the provided database path string
|
||||
#[error("Invalid database path supplied {1:?}; {0}")] |
||||
InvalidDbPath(#[source] io::Error, String), |
||||
/// Hyperlane Error
|
||||
#[error("{0}")] |
||||
HyperlaneError(#[from] HyperlaneProtocolError), |
||||
} |
||||
|
||||
type Result<T> = std::result::Result<T, DbError>; |
||||
|
||||
impl DB { |
||||
/// Opens db at `db_path` and creates if missing
|
||||
#[tracing::instrument(err)] |
||||
pub fn from_path(db_path: &Path) -> Result<DB> { |
||||
let path = { |
||||
let mut path = db_path |
||||
.parent() |
||||
.unwrap_or(Path::new(".")) |
||||
.canonicalize() |
||||
.map_err(|e| DbError::InvalidDbPath(e, db_path.to_string_lossy().into()))?; |
||||
if let Some(file_name) = db_path.file_name() { |
||||
path.push(file_name); |
||||
} |
||||
path |
||||
}; |
||||
|
||||
if path.is_dir() { |
||||
info!(path=%path.to_string_lossy(), "Opening existing db") |
||||
} else { |
||||
info!(path=%path.to_string_lossy(), "Creating db") |
||||
} |
||||
|
||||
let mut opts = Options::default(); |
||||
opts.create_if_missing(true); |
||||
|
||||
Rocks::open(&opts, &path) |
||||
.map_err(|e| DbError::OpeningError { |
||||
source: e, |
||||
path: db_path.into(), |
||||
canonicalized: path, |
||||
}) |
||||
.map(Into::into) |
||||
} |
||||
|
||||
/// Store a value in the DB
|
||||
pub fn store(&self, key: &[u8], value: &[u8]) -> Result<()> { |
||||
Ok(self.0.put(key, value)?) |
||||
} |
||||
|
||||
/// Retrieve a value from the DB
|
||||
pub fn retrieve(&self, key: &[u8]) -> Result<Option<Vec<u8>>> { |
||||
Ok(self.0.get(key)?) |
||||
} |
||||
} |
@ -1,51 +0,0 @@ |
||||
use std::fmt::Debug; |
||||
use std::sync::Arc; |
||||
|
||||
use derive_new::new; |
||||
use eyre::Result; |
||||
use tokio::task::JoinHandle; |
||||
use tracing::{info_span, instrument::Instrumented, Instrument}; |
||||
|
||||
use hyperlane_core::{InterchainGasPaymaster, InterchainGasPaymasterIndexer}; |
||||
|
||||
use crate::{chains::IndexSettings, db::HyperlaneDB, ContractSync, ContractSyncMetrics}; |
||||
|
||||
/// Caching InterchainGasPaymaster type
|
||||
#[derive(Debug, Clone, new)] |
||||
pub struct CachingInterchainGasPaymaster { |
||||
paymaster: Arc<dyn InterchainGasPaymaster>, |
||||
db: HyperlaneDB, |
||||
indexer: Arc<dyn InterchainGasPaymasterIndexer>, |
||||
} |
||||
|
||||
impl CachingInterchainGasPaymaster { |
||||
/// Return handle on paymaster object
|
||||
pub fn paymaster(&self) -> &Arc<dyn InterchainGasPaymaster> { |
||||
&self.paymaster |
||||
} |
||||
|
||||
/// Return handle on HyperlaneDB
|
||||
pub fn db(&self) -> &HyperlaneDB { |
||||
&self.db |
||||
} |
||||
|
||||
/// Spawn a task that syncs the CachingInterchainGasPaymaster's db with the
|
||||
/// on-chain event data
|
||||
pub fn sync( |
||||
&self, |
||||
index_settings: IndexSettings, |
||||
metrics: ContractSyncMetrics, |
||||
) -> Instrumented<JoinHandle<Result<()>>> { |
||||
let sync = ContractSync::new( |
||||
self.paymaster.domain().clone(), |
||||
self.db.clone(), |
||||
self.indexer.clone(), |
||||
index_settings, |
||||
metrics, |
||||
); |
||||
|
||||
tokio::spawn(async move { sync.sync_gas_payments().await }).instrument( |
||||
info_span!("InterchainGasPaymasterContractSync", self=%self.paymaster.domain()), |
||||
) |
||||
} |
||||
} |
@ -1,120 +0,0 @@ |
||||
use std::fmt::Debug; |
||||
use std::num::NonZeroU64; |
||||
use std::sync::Arc; |
||||
|
||||
use async_trait::async_trait; |
||||
use derive_new::new; |
||||
use tokio::task::JoinHandle; |
||||
use tracing::{info_span, instrument::Instrumented, Instrument}; |
||||
|
||||
use hyperlane_core::{ |
||||
ChainResult, Checkpoint, HyperlaneChain, HyperlaneContract, HyperlaneDomain, HyperlaneMessage, |
||||
HyperlaneProvider, Mailbox, MailboxIndexer, TxCostEstimate, TxOutcome, H256, U256, |
||||
}; |
||||
|
||||
use crate::{chains::IndexSettings, db::HyperlaneDB, ContractSync, ContractSyncMetrics}; |
||||
|
||||
/// Caching Mailbox type
|
||||
#[derive(Debug, Clone, new)] |
||||
pub struct CachingMailbox { |
||||
mailbox: Arc<dyn Mailbox>, |
||||
db: HyperlaneDB, |
||||
indexer: Arc<dyn MailboxIndexer>, |
||||
} |
||||
|
||||
impl CachingMailbox { |
||||
/// Return handle on mailbox object
|
||||
pub fn mailbox(&self) -> &Arc<dyn Mailbox> { |
||||
&self.mailbox |
||||
} |
||||
|
||||
/// Return handle on HyperlaneDB
|
||||
pub fn db(&self) -> &HyperlaneDB { |
||||
&self.db |
||||
} |
||||
|
||||
/// Spawn a task that syncs the CachingMailbox's db with the on-chain event
|
||||
/// data
|
||||
pub fn sync( |
||||
&self, |
||||
index_settings: IndexSettings, |
||||
metrics: ContractSyncMetrics, |
||||
) -> Instrumented<JoinHandle<eyre::Result<()>>> { |
||||
let sync = ContractSync::new( |
||||
self.mailbox.domain().clone(), |
||||
self.db.clone(), |
||||
self.indexer.clone(), |
||||
index_settings, |
||||
metrics, |
||||
); |
||||
|
||||
tokio::spawn(async move { sync.sync_dispatched_messages().await }) |
||||
.instrument(info_span!("MailboxContractSync", domain=%self.mailbox.domain())) |
||||
} |
||||
} |
||||
|
||||
#[async_trait] |
||||
impl Mailbox for CachingMailbox { |
||||
fn domain_hash(&self) -> H256 { |
||||
self.mailbox.domain_hash() |
||||
} |
||||
|
||||
async fn count(&self, maybe_lag: Option<NonZeroU64>) -> ChainResult<u32> { |
||||
self.mailbox.count(maybe_lag).await |
||||
} |
||||
|
||||
/// Fetch the status of a message
|
||||
async fn delivered(&self, id: H256) -> ChainResult<bool> { |
||||
self.mailbox.delivered(id).await |
||||
} |
||||
|
||||
async fn latest_checkpoint(&self, maybe_lag: Option<NonZeroU64>) -> ChainResult<Checkpoint> { |
||||
self.mailbox.latest_checkpoint(maybe_lag).await |
||||
} |
||||
|
||||
/// Fetch the current default interchain security module value
|
||||
async fn default_ism(&self) -> ChainResult<H256> { |
||||
self.mailbox.default_ism().await |
||||
} |
||||
|
||||
async fn recipient_ism(&self, recipient: H256) -> ChainResult<H256> { |
||||
self.mailbox.recipient_ism(recipient).await |
||||
} |
||||
|
||||
async fn process( |
||||
&self, |
||||
message: &HyperlaneMessage, |
||||
metadata: &[u8], |
||||
tx_gas_limit: Option<U256>, |
||||
) -> ChainResult<TxOutcome> { |
||||
self.mailbox.process(message, metadata, tx_gas_limit).await |
||||
} |
||||
|
||||
async fn process_estimate_costs( |
||||
&self, |
||||
message: &HyperlaneMessage, |
||||
metadata: &[u8], |
||||
) -> ChainResult<TxCostEstimate> { |
||||
self.mailbox.process_estimate_costs(message, metadata).await |
||||
} |
||||
|
||||
fn process_calldata(&self, message: &HyperlaneMessage, metadata: &[u8]) -> Vec<u8> { |
||||
self.mailbox.process_calldata(message, metadata) |
||||
} |
||||
} |
||||
|
||||
impl HyperlaneChain for CachingMailbox { |
||||
fn domain(&self) -> &HyperlaneDomain { |
||||
self.mailbox.domain() |
||||
} |
||||
|
||||
fn provider(&self) -> Box<dyn HyperlaneProvider> { |
||||
self.mailbox.provider() |
||||
} |
||||
} |
||||
|
||||
impl HyperlaneContract for CachingMailbox { |
||||
fn address(&self) -> H256 { |
||||
self.mailbox.address() |
||||
} |
||||
} |
@ -0,0 +1,36 @@ |
||||
use std::fmt::Debug; |
||||
|
||||
use async_trait::async_trait; |
||||
use auto_impl::auto_impl; |
||||
use eyre::Result; |
||||
|
||||
use crate::{HyperlaneMessage, LogMeta}; |
||||
|
||||
/// Interface for a HyperlaneLogStore that ingests logs.
|
||||
#[async_trait] |
||||
#[auto_impl(&, Box, Arc)] |
||||
pub trait HyperlaneLogStore<T>: Send + Sync + Debug { |
||||
/// Store a list of logs and their associated metadata
|
||||
/// Returns the number of elements that were stored.
|
||||
async fn store_logs(&self, logs: &[(T, LogMeta)]) -> Result<u32>; |
||||
} |
||||
|
||||
/// Extension of HyperlaneLogStore trait that supports getting the block number at which a known message was dispatched.
|
||||
#[async_trait] |
||||
#[auto_impl(&, Box, Arc)] |
||||
pub trait HyperlaneMessageStore: HyperlaneLogStore<HyperlaneMessage> { |
||||
/// Gets a message by nonce.
|
||||
async fn retrieve_message_by_nonce(&self, nonce: u32) -> Result<Option<HyperlaneMessage>>; |
||||
/// Gets the block number at which a message was dispatched.
|
||||
async fn retrieve_dispatched_block_number(&self, nonce: u32) -> Result<Option<u64>>; |
||||
} |
||||
|
||||
/// Extension of HyperlaneLogStore trait that supports a high watermark for the highest indexed block number.
|
||||
#[async_trait] |
||||
#[auto_impl(&, Box, Arc)] |
||||
pub trait HyperlaneWatermarkedLogStore<T>: HyperlaneLogStore<T> { |
||||
/// Gets the block number high watermark
|
||||
async fn retrieve_high_watermark(&self) -> Result<Option<u32>>; |
||||
/// Stores the block number high watermark
|
||||
async fn store_high_watermark(&self, block_number: u32) -> Result<()>; |
||||
} |
@ -1,40 +0,0 @@ |
||||
#![allow(non_snake_case)] |
||||
|
||||
use std::future::Future; |
||||
use std::time::Duration; |
||||
|
||||
use async_trait::async_trait; |
||||
use mockall::mock; |
||||
|
||||
use hyperlane_core::{ChainResult, SyncBlockRangeCursor}; |
||||
|
||||
mock! { |
||||
pub SyncBlockRangeCursor { |
||||
pub fn _next_range(&mut self) -> impl Future<Output=ChainResult<(u32, u32, Duration)>> + Send {} |
||||
|
||||
pub fn _current_position(&self) -> u32 {} |
||||
|
||||
pub fn _tip(&self) -> u32 {} |
||||
|
||||
pub fn _backtrack(&mut self, start_from: u32) {} |
||||
} |
||||
} |
||||
|
||||
#[async_trait] |
||||
impl SyncBlockRangeCursor for MockSyncBlockRangeCursor { |
||||
fn current_position(&self) -> u32 { |
||||
self._current_position() |
||||
} |
||||
|
||||
fn tip(&self) -> u32 { |
||||
self._tip() |
||||
} |
||||
|
||||
async fn next_range(&mut self) -> ChainResult<(u32, u32, Duration)> { |
||||
self._next_range().await |
||||
} |
||||
|
||||
fn backtrack(&mut self, start_from: u32) { |
||||
self._backtrack(start_from) |
||||
} |
||||
} |
@ -1,60 +0,0 @@ |
||||
#![allow(non_snake_case)] |
||||
|
||||
use async_trait::async_trait; |
||||
use mockall::*; |
||||
|
||||
use hyperlane_core::{ChainResult, HyperlaneMessage, Indexer, LogMeta, MailboxIndexer, H256}; |
||||
|
||||
mock! { |
||||
pub Indexer { |
||||
pub fn _get_finalized_block_number(&self) -> ChainResult<u32> {} |
||||
|
||||
pub fn _fetch_sorted_messages(&self, from: u32, to: u32) -> ChainResult<Vec<(HyperlaneMessage, LogMeta)>> {} |
||||
} |
||||
} |
||||
|
||||
impl std::fmt::Debug for MockIndexer { |
||||
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { |
||||
write!(f, "MockIndexer") |
||||
} |
||||
} |
||||
|
||||
mock! { |
||||
pub HyperlaneIndexer { |
||||
pub fn _get_finalized_block_number(&self) -> ChainResult<u32> {} |
||||
pub fn _fetch_sorted_messages(&self, from: u32, to: u32) -> ChainResult<Vec<(HyperlaneMessage, LogMeta)>> {} |
||||
pub fn _fetch_delivered_messages(&self, from: u32, to: u32) -> ChainResult<Vec<(H256, LogMeta)>> {} |
||||
} |
||||
} |
||||
|
||||
impl std::fmt::Debug for MockHyperlaneIndexer { |
||||
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { |
||||
write!(f, "MockHyperlaneIndexer") |
||||
} |
||||
} |
||||
|
||||
#[async_trait] |
||||
impl Indexer for MockHyperlaneIndexer { |
||||
async fn get_finalized_block_number(&self) -> ChainResult<u32> { |
||||
self._get_finalized_block_number() |
||||
} |
||||
} |
||||
|
||||
#[async_trait] |
||||
impl MailboxIndexer for MockHyperlaneIndexer { |
||||
async fn fetch_sorted_messages( |
||||
&self, |
||||
from: u32, |
||||
to: u32, |
||||
) -> ChainResult<Vec<(HyperlaneMessage, LogMeta)>> { |
||||
self._fetch_sorted_messages(from, to) |
||||
} |
||||
|
||||
async fn fetch_delivered_messages( |
||||
&self, |
||||
from: u32, |
||||
to: u32, |
||||
) -> ChainResult<Vec<(H256, LogMeta)>> { |
||||
self._fetch_delivered_messages(from, to) |
||||
} |
||||
} |
@ -1,11 +1,4 @@ |
||||
/// Mock mailbox contract
|
||||
pub mod mailbox; |
||||
|
||||
/// Mock indexer
|
||||
pub mod indexer; |
||||
|
||||
/// Mock SyncBlockRangeCursor
|
||||
pub mod cursor; |
||||
|
||||
pub use indexer::MockIndexer; |
||||
pub use mailbox::MockMailboxContract; |
||||
|
Loading…
Reference in new issue