|
|
|
@ -1,12 +1,9 @@ |
|
|
|
|
use std::cmp::min; |
|
|
|
|
use std::time::Duration; |
|
|
|
|
|
|
|
|
|
use eyre::Result; |
|
|
|
|
use tokio::time::sleep; |
|
|
|
|
use tracing::{debug, info, info_span, warn}; |
|
|
|
|
use tracing::{instrument::Instrumented, Instrument}; |
|
|
|
|
|
|
|
|
|
use abacus_core::{name_from_domain_id, ListValidity, MailboxIndexer}; |
|
|
|
|
use abacus_core::{ |
|
|
|
|
name_from_domain_id, Indexer, ListValidity, MailboxIndexer, SyncBlockRangeCursor, |
|
|
|
|
}; |
|
|
|
|
|
|
|
|
|
use crate::contract_sync::last_message::validate_message_continuity; |
|
|
|
|
use crate::{contract_sync::schema::OutboxContractSyncDB, ContractSync}; |
|
|
|
@ -18,7 +15,9 @@ where |
|
|
|
|
I: MailboxIndexer + Clone + 'static, |
|
|
|
|
{ |
|
|
|
|
/// Sync dispatched messages
|
|
|
|
|
pub fn sync_dispatched_messages(&self) -> Instrumented<tokio::task::JoinHandle<Result<()>>> { |
|
|
|
|
pub fn sync_dispatched_messages( |
|
|
|
|
&self, |
|
|
|
|
) -> Instrumented<tokio::task::JoinHandle<eyre::Result<()>>> { |
|
|
|
|
let span = info_span!("MessageContractSync"); |
|
|
|
|
|
|
|
|
|
let db = self.db.clone(); |
|
|
|
@ -41,82 +40,91 @@ where |
|
|
|
|
let message_nonce = self.metrics.message_nonce.clone(); |
|
|
|
|
let chain_name = self.chain_name.clone(); |
|
|
|
|
|
|
|
|
|
let config_from = self.index_settings.from(); |
|
|
|
|
let chunk_size = self.index_settings.chunk_size(); |
|
|
|
|
let cursor = { |
|
|
|
|
let config_initial_height = self.index_settings.from(); |
|
|
|
|
let initial_height = db |
|
|
|
|
.retrieve_latest_valid_message_range_start_block() |
|
|
|
|
.map_or(config_initial_height, |b| b + 1); |
|
|
|
|
create_cursor( |
|
|
|
|
indexer.clone(), |
|
|
|
|
self.index_settings.chunk_size(), |
|
|
|
|
initial_height, |
|
|
|
|
) |
|
|
|
|
}; |
|
|
|
|
|
|
|
|
|
// Indexes messages by fetching messages in ranges of blocks.
|
|
|
|
|
// We've observed occasional flakiness with providers where some events in
|
|
|
|
|
// a range will be missing. The leading theories are:
|
|
|
|
|
//
|
|
|
|
|
// 1. The provider is just flaky and sometimes misses events :(
|
|
|
|
|
//
|
|
|
|
|
// 2. For outbox chains with low finality times, it's possible that when
|
|
|
|
|
// we query the RPC provider for the latest finalized block number,
|
|
|
|
|
// we're returned a block number T. However when we attempt to index a range
|
|
|
|
|
// where the `to` block is T, the `eth_getLogs` RPC is load balanced by the
|
|
|
|
|
// provider to a different node whose latest known block is some block T' < T.
|
|
|
|
|
// The `eth_getLogs` RPC implementations seem to happily accept `to` blocks that
|
|
|
|
|
// exceed the latest known block, so it's possible that in our indexer we think
|
|
|
|
|
// that we've indexed up to block T but we've only *actually* indexed up to block T'.
|
|
|
|
|
|
|
|
|
|
// provider to a different node whose latest known block is some block T' <T.
|
|
|
|
|
//
|
|
|
|
|
// The `eth_getLogs` RPC implementations seem to happily accept
|
|
|
|
|
// `to` blocks that exceed the latest known block, so it's possible
|
|
|
|
|
// that in our indexer we think that we've indexed up to block T but
|
|
|
|
|
// we've only *actually* indexed up to block T'.
|
|
|
|
|
//
|
|
|
|
|
// It's easy to determine if a provider has skipped any message events by
|
|
|
|
|
// looking at the indices of each message and ensuring that we've indexed a valid
|
|
|
|
|
// continuation of messages.
|
|
|
|
|
// looking at the indices of each message and ensuring that we've indexed a
|
|
|
|
|
// valid continuation of messages.
|
|
|
|
|
//
|
|
|
|
|
// There are two classes of invalid continuations:
|
|
|
|
|
// 1. The latest previously indexed message index is M that was found in a previously
|
|
|
|
|
// indexed block range. A new block range [A,B] is indexed, returning a list of messages.
|
|
|
|
|
// The lowest message index in that list is `M + 1`, but there are some missing messages
|
|
|
|
|
// indices in the list. This is likely a flaky provider, and we can simply re-index the
|
|
|
|
|
// range [A,B] hoping that the provider will soon return a correct list.
|
|
|
|
|
// 2. The latest previously indexed message index is M that was found in a previously
|
|
|
|
|
// indexed block range, [A,B]. A new block range [C,D] is indexed, returning a list of
|
|
|
|
|
// messages. However, the lowest message index in that list is M' where M' > M + 1.
|
|
|
|
|
// This missing messages could be anywhere in the range [A,D]:
|
|
|
|
|
// * It's possible there was an issue when the prior block range [A,B] was indexed, where
|
|
|
|
|
// the provider didn't provide some messages with indices > M that it should have.
|
|
|
|
|
// * It's possible that the range [B,C] that was presumed to be empty when it was indexed
|
|
|
|
|
// actually wasn't.
|
|
|
|
|
// * And it's possible that this was just a flaky gap, where there are messages in the [C,D]
|
|
|
|
|
// range that weren't returned for some reason.
|
|
|
|
|
//
|
|
|
|
|
// 1. The latest previously indexed message index is M that was found in a
|
|
|
|
|
// previously indexed block range. A new block range [A,B] is indexed, returning
|
|
|
|
|
// a list of messages. The lowest message index in that list is `M + 1`,
|
|
|
|
|
// but there are some missing messages indices in the list. This is
|
|
|
|
|
// likely a flaky provider, and we can simply re-index the range [A,B]
|
|
|
|
|
// hoping that the provider will soon return a correct list.
|
|
|
|
|
//
|
|
|
|
|
// 2. The latest previously indexed message index is M that was found in a
|
|
|
|
|
// previously indexed block range, [A,B]. A new block range [C,D] is
|
|
|
|
|
// indexed, returning a list of messages. However, the lowest message
|
|
|
|
|
// index in that list is M' where M' > M + 1. This missing messages
|
|
|
|
|
// could be anywhere in the range [A,D]:
|
|
|
|
|
// * It's possible there was an issue when the prior block range [A,B] was
|
|
|
|
|
// indexed, where the provider didn't provide some messages with indices >
|
|
|
|
|
// M that it should have.
|
|
|
|
|
// * It's possible that the range [B,C] that was presumed to be empty when it
|
|
|
|
|
// was indexed actually wasn't.
|
|
|
|
|
// * And it's possible that this was just a flaky gap, where there are
|
|
|
|
|
// messages in the [C,D] range that weren't returned for some reason.
|
|
|
|
|
//
|
|
|
|
|
// We can handle this by re-indexing starting from block A.
|
|
|
|
|
// Note this means we only handle this case upon observing messages in some range [C,D]
|
|
|
|
|
// that indicate a previously indexed range may have missed some messages.
|
|
|
|
|
// Note this means we only handle this case upon observing messages in some
|
|
|
|
|
// range [C,D] that indicate a previously indexed range may have
|
|
|
|
|
// missed some messages.
|
|
|
|
|
tokio::spawn(async move { |
|
|
|
|
let mut from = db |
|
|
|
|
.retrieve_latest_valid_message_range_start_block() |
|
|
|
|
.unwrap_or(config_from); |
|
|
|
|
|
|
|
|
|
let mut last_valid_range_start_block = from; |
|
|
|
|
let mut cursor = cursor.await?; |
|
|
|
|
|
|
|
|
|
info!(from = from, "[Messages]: resuming indexer from latest valid message range start block"); |
|
|
|
|
let start_block = cursor.current_position(); |
|
|
|
|
let mut last_valid_range_start_block = start_block; |
|
|
|
|
info!(from = start_block, "[Messages]: resuming indexer from latest valid message range start block"); |
|
|
|
|
indexed_height.set(start_block as i64); |
|
|
|
|
|
|
|
|
|
indexed_height.set(from as i64); |
|
|
|
|
loop { |
|
|
|
|
sleep(Duration::from_secs(5)).await; |
|
|
|
|
|
|
|
|
|
// Only index blocks considered final.
|
|
|
|
|
// If there's an error getting the block number, just start the loop over
|
|
|
|
|
let Ok(tip) = indexer.get_finalized_block_number().await else { |
|
|
|
|
continue; |
|
|
|
|
}; |
|
|
|
|
if tip <= from { |
|
|
|
|
// Sleep if caught up to tip
|
|
|
|
|
sleep(Duration::from_secs(10)).await; |
|
|
|
|
let start_block = cursor.current_position(); |
|
|
|
|
let (from, to) = match cursor.next_range().await { |
|
|
|
|
Ok(range) => range, |
|
|
|
|
Err(err) => { |
|
|
|
|
warn!(error = %err, "[Messages]: failed to get next block range"); |
|
|
|
|
continue; |
|
|
|
|
} |
|
|
|
|
}; |
|
|
|
|
|
|
|
|
|
// Index the chunk_size, capping at the tip.
|
|
|
|
|
let to = min(tip, from + chunk_size); |
|
|
|
|
|
|
|
|
|
// Still search the full-size chunk size to possibly catch events that nodes have dropped "close to the tip"
|
|
|
|
|
let full_chunk_from = to.checked_sub(chunk_size).unwrap_or_default(); |
|
|
|
|
|
|
|
|
|
let mut sorted_messages: Vec<_> = indexer.fetch_sorted_messages(full_chunk_from, to).await?.into_iter().map(|(msg, _)| msg).collect(); |
|
|
|
|
let mut sorted_messages: Vec<_> = indexer |
|
|
|
|
.fetch_sorted_messages(from, to) |
|
|
|
|
.await? |
|
|
|
|
.into_iter() |
|
|
|
|
.map(|(msg, _)| msg) |
|
|
|
|
.collect(); |
|
|
|
|
|
|
|
|
|
info!( |
|
|
|
|
from = full_chunk_from, |
|
|
|
|
to = to, |
|
|
|
|
message_count = sorted_messages.len(), |
|
|
|
|
"[Messages]: indexed block range" |
|
|
|
|
); |
|
|
|
|
info!(from, to, message_count = sorted_messages.len(), "[Messages]: indexed block range"); |
|
|
|
|
|
|
|
|
|
// Get the latest known nonce. All messages whose indices are <= this index
|
|
|
|
|
// have been stored in the DB.
|
|
|
|
@ -128,12 +136,7 @@ where |
|
|
|
|
sorted_messages.retain(|m| m.nonce > min_nonce); |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
debug!( |
|
|
|
|
from = full_chunk_from, |
|
|
|
|
to = to, |
|
|
|
|
message_count = sorted_messages.len(), |
|
|
|
|
"[Messages]: filtered any messages already indexed" |
|
|
|
|
); |
|
|
|
|
debug!(from, to, message_count = sorted_messages.len(), "[Messages]: filtered any messages already indexed"); |
|
|
|
|
|
|
|
|
|
// Ensure the sorted messages are a valid continuation of last_nonce
|
|
|
|
|
match validate_message_continuity(last_nonce, &sorted_messages.iter().collect::<Vec<_>>()) { |
|
|
|
@ -153,11 +156,10 @@ where |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
// Update the latest valid start block.
|
|
|
|
|
db.store_latest_valid_message_range_start_block(full_chunk_from)?; |
|
|
|
|
last_valid_range_start_block = full_chunk_from; |
|
|
|
|
db.store_latest_valid_message_range_start_block(from)?; |
|
|
|
|
last_valid_range_start_block = from; |
|
|
|
|
|
|
|
|
|
// Move forward to the next height
|
|
|
|
|
from = to + 1; |
|
|
|
|
indexed_height.set(to as i64); |
|
|
|
|
} |
|
|
|
|
// The index of the first message in sorted_messages is not the
|
|
|
|
@ -173,11 +175,12 @@ where |
|
|
|
|
"[Messages]: Found invalid continuation in range. Re-indexing from the start block of the last successful range.", |
|
|
|
|
); |
|
|
|
|
|
|
|
|
|
from = last_valid_range_start_block; |
|
|
|
|
indexed_height.set(from as i64); |
|
|
|
|
cursor.backtrack(last_valid_range_start_block); |
|
|
|
|
indexed_height.set(last_valid_range_start_block as i64); |
|
|
|
|
} |
|
|
|
|
ListValidity::ContainsGaps => { |
|
|
|
|
missed_messages.inc(); |
|
|
|
|
cursor.backtrack(start_block); |
|
|
|
|
|
|
|
|
|
warn!( |
|
|
|
|
last_nonce = ?last_nonce, |
|
|
|
@ -191,7 +194,6 @@ where |
|
|
|
|
// We don't update last_valid_range_start_block because we cannot extrapolate
|
|
|
|
|
// if the range was correctly indexed if there are no messages to observe their
|
|
|
|
|
// indices.
|
|
|
|
|
from = to + 1; |
|
|
|
|
indexed_height.set(to as i64); |
|
|
|
|
} |
|
|
|
|
}; |
|
|
|
@ -201,6 +203,28 @@ where |
|
|
|
|
} |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
#[cfg(test)] |
|
|
|
|
static mut MOCK_CURSOR: Option<abacus_test::mocks::cursor::MockSyncBlockRangeCursor> = None; |
|
|
|
|
|
|
|
|
|
/// Create a new cursor. In test mode we should use the mock cursor created by
|
|
|
|
|
/// the test.
|
|
|
|
|
#[cfg_attr(test, allow(unused_variables))] |
|
|
|
|
async fn create_cursor<I: Indexer>( |
|
|
|
|
indexer: I, |
|
|
|
|
chunk_size: u32, |
|
|
|
|
initial_height: u32, |
|
|
|
|
) -> eyre::Result<impl SyncBlockRangeCursor> { |
|
|
|
|
#[cfg(not(test))] |
|
|
|
|
{ |
|
|
|
|
crate::RateLimitedSyncBlockRangeCursor::new(indexer, chunk_size, initial_height).await |
|
|
|
|
} |
|
|
|
|
#[cfg(test)] |
|
|
|
|
{ |
|
|
|
|
let cursor = unsafe { MOCK_CURSOR.take() }; |
|
|
|
|
Ok(cursor.expect("Mock cursor was not set before it was used")) |
|
|
|
|
} |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
#[cfg(test)] |
|
|
|
|
mod test { |
|
|
|
|
use std::sync::Arc; |
|
|
|
@ -208,20 +232,28 @@ mod test { |
|
|
|
|
|
|
|
|
|
use ethers::core::types::H256; |
|
|
|
|
use eyre::eyre; |
|
|
|
|
use mockall::predicate::eq; |
|
|
|
|
use mockall::*; |
|
|
|
|
use tokio::select; |
|
|
|
|
use tokio::time::{interval, timeout}; |
|
|
|
|
use tokio::sync::Mutex; |
|
|
|
|
use tokio::time::{interval, sleep, timeout}; |
|
|
|
|
|
|
|
|
|
use abacus_core::{db::AbacusDB, AbacusMessage, LogMeta}; |
|
|
|
|
use abacus_test::mocks::cursor::MockSyncBlockRangeCursor; |
|
|
|
|
use abacus_test::mocks::indexer::MockAbacusIndexer; |
|
|
|
|
use abacus_test::test_utils; |
|
|
|
|
use mockall::predicate::eq; |
|
|
|
|
|
|
|
|
|
use crate::chains::IndexSettings; |
|
|
|
|
use crate::contract_sync::mailbox::MOCK_CURSOR; |
|
|
|
|
use crate::contract_sync::schema::OutboxContractSyncDB; |
|
|
|
|
use crate::contract_sync::IndexSettings; |
|
|
|
|
use crate::ContractSync; |
|
|
|
|
use crate::{ContractSyncMetrics, CoreMetrics}; |
|
|
|
|
|
|
|
|
|
// we need a mutex for our tests because of the static cursor object
|
|
|
|
|
lazy_static! { |
|
|
|
|
static ref TEST_MTX: Mutex<()> = Mutex::new(()); |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
#[tokio::test] |
|
|
|
|
async fn handles_missing_rpc_messages() { |
|
|
|
|
test_utils::run_test_db(|db| async move { |
|
|
|
@ -238,6 +270,12 @@ mod test { |
|
|
|
|
}; |
|
|
|
|
|
|
|
|
|
let messages = (0..10).map(message_gen).collect::<Vec<AbacusMessage>>(); |
|
|
|
|
let m0 = messages[0].clone(); |
|
|
|
|
let m1 = messages[1].clone(); |
|
|
|
|
let m2 = messages[2].clone(); |
|
|
|
|
let m3 = messages[3].clone(); |
|
|
|
|
let m4 = messages[4].clone(); |
|
|
|
|
let m5 = messages[5].clone(); |
|
|
|
|
|
|
|
|
|
let meta = || LogMeta { |
|
|
|
|
address: Default::default(), |
|
|
|
@ -251,163 +289,102 @@ mod test { |
|
|
|
|
let latest_valid_message_range_start_block = 100; |
|
|
|
|
|
|
|
|
|
let mut mock_indexer = MockAbacusIndexer::new(); |
|
|
|
|
let mut mock_cursor = MockSyncBlockRangeCursor::new(); |
|
|
|
|
{ |
|
|
|
|
let mut seq = Sequence::new(); |
|
|
|
|
|
|
|
|
|
// Return m0.
|
|
|
|
|
let m0 = messages[0].clone(); |
|
|
|
|
mock_indexer |
|
|
|
|
.expect__get_finalized_block_number() |
|
|
|
|
// Some local macros to reduce code-duplication.
|
|
|
|
|
macro_rules! expect_current_position { |
|
|
|
|
($return_position:literal) => { |
|
|
|
|
mock_cursor |
|
|
|
|
.expect__current_position() |
|
|
|
|
.times(1) |
|
|
|
|
.in_sequence(&mut seq) |
|
|
|
|
.return_once(|| Ok(110)); |
|
|
|
|
mock_indexer |
|
|
|
|
.expect__fetch_sorted_messages() |
|
|
|
|
.return_once(|| $return_position); |
|
|
|
|
}; |
|
|
|
|
} |
|
|
|
|
macro_rules! expect_backtrack { |
|
|
|
|
($expected_new_from:literal) => { |
|
|
|
|
mock_cursor |
|
|
|
|
.expect__backtrack() |
|
|
|
|
.times(1) |
|
|
|
|
.with(eq(91), eq(110)) |
|
|
|
|
.in_sequence(&mut seq) |
|
|
|
|
.return_once(move |_, _| Ok(vec![(m0, meta())])); |
|
|
|
|
|
|
|
|
|
// Return m1, miss m2.
|
|
|
|
|
let m1 = messages[1].clone(); |
|
|
|
|
mock_indexer |
|
|
|
|
.expect__get_finalized_block_number() |
|
|
|
|
.with(eq($expected_new_from)) |
|
|
|
|
.return_once(|_| ()); |
|
|
|
|
}; |
|
|
|
|
} |
|
|
|
|
macro_rules! expect_fetches_range { |
|
|
|
|
($expected_from:literal, $expected_to:literal, $return_messages:expr) => { |
|
|
|
|
let messages: &[&AbacusMessage] = $return_messages; |
|
|
|
|
let messages = messages.iter().map(|&msg| (msg.clone(), meta())).collect(); |
|
|
|
|
mock_cursor |
|
|
|
|
.expect__next_range() |
|
|
|
|
.times(1) |
|
|
|
|
.in_sequence(&mut seq) |
|
|
|
|
.return_once(|| Ok(120)); |
|
|
|
|
.return_once(|| Box::pin(async { Ok(($expected_from, $expected_to)) })); |
|
|
|
|
mock_indexer |
|
|
|
|
.expect__fetch_sorted_messages() |
|
|
|
|
.times(1) |
|
|
|
|
.with(eq(101), eq(120)) |
|
|
|
|
.with(eq($expected_from), eq($expected_to)) |
|
|
|
|
.in_sequence(&mut seq) |
|
|
|
|
.return_once(move |_, _| Ok(vec![(m1, meta())])); |
|
|
|
|
.return_once(move |_, _| Ok(messages)); |
|
|
|
|
}; |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
expect_current_position!(91); |
|
|
|
|
expect_current_position!(91); |
|
|
|
|
|
|
|
|
|
// Return m0.
|
|
|
|
|
expect_fetches_range!(91, 110, &[&m0]); |
|
|
|
|
|
|
|
|
|
// Return m1, miss m2.
|
|
|
|
|
expect_current_position!(111); |
|
|
|
|
expect_fetches_range!(101, 120, &[&m1]); |
|
|
|
|
|
|
|
|
|
// Miss m3.
|
|
|
|
|
mock_indexer |
|
|
|
|
.expect__get_finalized_block_number() |
|
|
|
|
.times(1) |
|
|
|
|
.in_sequence(&mut seq) |
|
|
|
|
.return_once(|| Ok(130)); |
|
|
|
|
mock_indexer |
|
|
|
|
.expect__fetch_sorted_messages() |
|
|
|
|
.times(1) |
|
|
|
|
.with(eq(111), eq(130)) |
|
|
|
|
.in_sequence(&mut seq) |
|
|
|
|
.return_once(move |_, _| Ok(vec![])); |
|
|
|
|
expect_current_position!(121); |
|
|
|
|
expect_fetches_range!(111, 130, &[]); |
|
|
|
|
|
|
|
|
|
// Empty range.
|
|
|
|
|
mock_indexer |
|
|
|
|
.expect__get_finalized_block_number() |
|
|
|
|
.times(1) |
|
|
|
|
.in_sequence(&mut seq) |
|
|
|
|
.return_once(|| Ok(140)); |
|
|
|
|
mock_indexer |
|
|
|
|
.expect__fetch_sorted_messages() |
|
|
|
|
.times(1) |
|
|
|
|
.with(eq(121), eq(140)) |
|
|
|
|
.in_sequence(&mut seq) |
|
|
|
|
.return_once(move |_, _| Ok(vec![])); |
|
|
|
|
|
|
|
|
|
mock_indexer |
|
|
|
|
.expect__get_finalized_block_number() |
|
|
|
|
.times(1) |
|
|
|
|
.in_sequence(&mut seq) |
|
|
|
|
.return_once(|| Ok(140)); |
|
|
|
|
expect_current_position!(131); |
|
|
|
|
expect_fetches_range!(121, 140, &[]); |
|
|
|
|
|
|
|
|
|
// m1 --> m5 seen as an invalid continuation
|
|
|
|
|
let m5 = messages[5].clone(); |
|
|
|
|
mock_indexer |
|
|
|
|
.expect__get_finalized_block_number() |
|
|
|
|
.times(1) |
|
|
|
|
.in_sequence(&mut seq) |
|
|
|
|
.return_once(|| Ok(150)); |
|
|
|
|
mock_indexer |
|
|
|
|
.expect__fetch_sorted_messages() |
|
|
|
|
.times(1) |
|
|
|
|
.with(eq(131), eq(150)) |
|
|
|
|
.in_sequence(&mut seq) |
|
|
|
|
.return_once(move |_, _| Ok(vec![(m5, meta())])); |
|
|
|
|
expect_current_position!(141); |
|
|
|
|
expect_fetches_range!(131, 150, &[&m5]); |
|
|
|
|
expect_backtrack!(101); |
|
|
|
|
|
|
|
|
|
// Indexer goes back to the last valid message range start block
|
|
|
|
|
// and indexes the range based off the chunk size of 19.
|
|
|
|
|
// and indexes the range
|
|
|
|
|
// This time it gets m1 and m2 (which was previously skipped)
|
|
|
|
|
let m1 = messages[1].clone(); |
|
|
|
|
let m2 = messages[2].clone(); |
|
|
|
|
mock_indexer |
|
|
|
|
.expect__get_finalized_block_number() |
|
|
|
|
.times(1) |
|
|
|
|
.in_sequence(&mut seq) |
|
|
|
|
.return_once(|| Ok(160)); |
|
|
|
|
mock_indexer |
|
|
|
|
.expect__fetch_sorted_messages() |
|
|
|
|
.times(1) |
|
|
|
|
.with(eq(101), eq(120)) |
|
|
|
|
.in_sequence(&mut seq) |
|
|
|
|
.return_once(move |_, _| Ok(vec![(m1, meta()), (m2, meta())])); |
|
|
|
|
expect_current_position!(101); |
|
|
|
|
expect_fetches_range!(101, 120, &[&m1, &m2]); |
|
|
|
|
|
|
|
|
|
// Indexer continues, this time getting m3 and m5 message, but skipping m4,
|
|
|
|
|
// which means this range contains gaps
|
|
|
|
|
let m3 = messages[3].clone(); |
|
|
|
|
let m5 = messages[5].clone(); |
|
|
|
|
mock_indexer |
|
|
|
|
.expect__get_finalized_block_number() |
|
|
|
|
.times(1) |
|
|
|
|
.in_sequence(&mut seq) |
|
|
|
|
.return_once(|| Ok(170)); |
|
|
|
|
mock_indexer |
|
|
|
|
.expect__fetch_sorted_messages() |
|
|
|
|
.times(1) |
|
|
|
|
.with(eq(121), eq(140)) |
|
|
|
|
.in_sequence(&mut seq) |
|
|
|
|
.return_once(move |_, _| Ok(vec![(m3, meta()), (m5, meta())])); |
|
|
|
|
expect_current_position!(121); |
|
|
|
|
expect_fetches_range!(118, 140, &[&m3, &m5]); |
|
|
|
|
expect_backtrack!(121); |
|
|
|
|
|
|
|
|
|
// Indexer retries, the same range in hope of filling the gap,
|
|
|
|
|
// which it now does successfully
|
|
|
|
|
let m3 = messages[3].clone(); |
|
|
|
|
let m4 = messages[4].clone(); |
|
|
|
|
let m5 = messages[5].clone(); |
|
|
|
|
mock_indexer |
|
|
|
|
.expect__get_finalized_block_number() |
|
|
|
|
.times(1) |
|
|
|
|
.in_sequence(&mut seq) |
|
|
|
|
.return_once(|| Ok(170)); |
|
|
|
|
mock_indexer |
|
|
|
|
.expect__fetch_sorted_messages() |
|
|
|
|
.times(1) |
|
|
|
|
.with(eq(121), eq(140)) |
|
|
|
|
.in_sequence(&mut seq) |
|
|
|
|
.return_once(move |_, _| Ok(vec![(m3, meta()), (m4, meta()), (m5, meta())])); |
|
|
|
|
expect_current_position!(121); |
|
|
|
|
expect_fetches_range!(121, 140, &[&m3, &m4, &m5]); |
|
|
|
|
|
|
|
|
|
// Indexer continues with the next block range, which happens to be empty
|
|
|
|
|
mock_indexer |
|
|
|
|
.expect__get_finalized_block_number() |
|
|
|
|
.times(1) |
|
|
|
|
.in_sequence(&mut seq) |
|
|
|
|
.return_once(|| Ok(180)); |
|
|
|
|
mock_indexer |
|
|
|
|
.expect__fetch_sorted_messages() |
|
|
|
|
.times(1) |
|
|
|
|
.with(eq(141), eq(160)) |
|
|
|
|
.in_sequence(&mut seq) |
|
|
|
|
.return_once(move |_, _| Ok(vec![])); |
|
|
|
|
|
|
|
|
|
// Indexer catches up with the tip
|
|
|
|
|
mock_indexer |
|
|
|
|
.expect__get_finalized_block_number() |
|
|
|
|
.times(1) |
|
|
|
|
.in_sequence(&mut seq) |
|
|
|
|
.return_once(|| Ok(180)); |
|
|
|
|
mock_indexer |
|
|
|
|
.expect__fetch_sorted_messages() |
|
|
|
|
.times(1) |
|
|
|
|
.with(eq(161), eq(180)) |
|
|
|
|
.in_sequence(&mut seq) |
|
|
|
|
.return_once(move |_, _| Ok(vec![])); |
|
|
|
|
expect_current_position!(141); |
|
|
|
|
expect_fetches_range!(141, 160, &[]); |
|
|
|
|
|
|
|
|
|
// Stay at the same tip, so no other fetch_sorted_messages calls are made
|
|
|
|
|
mock_indexer |
|
|
|
|
.expect__get_finalized_block_number() |
|
|
|
|
.returning(|| Ok(180)); |
|
|
|
|
mock_cursor.expect__current_position().returning(|| 161); |
|
|
|
|
mock_cursor.expect__next_range().returning(|| { |
|
|
|
|
Box::pin(async move { |
|
|
|
|
// this sleep should be longer than the test timeout since we don't actually
|
|
|
|
|
// want to yield any more values at this point.
|
|
|
|
|
sleep(Duration::from_secs(100)).await; |
|
|
|
|
Ok((161, 161)) |
|
|
|
|
}) |
|
|
|
|
}); |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
let abacus_db = AbacusDB::new("outbox_1", db); |
|
|
|
@ -424,13 +401,14 @@ mod test { |
|
|
|
|
CoreMetrics::new("contract_sync_test", None, prometheus::Registry::new()) |
|
|
|
|
.expect("could not make metrics"), |
|
|
|
|
); |
|
|
|
|
unsafe { MOCK_CURSOR = Some(mock_cursor) }; |
|
|
|
|
|
|
|
|
|
let sync_metrics = ContractSyncMetrics::new(metrics); |
|
|
|
|
|
|
|
|
|
let contract_sync = ContractSync::new( |
|
|
|
|
"outbox_1".into(), |
|
|
|
|
abacus_db.clone(), |
|
|
|
|
indexer.clone(), |
|
|
|
|
indexer, |
|
|
|
|
IndexSettings { |
|
|
|
|
from: Some("0".to_string()), |
|
|
|
|
chunk: Some("19".to_string()), |
|
|
|
@ -439,7 +417,7 @@ mod test { |
|
|
|
|
); |
|
|
|
|
|
|
|
|
|
let sync_task = contract_sync.sync_dispatched_messages(); |
|
|
|
|
let test_pass_fut = timeout(Duration::from_secs(90), async move { |
|
|
|
|
let test_pass_fut = timeout(Duration::from_secs(5), async move { |
|
|
|
|
let mut interval = interval(Duration::from_millis(20)); |
|
|
|
|
loop { |
|
|
|
|
if abacus_db.message_by_nonce(0).expect("!db").is_some() |
|
|
|
@ -460,7 +438,9 @@ mod test { |
|
|
|
|
tests_result = test_pass_fut => |
|
|
|
|
if tests_result.is_ok() { Ok(()) } else { Err(eyre!("timed out")) } |
|
|
|
|
}; |
|
|
|
|
assert!(test_result.is_ok()); |
|
|
|
|
if let Err(err) = test_result { |
|
|
|
|
panic!("Test failed: {err}") |
|
|
|
|
} |
|
|
|
|
}) |
|
|
|
|
.await |
|
|
|
|
} |
|
|
|
|