Better Indexing (#2036)

### Description

* Logs the distance from the tip
* Calculates and logs the time to fully sync
* Changes indexing timeout when catching up from 1s to 100ms

### Drive-by changes

* Cleans up some log messages at the `info` level.
* Refactors the indexing logic to be a bit cleaner/simpler
* Updated all indexers to use the eta calculations

### Related issues

- Fixes #1872
- Fixes #1851 

### Backward compatibility

_Are these changes backward compatible?_

Yes

_Are there any infrastructure implications, e.g. changes that would
prohibit deploying older commits using this infra tooling?_

None


### Testing

_What kind of testing have these changes undergone?_

Manual
pull/2094/head
Mattie Conover 2 years ago committed by GitHub
parent 6d40e193b1
commit e91c3100c2
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 63
      rust/agents/scraper/src/chain_scraper/sync.rs
  2. 2
      rust/chains/hyperlane-ethereum/src/mailbox.rs
  3. 4
      rust/chains/hyperlane-fuel/src/mailbox.rs
  4. 72
      rust/hyperlane-base/src/contract_sync/cursor.rs
  5. 65
      rust/hyperlane-base/src/contract_sync/eta_calculator.rs
  6. 80
      rust/hyperlane-base/src/contract_sync/interchain_gas.rs
  7. 266
      rust/hyperlane-base/src/contract_sync/mailbox.rs
  8. 6
      rust/hyperlane-base/src/contract_sync/mod.rs
  9. 16
      rust/hyperlane-base/src/interchain_gas.rs
  10. 16
      rust/hyperlane-base/src/mailbox.rs
  11. 8
      rust/hyperlane-core/src/db/hyperlane_db.rs
  12. 18
      rust/hyperlane-core/src/traits/cursor.rs
  13. 32
      rust/hyperlane-core/src/utils.rs
  14. 11
      rust/hyperlane-test/src/mocks/cursor.rs

@ -1,16 +1,18 @@
use std::collections::HashMap;
use std::ops::Deref;
use std::sync::Arc;
use std::time::Duration;
use eyre::Result;
use itertools::Itertools;
use prometheus::{IntCounter, IntGauge, IntGaugeVec};
use time::Instant;
use tracing::{debug, info, instrument, trace, warn};
use hyperlane_base::last_message::validate_message_continuity;
use hyperlane_base::RateLimitedSyncBlockRangeCursor;
use hyperlane_base::{last_message::validate_message_continuity, RateLimitedSyncBlockRangeCursor};
use hyperlane_core::{
KnownHyperlaneDomain, ListValidity, MailboxIndexer, SyncBlockRangeCursor, H256,
utils::fmt_sync_time, KnownHyperlaneDomain, ListValidity, MailboxIndexer, SyncBlockRangeCursor,
H256,
};
use crate::chain_scraper::{
@ -112,15 +114,38 @@ impl Syncer {
info!(from = start_block, "Resuming chain sync");
self.indexed_height.set(start_block as i64);
let mut last_logged_time: Option<Instant> = None;
let mut should_log_checkpoint_info = || {
if last_logged_time.is_none()
|| last_logged_time.unwrap().elapsed() > Duration::from_secs(30)
{
last_logged_time = Some(Instant::now());
true
} else {
false
}
};
loop {
let start_block = self.sync_cursor.current_position();
let (from, to) = match self.sync_cursor.next_range().await {
Ok(range) => range,
Err(err) => {
warn!(error = %err, "failed to get next block range");
continue;
}
};
let Ok((from, to, eta)) = self.sync_cursor.next_range().await else { continue };
if should_log_checkpoint_info() {
info!(
from,
to,
distance_from_tip = self.sync_cursor.distance_from_tip(),
estimated_time_to_sync = fmt_sync_time(eta),
"Syncing range"
);
} else {
debug!(
from,
to,
distance_from_tip = self.sync_cursor.distance_from_tip(),
estimated_time_to_sync = fmt_sync_time(eta),
"Syncing range"
);
}
let extracted = self.scrape_range(from, to).await?;
@ -183,9 +208,9 @@ impl Syncer {
.mailbox_indexer
.fetch_sorted_messages(from, to)
.await?;
trace!(from, to, ?sorted_messages, "Fetched messages");
trace!(?sorted_messages, "Fetched messages");
debug!(from, to, "Fetching deliveries for range");
debug!("Fetching deliveries for range");
let deliveries = self
.contracts
.mailbox_indexer
@ -194,9 +219,9 @@ impl Syncer {
.into_iter()
.map(|(message_id, meta)| Delivery { message_id, meta })
.collect_vec();
trace!(from, to, ?deliveries, "Fetched deliveries");
trace!(?deliveries, "Fetched deliveries");
debug!(from, to, "Fetching payments for range");
debug!("Fetching payments for range");
let payments = self
.contracts
.igp_indexer
@ -205,14 +230,12 @@ impl Syncer {
.into_iter()
.map(|(payment, meta)| Payment { payment, meta })
.collect_vec();
trace!(from, to, ?payments, "Fetched payments");
trace!(?payments, "Fetched payments");
info!(
from,
to,
message_count = sorted_messages.len(),
deliveries_count = deliveries.len(),
payments = payments.len(),
delivery_count = deliveries.len(),
payment_count = payments.len(),
"Indexed block range for chain"
);
@ -226,8 +249,6 @@ impl Syncer {
.collect::<Vec<_>>();
debug!(
from,
to,
message_count = sorted_messages.len(),
"Filtered any messages already indexed for mailbox"
);

@ -296,7 +296,7 @@ where
Ok(self.contract.count().call().await?)
}
#[instrument(err, ret)]
#[instrument(level = "debug", err, ret, skip(self))]
async fn delivered(&self, id: H256) -> ChainResult<bool> {
Ok(self.contract.delivered(id.into()).call().await?)
}

@ -76,12 +76,12 @@ impl Mailbox for FuelMailbox {
.map_err(ChainCommunicationError::from_other)
}
#[instrument(err, ret, skip(self))]
#[instrument(level = "debug", err, ret, skip(self))]
async fn delivered(&self, id: H256) -> ChainResult<bool> {
todo!()
}
#[instrument(err, ret, skip(self))]
#[instrument(level = "debug", err, ret, skip(self))]
async fn latest_checkpoint(&self, lag: Option<NonZeroU64>) -> ChainResult<Checkpoint> {
assert!(
lag.is_none(),

@ -3,9 +3,15 @@ use std::time::{Duration, Instant};
use async_trait::async_trait;
use eyre::Result;
use tokio::time::sleep;
use tracing::warn;
use hyperlane_core::{ChainResult, Indexer, SyncBlockRangeCursor};
use crate::contract_sync::eta_calculator::SyncerEtaCalculator;
/// Time window for the moving average used in the eta calculator in seconds.
const ETA_TIME_WINDOW: f64 = 2. * 60.;
/// Tool for handling the logic of what the next block range that should be
/// queried is and also handling rate limiting. Rate limiting is automatically
/// performed by `next_range`.
@ -15,6 +21,7 @@ pub struct RateLimitedSyncBlockRangeCursor<I> {
last_tip_update: Instant,
chunk_size: u32,
from: u32,
eta_calculator: SyncerEtaCalculator,
}
impl<I> RateLimitedSyncBlockRangeCursor<I>
@ -30,43 +37,39 @@ where
chunk_size,
last_tip_update: Instant::now(),
from: initial_height,
eta_calculator: SyncerEtaCalculator::new(initial_height, tip, ETA_TIME_WINDOW),
})
}
/// Wait based on how close we are to the tip and update the tip,
/// i.e. the highest block we may scrape.
async fn rate_limit(&mut self) -> ChainResult<()> {
let update_tip = self.last_tip_update.elapsed() >= Duration::from_secs(30);
if self.from + self.chunk_size < self.tip {
// If doing the full chunk wouldn't exceed the already known tip,
// we don't necessarily need to fetch the new tip. Sleep a tiny bit
// so that we can catch up to the tip relatively quickly.
sleep(Duration::from_secs(1)).await;
Ok(())
} else {
// If doing the full chunk wouldn't exceed the already known tip sleep a tiny
// bit so that we can catch up relatively quickly.
sleep(Duration::from_millis(100)).await;
} else if !update_tip {
// We are close to the tip.
if self.last_tip_update.elapsed() < Duration::from_secs(30) {
// Sleep a little longer because we have caught up.
sleep(Duration::from_secs(10)).await;
} else {
// We are probably not caught up yet. This would happen if we
// started really far behind so now it is very likely the tip
// has moved a significant distance. We don't want to wait in
// this case any more than we normally would.
sleep(Duration::from_secs(1)).await;
}
// Sleep a little longer because we have caught up.
sleep(Duration::from_secs(10)).await;
}
match self.indexer.get_finalized_block_number().await {
Ok(tip) => {
// we retrieved a new tip value, go ahead and update.
self.last_tip_update = Instant::now();
self.tip = tip;
Ok(())
}
Err(e) => {
// we are failing to make a basic query, we should wait before retrying.
sleep(Duration::from_secs(10)).await;
Err(e)
}
if !update_tip {
return Ok(());
}
match self.indexer.get_finalized_block_number().await {
Ok(tip) => {
// we retrieved a new tip value, go ahead and update.
self.last_tip_update = Instant::now();
self.tip = tip;
Ok(())
}
Err(e) => {
warn!(error = %e, "Failed to get next block range because we could not get the current tip");
// we are failing to make a basic query, we should wait before retrying.
sleep(Duration::from_secs(10)).await;
Err(e)
}
}
}
@ -78,12 +81,21 @@ impl<I: Indexer> SyncBlockRangeCursor for RateLimitedSyncBlockRangeCursor<I> {
self.from
}
async fn next_range(&mut self) -> ChainResult<(u32, u32)> {
fn tip(&self) -> u32 {
self.tip
}
async fn next_range(&mut self) -> ChainResult<(u32, u32, Duration)> {
self.rate_limit().await?;
let to = u32::min(self.tip, self.from + self.chunk_size);
let from = to.saturating_sub(self.chunk_size);
self.from = to + 1;
Ok((from, to))
let eta = if to < self.tip {
self.eta_calculator.calculate(from, self.tip)
} else {
Duration::from_secs(0)
};
Ok((from, to, eta))
}
fn backtrack(&mut self, start_from: u32) {

@ -0,0 +1,65 @@
use std::time::{Duration, Instant};
use derive_new::new;
/// Calculates the expected time to catch up to the tip of the blockchain.
#[derive(new)]
pub(crate) struct SyncerEtaCalculator {
#[new(value = "Instant::now()")]
last_time: Instant,
last_block: u32,
last_tip: u32,
#[new(default)]
last_eta: Duration,
/// Block processing rate less the tip progression rate. It works
/// mathematically to have both rates merged as we are using a moving
/// average so partial updates will not overwrite
#[new(default)]
effective_rate: Option<f64>,
/// How long we want the data to "survive" for in the moving average.
time_window: f64,
}
impl SyncerEtaCalculator {
/// Calculate the expected time to catch up to the tip of the blockchain.
pub fn calculate(&mut self, current_block: u32, current_tip: u32) -> Duration {
let now = Instant::now();
let elapsed = now.duration_since(self.last_time).as_secs_f64();
self.last_time = now;
let blocks_processed = (current_block - self.last_block) as f64;
let tip_progression = (current_tip - self.last_tip) as f64;
self.last_block = current_block;
self.last_tip = current_tip;
let new_rate = (blocks_processed - tip_progression) / elapsed;
// Calculate the effective rate using a moving average. Only set the past
// effective rate once we have seen a move to prevent it taking a long
// time to normalize.
let effective_rate = if let Some(old_rate) = self.effective_rate {
let new_coeff = f64::min(elapsed / self.time_window, 0.9);
let old_coeff = 1. - new_coeff;
let er = (new_rate * new_coeff) + (old_rate * old_coeff);
self.effective_rate = Some(er);
er
} else {
if new_rate != 0. {
self.effective_rate = Some(new_rate);
}
new_rate
};
self.last_eta = if effective_rate <= 0. {
// max out at 1yr if we are behind
Duration::from_secs_f64(60. * 60. * 24. * 365.25)
} else {
Duration::from_secs_f64((current_tip - current_block) as f64 / effective_rate)
};
self.last_eta
}
}

@ -1,7 +1,6 @@
use tokio::task::JoinHandle;
use tracing::{debug, info, info_span, instrument::Instrumented, warn, Instrument};
use tracing::{debug, info, instrument};
use hyperlane_core::{InterchainGasPaymasterIndexer, SyncBlockRangeCursor};
use hyperlane_core::{utils::fmt_sync_time, InterchainGasPaymasterIndexer, SyncBlockRangeCursor};
use crate::{
contract_sync::{
@ -17,13 +16,9 @@ where
I: InterchainGasPaymasterIndexer + Clone + 'static,
{
/// Sync gas payments
pub fn sync_gas_payments(&self) -> Instrumented<JoinHandle<eyre::Result<()>>> {
let span = info_span!("GasPaymentContractSync");
let db = self.db.clone();
let indexer = self.indexer.clone();
let chain_name = self.domain.name();
#[instrument(name = "GasPaymentContractSync", skip(self))]
pub(crate) async fn sync_gas_payments(&self) -> eyre::Result<()> {
let chain_name = self.domain.as_ref();
let indexed_height = self
.metrics
.indexed_height
@ -35,56 +30,49 @@ where
let cursor = {
let config_initial_height = self.index_settings.from();
let initial_height = db
let initial_height = self
.db
.retrieve_latest_indexed_gas_payment_block()
.map_or(config_initial_height, |b| b + 1);
RateLimitedSyncBlockRangeCursor::new(
indexer.clone(),
self.indexer.clone(),
self.index_settings.chunk_size(),
initial_height,
)
};
tokio::spawn(async move {
let mut cursor = cursor.await?;
let start_block = cursor.current_position();
info!(from = start_block, "Resuming indexer");
indexed_height.set(start_block as i64);
let mut cursor = cursor.await?;
loop {
let (from, to) = match cursor.next_range().await {
Ok(range) => range,
Err(err) => {
warn!(error = %err, "Failed to get next block range");
continue;
}
};
let start_block = cursor.current_position();
info!(from = start_block, "Resuming indexer");
indexed_height.set(start_block as i64);
let gas_payments = indexer.fetch_gas_payments(from, to).await?;
loop {
let Ok((from, to, eta)) = cursor.next_range().await else { continue };
let gas_payments = self.indexer.fetch_gas_payments(from, to).await?;
debug!(
from,
to,
gas_payments_count = gas_payments.len(),
"Indexed block range"
);
debug!(
from,
to,
distance_from_tip = cursor.distance_from_tip(),
gas_payments_count = gas_payments.len(),
estimated_time_to_sync = fmt_sync_time(eta),
"Indexed block range"
);
let mut new_payments_processed: u64 = 0;
for (payment, meta) in gas_payments.iter() {
// Attempt to process the gas payment, incrementing new_payments_processed
// if it was processed for the first time.
if db.process_gas_payment(*payment, meta)? {
new_payments_processed += 1;
}
let mut new_payments_processed: u64 = 0;
for (payment, meta) in gas_payments.iter() {
// Attempt to process the gas payment, incrementing new_payments_processed
// if it was processed for the first time.
if self.db.process_gas_payment(*payment, meta)? {
new_payments_processed += 1;
}
}
stored_messages.inc_by(new_payments_processed);
stored_messages.inc_by(new_payments_processed);
db.store_latest_indexed_gas_payment_block(from)?;
indexed_height.set(to as i64);
}
})
.instrument(span)
self.db.store_latest_indexed_gas_payment_block(from)?;
indexed_height.set(to as i64);
}
}
}

@ -1,12 +1,16 @@
use tracing::{debug, info, info_span, warn};
use tracing::{instrument::Instrumented, Instrument};
use std::time::{Duration, Instant};
use tracing::{debug, info, instrument, warn};
use hyperlane_core::{
Indexer, KnownHyperlaneDomain, ListValidity, MailboxIndexer, SyncBlockRangeCursor,
utils::fmt_sync_time, Indexer, KnownHyperlaneDomain, ListValidity, MailboxIndexer,
SyncBlockRangeCursor,
};
use crate::contract_sync::last_message::validate_message_continuity;
use crate::{contract_sync::schema::MailboxContractSyncDB, ContractSync};
use crate::{
contract_sync::{last_message::validate_message_continuity, schema::MailboxContractSyncDB},
ContractSync,
};
const MESSAGES_LABEL: &str = "messages";
@ -15,37 +19,32 @@ where
I: MailboxIndexer + Clone + 'static,
{
/// Sync dispatched messages
pub fn sync_dispatched_messages(
&self,
) -> Instrumented<tokio::task::JoinHandle<eyre::Result<()>>> {
let span = info_span!("MessageContractSync");
let db = self.db.clone();
let indexer = self.indexer.clone();
let chain_name = self.domain.to_string();
#[instrument(name = "MessageContractSync", skip(self))]
pub(crate) async fn sync_dispatched_messages(&self) -> eyre::Result<()> {
let chain_name = self.domain.as_ref();
let indexed_height = self
.metrics
.indexed_height
.with_label_values(&[MESSAGES_LABEL, &chain_name]);
.with_label_values(&[MESSAGES_LABEL, chain_name]);
let stored_messages = self
.metrics
.stored_events
.with_label_values(&[MESSAGES_LABEL, &chain_name]);
.with_label_values(&[MESSAGES_LABEL, chain_name]);
let missed_messages = self
.metrics
.missed_events
.with_label_values(&[MESSAGES_LABEL, &chain_name]);
.with_label_values(&[MESSAGES_LABEL, chain_name]);
let message_nonce = self.metrics.message_nonce.clone();
let cursor = {
let config_initial_height = self.index_settings.from();
let initial_height = db
let initial_height = self
.db
.retrieve_latest_valid_message_range_start_block()
.map_or(config_initial_height, |b| b + 1);
create_cursor(
indexer.clone(),
self.indexer.clone(),
self.index_settings.chunk_size(),
initial_height,
)
@ -98,107 +97,142 @@ where
// Note this means we only handle this case upon observing messages in some
// range [C,D] that indicate a previously indexed range may have
// missed some messages.
tokio::spawn(async move {
let mut cursor = cursor.await?;
let mut cursor = cursor.await?;
let start_block = cursor.current_position();
let mut last_valid_range_start_block = start_block;
info!(
from = start_block,
"Resuming indexer from latest valid message range start block"
);
indexed_height.set(start_block as i64);
let mut last_logged_time: Option<Instant> = None;
let mut should_log_checkpoint_info = || {
if last_logged_time.is_none()
|| last_logged_time.unwrap().elapsed() > Duration::from_secs(30)
{
last_logged_time = Some(Instant::now());
true
} else {
false
}
};
loop {
let start_block = cursor.current_position();
let mut last_valid_range_start_block = start_block;
info!(from = start_block, "Resuming indexer from latest valid message range start block");
indexed_height.set(start_block as i64);
loop {
let start_block = cursor.current_position();
let (from, to) = match cursor.next_range().await {
Ok(range) => range,
Err(err) => {
warn!(error = %err, "Failed to get next block range");
continue;
}
};
let mut sorted_messages: Vec<_> = indexer
.fetch_sorted_messages(from, to)
.await?
.into_iter()
.map(|(msg, _)| msg)
.collect();
debug!(from, to, message_count = sorted_messages.len(), "Indexed block range");
// Get the latest known nonce. All messages whose indices are <= this index
// have been stored in the DB.
let last_nonce = db.retrieve_latest_nonce()?;
// Filter out any messages that have already been successfully indexed and stored.
// This is necessary if we're re-indexing blocks in hope of finding missing messages.
if let Some(min_nonce) = last_nonce {
sorted_messages.retain(|m| m.nonce > min_nonce);
}
let Ok((from, to, eta)) = cursor.next_range().await else { continue };
let mut sorted_messages: Vec<_> = self
.indexer
.fetch_sorted_messages(from, to)
.await?
.into_iter()
.map(|(msg, _)| msg)
.collect();
if should_log_checkpoint_info() {
info!(
from,
to,
distance_from_tip = cursor.distance_from_tip(),
estimated_time_to_sync = fmt_sync_time(eta),
message_count = sorted_messages.len(),
"Indexed block range"
);
} else {
debug!(
from,
to,
distance_from_tip = cursor.distance_from_tip(),
estimated_time_to_sync = fmt_sync_time(eta),
message_count = sorted_messages.len(),
"Indexed block range"
);
}
debug!(from, to, message_count = sorted_messages.len(), "Filtered any messages already indexed");
// Get the latest known nonce. All messages whose indices are <= this index
// have been stored in the DB.
let last_nonce = self.db.retrieve_latest_nonce()?;
// Ensure the sorted messages are a valid continuation of last_nonce
match validate_message_continuity(last_nonce, &sorted_messages.iter().collect::<Vec<_>>()) {
ListValidity::Valid => {
// Store messages
let max_nonce_of_batch = db.store_messages(&sorted_messages)?;
// Filter out any messages that have already been successfully indexed and
// stored. This is necessary if we're re-indexing blocks in hope of
// finding missing messages.
if let Some(min_nonce) = last_nonce {
sorted_messages.retain(|m| m.nonce > min_nonce);
}
// Report amount of messages stored into db
stored_messages.inc_by(sorted_messages.len() as u64);
debug!(
from,
to,
message_count = sorted_messages.len(),
"Filtered any messages already indexed"
);
// Report latest nonce to gauge by dst
for msg in sorted_messages.iter() {
let dst = KnownHyperlaneDomain::try_from(msg.destination).map(|d| d.as_str()).unwrap_or("unknown");
message_nonce
.with_label_values(&["dispatch", &chain_name, dst])
.set(max_nonce_of_batch as i64);
}
// Ensure the sorted messages are a valid continuation of last_nonce
match validate_message_continuity(
last_nonce,
&sorted_messages.iter().collect::<Vec<_>>(),
) {
ListValidity::Valid => {
// Store messages
let max_nonce_of_batch = self.db.store_messages(&sorted_messages)?;
// Report amount of messages stored into db
stored_messages.inc_by(sorted_messages.len() as u64);
// Report latest nonce to gauge by dst
for msg in sorted_messages.iter() {
let dst = KnownHyperlaneDomain::try_from(msg.destination)
.map(|d| d.as_str())
.unwrap_or("unknown");
message_nonce
.with_label_values(&["dispatch", chain_name, dst])
.set(max_nonce_of_batch as i64);
}
// Update the latest valid start block.
db.store_latest_valid_message_range_start_block(from)?;
last_valid_range_start_block = from;
// Update the latest valid start block.
self.db.store_latest_valid_message_range_start_block(from)?;
last_valid_range_start_block = from;
// Move forward to the next height
indexed_height.set(to as i64);
}
// The index of the first message in sorted_messages is not the
// `last_nonce+1`.
ListValidity::InvalidContinuation => {
missed_messages.inc();
warn!(
last_nonce = ?last_nonce,
start_block = from,
end_block = to,
last_valid_range_start_block,
"Found invalid continuation in range. Re-indexing from the start block of the last successful range.",
);
cursor.backtrack(last_valid_range_start_block);
indexed_height.set(last_valid_range_start_block as i64);
}
ListValidity::ContainsGaps => {
missed_messages.inc();
cursor.backtrack(start_block);
warn!(
last_nonce = ?last_nonce,
start_block = from,
end_block = to,
"Found gaps in the messages in range, re-indexing the same range.",
);
}
ListValidity::Empty => {
// Continue if no messages found.
// We don't update last_valid_range_start_block because we cannot extrapolate
// if the range was correctly indexed if there are no messages to observe their
// indices.
indexed_height.set(to as i64);
}
};
}
})
.instrument(span)
// Move forward to the next height
indexed_height.set(to as i64);
}
// The index of the first message in sorted_messages is not the
// `last_nonce+1`.
ListValidity::InvalidContinuation => {
missed_messages.inc();
warn!(
last_nonce = ?last_nonce,
start_block = from,
end_block = to,
last_valid_range_start_block,
"Found invalid continuation in range. Re-indexing from the start block of the last successful range.",
);
cursor.backtrack(last_valid_range_start_block);
indexed_height.set(last_valid_range_start_block as i64);
}
ListValidity::ContainsGaps => {
missed_messages.inc();
cursor.backtrack(start_block);
warn!(
last_nonce = ?last_nonce,
start_block = from,
end_block = to,
"Found gaps in the messages in range, re-indexing the same range.",
);
}
ListValidity::Empty => {
// Continue if no messages found.
// We don't update last_valid_range_start_block because we cannot extrapolate
// if the range was correctly indexed if there are no messages to observe their
// indices.
indexed_height.set(to as i64);
}
};
}
}
}
@ -321,7 +355,11 @@ mod test {
.expect__next_range()
.times(1)
.in_sequence(&mut seq)
.return_once(|| Box::pin(async { Ok(($expected_from, $expected_to)) }));
.return_once(|| {
Box::pin(async {
Ok(($expected_from, $expected_to, Duration::from_secs(0)))
})
});
mock_indexer
.expect__fetch_sorted_messages()
.times(1)
@ -382,7 +420,7 @@ mod test {
// this sleep should be longer than the test timeout since we don't actually
// want to yield any more values at this point.
sleep(Duration::from_secs(100)).await;
Ok((161, 161))
Ok((161, 161, Duration::from_secs(0)))
})
});
}

@ -1,6 +1,3 @@
// TODO: Reapply tip buffer
// TODO: Reapply metrics
use derive_new::new;
pub use cursor::*;
@ -12,6 +9,7 @@ pub use metrics::ContractSyncMetrics;
use crate::chains::IndexSettings;
mod cursor;
mod eta_calculator;
mod interchain_gas;
/// Tools for working with message continuity.
pub mod last_message;
@ -24,7 +22,7 @@ mod schema;
/// `indexer` and fills the agent's db with this data. A CachingMailbox
/// will use a contract sync to spawn syncing tasks to keep the db up-to-date.
#[derive(Debug, new)]
pub struct ContractSync<I> {
pub(crate) struct ContractSync<I> {
domain: HyperlaneDomain,
db: HyperlaneDB,
indexer: I,

@ -3,7 +3,6 @@ use std::sync::Arc;
use derive_new::new;
use eyre::Result;
use futures_util::future::select_all;
use tokio::task::JoinHandle;
use tracing::{info_span, instrument::Instrumented, Instrument};
@ -43,8 +42,6 @@ impl CachingInterchainGasPaymaster {
index_settings: IndexSettings,
metrics: ContractSyncMetrics,
) -> Instrumented<JoinHandle<Result<()>>> {
let span = info_span!("InterchainGasPaymasterContractSync", self = %self);
let sync = ContractSync::new(
self.paymaster.domain().clone(),
self.db.clone(),
@ -53,16 +50,7 @@ impl CachingInterchainGasPaymaster {
metrics,
);
tokio::spawn(async move {
let tasks = vec![sync.sync_gas_payments()];
let (_, _, remaining) = select_all(tasks).await;
for task in remaining.into_iter() {
cancel_task!(task);
}
Ok(())
})
.instrument(span)
tokio::spawn(async move { sync.sync_gas_payments().await })
.instrument(info_span!("InterchainGasPaymasterContractSync", self = %self))
}
}

@ -4,7 +4,6 @@ use std::sync::Arc;
use async_trait::async_trait;
use derive_new::new;
use futures_util::future::select_all;
use tokio::task::JoinHandle;
use tracing::instrument::Instrumented;
use tracing::{info_span, Instrument};
@ -50,8 +49,6 @@ impl CachingMailbox {
index_settings: IndexSettings,
metrics: ContractSyncMetrics,
) -> Instrumented<JoinHandle<eyre::Result<()>>> {
let span = info_span!("MailboxContractSync", self = %self);
let sync = ContractSync::new(
self.mailbox.domain().clone(),
self.db.clone(),
@ -60,17 +57,8 @@ impl CachingMailbox {
metrics,
);
tokio::spawn(async move {
let tasks = vec![sync.sync_dispatched_messages()];
let (_, _, remaining) = select_all(tasks).await;
for task in remaining.into_iter() {
cancel_task!(task);
}
Ok(())
})
.instrument(span)
tokio::spawn(async move { sync.sync_dispatched_messages().await })
.instrument(info_span!("MailboxContractSync", self = %self))
}
}

@ -2,7 +2,7 @@ use std::future::Future;
use std::time::Duration;
use tokio::time::sleep;
use tracing::{debug, info, trace};
use tracing::{debug, trace};
use crate::{
db::{
@ -88,7 +88,7 @@ impl HyperlaneDB {
pub fn store_message(&self, message: &HyperlaneMessage) -> Result<()> {
let id = message.id();
info!(msg=?message, "Storing new message in db",);
debug!(msg=?message, "Storing new message in db",);
self.store_message_id(message.nonce, message.destination, id)?;
self.store_keyed_encodable(MESSAGE, &id, message)?;
Ok(())
@ -235,7 +235,7 @@ impl HyperlaneDB {
let existing_payment = self.retrieve_gas_payment_for_message_id(event.message_id)?;
let total = existing_payment + event;
info!(?event, new_total_gas_payment=?total, "Storing gas payment");
debug!(?event, new_total_gas_payment=?total, "Storing gas payment");
self.store_keyed_encodable::<_, InterchainGasPaymentData>(
GAS_PAYMENT_FOR_MESSAGE_ID,
&total.message_id,
@ -250,7 +250,7 @@ impl HyperlaneDB {
let existing_payment = self.retrieve_gas_expenditure_for_message_id(event.message_id)?;
let total = existing_payment + event;
info!(?event, new_total_gas_payment=?total, "Storing gas payment");
debug!(?event, new_total_gas_payment=?total, "Storing gas payment");
self.store_keyed_encodable::<_, U256>(
GAS_EXPENDITURE_FOR_MESSAGE_ID,
&total.message_id,

@ -1,17 +1,29 @@
use crate::ChainResult;
use std::time::Duration;
use async_trait::async_trait;
use auto_impl::auto_impl;
use crate::ChainResult;
/// Tool for handling the logic of what the next block range that should be
/// queried and may perform rate limiting on `next_range` queries.
#[async_trait]
#[auto_impl(Box)]
pub trait SyncBlockRangeCursor {
/// Returns the current `from` position of the scraper. Note that
/// Returns the current `from` position of the indexer. Note that
/// `next_range` may return a `from` value that is lower than this in order
/// to have some overlap.
fn current_position(&self) -> u32;
/// Returns the current `tip` of the blockchain. This is the highest block
/// we know of.
fn tip(&self) -> u32;
/// Returns the current distance from the tip of the blockchain.
fn distance_from_tip(&self) -> u32 {
self.tip().saturating_sub(self.current_position())
}
/// Get the next block range `(from, to)` which should be fetched (this
/// returns an inclusive range such as (0,50), (51,100), ...). This
/// will automatically rate limit based on how far we are from the
@ -25,7 +37,7 @@ pub trait SyncBlockRangeCursor {
/// This assumes the caller will call next_range again automatically on Err,
/// but it returns the error to allow for tailored logging or different end
/// cases.
async fn next_range(&mut self) -> ChainResult<(u32, u32)>;
async fn next_range(&mut self) -> ChainResult<(u32, u32, Duration)>;
/// If there was an issue when a range of data was fetched, this rolls back
/// so the next range fetched will be from `start_from`. Note that it is a

@ -1,6 +1,7 @@
use std::fmt::{Debug, Formatter};
use std::num::{ParseIntError, TryFromIntError};
use std::str::FromStr;
use std::time::Duration;
use serde::Deserialize;
use sha3::{digest::Update, Digest, Keccak256};
@ -131,6 +132,37 @@ pub fn fmt_domain(domain: u32) -> String {
.unwrap_or_else(|_| domain.to_string())
}
/// Formats the duration in the most appropriate time units.
pub fn fmt_duration(dur: Duration) -> String {
const MIN: f64 = 60.;
const HOUR: f64 = MIN * 60.;
const DAY: f64 = HOUR * 24.;
const YEAR: f64 = DAY * 365.25;
let sec = dur.as_secs_f64();
if sec < 60. {
format!("{:.0}s", sec)
} else if sec < HOUR {
format!("{:.1}m", sec / MIN)
} else if sec < DAY {
format!("{:.2}h", sec / HOUR)
} else if sec < YEAR {
format!("{:.2}d", sec / DAY)
} else {
format!("{:.2}y", sec / YEAR)
}
}
/// Formats the duration in the most appropriate time units and says "synced" if
/// the duration is 0.
pub fn fmt_sync_time(dur: Duration) -> String {
if dur.as_secs() == 0 {
"synced".into()
} else {
fmt_duration(dur)
}
}
/// An error when parsing a StrOrInt type as an integer value.
#[derive(Error, Debug)]
pub enum StrOrIntParseError {

@ -1,6 +1,7 @@
#![allow(non_snake_case)]
use std::future::Future;
use std::time::Duration;
use async_trait::async_trait;
use mockall::mock;
@ -9,10 +10,12 @@ use hyperlane_core::{ChainResult, SyncBlockRangeCursor};
mock! {
pub SyncBlockRangeCursor {
pub fn _next_range(&mut self) -> impl Future<Output=ChainResult<(u32, u32)>> + Send {}
pub fn _next_range(&mut self) -> impl Future<Output=ChainResult<(u32, u32, Duration)>> + Send {}
pub fn _current_position(&self) -> u32 {}
pub fn _tip(&self) -> u32 {}
pub fn _backtrack(&mut self, start_from: u32) {}
}
}
@ -23,7 +26,11 @@ impl SyncBlockRangeCursor for MockSyncBlockRangeCursor {
self._current_position()
}
async fn next_range(&mut self) -> ChainResult<(u32, u32)> {
fn tip(&self) -> u32 {
self._tip()
}
async fn next_range(&mut self) -> ChainResult<(u32, u32, Duration)> {
self._next_range().await
}

Loading…
Cancel
Save