Refactor message indexing to fix finding_message mode (#797)

* Refactor message indexing to not have bugs

* Nits

* cargo fmt

* Address comments
pull/808/head
Trevor Porter 2 years ago committed by GitHub
parent aa73b93f38
commit 90287d8738
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 12
      rust/abacus-base/src/contract_sync/interchain_gas.rs
  2. 4
      rust/abacus-base/src/contract_sync/last_message.rs
  3. 147
      rust/abacus-base/src/contract_sync/outbox.rs
  4. 19
      rust/abacus-base/src/contract_sync/schema.rs
  5. 8
      rust/abacus-core/src/lib.rs

@ -44,11 +44,15 @@ where
loop { loop {
indexed_height.set(from.into()); indexed_height.set(from.into());
// Only index blocks considered final // Only index blocks considered final.
let tip = indexer.get_finalized_block_number().await?; // If there's an error getting the block number, just start the loop over
let tip = if let Ok(num) = indexer.get_finalized_block_number().await {
num
} else {
continue;
};
if tip <= from { if tip <= from {
debug!(tip=?tip, from=?from, "[GasPayments]: caught up to tip, waiting for new block"); debug!(tip=?tip, from=?from, "[GasPayments]: caught up to tip, waiting for new block");
// TODO: Make this configurable
// Sleep if caught up to tip // Sleep if caught up to tip
sleep(Duration::from_secs(1)).await; sleep(Duration::from_secs(1)).await;
continue; continue;
@ -59,7 +63,7 @@ where
let gas_payments = indexer.fetch_gas_payments(from, to).await?; let gas_payments = indexer.fetch_gas_payments(from, to).await?;
debug!( info!(
from = from, from = from,
to = to, to = to,
gas_payments_count = gas_payments.len(), gas_payments_count = gas_payments.len(),

@ -40,14 +40,14 @@ impl OptLatestLeafIndex {
.iter() .iter()
.any(|message| *last_seen == message.leaf_index - 1); .any(|message| *last_seen == message.leaf_index - 1);
if !has_desired_message { if !has_desired_message {
return ListValidity::Invalid; return ListValidity::InvalidContinuation;
} }
} }
// Ensure no gaps in new batch of leaves // Ensure no gaps in new batch of leaves
for pair in sorted_messages.windows(2) { for pair in sorted_messages.windows(2) {
if pair[0].leaf_index != pair[1].leaf_index - 1 { if pair[0].leaf_index != pair[1].leaf_index - 1 {
return ListValidity::Invalid; return ListValidity::ContainsGaps;
} }
} }

@ -45,70 +45,105 @@ where
let config_from = self.index_settings.from(); let config_from = self.index_settings.from();
let chunk_size = self.index_settings.chunk_size(); let chunk_size = self.index_settings.chunk_size();
// Indexes messages by fetching messages in ranges of blocks.
// We've observed occasional flakiness with providers where some events in
// a range will be missing. The leading theories are:
// 1. The provider is just flaky and sometimes misses events :(
// 2. For outbox chains with low finality times, it's possible that when
// we query the RPC provider for the latest finalized block number,
// we're returned a block number T. However when we attempt to index a range
// where the `to` block is T, the `eth_getLogs` RPC is load balanced by the
// provider to a different node whose latest known block is some block T' < T.
// The `eth_getLogs` RPC implementations seem to happily accept `to` blocks that
// exceed the latest known block, so it's possible that in our indexer we think
// that we've indexed up to block T but we've only *actually* indexed up to block T'.
// It's easy to determine if a provider has skipped any message events by
// looking at the indices of each message and ensuring that we've indexed a valid
// continuation of messages.
// There are two classes of invalid continuations:
// 1. The latest previously indexed message index is M that was found in a previously
// indexed block range. A new block range [A,B] is indexed, returning a list of messages.
// The lowest message index in that list is `M + 1`, but there are some missing messages
// indices in the list. This is likely a flaky provider, and we can simply re-index the
// range [A,B] hoping that the provider will soon return a correct list.
// 2. The latest previously indexed message index is M that was found in a previously
// indexed block range, [A,B]. A new block range [C,D] is indexed, returning a list of
// messages. However, the lowest message index in that list is M' where M' > M + 1.
// This missing messages could be anywhere in the range [A,D]:
// * It's possible there was an issue when the prior block range [A,B] was indexed, where
// the provider didn't provide some messages with indices > M that it should have.
// * It's possible that the range [B,C] that was presumed to be empty when it was indexed
// actually wasn't.
// * And it's possible that this was just a flaky gap, where there are messages in the [C,D]
// range that weren't returned for some reason.
// We can handle this by re-indexing starting from block A.
// Note this means we only handle this case upon observing messages in some range [C,D]
// that indicate a previously indexed range may have missed some messages.
tokio::spawn(async move { tokio::spawn(async move {
let mut from = db let mut from = db
.retrieve_message_latest_block_end() .retrieve_latest_valid_message_range_start_block()
.map_or_else(|| config_from, |h| h + 1); .unwrap_or(config_from);
let mut finding_missing = false; let mut last_valid_range_start_block = from;
let mut realized_missing_start_block = 0;
let mut realized_missing_end_block = 0;
let mut exponential = 0;
info!(from = from, "[Messages]: resuming indexer from {from}"); info!(from = from, "[Messages]: resuming indexer from latest valid message range start block");
loop { loop {
indexed_height.set(from as i64); indexed_height.set(from as i64);
// If we were searching for missing message and have reached // Only index blocks considered final.
// original missing start block, turn off finding_missing and // If there's an error getting the block number, just start the loop over
// TRY to resume normal indexing let tip = if let Ok(num) = indexer.get_finalized_block_number().await {
if finding_missing && from >= realized_missing_start_block { num
info!("Turning off finding_missing mode"); } else {
finding_missing = false; continue;
} };
// If we have passed the end block of the missing message, we
// have found the message and can reset variables
if from > realized_missing_end_block && realized_missing_end_block != 0 {
missed_messages.inc();
exponential = 0;
realized_missing_start_block = 0;
realized_missing_end_block = 0;
}
// Only index blocks considered final
let tip = indexer.get_finalized_block_number().await?;
if tip <= from { if tip <= from {
// TODO: Make this configurable
// Sleep if caught up to tip // Sleep if caught up to tip
sleep(Duration::from_secs(1)).await; sleep(Duration::from_secs(1)).await;
continue; continue;
} }
let candidate = from + chunk_size; // Index the chunk_size, capping at the tip.
let to = min(tip, candidate); let to = min(tip, from + chunk_size);
let sorted_messages = indexer.fetch_sorted_messages(from, to).await?; let mut sorted_messages = indexer.fetch_sorted_messages(from, to).await?;
debug!( info!(
from = from, from = from,
to = to, to = to,
message_count = sorted_messages.len(), message_count = sorted_messages.len(),
"[Messages]: indexed block heights {from}...{to}" "[Messages]: indexed block heights {from}...{to}"
); );
// If no messages found, update last seen block and next height // Get the latest known leaf index. All messages whose indices are <= this index
// and continue // have been stored in the DB.
let last_leaf_index: OptLatestLeafIndex = db.retrieve_latest_leaf_index()?.into();
// Filter out any messages that have already been successfully indexed and stored.
// This is necessary if we're re-indexing blocks in hope of finding missing messages.
if let Some(min_index) = last_leaf_index.as_ref() {
sorted_messages = sorted_messages.into_iter().filter(|m| m.leaf_index > *min_index).collect();
}
debug!(
from = from,
to = to,
message_count = sorted_messages.len(),
"[Messages]: filtered any messages already indexed"
);
// Continue if no messages found.
// We don't update last_valid_range_start_block because we cannot extrapolate
// if the range was correctly indexed if there are no messages to observe their
// indices.
if sorted_messages.is_empty() { if sorted_messages.is_empty() {
db.store_message_latest_block_end(to)?;
from = to + 1; from = to + 1;
continue; continue;
} }
// If messages found, check that list is valid // Ensure the sorted messages are a valid continution of last_leaf_index
let last_leaf_index: OptLatestLeafIndex = db.retrieve_latest_leaf_index()?.into();
match &last_leaf_index.valid_continuation(&sorted_messages) { match &last_leaf_index.valid_continuation(&sorted_messages) {
ListValidity::Valid => { ListValidity::Valid => {
// Store messages // Store messages
@ -128,30 +163,36 @@ where
.set(max_leaf_index_of_batch as i64); .set(max_leaf_index_of_batch as i64);
} }
// Move forward next height // Update the latest valid start block.
db.store_message_latest_block_end(to)?; db.store_latest_valid_message_range_start_block(from)?;
from = to + 1; last_valid_range_start_block = from;
}
ListValidity::Invalid => { // Move forward to the next height
if finding_missing {
from = to + 1; from = to + 1;
} else { },
// The index of the first message in sorted_messages is not the last_leaf_index + 1.
ListValidity::InvalidContinuation => {
missed_messages.inc();
warn!( warn!(
last_leaf_index = ?last_leaf_index, last_leaf_index = ?last_leaf_index,
start_block = from, start_block = from,
end_block = to, end_block = to,
"[Messages]: RPC failed to find message(s) between blocks {from}...{to}. Last seen leaf index: {:?}. Activating finding_missing mode.", last_valid_range_start_block,
last_leaf_index "[Messages]: Found invalid continuation in range. Re-indexing from the start block of the last successful range.",
); );
// Turn on finding_missing mode from = last_valid_range_start_block;
finding_missing = true; },
realized_missing_start_block = from; ListValidity::ContainsGaps => {
realized_missing_end_block = to; missed_messages.inc();
from = realized_missing_start_block - (chunk_size * 2u32.pow(exponential as u32)); warn!(
exponential += 1; last_leaf_index = ?last_leaf_index,
} start_block = from,
end_block = to,
"[Messages]: Found gaps in the messages in range, re-indexing the same range.",
);
} }
ListValidity::Empty => unreachable!("Tried to validate empty list of messages"), ListValidity::Empty => unreachable!("Tried to validate empty list of messages"),
}; };

@ -2,21 +2,26 @@ use abacus_core::db::AbacusDB;
use abacus_core::db::DbError; use abacus_core::db::DbError;
use eyre::Result; use eyre::Result;
static MESSAGES_LAST_BLOCK_END: &str = "messages_last_inspected"; /// The start block number of the latest "valid" message block range.
/// This is an interval of block indexes where > 0 messages were indexed,
/// all of which had a contiguous sequence of messages based off their indices,
/// and the lowest index is the successor to the highest index of the prior
/// valid range.
static LATEST_VALID_MESSAGE_RANGE_START_BLOCK: &str = "latest_valid_message_range_start_block";
static LATEST_INDEXED_GAS_PAYMENT_BLOCK: &str = "latest_indexed_gas_payment_block"; static LATEST_INDEXED_GAS_PAYMENT_BLOCK: &str = "latest_indexed_gas_payment_block";
pub(crate) trait OutboxContractSyncDB { pub(crate) trait OutboxContractSyncDB {
fn store_message_latest_block_end(&self, latest_block: u32) -> Result<(), DbError>; fn store_latest_valid_message_range_start_block(&self, block_num: u32) -> Result<(), DbError>;
fn retrieve_message_latest_block_end(&self) -> Option<u32>; fn retrieve_latest_valid_message_range_start_block(&self) -> Option<u32>;
} }
impl OutboxContractSyncDB for AbacusDB { impl OutboxContractSyncDB for AbacusDB {
fn store_message_latest_block_end(&self, latest_block: u32) -> Result<(), DbError> { fn store_latest_valid_message_range_start_block(&self, block_num: u32) -> Result<(), DbError> {
self.store_encodable("", MESSAGES_LAST_BLOCK_END, &latest_block) self.store_encodable("", LATEST_VALID_MESSAGE_RANGE_START_BLOCK, &block_num)
} }
fn retrieve_message_latest_block_end(&self) -> Option<u32> { fn retrieve_latest_valid_message_range_start_block(&self) -> Option<u32> {
self.retrieve_decodable("", MESSAGES_LAST_BLOCK_END) self.retrieve_decodable("", LATEST_VALID_MESSAGE_RANGE_START_BLOCK)
.expect("db failure") .expect("db failure")
} }
} }

@ -48,15 +48,17 @@ use ethers::{
signers::{AwsSignerError, LocalWallet, Signer}, signers::{AwsSignerError, LocalWallet, Signer},
}; };
/// Enum for validity of a list (of checkpoints or messages) /// Enum for validity of a list of messages
#[derive(Debug)] #[derive(Debug)]
pub enum ListValidity { pub enum ListValidity {
/// Empty list /// Empty list
Empty, Empty,
/// Valid list /// Valid list
Valid, Valid,
/// Invalid list /// Invalid list. Does not build upon the correct prior element.
Invalid, InvalidContinuation,
/// Invalid list. Contains gaps, but builds upon the correct prior element.
ContainsGaps,
} }
/// Error types for Abacus /// Error types for Abacus

Loading…
Cancel
Save