Tests for new message indexing (#813)

* tests

* Clean up

* fmt

* smaller diff

* Fix comment
pull/822/head
Trevor Porter 2 years ago committed by GitHub
parent 889c16a479
commit cd3e1e7aab
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 163
      rust/abacus-base/src/contract_sync/outbox.rs

@ -217,7 +217,9 @@ mod test {
use abacus_core::{db::AbacusDB, AbacusMessage, Encode, RawCommittedMessage};
use abacus_test::mocks::indexer::MockAbacusIndexer;
use abacus_test::test_utils;
use mockall::predicate::eq;
use crate::contract_sync::schema::OutboxContractSyncDB;
use crate::ContractSync;
use crate::{settings::IndexSettings, ContractSyncMetrics, CoreMetrics};
@ -235,70 +237,80 @@ mod test {
.write_to(&mut message_vec)
.expect("!write_to");
let first_message = RawCommittedMessage {
let m0 = RawCommittedMessage {
leaf_index: 0,
message: message_vec.clone(),
};
let second_message = RawCommittedMessage {
let m1 = RawCommittedMessage {
leaf_index: 1,
message: message_vec.clone(),
};
let third_message = RawCommittedMessage {
let m2 = RawCommittedMessage {
leaf_index: 2,
message: message_vec.clone(),
};
let fourth_message = RawCommittedMessage {
let m3 = RawCommittedMessage {
leaf_index: 3,
message: message_vec.clone(),
};
let fifth_message = RawCommittedMessage {
let m4 = RawCommittedMessage {
leaf_index: 4,
message: message_vec.clone(),
};
let m5 = RawCommittedMessage {
leaf_index: 5,
message: message_vec.clone(),
};
let latest_valid_message_range_start_block = 100;
let mut mock_indexer = MockAbacusIndexer::new();
{
let mut seq = Sequence::new();
// Return first message.
// Return m0.
let m0_clone = m0.clone();
mock_indexer
.expect__get_finalized_block_number()
.times(1)
.in_sequence(&mut seq)
.return_once(|| Ok(100));
let m1 = first_message.clone();
.return_once(|| Ok(110));
mock_indexer
.expect__fetch_sorted_messages()
.times(1)
.with(eq(100), eq(110))
.in_sequence(&mut seq)
.return_once(move |_, _| Ok(vec![m1]));
.return_once(move |_, _| Ok(vec![m0_clone]));
// Return second message, misses third message.
// Return m1, miss m2.
let m1_clone = m1.clone();
mock_indexer
.expect__get_finalized_block_number()
.times(1)
.in_sequence(&mut seq)
.return_once(|| Ok(100));
let m2 = second_message.clone();
.return_once(|| Ok(120));
mock_indexer
.expect__fetch_sorted_messages()
.times(1)
.with(eq(111), eq(120))
.in_sequence(&mut seq)
.return_once(move |_, _| Ok(vec![m2]));
.return_once(move |_, _| Ok(vec![m1_clone]));
// Misses the fourth.
// Miss m3.
mock_indexer
.expect__get_finalized_block_number()
.times(1)
.in_sequence(&mut seq)
.return_once(|| Ok(100));
.return_once(|| Ok(130));
mock_indexer
.expect__fetch_sorted_messages()
.times(1)
.with(eq(121), eq(130))
.in_sequence(&mut seq)
.return_once(move |_, _| Ok(vec![]));
@ -307,150 +319,116 @@ mod test {
.expect__get_finalized_block_number()
.times(1)
.in_sequence(&mut seq)
.return_once(|| Ok(100));
.return_once(|| Ok(140));
mock_indexer
.expect__fetch_sorted_messages()
.times(1)
.with(eq(131), eq(140))
.in_sequence(&mut seq)
.return_once(move |_, _| Ok(vec![]));
// Second --> fifth message seen as invalid.
// m1 --> m5 seen as an invalid continuation
let m5_clone = m5.clone();
mock_indexer
.expect__get_finalized_block_number()
.times(1)
.in_sequence(&mut seq)
.return_once(|| Ok(100));
let m5 = fifth_message.clone();
.return_once(|| Ok(150));
mock_indexer
.expect__fetch_sorted_messages()
.times(1)
.with(eq(141), eq(150))
.in_sequence(&mut seq)
.return_once(move |_, _| Ok(vec![m5]));
.return_once(move |_, _| Ok(vec![m5_clone]));
// Indexer goes back and tries empty block range
// Indexer goes back to the last valid message range start block
// and indexes the range based off the chunk size of 19.
// This time it gets m1 and m2 (which was previously skipped)
let m1_clone = m1.clone();
let m2_clone = m2.clone();
mock_indexer
.expect__get_finalized_block_number()
.times(1)
.in_sequence(&mut seq)
.return_once(|| Ok(100));
.return_once(|| Ok(160));
mock_indexer
.expect__fetch_sorted_messages()
.times(1)
.with(eq(111), eq(130))
.in_sequence(&mut seq)
.return_once(move |_, _| Ok(vec![]));
.return_once(move |_, _| Ok(vec![m1_clone, m2_clone]));
// Indexer tries to move on to realized missing block range but can't.
// Indexer continues, this time getting m3 and m5 message, but skipping m4,
// which means this range contains gaps
let m3_clone = m3.clone();
let m5_clone = m5.clone();
mock_indexer
.expect__get_finalized_block_number()
.times(1)
.in_sequence(&mut seq)
.return_once(|| Ok(100));
let m5 = fifth_message.clone();
.return_once(|| Ok(170));
mock_indexer
.expect__fetch_sorted_messages()
.times(1)
.with(eq(131), eq(150))
.in_sequence(&mut seq)
.return_once(move |_, _| Ok(vec![m5]));
.return_once(move |_, _| Ok(vec![m3_clone, m5_clone]));
// Indexer goes back further and gets to fourth message.
// Indexer retries, the same range in hope of filling the gap,
// which it now does successfully
mock_indexer
.expect__get_finalized_block_number()
.times(1)
.in_sequence(&mut seq)
.return_once(|| Ok(100));
let m4 = fourth_message.clone();
.return_once(|| Ok(170));
mock_indexer
.expect__fetch_sorted_messages()
.times(1)
.with(eq(131), eq(150))
.in_sequence(&mut seq)
.return_once(move |_, _| Ok(vec![m4]));
.return_once(move |_, _| Ok(vec![m3, m4, m5]));
// Indexer gets empty range again.
// Indexer continues with the next block range, which happens to be empty
mock_indexer
.expect__get_finalized_block_number()
.times(1)
.in_sequence(&mut seq)
.return_once(|| Ok(100));
.return_once(|| Ok(180));
mock_indexer
.expect__fetch_sorted_messages()
.times(1)
.with(eq(151), eq(170))
.in_sequence(&mut seq)
.return_once(move |_, _| Ok(vec![]));
// Indexer gets fifth message again.
mock_indexer
.expect__get_finalized_block_number()
.times(1)
.in_sequence(&mut seq)
.return_once(|| Ok(100));
let m5 = fifth_message.clone();
mock_indexer
.expect__fetch_sorted_messages()
.times(1)
.in_sequence(&mut seq)
.return_once(move |_, _| Ok(vec![m5]));
// Indexer goes back even further and gets to message 2 and 3.
mock_indexer
.expect__get_finalized_block_number()
.times(1)
.in_sequence(&mut seq)
.return_once(|| Ok(100));
let m2 = second_message.clone();
let m3 = third_message.clone();
mock_indexer
.expect__fetch_sorted_messages()
.times(1)
.in_sequence(&mut seq)
.return_once(move |_, _| Ok(vec![m2, m3]));
// Return fourth message.
// Indexer catches up with the tip
mock_indexer
.expect__get_finalized_block_number()
.times(1)
.in_sequence(&mut seq)
.return_once(|| Ok(100));
let m4 = fourth_message.clone();
mock_indexer
.expect__fetch_sorted_messages()
.times(1)
.in_sequence(&mut seq)
.return_once(move |_, _| Ok(vec![m4]));
// Re-indexes empty block range.
mock_indexer
.expect__get_finalized_block_number()
.times(1)
.in_sequence(&mut seq)
.return_once(|| Ok(100));
.return_once(|| Ok(190));
mock_indexer
.expect__fetch_sorted_messages()
.times(1)
.with(eq(171), eq(190))
.in_sequence(&mut seq)
.return_once(move |_, _| Ok(vec![]));
// Return fifth message.
mock_indexer
.expect__get_finalized_block_number()
.times(1)
.in_sequence(&mut seq)
.return_once(|| Ok(100));
mock_indexer
.expect__fetch_sorted_messages()
.times(1)
.in_sequence(&mut seq)
.return_once(move |_, _| Ok(vec![fifth_message]));
// Stay at the same tip, so no other fetch_sorted_messages calls are made
mock_indexer
.expect__get_finalized_block_number()
.returning(|| Ok(100));
mock_indexer
.expect__fetch_sorted_messages()
.returning(|_, _| Ok(vec![]));
.returning(|| Ok(190));
}
let abacus_db = AbacusDB::new("outbox_1", db);
// Set the latest valid message range start block
abacus_db
.store_latest_valid_message_range_start_block(
latest_valid_message_range_start_block,
)
.unwrap();
let indexer = Arc::new(mock_indexer);
let metrics = Arc::new(
CoreMetrics::new("contract_sync_test", None, prometheus::Registry::new())
@ -465,7 +443,7 @@ mod test {
indexer.clone(),
IndexSettings {
from: Some("0".to_string()),
chunk: Some("10".to_string()),
chunk: Some("19".to_string()),
},
sync_metrics,
);
@ -479,6 +457,7 @@ mod test {
&& abacus_db.message_by_leaf_index(2).expect("!db").is_some()
&& abacus_db.message_by_leaf_index(3).expect("!db").is_some()
&& abacus_db.message_by_leaf_index(4).expect("!db").is_some()
&& abacus_db.message_by_leaf_index(5).expect("!db").is_some()
{
break;
}

Loading…
Cancel
Save