diff --git a/rust/Cargo.lock b/rust/Cargo.lock index 8cd065e69..655eeeaf1 100644 --- a/rust/Cargo.lock +++ b/rust/Cargo.lock @@ -113,6 +113,7 @@ dependencies = [ "rocksdb", "serde", "serde_json", + "tempfile", "thiserror", "tokio", "tracing", diff --git a/rust/abacus-base/Cargo.toml b/rust/abacus-base/Cargo.toml index 9f4492f76..5f232da8b 100644 --- a/rust/abacus-base/Cargo.toml +++ b/rust/abacus-base/Cargo.toml @@ -57,5 +57,6 @@ once_cell = "1.12" [dev-dependencies] color-eyre = "0.6" + [features] oneline-eyre = ["backtrace-oneline", "backtrace"] diff --git a/rust/abacus-base/src/contract_sync/outbox.rs b/rust/abacus-base/src/contract_sync/outbox.rs index 6cfe5bf90..67262fd2d 100644 --- a/rust/abacus-base/src/contract_sync/outbox.rs +++ b/rust/abacus-base/src/contract_sync/outbox.rs @@ -143,7 +143,7 @@ where continue; } - // Ensure the sorted messages are a valid continution of last_leaf_index + // Ensure the sorted messages are a valid continuation of last_leaf_index match &last_leaf_index.valid_continuation(&sorted_messages) { ListValidity::Valid => { // Store messages @@ -169,8 +169,9 @@ where // Move forward to the next height from = to + 1; - }, - // The index of the first message in sorted_messages is not the last_leaf_index + 1. + } + // The index of the first message in sorted_messages is not the + // `last_leaf_index+1`. ListValidity::InvalidContinuation => { missed_messages.inc(); @@ -183,7 +184,7 @@ where ); from = last_valid_range_start_block; - }, + } ListValidity::ContainsGaps => { missed_messages.inc(); @@ -198,16 +199,20 @@ where }; } }) - .instrument(span) + .instrument(span) } } #[cfg(test)] mod test { use std::sync::Arc; + use std::time::Duration; use ethers::core::types::H256; + use eyre::eyre; use mockall::*; + use tokio::select; + use tokio::time::{interval, timeout}; use abacus_core::{db::AbacusDB, AbacusMessage, Encode, RawCommittedMessage}; use abacus_test::mocks::indexer::MockAbacusIndexer; @@ -216,16 +221,9 @@ mod test { use crate::ContractSync; use crate::{settings::IndexSettings, ContractSyncMetrics, CoreMetrics}; - use super::*; - #[tokio::test] async fn handles_missing_rpc_messages() { test_utils::run_test_db(|db| async move { - // let first_root = H256::from([0; 32]); - // let second_root = H256::from([1; 32]); - // let third_root = H256::from([2; 32]); - // let fourth_root = H256::from([2; 32]); - let mut message_vec = vec![]; AbacusMessage { origin: 1000, @@ -246,7 +244,6 @@ mod test { leaf_index: 1, message: message_vec.clone(), }; - let second_message_clone = second_message.clone(); let third_message = RawCommittedMessage { leaf_index: 2, @@ -257,46 +254,43 @@ mod test { leaf_index: 3, message: message_vec.clone(), }; - let fourth_message_clone_1 = fourth_message.clone(); - let fourth_message_clone_2 = fourth_message.clone(); let fifth_message = RawCommittedMessage { leaf_index: 4, message: message_vec.clone(), }; - let fifth_message_clone_1 = fifth_message.clone(); - let fifth_message_clone_2 = fifth_message.clone(); - let fifth_message_clone_3 = fifth_message.clone(); let mut mock_indexer = MockAbacusIndexer::new(); { let mut seq = Sequence::new(); - // Return first message + // Return first message. mock_indexer .expect__get_finalized_block_number() .times(1) .in_sequence(&mut seq) .return_once(|| Ok(100)); + let m1 = first_message.clone(); mock_indexer .expect__fetch_sorted_messages() .times(1) .in_sequence(&mut seq) - .return_once(move |_, _| Ok(vec![first_message.clone()])); + .return_once(move |_, _| Ok(vec![m1])); - // Return second message, misses third message + // Return second message, misses third message. mock_indexer .expect__get_finalized_block_number() .times(1) .in_sequence(&mut seq) .return_once(|| Ok(100)); + let m2 = second_message.clone(); mock_indexer .expect__fetch_sorted_messages() .times(1) .in_sequence(&mut seq) - .return_once(move |_, _| Ok(vec![second_message])); + .return_once(move |_, _| Ok(vec![m2])); - // misses the fourth + // Misses the fourth. mock_indexer .expect__get_finalized_block_number() .times(1) @@ -308,7 +302,7 @@ mod test { .in_sequence(&mut seq) .return_once(move |_, _| Ok(vec![])); - // empty range + // Empty range. mock_indexer .expect__get_finalized_block_number() .times(1) @@ -320,17 +314,18 @@ mod test { .in_sequence(&mut seq) .return_once(move |_, _| Ok(vec![])); - // second --> fifth message seen as invalid + // Second --> fifth message seen as invalid. mock_indexer .expect__get_finalized_block_number() .times(1) .in_sequence(&mut seq) .return_once(|| Ok(100)); + let m5 = fifth_message.clone(); mock_indexer .expect__fetch_sorted_messages() .times(1) .in_sequence(&mut seq) - .return_once(move |_, _| Ok(vec![fifth_message])); + .return_once(move |_, _| Ok(vec![m5])); // Indexer goes back and tries empty block range mock_indexer @@ -344,32 +339,33 @@ mod test { .in_sequence(&mut seq) .return_once(move |_, _| Ok(vec![])); - // Indexer tries to move on to realized missing block range but - // can't + // Indexer tries to move on to realized missing block range but can't. mock_indexer .expect__get_finalized_block_number() .times(1) .in_sequence(&mut seq) .return_once(|| Ok(100)); + let m5 = fifth_message.clone(); mock_indexer .expect__fetch_sorted_messages() .times(1) .in_sequence(&mut seq) - .return_once(move |_, _| Ok(vec![fifth_message_clone_1])); + .return_once(move |_, _| Ok(vec![m5])); - // Indexer goes back further and gets to fourth message + // Indexer goes back further and gets to fourth message. mock_indexer .expect__get_finalized_block_number() .times(1) .in_sequence(&mut seq) .return_once(|| Ok(100)); + let m4 = fourth_message.clone(); mock_indexer .expect__fetch_sorted_messages() .times(1) .in_sequence(&mut seq) - .return_once(move |_, _| Ok(vec![fourth_message_clone_1])); + .return_once(move |_, _| Ok(vec![m4])); - // Indexer gets empty range again + // Indexer gets empty range again. mock_indexer .expect__get_finalized_block_number() .times(1) @@ -381,43 +377,47 @@ mod test { .in_sequence(&mut seq) .return_once(move |_, _| Ok(vec![])); - // Indexer gets fifth message again + // Indexer gets fifth message again. mock_indexer .expect__get_finalized_block_number() .times(1) .in_sequence(&mut seq) .return_once(|| Ok(100)); + let m5 = fifth_message.clone(); mock_indexer .expect__fetch_sorted_messages() .times(1) .in_sequence(&mut seq) - .return_once(move |_, _| Ok(vec![fifth_message_clone_2])); + .return_once(move |_, _| Ok(vec![m5])); - // Indexer goes back even further and gets to message 2 and 3 + // Indexer goes back even further and gets to message 2 and 3. mock_indexer .expect__get_finalized_block_number() .times(1) .in_sequence(&mut seq) .return_once(|| Ok(100)); + let m2 = second_message.clone(); + let m3 = third_message.clone(); mock_indexer .expect__fetch_sorted_messages() .times(1) .in_sequence(&mut seq) - .return_once(move |_, _| Ok(vec![second_message_clone, third_message])); + .return_once(move |_, _| Ok(vec![m2, m3])); - // Return fourth message + // Return fourth message. mock_indexer .expect__get_finalized_block_number() .times(1) .in_sequence(&mut seq) .return_once(|| Ok(100)); + let m4 = fourth_message.clone(); mock_indexer .expect__fetch_sorted_messages() .times(1) .in_sequence(&mut seq) - .return_once(move |_, _| Ok(vec![fourth_message_clone_2])); + .return_once(move |_, _| Ok(vec![m4])); - // Reindexes empty block range + // Re-indexes empty block range. mock_indexer .expect__get_finalized_block_number() .times(1) @@ -429,7 +429,7 @@ mod test { .in_sequence(&mut seq) .return_once(move |_, _| Ok(vec![])); - // Return fifth message + // Return fifth message. mock_indexer .expect__get_finalized_block_number() .times(1) @@ -439,17 +439,14 @@ mod test { .expect__fetch_sorted_messages() .times(1) .in_sequence(&mut seq) - .return_once(move |_, _| Ok(vec![fifth_message_clone_3])); + .return_once(move |_, _| Ok(vec![fifth_message])); - // Return empty vec for remaining calls mock_indexer .expect__get_finalized_block_number() - .times(1) - .in_sequence(&mut seq) - .return_once(|| Ok(100)); + .returning(|| Ok(100)); mock_indexer .expect__fetch_sorted_messages() - .return_once(move |_, _| Ok(vec![])); + .returning(|_, _| Ok(vec![])); } let abacus_db = AbacusDB::new("outbox_1", db); @@ -474,14 +471,27 @@ mod test { ); let sync_task = contract_sync.sync_outbox_messages(); - sleep(Duration::from_secs(3)).await; - cancel_task!(sync_task); - - assert!(abacus_db.message_by_leaf_index(0).expect("!db").is_some()); - assert!(abacus_db.message_by_leaf_index(1).expect("!db").is_some()); - assert!(abacus_db.message_by_leaf_index(2).expect("!db").is_some()); - assert!(abacus_db.message_by_leaf_index(3).expect("!db").is_some()); - assert!(abacus_db.message_by_leaf_index(4).expect("!db").is_some()); + let test_pass_fut = timeout(Duration::from_secs(30), async move { + let mut interval = interval(Duration::from_millis(20)); + loop { + if abacus_db.message_by_leaf_index(0).expect("!db").is_some() + && abacus_db.message_by_leaf_index(1).expect("!db").is_some() + && abacus_db.message_by_leaf_index(2).expect("!db").is_some() + && abacus_db.message_by_leaf_index(3).expect("!db").is_some() + && abacus_db.message_by_leaf_index(4).expect("!db").is_some() + { + break; + } + interval.tick().await; + } + }); + let test_result = select! { + err = sync_task => Err(eyre!( + "sync task unexpectedly done before test: {:?}", err.unwrap_err())), + tests_result = test_pass_fut => + if tests_result.is_ok() { Ok(()) } else { Err(eyre!("timed out")) } + }; + assert!(test_result.is_ok()); }) .await } diff --git a/rust/abacus-test/Cargo.toml b/rust/abacus-test/Cargo.toml index 324b9048f..6650f7557 100644 --- a/rust/abacus-test/Cargo.toml +++ b/rust/abacus-test/Cargo.toml @@ -15,6 +15,7 @@ eyre = "0.6" mockall = "0.10.2" rand = "0.8.3" rocksdb = "0.18" +tempfile = "3.3" abacus-core = { path = "../abacus-core" } tracing = "0.1" diff --git a/rust/abacus-test/src/test_utils.rs b/rust/abacus-test/src/test_utils.rs index 9e81a2eff..4415ddbf6 100644 --- a/rust/abacus-test/src/test_utils.rs +++ b/rust/abacus-test/src/test_utils.rs @@ -1,10 +1,8 @@ -use abacus_core::db::DB; -use futures_util::FutureExt; -use rand::distributions::Alphanumeric; -use rand::{thread_rng, Rng}; -use std::{future::Future, panic}; - +use futures_util::Future; use rocksdb::Options; +use tempfile::TempDir; + +use abacus_core::db::DB; pub fn setup_db(db_path: String) -> DB { let mut opts = Options::default(); @@ -16,34 +14,27 @@ pub fn setup_db(db_path: String) -> DB { pub async fn run_test_db(test: T) where - T: FnOnce(DB) -> Fut + panic::UnwindSafe, + T: FnOnce(DB) -> Fut, Fut: Future, { - // RocksDB only allows one unique db handle to be open at a time. Because - // `cargo test` is multithreaded by default, we use random db pathnames to - // avoid collisions between 2+ threads - let rand_path: String = thread_rng() - .sample_iter(&Alphanumeric) - .take(8) - .map(char::from) - .collect(); - let result = { - let db = setup_db(rand_path.clone()); - - let func = panic::AssertUnwindSafe(async { test(db).await }); - func.catch_unwind().await - }; - let _ = rocksdb::DB::destroy(&Options::default(), rand_path); - assert!(result.is_ok()) + // Use `/tmp`-equivalent so that any resource leak of the db files will + // eventually be cleaned up, even if e.g. TempDir's drop handler never runs + // due to a segfault etc encountered during the test. + let db_tmp_dir = TempDir::new().unwrap(); + let db = setup_db(db_tmp_dir.path().to_str().unwrap().into()); + let _test_result = test(db).await; + let _ = rocksdb::DB::destroy(&Options::default(), db_tmp_dir); } #[cfg(test)] mod test { - use super::*; + use ethers::types::H256; + use abacus_core::{ accumulator::merkle::Proof, db::AbacusDB, AbacusMessage, Encode, RawCommittedMessage, }; - use ethers::types::H256; + + use super::*; #[tokio::test] async fn db_stores_and_retrieves_messages() {