Proper RPC Useage fix (#1282)

* Dedupe indexer range calculations

* Better handling of tip updates

* Don't use to block for interchain gas indexing

* Rename to SyncBlockRangeCursor

* use elapsed

* Fix init case for debug mode

* Work on mocking for outbox indexer test

* Fix tests

* Use from instead of start for interchain gas indexing db cursor

* warn on failure to get block range from cursor

* Fixed typo

* Document test sleep in outbox
pull/1307/head
Mattie Conover 2 years ago committed by GitHub
parent bc32eb92ce
commit 0ba82d1918
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 33
      rust/Cargo.lock
  2. 2
      rust/abacus-base/Cargo.toml
  3. 2
      rust/abacus-base/src/agent.rs
  4. 92
      rust/abacus-base/src/contract_sync/cursor.rs
  5. 62
      rust/abacus-base/src/contract_sync/interchain_gas.rs
  6. 2
      rust/abacus-base/src/contract_sync/mod.rs
  7. 377
      rust/abacus-base/src/contract_sync/outbox.rs
  8. 3
      rust/abacus-base/src/lib.rs
  9. 34
      rust/abacus-core/src/traits/cursor.rs
  10. 2
      rust/abacus-core/src/traits/mod.rs
  11. 2
      rust/abacus-test/Cargo.toml
  12. 32
      rust/abacus-test/src/mocks/cursor.rs
  13. 3
      rust/abacus-test/src/mocks/mod.rs
  14. 4
      rust/agents/scraper/src/chain_scraper/mod.rs
  15. 72
      rust/agents/scraper/src/chain_scraper/sync.rs

33
rust/Cargo.lock generated

@ -1204,10 +1204,10 @@ dependencies = [
]
[[package]]
name = "difference"
version = "2.0.0"
name = "difflib"
version = "0.4.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "524cbf6897b527295dff137cec09ecf3a05f4fddffd7dfcd1585403449e74198"
checksum = "6184e33543162437515c2e2b48714794e37845ec9851711914eec9d308f6ebe8"
[[package]]
name = "digest"
@ -1302,9 +1302,9 @@ dependencies = [
[[package]]
name = "downcast"
version = "0.10.0"
version = "0.11.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "4bb454f0228b18c7f4c3b0ebbee346ed9c52e7443b0999cd543ff3571205701d"
checksum = "1435fa1053d8b2fbbe9be7e97eca7f33d37b28409959813daefc1446a14247f1"
[[package]]
name = "dunce"
@ -1740,9 +1740,9 @@ dependencies = [
[[package]]
name = "float-cmp"
version = "0.8.0"
version = "0.9.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "e1267f4ac4f343772758f7b1bdcbe767c218bbab93bb432acbf5162bbf85a6c4"
checksum = "98de4bbd547a563b716d8dfa9aad1cb19bfab00f4fa09a6a4ed21dbcf44ce9c4"
dependencies = [
"num-traits",
]
@ -1780,9 +1780,9 @@ dependencies = [
[[package]]
name = "fragile"
version = "1.2.1"
version = "2.0.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "85dcb89d2b10c5f6133de2efd8c11959ce9dbb46a2f7a4cab208c4eeda6ce1ab"
checksum = "6c2141d6d6c8512188a7891b4b01590a45f6dac67afb4f255c4124dbb86d4eaa"
[[package]]
name = "funty"
@ -2603,9 +2603,9 @@ dependencies = [
[[package]]
name = "mockall"
version = "0.10.2"
version = "0.11.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "6ab571328afa78ae322493cacca3efac6a0f2e0a67305b4df31fd439ef129ac0"
checksum = "50e4a1c770583dac7ab5e2f6c139153b783a53a1bbee9729613f193e59828326"
dependencies = [
"cfg-if",
"downcast",
@ -2618,9 +2618,9 @@ dependencies = [
[[package]]
name = "mockall_derive"
version = "0.10.2"
version = "0.11.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "e7e25b214433f669161f414959594216d8e6ba83b6679d3db96899c0b4639033"
checksum = "832663583d5fa284ca8810bf7015e46c9fff9622d3cf34bd1eea5003fec06dd0"
dependencies = [
"cfg-if",
"proc-macro2",
@ -3313,12 +3313,13 @@ checksum = "eb9f9e6e233e5c4a35559a617bf40a4ec447db2e84c20b55a6f83167b7e57872"
[[package]]
name = "predicates"
version = "1.0.8"
version = "2.1.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "f49cfaf7fdaa3bfacc6fa3e7054e65148878354a5cfddcf661df4c851f8021df"
checksum = "ed6bd09a7f7e68f3f0bf710fb7ab9c4615a488b58b5f653382a687701e458c92"
dependencies = [
"difference",
"difflib",
"float-cmp",
"itertools",
"normalize-line-endings",
"predicates-core",
"regex",

@ -21,7 +21,7 @@ tracing = "0.1"
tracing-futures = "0.2"
tracing-subscriber = { version = "0.3", features = ["json"] }
rocksdb = "0.18"
mockall = "0.10.2"
mockall = "0.11"
backtrace = { version = "0.3", optional = true }
backtrace-oneline = { path = "../utils/backtrace-oneline", optional = true }

@ -132,8 +132,6 @@ pub async fn agent_main<A: BaseAgent>() -> Result<()> {
crate::oneline_eyre::install()?;
#[cfg(all(feature = "color_eyre", not(feature = "oneline-errors")))]
color_eyre::install()?;
#[cfg(not(any(feature = "color-eyre", feature = "oneline-eyre")))]
eyre::install()?;
let settings = A::Settings::new().map_err(|e| e.into())?;
let core_settings: &AgentSettings = settings.as_ref();

@ -0,0 +1,92 @@
use std::time::{Duration, Instant};
use async_trait::async_trait;
use eyre::Result;
use tokio::time::sleep;
use abacus_core::{Indexer, SyncBlockRangeCursor};
/// Tool for handling the logic of what the next block range that should be
/// queried is and also handling rate limiting. Rate limiting is automatically
/// performed by `next_range`.
pub struct RateLimitedSyncBlockRangeCursor<I> {
indexer: I,
tip: u32,
last_tip_update: Instant,
chunk_size: u32,
from: u32,
}
impl<I> RateLimitedSyncBlockRangeCursor<I>
where
I: Indexer,
{
/// Construct a new contract sync helper.
pub async fn new(indexer: I, chunk_size: u32, initial_height: u32) -> Result<Self> {
let tip = indexer.get_finalized_block_number().await?;
Ok(Self {
indexer,
tip,
chunk_size,
last_tip_update: Instant::now(),
from: initial_height,
})
}
/// Wait based on how close we are to the tip and update the tip,
/// i.e. the highest block we may scrape.
async fn rate_limit(&mut self) -> Result<()> {
if self.from + self.chunk_size < self.tip {
// If doing the full chunk wouldn't exceed the already known tip,
// we don't necessarily need to fetch the new tip. Sleep a tiny bit
// so that we can catch up to the tip relatively quickly.
sleep(Duration::from_secs(1)).await;
Ok(())
} else {
// We are close to the tip.
if self.last_tip_update.elapsed() < Duration::from_secs(30) {
// Sleep a little longer because we have caught up.
sleep(Duration::from_secs(10)).await;
} else {
// We are probably not caught up yet. This would happen if we
// started really far behind so now it is very likely the tip
// has moved a significant distance. We don't want to wait in
// this case any more than we normally would.
sleep(Duration::from_secs(1)).await;
}
match self.indexer.get_finalized_block_number().await {
Ok(tip) => {
// we retrieved a new tip value, go ahead and update.
self.last_tip_update = Instant::now();
self.tip = tip;
Ok(())
}
Err(e) => {
// we are failing to make a basic query, we should wait before retrying.
sleep(Duration::from_secs(10)).await;
Err(e)
}
}
}
}
}
#[async_trait]
impl<I: Indexer> SyncBlockRangeCursor for RateLimitedSyncBlockRangeCursor<I> {
fn current_position(&self) -> u32 {
self.from
}
async fn next_range(&mut self) -> Result<(u32, u32)> {
self.rate_limit().await?;
let to = u32::min(self.tip, self.from + self.chunk_size);
let from = to.saturating_sub(self.chunk_size);
self.from = to + 1;
Ok((from, to))
}
fn backtrack(&mut self, start_from: u32) {
self.from = u32::min(start_from, self.from);
}
}

@ -1,11 +1,9 @@
use std::cmp::min;
use std::time::Duration;
use tokio::task::JoinHandle;
use tracing::{info, info_span, instrument::Instrumented, warn, Instrument};
use tokio::{task::JoinHandle, time::sleep};
use tracing::{debug, info, info_span, instrument::Instrumented, Instrument};
use abacus_core::InterchainGasPaymasterIndexer;
use abacus_core::{InterchainGasPaymasterIndexer, SyncBlockRangeCursor};
use crate::contract_sync::cursor::RateLimitedSyncBlockRangeCursor;
use crate::{contract_sync::schema::InterchainGasPaymasterContractSyncDB, ContractSync};
const GAS_PAYMENTS_LABEL: &str = "gas_payments";
@ -31,42 +29,39 @@ where
.stored_events
.with_label_values(&[GAS_PAYMENTS_LABEL, &self.chain_name]);
let config_from = self.index_settings.from();
let chunk_size = self.index_settings.chunk_size();
let cursor = {
let config_initial_height = self.index_settings.from();
let initial_height = db
.retrieve_latest_indexed_gas_payment_block()
.map_or(config_initial_height, |b| b + 1);
RateLimitedSyncBlockRangeCursor::new(
indexer.clone(),
self.index_settings.chunk_size(),
initial_height,
)
};
tokio::spawn(async move {
let mut from = db
.retrieve_latest_indexed_gas_payment_block()
.map_or_else(|| config_from, |b| b + 1);
let mut cursor = cursor.await?;
info!(from = from, "[GasPayments]: resuming indexer from {from}");
indexed_height.set(from as i64);
let start_block = cursor.current_position();
info!(from = start_block, "[GasPayments]: resuming indexer");
indexed_height.set(start_block as i64);
loop {
sleep(Duration::from_secs(5)).await;
// Only index blocks considered final.
// If there's an error getting the block number, just start the loop over
let Ok(tip) = indexer.get_finalized_block_number().await else {
continue;
};
if tip <= from {
// Sleep if caught up to tip
debug!(tip=?tip, from=?from, "[GasPayments]: caught up to tip, waiting for new block");
sleep(Duration::from_secs(10)).await;
let (from, to) = match cursor.next_range().await {
Ok(range) => range,
Err(err) => {
warn!(error = %err, "[GasPayments]: failed to get next block range");
continue;
}
};
let candidate = from + chunk_size;
let to = min(tip, candidate);
// Still search the full-size chunk size to possibly catch events that nodes have dropped "close to the tip"
let full_chunk_from = to.checked_sub(chunk_size).unwrap_or_default();
let gas_payments = indexer.fetch_gas_payments(full_chunk_from, to).await?;
let gas_payments = indexer.fetch_gas_payments(from, to).await?;
info!(
from = full_chunk_from,
to = to,
from,
to,
gas_payments_count = gas_payments.len(),
"[GasPayments]: indexed block range"
);
@ -82,8 +77,7 @@ where
stored_messages.inc_by(new_payments_processed);
db.store_latest_indexed_gas_payment_block(to)?;
from = to + 1;
db.store_latest_indexed_gas_payment_block(from)?;
indexed_height.set(to as i64);
}
})

@ -2,12 +2,14 @@
// TODO: Reapply metrics
use abacus_core::db::AbacusDB;
pub use cursor::*;
pub use interchain_gas::*;
pub use metrics::ContractSyncMetrics;
pub use outbox::*;
use crate::settings::IndexSettings;
mod cursor;
mod interchain_gas;
/// Tools for working with message continuity.
pub mod last_message;

@ -1,11 +1,10 @@
use std::cmp::min;
use std::time::Duration;
use tokio::time::sleep;
use tracing::{debug, info, info_span, warn};
use tracing::{instrument::Instrumented, Instrument};
use abacus_core::{name_from_domain_id, CommittedMessage, ListValidity, OutboxIndexer};
use abacus_core::{
name_from_domain_id, CommittedMessage, Indexer, ListValidity, OutboxIndexer,
SyncBlockRangeCursor,
};
use crate::contract_sync::last_message::validate_message_continuity;
use crate::{contract_sync::schema::OutboxContractSyncDB, ContractSync};
@ -40,82 +39,91 @@ where
let message_leaf_index = self.metrics.message_leaf_index.clone();
let chain_name = self.chain_name.clone();
let config_from = self.index_settings.from();
let chunk_size = self.index_settings.chunk_size();
let cursor = {
let config_initial_height = self.index_settings.from();
let initial_height = db
.retrieve_latest_valid_message_range_start_block()
.map_or(config_initial_height, |b| b + 1);
create_cursor(
indexer.clone(),
self.index_settings.chunk_size(),
initial_height,
)
};
// Indexes messages by fetching messages in ranges of blocks.
// We've observed occasional flakiness with providers where some events in
// a range will be missing. The leading theories are:
//
// 1. The provider is just flaky and sometimes misses events :(
//
// 2. For outbox chains with low finality times, it's possible that when
// we query the RPC provider for the latest finalized block number,
// we're returned a block number T. However when we attempt to index a range
// where the `to` block is T, the `eth_getLogs` RPC is load balanced by the
// provider to a different node whose latest known block is some block T' <T.
// The `eth_getLogs` RPC implementations seem to happily accept `to` blocks that
// exceed the latest known block, so it's possible that in our indexer we think
// that we've indexed up to block T but we've only *actually* indexed up to block T'.
//
// The `eth_getLogs` RPC implementations seem to happily accept
// `to` blocks that exceed the latest known block, so it's possible
// that in our indexer we think that we've indexed up to block T but
// we've only *actually* indexed up to block T'.
//
// It's easy to determine if a provider has skipped any message events by
// looking at the indices of each message and ensuring that we've indexed a valid
// continuation of messages.
// looking at the indices of each message and ensuring that we've indexed a
// valid continuation of messages.
//
// There are two classes of invalid continuations:
// 1. The latest previously indexed message index is M that was found in a previously
// indexed block range. A new block range [A,B] is indexed, returning a list of messages.
// The lowest message index in that list is `M + 1`, but there are some missing messages
// indices in the list. This is likely a flaky provider, and we can simply re-index the
// range [A,B] hoping that the provider will soon return a correct list.
// 2. The latest previously indexed message index is M that was found in a previously
// indexed block range, [A,B]. A new block range [C,D] is indexed, returning a list of
// messages. However, the lowest message index in that list is M' where M' > M + 1.
// This missing messages could be anywhere in the range [A,D]:
// * It's possible there was an issue when the prior block range [A,B] was indexed, where
// the provider didn't provide some messages with indices > M that it should have.
// * It's possible that the range [B,C] that was presumed to be empty when it was indexed
// actually wasn't.
// * And it's possible that this was just a flaky gap, where there are messages in the [C,D]
// range that weren't returned for some reason.
//
// 1. The latest previously indexed message index is M that was found in a
// previously indexed block range. A new block range [A,B] is indexed, returning
// a list of messages. The lowest message index in that list is `M + 1`,
// but there are some missing messages indices in the list. This is
// likely a flaky provider, and we can simply re-index the range [A,B]
// hoping that the provider will soon return a correct list.
//
// 2. The latest previously indexed message index is M that was found in a
// previously indexed block range, [A,B]. A new block range [C,D] is
// indexed, returning a list of messages. However, the lowest message
// index in that list is M' where M' > M + 1. This missing messages
// could be anywhere in the range [A,D]:
// * It's possible there was an issue when the prior block range [A,B] was
// indexed, where the provider didn't provide some messages with indices >
// M that it should have.
// * It's possible that the range [B,C] that was presumed to be empty when it
// was indexed actually wasn't.
// * And it's possible that this was just a flaky gap, where there are
// messages in the [C,D] range that weren't returned for some reason.
//
// We can handle this by re-indexing starting from block A.
// Note this means we only handle this case upon observing messages in some range [C,D]
// that indicate a previously indexed range may have missed some messages.
// Note this means we only handle this case upon observing messages in some
// range [C,D] that indicate a previously indexed range may have
// missed some messages.
tokio::spawn(async move {
let mut from = db
.retrieve_latest_valid_message_range_start_block()
.unwrap_or(config_from);
let mut last_valid_range_start_block = from;
let mut cursor = cursor.await?;
info!(from = from, "[Messages]: resuming indexer from latest valid message range start block");
let start_block = cursor.current_position();
let mut last_valid_range_start_block = start_block;
info!(from = start_block, "[Messages]: resuming indexer from latest valid message range start block");
indexed_height.set(start_block as i64);
indexed_height.set(from as i64);
loop {
sleep(Duration::from_secs(5)).await;
// Only index blocks considered final.
// If there's an error getting the block number, just start the loop over
let Ok(tip) = indexer.get_finalized_block_number().await else {
continue;
};
if tip <= from {
// Sleep if caught up to tip
sleep(Duration::from_secs(10)).await;
let start_block = cursor.current_position();
let (from, to) = match cursor.next_range().await {
Ok(range) => range,
Err(err) => {
warn!(error = %err, "[Messages]: failed to get next block range");
continue;
}
};
// Index the chunk_size, capping at the tip.
let to = min(tip, from + chunk_size);
// Still search the full-size chunk size to possibly catch events that nodes have dropped "close to the tip"
let full_chunk_from = to.checked_sub(chunk_size).unwrap_or_default();
let mut sorted_messages: Vec<_> = indexer.fetch_sorted_messages(full_chunk_from, to).await?.into_iter().map(|(msg, _)| msg).collect();
let mut sorted_messages: Vec<_> = indexer
.fetch_sorted_messages(from, to)
.await?
.into_iter()
.map(|(msg, _)| msg)
.collect();
info!(
from = full_chunk_from,
to = to,
message_count = sorted_messages.len(),
"[Messages]: indexed block range"
);
info!(from, to, message_count = sorted_messages.len(), "[Messages]: indexed block range");
// Get the latest known leaf index. All messages whose indices are <= this index
// have been stored in the DB.
@ -127,12 +135,7 @@ where
sorted_messages.retain(|m| m.leaf_index > min_index);
}
debug!(
from = full_chunk_from,
to = to,
message_count = sorted_messages.len(),
"[Messages]: filtered any messages already indexed"
);
debug!(from, to, message_count = sorted_messages.len(), "[Messages]: filtered any messages already indexed");
// Ensure the sorted messages are a valid continuation of last_leaf_index
match validate_message_continuity(last_leaf_index, &sorted_messages.iter().collect::<Vec<_>>()) {
@ -155,11 +158,10 @@ where
}
// Update the latest valid start block.
db.store_latest_valid_message_range_start_block(full_chunk_from)?;
last_valid_range_start_block = full_chunk_from;
db.store_latest_valid_message_range_start_block(from)?;
last_valid_range_start_block = from;
// Move forward to the next height
from = to + 1;
indexed_height.set(to as i64);
}
// The index of the first message in sorted_messages is not the
@ -175,11 +177,12 @@ where
"[Messages]: Found invalid continuation in range. Re-indexing from the start block of the last successful range.",
);
from = last_valid_range_start_block;
indexed_height.set(from as i64);
cursor.backtrack(last_valid_range_start_block);
indexed_height.set(last_valid_range_start_block as i64);
}
ListValidity::ContainsGaps => {
missed_messages.inc();
cursor.backtrack(start_block);
warn!(
last_leaf_index = ?last_leaf_index,
@ -193,7 +196,6 @@ where
// We don't update last_valid_range_start_block because we cannot extrapolate
// if the range was correctly indexed if there are no messages to observe their
// indices.
from = to + 1;
indexed_height.set(to as i64);
}
};
@ -203,6 +205,28 @@ where
}
}
#[cfg(test)]
static mut MOCK_CURSOR: Option<abacus_test::mocks::cursor::MockSyncBlockRangeCursor> = None;
/// Create a new cursor. In test mode we should use the mock cursor created by
/// the test.
#[cfg_attr(test, allow(unused_variables))]
async fn create_cursor<I: Indexer>(
indexer: I,
chunk_size: u32,
initial_height: u32,
) -> eyre::Result<impl SyncBlockRangeCursor> {
#[cfg(not(test))]
{
crate::RateLimitedSyncBlockRangeCursor::new(indexer, chunk_size, initial_height).await
}
#[cfg(test)]
{
let cursor = unsafe { MOCK_CURSOR.take() };
Ok(cursor.expect("Mock cursor was not set before it was used"))
}
}
#[cfg(test)]
mod test {
use std::sync::Arc;
@ -210,22 +234,32 @@ mod test {
use ethers::core::types::H256;
use eyre::eyre;
use mockall::predicate::eq;
use mockall::*;
use tokio::select;
use tokio::time::{interval, timeout};
use tokio::sync::Mutex;
use tokio::time::{interval, sleep, timeout};
use abacus_core::{db::AbacusDB, AbacusMessage, Encode, LogMeta, RawCommittedMessage};
use abacus_test::mocks::cursor::MockSyncBlockRangeCursor;
use abacus_test::mocks::indexer::MockAbacusIndexer;
use abacus_test::test_utils;
use mockall::predicate::eq;
use crate::contract_sync::outbox::MOCK_CURSOR;
use crate::contract_sync::schema::OutboxContractSyncDB;
use crate::ContractSync;
use crate::{settings::IndexSettings, ContractSyncMetrics, CoreMetrics};
// we need a mutex for our tests because of the static cursor object
lazy_static! {
static ref TEST_MTX: Mutex<()> = Mutex::new(());
}
#[tokio::test]
async fn handles_missing_rpc_messages() {
test_utils::run_test_db(|db| async move {
let _test_lock = TEST_MTX.lock().await;
let mut message_vec = vec![];
AbacusMessage {
origin: 1000,
@ -279,160 +313,102 @@ mod test {
let latest_valid_message_range_start_block = 100;
let mut mock_indexer = MockAbacusIndexer::new();
let mut mock_cursor = MockSyncBlockRangeCursor::new();
{
let mut seq = Sequence::new();
// Return m0.
let m0_clone = m0.clone();
mock_indexer
.expect__get_finalized_block_number()
// Some local macros to reduce code-duplication.
macro_rules! expect_current_position {
($return_position:literal) => {
mock_cursor
.expect__current_position()
.times(1)
.in_sequence(&mut seq)
.return_once(|| Ok(110));
mock_indexer
.expect__fetch_sorted_messages()
.return_once(|| $return_position);
};
}
macro_rules! expect_backtrack {
($expected_new_from:literal) => {
mock_cursor
.expect__backtrack()
.times(1)
.with(eq(91), eq(110))
.in_sequence(&mut seq)
.return_once(move |_, _| Ok(vec![(m0_clone, meta())]));
// Return m1, miss m2.
let m1_clone = m1.clone();
mock_indexer
.expect__get_finalized_block_number()
.with(eq($expected_new_from))
.return_once(|_| ());
};
}
macro_rules! expect_fetches_range {
($expected_from:literal, $expected_to:literal, $return_messages:expr) => {
let messages: &[&RawCommittedMessage] = $return_messages;
let messages = messages.iter().map(|&msg| (msg.clone(), meta())).collect();
mock_cursor
.expect__next_range()
.times(1)
.in_sequence(&mut seq)
.return_once(|| Ok(120));
.return_once(|| Box::pin(async { Ok(($expected_from, $expected_to)) }));
mock_indexer
.expect__fetch_sorted_messages()
.times(1)
.with(eq(101), eq(120))
.with(eq($expected_from), eq($expected_to))
.in_sequence(&mut seq)
.return_once(move |_, _| Ok(vec![(m1_clone, meta())]));
.return_once(move |_, _| Ok(messages));
};
}
expect_current_position!(91);
expect_current_position!(91);
// Return m0.
expect_fetches_range!(91, 110, &[&m0]);
// Return m1, miss m2.
expect_current_position!(111);
expect_fetches_range!(101, 120, &[&m1]);
// Miss m3.
mock_indexer
.expect__get_finalized_block_number()
.times(1)
.in_sequence(&mut seq)
.return_once(|| Ok(130));
mock_indexer
.expect__fetch_sorted_messages()
.times(1)
.with(eq(111), eq(130))
.in_sequence(&mut seq)
.return_once(move |_, _| Ok(vec![]));
expect_current_position!(121);
expect_fetches_range!(111, 130, &[]);
// Empty range.
mock_indexer
.expect__get_finalized_block_number()
.times(1)
.in_sequence(&mut seq)
.return_once(|| Ok(140));
mock_indexer
.expect__fetch_sorted_messages()
.times(1)
.with(eq(121), eq(140))
.in_sequence(&mut seq)
.return_once(move |_, _| Ok(vec![]));
mock_indexer
.expect__get_finalized_block_number()
.times(1)
.in_sequence(&mut seq)
.return_once(|| Ok(140));
expect_current_position!(131);
expect_fetches_range!(121, 140, &[]);
// m1 --> m5 seen as an invalid continuation
let m5_clone = m5.clone();
mock_indexer
.expect__get_finalized_block_number()
.times(1)
.in_sequence(&mut seq)
.return_once(|| Ok(150));
mock_indexer
.expect__fetch_sorted_messages()
.times(1)
.with(eq(131), eq(150))
.in_sequence(&mut seq)
.return_once(move |_, _| Ok(vec![(m5_clone, meta())]));
expect_current_position!(141);
expect_fetches_range!(131, 150, &[&m5]);
expect_backtrack!(101);
// Indexer goes back to the last valid message range start block
// and indexes the range based off the chunk size of 19.
// and indexes the range
// This time it gets m1 and m2 (which was previously skipped)
let m1_clone = m1.clone();
let m2_clone = m2.clone();
mock_indexer
.expect__get_finalized_block_number()
.times(1)
.in_sequence(&mut seq)
.return_once(|| Ok(160));
mock_indexer
.expect__fetch_sorted_messages()
.times(1)
.with(eq(101), eq(120))
.in_sequence(&mut seq)
.return_once(move |_, _| Ok(vec![(m1_clone, meta()), (m2_clone, meta())]));
expect_current_position!(101);
expect_fetches_range!(101, 120, &[&m1, &m2]);
// Indexer continues, this time getting m3 and m5 message, but skipping m4,
// which means this range contains gaps
let m3_clone = m3.clone();
let m5_clone = m5.clone();
mock_indexer
.expect__get_finalized_block_number()
.times(1)
.in_sequence(&mut seq)
.return_once(|| Ok(170));
mock_indexer
.expect__fetch_sorted_messages()
.times(1)
.with(eq(121), eq(140))
.in_sequence(&mut seq)
.return_once(move |_, _| Ok(vec![(m3_clone, meta()), (m5_clone, meta())]));
expect_current_position!(121);
expect_fetches_range!(118, 140, &[&m3, &m5]);
expect_backtrack!(121);
// Indexer retries, the same range in hope of filling the gap,
// which it now does successfully
mock_indexer
.expect__get_finalized_block_number()
.times(1)
.in_sequence(&mut seq)
.return_once(|| Ok(170));
mock_indexer
.expect__fetch_sorted_messages()
.times(1)
.with(eq(121), eq(140))
.in_sequence(&mut seq)
.return_once(move |_, _| Ok(vec![(m3, meta()), (m4, meta()), (m5, meta())]));
expect_current_position!(121);
expect_fetches_range!(121, 140, &[&m3, &m4, &m5]);
// Indexer continues with the next block range, which happens to be empty
mock_indexer
.expect__get_finalized_block_number()
.times(1)
.in_sequence(&mut seq)
.return_once(|| Ok(180));
mock_indexer
.expect__fetch_sorted_messages()
.times(1)
.with(eq(141), eq(160))
.in_sequence(&mut seq)
.return_once(move |_, _| Ok(vec![]));
// Indexer catches up with the tip
mock_indexer
.expect__get_finalized_block_number()
.times(1)
.in_sequence(&mut seq)
.return_once(|| Ok(180));
mock_indexer
.expect__fetch_sorted_messages()
.times(1)
.with(eq(161), eq(180))
.in_sequence(&mut seq)
.return_once(move |_, _| Ok(vec![]));
expect_current_position!(141);
expect_fetches_range!(141, 160, &[]);
// Stay at the same tip, so no other fetch_sorted_messages calls are made
mock_indexer
.expect__get_finalized_block_number()
.returning(|| Ok(180));
mock_cursor.expect__current_position().returning(|| 161);
mock_cursor.expect__next_range().returning(|| {
Box::pin(async move {
// this sleep should be longer than the test timeout since we don't actually
// want to yield any more values at this point.
sleep(Duration::from_secs(100)).await;
Ok((161, 161))
})
});
}
let abacus_db = AbacusDB::new("outbox_1", db);
@ -449,13 +425,14 @@ mod test {
CoreMetrics::new("contract_sync_test", None, prometheus::Registry::new())
.expect("could not make metrics"),
);
unsafe { MOCK_CURSOR = Some(mock_cursor) };
let sync_metrics = ContractSyncMetrics::new(metrics);
let contract_sync = ContractSync::new(
"outbox_1".into(),
abacus_db.clone(),
indexer.clone(),
indexer,
IndexSettings {
from: Some("0".to_string()),
chunk: Some("19".to_string()),
@ -464,7 +441,7 @@ mod test {
);
let sync_task = contract_sync.sync_outbox_messages();
let test_pass_fut = timeout(Duration::from_secs(90), async move {
let test_pass_fut = timeout(Duration::from_secs(5), async move {
let mut interval = interval(Duration::from_millis(20));
loop {
if abacus_db.message_by_leaf_index(0).expect("!db").is_some()
@ -485,7 +462,9 @@ mod test {
tests_result = test_pass_fut =>
if tests_result.is_ok() { Ok(()) } else { Err(eyre!("timed out")) }
};
assert!(test_result.is_ok());
if let Err(err) = test_result {
panic!("Test failed: {err}")
}
})
.await
}

@ -5,7 +5,8 @@
//! Implementations of the `Outbox` and `Inbox` traits on different chains
//! ought to live here.
#![forbid(unsafe_code)]
// Forbid unsafe code outside of tests
#![cfg_attr(not(test), forbid(unsafe_code))]
#![warn(missing_docs)]
#![warn(unused_extern_crates)]

@ -0,0 +1,34 @@
use async_trait::async_trait;
use auto_impl::auto_impl;
use eyre::Result;
/// Tool for handling the logic of what the next block range that should be
/// queried and may perform rate limiting on `next_range` queries.
#[async_trait]
#[auto_impl(Box)]
pub trait SyncBlockRangeCursor {
/// Returns the current `from` position of the scraper. Note that
/// `next_range` may return a `from` value that is lower than this in order
/// to have some overlap.
fn current_position(&self) -> u32;
/// Get the next block range `(from, to)` which should be fetched (this
/// returns an inclusive range such as (0,50), (51,100), ...). This
/// will automatically rate limit based on how far we are from the
/// highest block we can scrape according to
/// `get_finalized_block_number`.
///
/// In reality this will often return a from value that overlaps with the
/// previous range to help ensure that we scrape everything even if the
/// provider failed to respond in full previously.
///
/// This assumes the caller will call next_range again automatically on Err,
/// but it returns the error to allow for tailored logging or different end
/// cases.
async fn next_range(&mut self) -> Result<(u32, u32)>;
/// If there was an issue when a range of data was fetched, this rolls back
/// so the next range fetched will be from `start_from`. Note that it is a
/// no-op if a later block value is specified.
fn backtrack(&mut self, start_from: u32);
}

@ -13,6 +13,7 @@ use ethers::{
use eyre::Result;
pub use common::*;
pub use cursor::*;
pub use encode::*;
pub use inbox::*;
pub use indexer::*;
@ -24,6 +25,7 @@ pub use validator_manager::*;
use crate::{db::DbError, utils::domain_hash, AbacusError};
mod common;
mod cursor;
mod encode;
mod inbox;
mod indexer;

@ -12,7 +12,7 @@ thiserror = { version = "1.0", default-features = false }
async-trait = { version = "0.1", default-features = false }
futures-util = "0.3"
eyre = "0.6"
mockall = "0.10.2"
mockall = "0.11"
rand = "0.8.3"
rocksdb = "0.18"
tempfile = "3.3"

@ -0,0 +1,32 @@
#![allow(non_snake_case)]
use abacus_core::SyncBlockRangeCursor;
use async_trait::async_trait;
use eyre::Result;
use mockall::mock;
use std::future::Future;
mock! {
pub SyncBlockRangeCursor {
pub fn _next_range(&mut self) -> impl Future<Output=Result<(u32, u32)>> + Send {}
pub fn _current_position(&self) -> u32 {}
pub fn _backtrack(&mut self, start_from: u32) {}
}
}
#[async_trait]
impl SyncBlockRangeCursor for MockSyncBlockRangeCursor {
fn current_position(&self) -> u32 {
self._current_position()
}
async fn next_range(&mut self) -> Result<(u32, u32)> {
self._next_range().await
}
fn backtrack(&mut self, start_from: u32) {
self._backtrack(start_from)
}
}

@ -7,5 +7,8 @@ pub mod inbox;
/// Mock indexer
pub mod indexer;
/// Mock SyncBlockRangeCursor
pub mod cursor;
pub use indexer::MockIndexer;
pub use outbox::MockOutboxContract;

@ -88,10 +88,6 @@ impl SqlChainScraper {
self.remotes.keys().copied()
}
pub async fn get_finalized_block_number(&self) -> Result<u32> {
self.local.indexer.get_finalized_block_number().await
}
/// Sync contract data and other blockchain with the current chain state.
/// This will create a long-running task that should be spawned.
pub fn sync(self) -> impl Future<Output = Result<()>> + Send + 'static {

@ -1,16 +1,17 @@
use std::cmp::min;
use std::collections::HashMap;
use std::ops::Deref;
use std::time::Duration;
use std::sync::Arc;
use abacus_base::RateLimitedSyncBlockRangeCursor;
use ethers::prelude::H256;
use eyre::Result;
use prometheus::{IntCounter, IntGauge, IntGaugeVec};
use tokio::time::sleep;
use tracing::{debug, info, instrument, warn};
use abacus_base::last_message::validate_message_continuity;
use abacus_core::{name_from_domain_id, CommittedMessage, ListValidity};
use abacus_core::{
name_from_domain_id, CommittedMessage, ListValidity, OutboxIndexer, SyncBlockRangeCursor,
};
use crate::chain_scraper::{Delivery, RawMsgWithMeta, SqlChainScraper, TxnWithIdAndTime};
@ -30,9 +31,8 @@ pub(super) struct Syncer {
stored_deliveries: IntCounter,
missed_messages: IntCounter,
message_leaf_index: IntGaugeVec,
chunk_size: u32,
sync_cursor: RateLimitedSyncBlockRangeCursor<Arc<dyn OutboxIndexer>>,
from: u32,
last_valid_range_start_block: u32,
last_leaf_index: u32,
}
@ -79,10 +79,17 @@ impl Syncer {
let message_leaf_index = scraper.metrics.message_leaf_index.clone();
let chunk_size = scraper.chunk_size;
let from = scraper.cursor.height().await as u32;
let last_valid_range_start_block = from;
let initial_height = scraper.cursor.height().await as u32;
let last_valid_range_start_block = initial_height;
let last_leaf_index = scraper.last_message_leaf_index().await?.unwrap_or(0);
let sync_cursor = RateLimitedSyncBlockRangeCursor::new(
scraper.local.indexer.clone(),
chunk_size,
initial_height,
)
.await?;
Ok(Self {
scraper,
indexed_message_height,
@ -91,8 +98,7 @@ impl Syncer {
stored_deliveries,
missed_messages,
message_leaf_index,
chunk_size,
from,
sync_cursor,
last_valid_range_start_block,
last_leaf_index,
})
@ -101,25 +107,23 @@ impl Syncer {
/// Sync contract and other blockchain data with the current chain state.
#[instrument(skip(self), fields(chain_name = self.chain_name(), chink_size = self.chunk_size))]
pub async fn run(mut self) -> Result<()> {
info!(from = self.from, "Resuming chain sync");
self.indexed_message_height.set(self.from as i64);
self.indexed_deliveries_height.set(self.from as i64);
let start_block = self.sync_cursor.current_position();
info!(from = start_block, "Resuming chain sync");
self.indexed_message_height.set(start_block as i64);
self.indexed_deliveries_height.set(start_block as i64);
loop {
sleep(Duration::from_secs(5)).await;
let Ok(tip) = self.get_finalized_block_number().await else {
continue;
};
if tip <= self.from {
sleep(Duration::from_secs(10)).await;
debug_assert_eq!(self.local.outbox.local_domain(), self.local_domain());
let start_block = self.sync_cursor.current_position();
let (from, to) = match self.sync_cursor.next_range().await {
Ok(range) => range,
Err(err) => {
warn!(error = %err, "failed to get next block range");
continue;
}
};
let to = min(tip, self.from + self.chunk_size);
let full_chunk_from = to.checked_sub(self.chunk_size).unwrap_or_default();
debug_assert_eq!(self.local.outbox.local_domain(), self.local_domain());
let (sorted_messages, deliveries) = self.scrape_range(full_chunk_from, to).await?;
let (sorted_messages, deliveries) = self.scrape_range(from, to).await?;
let validation = validate_message_continuity(
Some(self.last_leaf_index),
@ -130,18 +134,16 @@ impl Syncer {
let max_leaf_index_of_batch =
self.record_data(sorted_messages, deliveries).await?;
self.cursor.update(full_chunk_from as u64).await;
self.cursor.update(from as u64).await;
if let Some(idx) = max_leaf_index_of_batch {
self.last_leaf_index = idx;
}
self.last_valid_range_start_block = full_chunk_from;
self.from = to + 1;
self.last_valid_range_start_block = from;
self.indexed_message_height.set(to as i64);
self.indexed_deliveries_height.set(to as i64);
}
ListValidity::Empty => {
let _ = self.record_data(sorted_messages, deliveries).await?;
self.from = to + 1;
self.indexed_message_height.set(to as i64);
self.indexed_deliveries_height.set(to as i64);
}
@ -149,20 +151,24 @@ impl Syncer {
self.missed_messages.inc();
warn!(
last_leaf_index = self.last_leaf_index,
start_block = self.from,
start_block = from,
end_block = to,
last_valid_range_start_block = self.last_valid_range_start_block,
"Found invalid continuation in range. Re-indexing from the start block of the last successful range."
);
self.from = self.last_valid_range_start_block;
self.indexed_message_height.set(self.from as i64);
self.indexed_deliveries_height.set(self.from as i64);
self.sync_cursor
.backtrack(self.last_valid_range_start_block);
self.indexed_message_height
.set(self.last_valid_range_start_block as i64);
self.indexed_deliveries_height
.set(self.last_valid_range_start_block as i64);
}
ListValidity::ContainsGaps => {
self.missed_messages.inc();
self.sync_cursor.backtrack(start_block);
warn!(
last_leaf_index = self.last_leaf_index,
start_block = self.from,
start_block = from,
end_block = to,
last_valid_range_start_block = self.last_valid_range_start_block,
"Found gaps in the message in range, re-indexing the same range."

Loading…
Cancel
Save