diff --git a/rust/abacus-base/src/contract_sync/outbox.rs b/rust/abacus-base/src/contract_sync/outbox.rs index 67262fd2d..473ff8d8e 100644 --- a/rust/abacus-base/src/contract_sync/outbox.rs +++ b/rust/abacus-base/src/contract_sync/outbox.rs @@ -217,7 +217,9 @@ mod test { use abacus_core::{db::AbacusDB, AbacusMessage, Encode, RawCommittedMessage}; use abacus_test::mocks::indexer::MockAbacusIndexer; use abacus_test::test_utils; + use mockall::predicate::eq; + use crate::contract_sync::schema::OutboxContractSyncDB; use crate::ContractSync; use crate::{settings::IndexSettings, ContractSyncMetrics, CoreMetrics}; @@ -235,70 +237,80 @@ mod test { .write_to(&mut message_vec) .expect("!write_to"); - let first_message = RawCommittedMessage { + let m0 = RawCommittedMessage { leaf_index: 0, message: message_vec.clone(), }; - let second_message = RawCommittedMessage { + let m1 = RawCommittedMessage { leaf_index: 1, message: message_vec.clone(), }; - let third_message = RawCommittedMessage { + let m2 = RawCommittedMessage { leaf_index: 2, message: message_vec.clone(), }; - let fourth_message = RawCommittedMessage { + let m3 = RawCommittedMessage { leaf_index: 3, message: message_vec.clone(), }; - let fifth_message = RawCommittedMessage { + let m4 = RawCommittedMessage { leaf_index: 4, message: message_vec.clone(), }; + let m5 = RawCommittedMessage { + leaf_index: 5, + message: message_vec.clone(), + }; + + let latest_valid_message_range_start_block = 100; + let mut mock_indexer = MockAbacusIndexer::new(); { let mut seq = Sequence::new(); - // Return first message. + // Return m0. + let m0_clone = m0.clone(); mock_indexer .expect__get_finalized_block_number() .times(1) .in_sequence(&mut seq) - .return_once(|| Ok(100)); - let m1 = first_message.clone(); + .return_once(|| Ok(110)); mock_indexer .expect__fetch_sorted_messages() .times(1) + .with(eq(100), eq(110)) .in_sequence(&mut seq) - .return_once(move |_, _| Ok(vec![m1])); + .return_once(move |_, _| Ok(vec![m0_clone])); - // Return second message, misses third message. + // Return m1, miss m2. + let m1_clone = m1.clone(); mock_indexer .expect__get_finalized_block_number() .times(1) .in_sequence(&mut seq) - .return_once(|| Ok(100)); - let m2 = second_message.clone(); + .return_once(|| Ok(120)); mock_indexer .expect__fetch_sorted_messages() .times(1) + .with(eq(111), eq(120)) .in_sequence(&mut seq) - .return_once(move |_, _| Ok(vec![m2])); + .return_once(move |_, _| Ok(vec![m1_clone])); - // Misses the fourth. + // Miss m3. mock_indexer .expect__get_finalized_block_number() .times(1) .in_sequence(&mut seq) - .return_once(|| Ok(100)); + .return_once(|| Ok(130)); mock_indexer .expect__fetch_sorted_messages() .times(1) + .with(eq(121), eq(130)) .in_sequence(&mut seq) .return_once(move |_, _| Ok(vec![])); @@ -307,150 +319,116 @@ mod test { .expect__get_finalized_block_number() .times(1) .in_sequence(&mut seq) - .return_once(|| Ok(100)); + .return_once(|| Ok(140)); mock_indexer .expect__fetch_sorted_messages() .times(1) + .with(eq(131), eq(140)) .in_sequence(&mut seq) .return_once(move |_, _| Ok(vec![])); - // Second --> fifth message seen as invalid. + // m1 --> m5 seen as an invalid continuation + let m5_clone = m5.clone(); mock_indexer .expect__get_finalized_block_number() .times(1) .in_sequence(&mut seq) - .return_once(|| Ok(100)); - let m5 = fifth_message.clone(); + .return_once(|| Ok(150)); mock_indexer .expect__fetch_sorted_messages() .times(1) + .with(eq(141), eq(150)) .in_sequence(&mut seq) - .return_once(move |_, _| Ok(vec![m5])); + .return_once(move |_, _| Ok(vec![m5_clone])); - // Indexer goes back and tries empty block range + // Indexer goes back to the last valid message range start block + // and indexes the range based off the chunk size of 19. + // This time it gets m1 and m2 (which was previously skipped) + let m1_clone = m1.clone(); + let m2_clone = m2.clone(); mock_indexer .expect__get_finalized_block_number() .times(1) .in_sequence(&mut seq) - .return_once(|| Ok(100)); + .return_once(|| Ok(160)); mock_indexer .expect__fetch_sorted_messages() .times(1) + .with(eq(111), eq(130)) .in_sequence(&mut seq) - .return_once(move |_, _| Ok(vec![])); + .return_once(move |_, _| Ok(vec![m1_clone, m2_clone])); - // Indexer tries to move on to realized missing block range but can't. + // Indexer continues, this time getting m3 and m5 message, but skipping m4, + // which means this range contains gaps + let m3_clone = m3.clone(); + let m5_clone = m5.clone(); mock_indexer .expect__get_finalized_block_number() .times(1) .in_sequence(&mut seq) - .return_once(|| Ok(100)); - let m5 = fifth_message.clone(); + .return_once(|| Ok(170)); mock_indexer .expect__fetch_sorted_messages() .times(1) + .with(eq(131), eq(150)) .in_sequence(&mut seq) - .return_once(move |_, _| Ok(vec![m5])); + .return_once(move |_, _| Ok(vec![m3_clone, m5_clone])); - // Indexer goes back further and gets to fourth message. + // Indexer retries, the same range in hope of filling the gap, + // which it now does successfully mock_indexer .expect__get_finalized_block_number() .times(1) .in_sequence(&mut seq) - .return_once(|| Ok(100)); - let m4 = fourth_message.clone(); + .return_once(|| Ok(170)); mock_indexer .expect__fetch_sorted_messages() .times(1) + .with(eq(131), eq(150)) .in_sequence(&mut seq) - .return_once(move |_, _| Ok(vec![m4])); + .return_once(move |_, _| Ok(vec![m3, m4, m5])); - // Indexer gets empty range again. + // Indexer continues with the next block range, which happens to be empty mock_indexer .expect__get_finalized_block_number() .times(1) .in_sequence(&mut seq) - .return_once(|| Ok(100)); + .return_once(|| Ok(180)); mock_indexer .expect__fetch_sorted_messages() .times(1) + .with(eq(151), eq(170)) .in_sequence(&mut seq) .return_once(move |_, _| Ok(vec![])); - // Indexer gets fifth message again. - mock_indexer - .expect__get_finalized_block_number() - .times(1) - .in_sequence(&mut seq) - .return_once(|| Ok(100)); - let m5 = fifth_message.clone(); - mock_indexer - .expect__fetch_sorted_messages() - .times(1) - .in_sequence(&mut seq) - .return_once(move |_, _| Ok(vec![m5])); - - // Indexer goes back even further and gets to message 2 and 3. - mock_indexer - .expect__get_finalized_block_number() - .times(1) - .in_sequence(&mut seq) - .return_once(|| Ok(100)); - let m2 = second_message.clone(); - let m3 = third_message.clone(); - mock_indexer - .expect__fetch_sorted_messages() - .times(1) - .in_sequence(&mut seq) - .return_once(move |_, _| Ok(vec![m2, m3])); - - // Return fourth message. + // Indexer catches up with the tip mock_indexer .expect__get_finalized_block_number() .times(1) .in_sequence(&mut seq) - .return_once(|| Ok(100)); - let m4 = fourth_message.clone(); - mock_indexer - .expect__fetch_sorted_messages() - .times(1) - .in_sequence(&mut seq) - .return_once(move |_, _| Ok(vec![m4])); - - // Re-indexes empty block range. - mock_indexer - .expect__get_finalized_block_number() - .times(1) - .in_sequence(&mut seq) - .return_once(|| Ok(100)); + .return_once(|| Ok(190)); mock_indexer .expect__fetch_sorted_messages() .times(1) + .with(eq(171), eq(190)) .in_sequence(&mut seq) .return_once(move |_, _| Ok(vec![])); - // Return fifth message. - mock_indexer - .expect__get_finalized_block_number() - .times(1) - .in_sequence(&mut seq) - .return_once(|| Ok(100)); - mock_indexer - .expect__fetch_sorted_messages() - .times(1) - .in_sequence(&mut seq) - .return_once(move |_, _| Ok(vec![fifth_message])); - + // Stay at the same tip, so no other fetch_sorted_messages calls are made mock_indexer .expect__get_finalized_block_number() - .returning(|| Ok(100)); - mock_indexer - .expect__fetch_sorted_messages() - .returning(|_, _| Ok(vec![])); + .returning(|| Ok(190)); } let abacus_db = AbacusDB::new("outbox_1", db); + // Set the latest valid message range start block + abacus_db + .store_latest_valid_message_range_start_block( + latest_valid_message_range_start_block, + ) + .unwrap(); + let indexer = Arc::new(mock_indexer); let metrics = Arc::new( CoreMetrics::new("contract_sync_test", None, prometheus::Registry::new()) @@ -465,7 +443,7 @@ mod test { indexer.clone(), IndexSettings { from: Some("0".to_string()), - chunk: Some("10".to_string()), + chunk: Some("19".to_string()), }, sync_metrics, ); @@ -479,6 +457,7 @@ mod test { && abacus_db.message_by_leaf_index(2).expect("!db").is_some() && abacus_db.message_by_leaf_index(3).expect("!db").is_some() && abacus_db.message_by_leaf_index(4).expect("!db").is_some() + && abacus_db.message_by_leaf_index(5).expect("!db").is_some() { break; }