From 90287d8738ead261d6cb6b317a22c3978f9dd21b Mon Sep 17 00:00:00 2001 From: Trevor Porter Date: Wed, 20 Jul 2022 22:36:18 +0100 Subject: [PATCH] Refactor message indexing to fix finding_message mode (#797) * Refactor message indexing to not have bugs * Nits * cargo fmt * Address comments --- .../src/contract_sync/interchain_gas.rs | 12 +- .../src/contract_sync/last_message.rs | 4 +- rust/abacus-base/src/contract_sync/outbox.rs | 161 +++++++++++------- rust/abacus-base/src/contract_sync/schema.rs | 19 ++- rust/abacus-core/src/lib.rs | 8 +- 5 files changed, 128 insertions(+), 76 deletions(-) diff --git a/rust/abacus-base/src/contract_sync/interchain_gas.rs b/rust/abacus-base/src/contract_sync/interchain_gas.rs index 70190ffa3..59e311a6f 100644 --- a/rust/abacus-base/src/contract_sync/interchain_gas.rs +++ b/rust/abacus-base/src/contract_sync/interchain_gas.rs @@ -44,11 +44,15 @@ where loop { indexed_height.set(from.into()); - // Only index blocks considered final - let tip = indexer.get_finalized_block_number().await?; + // Only index blocks considered final. + // If there's an error getting the block number, just start the loop over + let tip = if let Ok(num) = indexer.get_finalized_block_number().await { + num + } else { + continue; + }; if tip <= from { debug!(tip=?tip, from=?from, "[GasPayments]: caught up to tip, waiting for new block"); - // TODO: Make this configurable // Sleep if caught up to tip sleep(Duration::from_secs(1)).await; continue; @@ -59,7 +63,7 @@ where let gas_payments = indexer.fetch_gas_payments(from, to).await?; - debug!( + info!( from = from, to = to, gas_payments_count = gas_payments.len(), diff --git a/rust/abacus-base/src/contract_sync/last_message.rs b/rust/abacus-base/src/contract_sync/last_message.rs index 2a923f062..f94e267b3 100644 --- a/rust/abacus-base/src/contract_sync/last_message.rs +++ b/rust/abacus-base/src/contract_sync/last_message.rs @@ -40,14 +40,14 @@ impl OptLatestLeafIndex { .iter() .any(|message| *last_seen == message.leaf_index - 1); if !has_desired_message { - return ListValidity::Invalid; + return ListValidity::InvalidContinuation; } } // Ensure no gaps in new batch of leaves for pair in sorted_messages.windows(2) { if pair[0].leaf_index != pair[1].leaf_index - 1 { - return ListValidity::Invalid; + return ListValidity::ContainsGaps; } } diff --git a/rust/abacus-base/src/contract_sync/outbox.rs b/rust/abacus-base/src/contract_sync/outbox.rs index 989e68b7e..6cfe5bf90 100644 --- a/rust/abacus-base/src/contract_sync/outbox.rs +++ b/rust/abacus-base/src/contract_sync/outbox.rs @@ -45,70 +45,105 @@ where let config_from = self.index_settings.from(); let chunk_size = self.index_settings.chunk_size(); + // Indexes messages by fetching messages in ranges of blocks. + // We've observed occasional flakiness with providers where some events in + // a range will be missing. The leading theories are: + // 1. The provider is just flaky and sometimes misses events :( + // 2. For outbox chains with low finality times, it's possible that when + // we query the RPC provider for the latest finalized block number, + // we're returned a block number T. However when we attempt to index a range + // where the `to` block is T, the `eth_getLogs` RPC is load balanced by the + // provider to a different node whose latest known block is some block T' < T. + // The `eth_getLogs` RPC implementations seem to happily accept `to` blocks that + // exceed the latest known block, so it's possible that in our indexer we think + // that we've indexed up to block T but we've only *actually* indexed up to block T'. + + // It's easy to determine if a provider has skipped any message events by + // looking at the indices of each message and ensuring that we've indexed a valid + // continuation of messages. + // There are two classes of invalid continuations: + // 1. The latest previously indexed message index is M that was found in a previously + // indexed block range. A new block range [A,B] is indexed, returning a list of messages. + // The lowest message index in that list is `M + 1`, but there are some missing messages + // indices in the list. This is likely a flaky provider, and we can simply re-index the + // range [A,B] hoping that the provider will soon return a correct list. + // 2. The latest previously indexed message index is M that was found in a previously + // indexed block range, [A,B]. A new block range [C,D] is indexed, returning a list of + // messages. However, the lowest message index in that list is M' where M' > M + 1. + // This missing messages could be anywhere in the range [A,D]: + // * It's possible there was an issue when the prior block range [A,B] was indexed, where + // the provider didn't provide some messages with indices > M that it should have. + // * It's possible that the range [B,C] that was presumed to be empty when it was indexed + // actually wasn't. + // * And it's possible that this was just a flaky gap, where there are messages in the [C,D] + // range that weren't returned for some reason. + // We can handle this by re-indexing starting from block A. + // Note this means we only handle this case upon observing messages in some range [C,D] + // that indicate a previously indexed range may have missed some messages. tokio::spawn(async move { let mut from = db - .retrieve_message_latest_block_end() - .map_or_else(|| config_from, |h| h + 1); + .retrieve_latest_valid_message_range_start_block() + .unwrap_or(config_from); - let mut finding_missing = false; - let mut realized_missing_start_block = 0; - let mut realized_missing_end_block = 0; - let mut exponential = 0; + let mut last_valid_range_start_block = from; - info!(from = from, "[Messages]: resuming indexer from {from}"); + info!(from = from, "[Messages]: resuming indexer from latest valid message range start block"); loop { indexed_height.set(from as i64); - // If we were searching for missing message and have reached - // original missing start block, turn off finding_missing and - // TRY to resume normal indexing - if finding_missing && from >= realized_missing_start_block { - info!("Turning off finding_missing mode"); - finding_missing = false; - } - - // If we have passed the end block of the missing message, we - // have found the message and can reset variables - if from > realized_missing_end_block && realized_missing_end_block != 0 { - missed_messages.inc(); - - exponential = 0; - realized_missing_start_block = 0; - realized_missing_end_block = 0; - } - - // Only index blocks considered final - let tip = indexer.get_finalized_block_number().await?; + // Only index blocks considered final. + // If there's an error getting the block number, just start the loop over + let tip = if let Ok(num) = indexer.get_finalized_block_number().await { + num + } else { + continue; + }; if tip <= from { - // TODO: Make this configurable // Sleep if caught up to tip sleep(Duration::from_secs(1)).await; continue; } - let candidate = from + chunk_size; - let to = min(tip, candidate); + // Index the chunk_size, capping at the tip. + let to = min(tip, from + chunk_size); - let sorted_messages = indexer.fetch_sorted_messages(from, to).await?; + let mut sorted_messages = indexer.fetch_sorted_messages(from, to).await?; - debug!( + info!( from = from, to = to, message_count = sorted_messages.len(), "[Messages]: indexed block heights {from}...{to}" ); - // If no messages found, update last seen block and next height - // and continue + // Get the latest known leaf index. All messages whose indices are <= this index + // have been stored in the DB. + let last_leaf_index: OptLatestLeafIndex = db.retrieve_latest_leaf_index()?.into(); + + // Filter out any messages that have already been successfully indexed and stored. + // This is necessary if we're re-indexing blocks in hope of finding missing messages. + if let Some(min_index) = last_leaf_index.as_ref() { + sorted_messages = sorted_messages.into_iter().filter(|m| m.leaf_index > *min_index).collect(); + } + + debug!( + from = from, + to = to, + message_count = sorted_messages.len(), + "[Messages]: filtered any messages already indexed" + ); + + // Continue if no messages found. + // We don't update last_valid_range_start_block because we cannot extrapolate + // if the range was correctly indexed if there are no messages to observe their + // indices. if sorted_messages.is_empty() { - db.store_message_latest_block_end(to)?; from = to + 1; continue; } - // If messages found, check that list is valid - let last_leaf_index: OptLatestLeafIndex = db.retrieve_latest_leaf_index()?.into(); + // Ensure the sorted messages are a valid continution of last_leaf_index match &last_leaf_index.valid_continuation(&sorted_messages) { ListValidity::Valid => { // Store messages @@ -128,30 +163,36 @@ where .set(max_leaf_index_of_batch as i64); } - // Move forward next height - db.store_message_latest_block_end(to)?; + // Update the latest valid start block. + db.store_latest_valid_message_range_start_block(from)?; + last_valid_range_start_block = from; + + // Move forward to the next height from = to + 1; - } - ListValidity::Invalid => { - if finding_missing { - from = to + 1; - } else { - warn!( - last_leaf_index = ?last_leaf_index, - start_block = from, - end_block = to, - "[Messages]: RPC failed to find message(s) between blocks {from}...{to}. Last seen leaf index: {:?}. Activating finding_missing mode.", - last_leaf_index - ); - - // Turn on finding_missing mode - finding_missing = true; - realized_missing_start_block = from; - realized_missing_end_block = to; - - from = realized_missing_start_block - (chunk_size * 2u32.pow(exponential as u32)); - exponential += 1; - } + }, + // The index of the first message in sorted_messages is not the last_leaf_index + 1. + ListValidity::InvalidContinuation => { + missed_messages.inc(); + + warn!( + last_leaf_index = ?last_leaf_index, + start_block = from, + end_block = to, + last_valid_range_start_block, + "[Messages]: Found invalid continuation in range. Re-indexing from the start block of the last successful range.", + ); + + from = last_valid_range_start_block; + }, + ListValidity::ContainsGaps => { + missed_messages.inc(); + + warn!( + last_leaf_index = ?last_leaf_index, + start_block = from, + end_block = to, + "[Messages]: Found gaps in the messages in range, re-indexing the same range.", + ); } ListValidity::Empty => unreachable!("Tried to validate empty list of messages"), }; diff --git a/rust/abacus-base/src/contract_sync/schema.rs b/rust/abacus-base/src/contract_sync/schema.rs index b8daed9f3..62d9abf86 100644 --- a/rust/abacus-base/src/contract_sync/schema.rs +++ b/rust/abacus-base/src/contract_sync/schema.rs @@ -2,21 +2,26 @@ use abacus_core::db::AbacusDB; use abacus_core::db::DbError; use eyre::Result; -static MESSAGES_LAST_BLOCK_END: &str = "messages_last_inspected"; +/// The start block number of the latest "valid" message block range. +/// This is an interval of block indexes where > 0 messages were indexed, +/// all of which had a contiguous sequence of messages based off their indices, +/// and the lowest index is the successor to the highest index of the prior +/// valid range. +static LATEST_VALID_MESSAGE_RANGE_START_BLOCK: &str = "latest_valid_message_range_start_block"; static LATEST_INDEXED_GAS_PAYMENT_BLOCK: &str = "latest_indexed_gas_payment_block"; pub(crate) trait OutboxContractSyncDB { - fn store_message_latest_block_end(&self, latest_block: u32) -> Result<(), DbError>; - fn retrieve_message_latest_block_end(&self) -> Option; + fn store_latest_valid_message_range_start_block(&self, block_num: u32) -> Result<(), DbError>; + fn retrieve_latest_valid_message_range_start_block(&self) -> Option; } impl OutboxContractSyncDB for AbacusDB { - fn store_message_latest_block_end(&self, latest_block: u32) -> Result<(), DbError> { - self.store_encodable("", MESSAGES_LAST_BLOCK_END, &latest_block) + fn store_latest_valid_message_range_start_block(&self, block_num: u32) -> Result<(), DbError> { + self.store_encodable("", LATEST_VALID_MESSAGE_RANGE_START_BLOCK, &block_num) } - fn retrieve_message_latest_block_end(&self) -> Option { - self.retrieve_decodable("", MESSAGES_LAST_BLOCK_END) + fn retrieve_latest_valid_message_range_start_block(&self) -> Option { + self.retrieve_decodable("", LATEST_VALID_MESSAGE_RANGE_START_BLOCK) .expect("db failure") } } diff --git a/rust/abacus-core/src/lib.rs b/rust/abacus-core/src/lib.rs index 8d9850f9d..9f4b13152 100644 --- a/rust/abacus-core/src/lib.rs +++ b/rust/abacus-core/src/lib.rs @@ -48,15 +48,17 @@ use ethers::{ signers::{AwsSignerError, LocalWallet, Signer}, }; -/// Enum for validity of a list (of checkpoints or messages) +/// Enum for validity of a list of messages #[derive(Debug)] pub enum ListValidity { /// Empty list Empty, /// Valid list Valid, - /// Invalid list - Invalid, + /// Invalid list. Does not build upon the correct prior element. + InvalidContinuation, + /// Invalid list. Contains gaps, but builds upon the correct prior element. + ContainsGaps, } /// Error types for Abacus