Eta calculator fixes (#2735)

## Bug 1

Closes https://github.com/hyperlane-xyz/hyperlane-monorepo/issues/2723

The relayer panic is caused by an overflow, bc of dividing by
~`6.540888459481895e-211`. On my local, the effective rate of indexing
starts at `0.61`.
```
{"timestamp":"2023-09-15T09:57:10.746276Z","level":"INFO","fields":{"message":"~~~ blocks_processed: 2508, tip_progression: 2042, elapsed: 757.10340475, old_rate: Some(0.6155037701275111), effective_rate: 0.6155037701275111"},"target":"hyperlane_base::contract_sync::eta_calculator","span":{"domain":"solanadevnet","label":"gas_payments","name":"ContractSync"},"spans":[{"domain":"solanadevnet","label":"gas_payments","name":"ContractSync"}]}
```

But then both the `blocks_processed` and the `tip_progression` are
consistently zero, which makes the `new_rate` be zero
(eea423ad04/rust/hyperlane-base/src/contract_sync/eta_calculator.rs (L41)),
and over time it takes over the entire moving average window to make it
almost zero, leading to an overflow. 15 mins after that initial log, the
effective rate already became `0.00038`.

The reason for blocks_processed and tip_progression consistently being
zero after the first log is that `eta_calculator.calculate(from, tip)`
is always called with the same from and tip although it expects to get
the latest values.

### The fix

the tip wasn't being set after the sequence_and_tip query here:
eea423ad04/rust/hyperlane-base/src/contract_sync/cursor.rs (L565)

And then the to and from are calculated based on it:
eea423ad04/rust/hyperlane-base/src/contract_sync/cursor.rs (L550)

So even though the sync_state internal variables were kept up-to-date,
the min(...) would cause the issue.

The first PR commit fixes this.

## Bug 2

There was another bug in the eta calculator, caused by it only using
`next_block` to approximate synced state, which doesn't apply to
sequence indexing. The way the eta calculator is called had to be
changed to use abstract measures of sync progression (which could be
blocks or sequences).

The second PR commit fixes this, afaict.
pull/2499/merge
Daniel Savu 1 year ago committed by GitHub
parent 7ab7cf5447
commit d65c4e7e0a
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 57
      rust/hyperlane-base/src/contract_sync/cursor.rs
  2. 5
      rust/hyperlane-base/src/contract_sync/eta_calculator.rs
  3. 2
      rust/hyperlane-core/src/traits/indexer.rs

@ -67,7 +67,7 @@ impl SyncState {
"Sequence indexing requires a max sequence", "Sequence indexing requires a max sequence",
) )
})?; })?;
if let Some(range) = self.sequence_range(tip, max_sequence)? { if let Some(range) = self.sequence_range(max_sequence)? {
range range
} else { } else {
return Ok(None); return Ok(None);
@ -107,11 +107,7 @@ impl SyncState {
/// * `max_sequence` - The maximum sequence that should be indexed. /// * `max_sequence` - The maximum sequence that should be indexed.
/// `max_sequence` is the exclusive upper bound of the range to be indexed. /// `max_sequence` is the exclusive upper bound of the range to be indexed.
/// (e.g. `0..max_sequence`) /// (e.g. `0..max_sequence`)
fn sequence_range( fn sequence_range(&mut self, max_sequence: u32) -> ChainResult<Option<RangeInclusive<u32>>> {
&mut self,
tip: u32,
max_sequence: u32,
) -> ChainResult<Option<RangeInclusive<u32>>> {
let (from, to) = match self.direction { let (from, to) = match self.direction {
SyncDirection::Forward => { SyncDirection::Forward => {
let sequence_start = self.next_sequence; let sequence_start = self.next_sequence;
@ -120,7 +116,6 @@ impl SyncState {
return Ok(None); return Ok(None);
} }
sequence_end = u32::min(sequence_end, max_sequence.saturating_sub(1)); sequence_end = u32::min(sequence_end, max_sequence.saturating_sub(1));
self.next_block = tip;
self.next_sequence = sequence_end + 1; self.next_sequence = sequence_end + 1;
(sequence_start, sequence_end) (sequence_start, sequence_end)
} }
@ -476,6 +471,7 @@ pub(crate) struct RateLimitedContractSyncCursor<T> {
indexer: Arc<dyn SequenceIndexer<T>>, indexer: Arc<dyn SequenceIndexer<T>>,
db: Arc<dyn HyperlaneWatermarkedLogStore<T>>, db: Arc<dyn HyperlaneWatermarkedLogStore<T>>,
tip: u32, tip: u32,
max_sequence: Option<u32>,
last_tip_update: Instant, last_tip_update: Instant,
eta_calculator: SyncerEtaCalculator, eta_calculator: SyncerEtaCalculator,
sync_state: SyncState, sync_state: SyncState,
@ -490,11 +486,12 @@ impl<T> RateLimitedContractSyncCursor<T> {
initial_height: u32, initial_height: u32,
mode: IndexMode, mode: IndexMode,
) -> Result<Self> { ) -> Result<Self> {
let tip = indexer.get_finalized_block_number().await?; let (max_sequence, tip) = indexer.sequence_and_tip().await?;
Ok(Self { Ok(Self {
indexer, indexer,
db, db,
tip, tip,
max_sequence,
last_tip_update: Instant::now(), last_tip_update: Instant::now(),
eta_calculator: SyncerEtaCalculator::new(initial_height, tip, ETA_TIME_WINDOW), eta_calculator: SyncerEtaCalculator::new(initial_height, tip, ETA_TIME_WINDOW),
sync_state: SyncState::new( sync_state: SyncState::new(
@ -539,6 +536,32 @@ impl<T> RateLimitedContractSyncCursor<T> {
} }
} }
} }
fn sync_end(&self) -> ChainResult<u32> {
match self.sync_state.mode {
IndexMode::Block => Ok(self.tip),
IndexMode::Sequence => {
self.max_sequence
.ok_or(ChainCommunicationError::from_other_str(
"Sequence indexing requires a max sequence",
))
}
}
}
fn sync_position(&self) -> u32 {
match self.sync_state.mode {
IndexMode::Block => self.sync_state.next_block,
IndexMode::Sequence => self.sync_state.next_sequence,
}
}
fn sync_step(&self) -> u32 {
match self.sync_state.mode {
IndexMode::Block => self.sync_state.chunk_size,
IndexMode::Sequence => MAX_SEQUENCE_RANGE,
}
}
} }
#[async_trait] #[async_trait]
@ -547,13 +570,11 @@ where
T: Send + Debug + 'static, T: Send + Debug + 'static,
{ {
async fn next_action(&mut self) -> ChainResult<(CursorAction, Duration)> { async fn next_action(&mut self) -> ChainResult<(CursorAction, Duration)> {
let to = u32::min( let sync_end = self.sync_end()?;
self.tip, let to = u32::min(sync_end, self.sync_position() + self.sync_step());
self.sync_state.next_block + self.sync_state.chunk_size, let from = self.sync_position();
); let eta = if to < sync_end {
let from = to.saturating_sub(self.sync_state.chunk_size); self.eta_calculator.calculate(from, sync_end)
let eta = if to < self.tip {
self.eta_calculator.calculate(from, self.tip)
} else { } else {
Duration::from_secs(0) Duration::from_secs(0)
}; };
@ -562,8 +583,10 @@ where
if let Some(rate_limit) = rate_limit { if let Some(rate_limit) = rate_limit {
return Ok((CursorAction::Sleep(rate_limit), eta)); return Ok((CursorAction::Sleep(rate_limit), eta));
} }
let (count, tip) = self.indexer.sequence_and_tip().await?; let (max_sequence, tip) = self.indexer.sequence_and_tip().await?;
if let Some(range) = self.sync_state.get_next_range(count, tip).await? { self.tip = tip;
self.max_sequence = max_sequence;
if let Some(range) = self.sync_state.get_next_range(max_sequence, tip).await? {
return Ok((CursorAction::Query(range), eta)); return Ok((CursorAction::Query(range), eta));
} }

@ -38,10 +38,13 @@ impl SyncerEtaCalculator {
self.last_block = current_block; self.last_block = current_block;
self.last_tip = current_tip; self.last_tip = current_tip;
// The block-processing rate, minus the tip-progression rate, measured in
// blocks per second.
let new_rate = (blocks_processed - tip_progression) / elapsed; let new_rate = (blocks_processed - tip_progression) / elapsed;
// Calculate the effective rate using a moving average. Only set the past // Calculate the effective rate using a moving average. Only set the past
// effective rate once we have seen a move to prevent it taking a long // effective rate once we have seen a move, to prevent it taking a long
// time to normalize. // time to normalize.
let effective_rate = if let Some(old_rate) = self.effective_rate { let effective_rate = if let Some(old_rate) = self.effective_rate {
let new_coeff = f64::min(elapsed / self.time_window, 0.9); let new_coeff = f64::min(elapsed / self.time_window, 0.9);

@ -38,7 +38,7 @@ pub trait Indexer<T: Sized>: Send + Sync + Debug {
/// Interface for indexing data in sequence. /// Interface for indexing data in sequence.
#[async_trait] #[async_trait]
#[auto_impl(&, Box, Arc)] #[auto_impl(&, Box, Arc)]
pub trait SequenceIndexer<T>: Indexer<T> + 'static { pub trait SequenceIndexer<T>: Indexer<T> {
/// Return the latest finalized sequence (if any) and block number /// Return the latest finalized sequence (if any) and block number
async fn sequence_and_tip(&self) -> ChainResult<(Option<u32>, u32)>; async fn sequence_and_tip(&self) -> ChainResult<(Option<u32>, u32)>;
} }

Loading…
Cancel
Save