Fix broken test `contract_sync::outbox::test::handles_missing_rpc_messages`. (#811)

Before this PR, the test will panic due to outbox syncer running in a concurrent
tokio task after the test completes and invoking mocked functions without expectations set.
The panic is also swallowed by a separate bug and is only visible if tests are run under
`cargo test -- --show-output`.

After this PR, we no longer see a backtrace / panic when running with `cargo test -- --show-output`.
We also correctly mark the test as failed in `cargo test` if we introduce a new panic in the test or DB harness code.
Some additional changes to try and improve things adjacent to this test:

    *  use tmpfs instead of disk + panic-unwind-detection for the RocksDB contents to avoid the panic detection complexity

    *  avoid the current correctness dependency on sleep(from_secs(5)) in the test. This is inherently racy and it is not much harder or more complex to just select() with a timeout
pull/818/head
webbhorn 2 years ago committed by GitHub
parent 1b90cd534c
commit ae03a7343b
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 1
      rust/Cargo.lock
  2. 1
      rust/abacus-base/Cargo.toml
  3. 118
      rust/abacus-base/src/contract_sync/outbox.rs
  4. 1
      rust/abacus-test/Cargo.toml
  5. 41
      rust/abacus-test/src/test_utils.rs

1
rust/Cargo.lock generated

@ -113,6 +113,7 @@ dependencies = [
"rocksdb",
"serde",
"serde_json",
"tempfile",
"thiserror",
"tokio",
"tracing",

@ -57,5 +57,6 @@ once_cell = "1.12"
[dev-dependencies]
color-eyre = "0.6"
[features]
oneline-eyre = ["backtrace-oneline", "backtrace"]

@ -143,7 +143,7 @@ where
continue;
}
// Ensure the sorted messages are a valid continution of last_leaf_index
// Ensure the sorted messages are a valid continuation of last_leaf_index
match &last_leaf_index.valid_continuation(&sorted_messages) {
ListValidity::Valid => {
// Store messages
@ -169,8 +169,9 @@ where
// Move forward to the next height
from = to + 1;
},
// The index of the first message in sorted_messages is not the last_leaf_index + 1.
}
// The index of the first message in sorted_messages is not the
// `last_leaf_index+1`.
ListValidity::InvalidContinuation => {
missed_messages.inc();
@ -183,7 +184,7 @@ where
);
from = last_valid_range_start_block;
},
}
ListValidity::ContainsGaps => {
missed_messages.inc();
@ -198,16 +199,20 @@ where
};
}
})
.instrument(span)
.instrument(span)
}
}
#[cfg(test)]
mod test {
use std::sync::Arc;
use std::time::Duration;
use ethers::core::types::H256;
use eyre::eyre;
use mockall::*;
use tokio::select;
use tokio::time::{interval, timeout};
use abacus_core::{db::AbacusDB, AbacusMessage, Encode, RawCommittedMessage};
use abacus_test::mocks::indexer::MockAbacusIndexer;
@ -216,16 +221,9 @@ mod test {
use crate::ContractSync;
use crate::{settings::IndexSettings, ContractSyncMetrics, CoreMetrics};
use super::*;
#[tokio::test]
async fn handles_missing_rpc_messages() {
test_utils::run_test_db(|db| async move {
// let first_root = H256::from([0; 32]);
// let second_root = H256::from([1; 32]);
// let third_root = H256::from([2; 32]);
// let fourth_root = H256::from([2; 32]);
let mut message_vec = vec![];
AbacusMessage {
origin: 1000,
@ -246,7 +244,6 @@ mod test {
leaf_index: 1,
message: message_vec.clone(),
};
let second_message_clone = second_message.clone();
let third_message = RawCommittedMessage {
leaf_index: 2,
@ -257,46 +254,43 @@ mod test {
leaf_index: 3,
message: message_vec.clone(),
};
let fourth_message_clone_1 = fourth_message.clone();
let fourth_message_clone_2 = fourth_message.clone();
let fifth_message = RawCommittedMessage {
leaf_index: 4,
message: message_vec.clone(),
};
let fifth_message_clone_1 = fifth_message.clone();
let fifth_message_clone_2 = fifth_message.clone();
let fifth_message_clone_3 = fifth_message.clone();
let mut mock_indexer = MockAbacusIndexer::new();
{
let mut seq = Sequence::new();
// Return first message
// Return first message.
mock_indexer
.expect__get_finalized_block_number()
.times(1)
.in_sequence(&mut seq)
.return_once(|| Ok(100));
let m1 = first_message.clone();
mock_indexer
.expect__fetch_sorted_messages()
.times(1)
.in_sequence(&mut seq)
.return_once(move |_, _| Ok(vec![first_message.clone()]));
.return_once(move |_, _| Ok(vec![m1]));
// Return second message, misses third message
// Return second message, misses third message.
mock_indexer
.expect__get_finalized_block_number()
.times(1)
.in_sequence(&mut seq)
.return_once(|| Ok(100));
let m2 = second_message.clone();
mock_indexer
.expect__fetch_sorted_messages()
.times(1)
.in_sequence(&mut seq)
.return_once(move |_, _| Ok(vec![second_message]));
.return_once(move |_, _| Ok(vec![m2]));
// misses the fourth
// Misses the fourth.
mock_indexer
.expect__get_finalized_block_number()
.times(1)
@ -308,7 +302,7 @@ mod test {
.in_sequence(&mut seq)
.return_once(move |_, _| Ok(vec![]));
// empty range
// Empty range.
mock_indexer
.expect__get_finalized_block_number()
.times(1)
@ -320,17 +314,18 @@ mod test {
.in_sequence(&mut seq)
.return_once(move |_, _| Ok(vec![]));
// second --> fifth message seen as invalid
// Second --> fifth message seen as invalid.
mock_indexer
.expect__get_finalized_block_number()
.times(1)
.in_sequence(&mut seq)
.return_once(|| Ok(100));
let m5 = fifth_message.clone();
mock_indexer
.expect__fetch_sorted_messages()
.times(1)
.in_sequence(&mut seq)
.return_once(move |_, _| Ok(vec![fifth_message]));
.return_once(move |_, _| Ok(vec![m5]));
// Indexer goes back and tries empty block range
mock_indexer
@ -344,32 +339,33 @@ mod test {
.in_sequence(&mut seq)
.return_once(move |_, _| Ok(vec![]));
// Indexer tries to move on to realized missing block range but
// can't
// Indexer tries to move on to realized missing block range but can't.
mock_indexer
.expect__get_finalized_block_number()
.times(1)
.in_sequence(&mut seq)
.return_once(|| Ok(100));
let m5 = fifth_message.clone();
mock_indexer
.expect__fetch_sorted_messages()
.times(1)
.in_sequence(&mut seq)
.return_once(move |_, _| Ok(vec![fifth_message_clone_1]));
.return_once(move |_, _| Ok(vec![m5]));
// Indexer goes back further and gets to fourth message
// Indexer goes back further and gets to fourth message.
mock_indexer
.expect__get_finalized_block_number()
.times(1)
.in_sequence(&mut seq)
.return_once(|| Ok(100));
let m4 = fourth_message.clone();
mock_indexer
.expect__fetch_sorted_messages()
.times(1)
.in_sequence(&mut seq)
.return_once(move |_, _| Ok(vec![fourth_message_clone_1]));
.return_once(move |_, _| Ok(vec![m4]));
// Indexer gets empty range again
// Indexer gets empty range again.
mock_indexer
.expect__get_finalized_block_number()
.times(1)
@ -381,43 +377,47 @@ mod test {
.in_sequence(&mut seq)
.return_once(move |_, _| Ok(vec![]));
// Indexer gets fifth message again
// Indexer gets fifth message again.
mock_indexer
.expect__get_finalized_block_number()
.times(1)
.in_sequence(&mut seq)
.return_once(|| Ok(100));
let m5 = fifth_message.clone();
mock_indexer
.expect__fetch_sorted_messages()
.times(1)
.in_sequence(&mut seq)
.return_once(move |_, _| Ok(vec![fifth_message_clone_2]));
.return_once(move |_, _| Ok(vec![m5]));
// Indexer goes back even further and gets to message 2 and 3
// Indexer goes back even further and gets to message 2 and 3.
mock_indexer
.expect__get_finalized_block_number()
.times(1)
.in_sequence(&mut seq)
.return_once(|| Ok(100));
let m2 = second_message.clone();
let m3 = third_message.clone();
mock_indexer
.expect__fetch_sorted_messages()
.times(1)
.in_sequence(&mut seq)
.return_once(move |_, _| Ok(vec![second_message_clone, third_message]));
.return_once(move |_, _| Ok(vec![m2, m3]));
// Return fourth message
// Return fourth message.
mock_indexer
.expect__get_finalized_block_number()
.times(1)
.in_sequence(&mut seq)
.return_once(|| Ok(100));
let m4 = fourth_message.clone();
mock_indexer
.expect__fetch_sorted_messages()
.times(1)
.in_sequence(&mut seq)
.return_once(move |_, _| Ok(vec![fourth_message_clone_2]));
.return_once(move |_, _| Ok(vec![m4]));
// Reindexes empty block range
// Re-indexes empty block range.
mock_indexer
.expect__get_finalized_block_number()
.times(1)
@ -429,7 +429,7 @@ mod test {
.in_sequence(&mut seq)
.return_once(move |_, _| Ok(vec![]));
// Return fifth message
// Return fifth message.
mock_indexer
.expect__get_finalized_block_number()
.times(1)
@ -439,17 +439,14 @@ mod test {
.expect__fetch_sorted_messages()
.times(1)
.in_sequence(&mut seq)
.return_once(move |_, _| Ok(vec![fifth_message_clone_3]));
.return_once(move |_, _| Ok(vec![fifth_message]));
// Return empty vec for remaining calls
mock_indexer
.expect__get_finalized_block_number()
.times(1)
.in_sequence(&mut seq)
.return_once(|| Ok(100));
.returning(|| Ok(100));
mock_indexer
.expect__fetch_sorted_messages()
.return_once(move |_, _| Ok(vec![]));
.returning(|_, _| Ok(vec![]));
}
let abacus_db = AbacusDB::new("outbox_1", db);
@ -474,14 +471,27 @@ mod test {
);
let sync_task = contract_sync.sync_outbox_messages();
sleep(Duration::from_secs(3)).await;
cancel_task!(sync_task);
assert!(abacus_db.message_by_leaf_index(0).expect("!db").is_some());
assert!(abacus_db.message_by_leaf_index(1).expect("!db").is_some());
assert!(abacus_db.message_by_leaf_index(2).expect("!db").is_some());
assert!(abacus_db.message_by_leaf_index(3).expect("!db").is_some());
assert!(abacus_db.message_by_leaf_index(4).expect("!db").is_some());
let test_pass_fut = timeout(Duration::from_secs(30), async move {
let mut interval = interval(Duration::from_millis(20));
loop {
if abacus_db.message_by_leaf_index(0).expect("!db").is_some()
&& abacus_db.message_by_leaf_index(1).expect("!db").is_some()
&& abacus_db.message_by_leaf_index(2).expect("!db").is_some()
&& abacus_db.message_by_leaf_index(3).expect("!db").is_some()
&& abacus_db.message_by_leaf_index(4).expect("!db").is_some()
{
break;
}
interval.tick().await;
}
});
let test_result = select! {
err = sync_task => Err(eyre!(
"sync task unexpectedly done before test: {:?}", err.unwrap_err())),
tests_result = test_pass_fut =>
if tests_result.is_ok() { Ok(()) } else { Err(eyre!("timed out")) }
};
assert!(test_result.is_ok());
})
.await
}

@ -15,6 +15,7 @@ eyre = "0.6"
mockall = "0.10.2"
rand = "0.8.3"
rocksdb = "0.18"
tempfile = "3.3"
abacus-core = { path = "../abacus-core" }
tracing = "0.1"

@ -1,10 +1,8 @@
use abacus_core::db::DB;
use futures_util::FutureExt;
use rand::distributions::Alphanumeric;
use rand::{thread_rng, Rng};
use std::{future::Future, panic};
use futures_util::Future;
use rocksdb::Options;
use tempfile::TempDir;
use abacus_core::db::DB;
pub fn setup_db(db_path: String) -> DB {
let mut opts = Options::default();
@ -16,34 +14,27 @@ pub fn setup_db(db_path: String) -> DB {
pub async fn run_test_db<T, Fut>(test: T)
where
T: FnOnce(DB) -> Fut + panic::UnwindSafe,
T: FnOnce(DB) -> Fut,
Fut: Future<Output = ()>,
{
// RocksDB only allows one unique db handle to be open at a time. Because
// `cargo test` is multithreaded by default, we use random db pathnames to
// avoid collisions between 2+ threads
let rand_path: String = thread_rng()
.sample_iter(&Alphanumeric)
.take(8)
.map(char::from)
.collect();
let result = {
let db = setup_db(rand_path.clone());
let func = panic::AssertUnwindSafe(async { test(db).await });
func.catch_unwind().await
};
let _ = rocksdb::DB::destroy(&Options::default(), rand_path);
assert!(result.is_ok())
// Use `/tmp`-equivalent so that any resource leak of the db files will
// eventually be cleaned up, even if e.g. TempDir's drop handler never runs
// due to a segfault etc encountered during the test.
let db_tmp_dir = TempDir::new().unwrap();
let db = setup_db(db_tmp_dir.path().to_str().unwrap().into());
let _test_result = test(db).await;
let _ = rocksdb::DB::destroy(&Options::default(), db_tmp_dir);
}
#[cfg(test)]
mod test {
use super::*;
use ethers::types::H256;
use abacus_core::{
accumulator::merkle::Proof, db::AbacusDB, AbacusMessage, Encode, RawCommittedMessage,
};
use ethers::types::H256;
use super::*;
#[tokio::test]
async fn db_stores_and_retrieves_messages() {

Loading…
Cancel
Save