From bc32eb92ce1307ab29be8e77083afeaff3464f06 Mon Sep 17 00:00:00 2001 From: Asa Oines Date: Tue, 22 Nov 2022 08:14:19 -0500 Subject: [PATCH 1/7] Do not make empty queries to coingecko (#1285) --- typescript/sdk/src/gas/token-prices.ts | 10 ++++++---- 1 file changed, 6 insertions(+), 4 deletions(-) diff --git a/typescript/sdk/src/gas/token-prices.ts b/typescript/sdk/src/gas/token-prices.ts index da331a3dd..c8689a201 100644 --- a/typescript/sdk/src/gas/token-prices.ts +++ b/typescript/sdk/src/gas/token-prices.ts @@ -104,10 +104,12 @@ export class CoinGeckoTokenPriceGetter implements TokenPriceGetter { } const toQuery = chains.filter((c) => !this.cache.isFresh(c)); - try { - await this.queryTokenPrices(toQuery); - } catch (e) { - warn('Failed to query token prices', e); + if (toQuery.length > 0) { + try { + await this.queryTokenPrices(toQuery); + } catch (e) { + warn('Failed to query token prices', e); + } } return chains.map((chain) => this.cache.fetch(chain)); } From 0ba82d1918845af670d65a1692c9e013a54a5881 Mon Sep 17 00:00:00 2001 From: Mattie Conover Date: Tue, 22 Nov 2022 11:22:29 -0800 Subject: [PATCH 2/7] Proper RPC Useage fix (#1282) * Dedupe indexer range calculations * Better handling of tip updates * Don't use to block for interchain gas indexing * Rename to SyncBlockRangeCursor * use elapsed * Fix init case for debug mode * Work on mocking for outbox indexer test * Fix tests * Use from instead of start for interchain gas indexing db cursor * warn on failure to get block range from cursor * Fixed typo * Document test sleep in outbox --- rust/Cargo.lock | 33 +- rust/abacus-base/Cargo.toml | 2 +- rust/abacus-base/src/agent.rs | 2 - rust/abacus-base/src/contract_sync/cursor.rs | 92 ++++ .../src/contract_sync/interchain_gas.rs | 64 ++- rust/abacus-base/src/contract_sync/mod.rs | 2 + rust/abacus-base/src/contract_sync/outbox.rs | 403 +++++++++--------- rust/abacus-base/src/lib.rs | 3 +- rust/abacus-core/src/traits/cursor.rs | 34 ++ rust/abacus-core/src/traits/mod.rs | 2 + rust/abacus-test/Cargo.toml | 2 +- rust/abacus-test/src/mocks/cursor.rs | 32 ++ rust/abacus-test/src/mocks/mod.rs | 3 + rust/agents/scraper/src/chain_scraper/mod.rs | 4 - rust/agents/scraper/src/chain_scraper/sync.rs | 74 ++-- 15 files changed, 446 insertions(+), 306 deletions(-) create mode 100644 rust/abacus-base/src/contract_sync/cursor.rs create mode 100644 rust/abacus-core/src/traits/cursor.rs create mode 100644 rust/abacus-test/src/mocks/cursor.rs diff --git a/rust/Cargo.lock b/rust/Cargo.lock index 49c133390..4e815ca3c 100644 --- a/rust/Cargo.lock +++ b/rust/Cargo.lock @@ -1204,10 +1204,10 @@ dependencies = [ ] [[package]] -name = "difference" -version = "2.0.0" +name = "difflib" +version = "0.4.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "524cbf6897b527295dff137cec09ecf3a05f4fddffd7dfcd1585403449e74198" +checksum = "6184e33543162437515c2e2b48714794e37845ec9851711914eec9d308f6ebe8" [[package]] name = "digest" @@ -1302,9 +1302,9 @@ dependencies = [ [[package]] name = "downcast" -version = "0.10.0" +version = "0.11.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "4bb454f0228b18c7f4c3b0ebbee346ed9c52e7443b0999cd543ff3571205701d" +checksum = "1435fa1053d8b2fbbe9be7e97eca7f33d37b28409959813daefc1446a14247f1" [[package]] name = "dunce" @@ -1740,9 +1740,9 @@ dependencies = [ [[package]] name = "float-cmp" -version = "0.8.0" +version = "0.9.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "e1267f4ac4f343772758f7b1bdcbe767c218bbab93bb432acbf5162bbf85a6c4" +checksum = "98de4bbd547a563b716d8dfa9aad1cb19bfab00f4fa09a6a4ed21dbcf44ce9c4" dependencies = [ "num-traits", ] @@ -1780,9 +1780,9 @@ dependencies = [ [[package]] name = "fragile" -version = "1.2.1" +version = "2.0.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "85dcb89d2b10c5f6133de2efd8c11959ce9dbb46a2f7a4cab208c4eeda6ce1ab" +checksum = "6c2141d6d6c8512188a7891b4b01590a45f6dac67afb4f255c4124dbb86d4eaa" [[package]] name = "funty" @@ -2603,9 +2603,9 @@ dependencies = [ [[package]] name = "mockall" -version = "0.10.2" +version = "0.11.3" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "6ab571328afa78ae322493cacca3efac6a0f2e0a67305b4df31fd439ef129ac0" +checksum = "50e4a1c770583dac7ab5e2f6c139153b783a53a1bbee9729613f193e59828326" dependencies = [ "cfg-if", "downcast", @@ -2618,9 +2618,9 @@ dependencies = [ [[package]] name = "mockall_derive" -version = "0.10.2" +version = "0.11.3" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "e7e25b214433f669161f414959594216d8e6ba83b6679d3db96899c0b4639033" +checksum = "832663583d5fa284ca8810bf7015e46c9fff9622d3cf34bd1eea5003fec06dd0" dependencies = [ "cfg-if", "proc-macro2", @@ -3313,12 +3313,13 @@ checksum = "eb9f9e6e233e5c4a35559a617bf40a4ec447db2e84c20b55a6f83167b7e57872" [[package]] name = "predicates" -version = "1.0.8" +version = "2.1.3" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "f49cfaf7fdaa3bfacc6fa3e7054e65148878354a5cfddcf661df4c851f8021df" +checksum = "ed6bd09a7f7e68f3f0bf710fb7ab9c4615a488b58b5f653382a687701e458c92" dependencies = [ - "difference", + "difflib", "float-cmp", + "itertools", "normalize-line-endings", "predicates-core", "regex", diff --git a/rust/abacus-base/Cargo.toml b/rust/abacus-base/Cargo.toml index d3c5a36b7..788ef73f8 100644 --- a/rust/abacus-base/Cargo.toml +++ b/rust/abacus-base/Cargo.toml @@ -21,7 +21,7 @@ tracing = "0.1" tracing-futures = "0.2" tracing-subscriber = { version = "0.3", features = ["json"] } rocksdb = "0.18" -mockall = "0.10.2" +mockall = "0.11" backtrace = { version = "0.3", optional = true } backtrace-oneline = { path = "../utils/backtrace-oneline", optional = true } diff --git a/rust/abacus-base/src/agent.rs b/rust/abacus-base/src/agent.rs index 78b6f8780..f2ba5ed21 100644 --- a/rust/abacus-base/src/agent.rs +++ b/rust/abacus-base/src/agent.rs @@ -132,8 +132,6 @@ pub async fn agent_main() -> Result<()> { crate::oneline_eyre::install()?; #[cfg(all(feature = "color_eyre", not(feature = "oneline-errors")))] color_eyre::install()?; - #[cfg(not(any(feature = "color-eyre", feature = "oneline-eyre")))] - eyre::install()?; let settings = A::Settings::new().map_err(|e| e.into())?; let core_settings: &AgentSettings = settings.as_ref(); diff --git a/rust/abacus-base/src/contract_sync/cursor.rs b/rust/abacus-base/src/contract_sync/cursor.rs new file mode 100644 index 000000000..7ea11503d --- /dev/null +++ b/rust/abacus-base/src/contract_sync/cursor.rs @@ -0,0 +1,92 @@ +use std::time::{Duration, Instant}; + +use async_trait::async_trait; +use eyre::Result; +use tokio::time::sleep; + +use abacus_core::{Indexer, SyncBlockRangeCursor}; + +/// Tool for handling the logic of what the next block range that should be +/// queried is and also handling rate limiting. Rate limiting is automatically +/// performed by `next_range`. +pub struct RateLimitedSyncBlockRangeCursor { + indexer: I, + tip: u32, + last_tip_update: Instant, + chunk_size: u32, + from: u32, +} + +impl RateLimitedSyncBlockRangeCursor +where + I: Indexer, +{ + /// Construct a new contract sync helper. + pub async fn new(indexer: I, chunk_size: u32, initial_height: u32) -> Result { + let tip = indexer.get_finalized_block_number().await?; + Ok(Self { + indexer, + tip, + chunk_size, + last_tip_update: Instant::now(), + from: initial_height, + }) + } + + /// Wait based on how close we are to the tip and update the tip, + /// i.e. the highest block we may scrape. + async fn rate_limit(&mut self) -> Result<()> { + if self.from + self.chunk_size < self.tip { + // If doing the full chunk wouldn't exceed the already known tip, + // we don't necessarily need to fetch the new tip. Sleep a tiny bit + // so that we can catch up to the tip relatively quickly. + sleep(Duration::from_secs(1)).await; + Ok(()) + } else { + // We are close to the tip. + if self.last_tip_update.elapsed() < Duration::from_secs(30) { + // Sleep a little longer because we have caught up. + sleep(Duration::from_secs(10)).await; + } else { + // We are probably not caught up yet. This would happen if we + // started really far behind so now it is very likely the tip + // has moved a significant distance. We don't want to wait in + // this case any more than we normally would. + sleep(Duration::from_secs(1)).await; + } + + match self.indexer.get_finalized_block_number().await { + Ok(tip) => { + // we retrieved a new tip value, go ahead and update. + self.last_tip_update = Instant::now(); + self.tip = tip; + Ok(()) + } + Err(e) => { + // we are failing to make a basic query, we should wait before retrying. + sleep(Duration::from_secs(10)).await; + Err(e) + } + } + } + } +} + +#[async_trait] +impl SyncBlockRangeCursor for RateLimitedSyncBlockRangeCursor { + fn current_position(&self) -> u32 { + self.from + } + + async fn next_range(&mut self) -> Result<(u32, u32)> { + self.rate_limit().await?; + let to = u32::min(self.tip, self.from + self.chunk_size); + let from = to.saturating_sub(self.chunk_size); + self.from = to + 1; + Ok((from, to)) + } + + fn backtrack(&mut self, start_from: u32) { + self.from = u32::min(start_from, self.from); + } +} diff --git a/rust/abacus-base/src/contract_sync/interchain_gas.rs b/rust/abacus-base/src/contract_sync/interchain_gas.rs index 346ffbf27..f24cccea4 100644 --- a/rust/abacus-base/src/contract_sync/interchain_gas.rs +++ b/rust/abacus-base/src/contract_sync/interchain_gas.rs @@ -1,11 +1,9 @@ -use std::cmp::min; -use std::time::Duration; +use tokio::task::JoinHandle; +use tracing::{info, info_span, instrument::Instrumented, warn, Instrument}; -use tokio::{task::JoinHandle, time::sleep}; -use tracing::{debug, info, info_span, instrument::Instrumented, Instrument}; - -use abacus_core::InterchainGasPaymasterIndexer; +use abacus_core::{InterchainGasPaymasterIndexer, SyncBlockRangeCursor}; +use crate::contract_sync::cursor::RateLimitedSyncBlockRangeCursor; use crate::{contract_sync::schema::InterchainGasPaymasterContractSyncDB, ContractSync}; const GAS_PAYMENTS_LABEL: &str = "gas_payments"; @@ -31,42 +29,39 @@ where .stored_events .with_label_values(&[GAS_PAYMENTS_LABEL, &self.chain_name]); - let config_from = self.index_settings.from(); - let chunk_size = self.index_settings.chunk_size(); + let cursor = { + let config_initial_height = self.index_settings.from(); + let initial_height = db + .retrieve_latest_indexed_gas_payment_block() + .map_or(config_initial_height, |b| b + 1); + RateLimitedSyncBlockRangeCursor::new( + indexer.clone(), + self.index_settings.chunk_size(), + initial_height, + ) + }; tokio::spawn(async move { - let mut from = db - .retrieve_latest_indexed_gas_payment_block() - .map_or_else(|| config_from, |b| b + 1); + let mut cursor = cursor.await?; - info!(from = from, "[GasPayments]: resuming indexer from {from}"); - indexed_height.set(from as i64); + let start_block = cursor.current_position(); + info!(from = start_block, "[GasPayments]: resuming indexer"); + indexed_height.set(start_block as i64); loop { - sleep(Duration::from_secs(5)).await; - - // Only index blocks considered final. - // If there's an error getting the block number, just start the loop over - let Ok(tip) = indexer.get_finalized_block_number().await else { - continue; + let (from, to) = match cursor.next_range().await { + Ok(range) => range, + Err(err) => { + warn!(error = %err, "[GasPayments]: failed to get next block range"); + continue; + } }; - if tip <= from { - // Sleep if caught up to tip - debug!(tip=?tip, from=?from, "[GasPayments]: caught up to tip, waiting for new block"); - sleep(Duration::from_secs(10)).await; - continue; - } - - let candidate = from + chunk_size; - let to = min(tip, candidate); - // Still search the full-size chunk size to possibly catch events that nodes have dropped "close to the tip" - let full_chunk_from = to.checked_sub(chunk_size).unwrap_or_default(); - let gas_payments = indexer.fetch_gas_payments(full_chunk_from, to).await?; + let gas_payments = indexer.fetch_gas_payments(from, to).await?; info!( - from = full_chunk_from, - to = to, + from, + to, gas_payments_count = gas_payments.len(), "[GasPayments]: indexed block range" ); @@ -82,8 +77,7 @@ where stored_messages.inc_by(new_payments_processed); - db.store_latest_indexed_gas_payment_block(to)?; - from = to + 1; + db.store_latest_indexed_gas_payment_block(from)?; indexed_height.set(to as i64); } }) diff --git a/rust/abacus-base/src/contract_sync/mod.rs b/rust/abacus-base/src/contract_sync/mod.rs index b1f6809ed..894f72b73 100644 --- a/rust/abacus-base/src/contract_sync/mod.rs +++ b/rust/abacus-base/src/contract_sync/mod.rs @@ -2,12 +2,14 @@ // TODO: Reapply metrics use abacus_core::db::AbacusDB; +pub use cursor::*; pub use interchain_gas::*; pub use metrics::ContractSyncMetrics; pub use outbox::*; use crate::settings::IndexSettings; +mod cursor; mod interchain_gas; /// Tools for working with message continuity. pub mod last_message; diff --git a/rust/abacus-base/src/contract_sync/outbox.rs b/rust/abacus-base/src/contract_sync/outbox.rs index 1c91d1982..39dd5243e 100644 --- a/rust/abacus-base/src/contract_sync/outbox.rs +++ b/rust/abacus-base/src/contract_sync/outbox.rs @@ -1,11 +1,10 @@ -use std::cmp::min; -use std::time::Duration; - -use tokio::time::sleep; use tracing::{debug, info, info_span, warn}; use tracing::{instrument::Instrumented, Instrument}; -use abacus_core::{name_from_domain_id, CommittedMessage, ListValidity, OutboxIndexer}; +use abacus_core::{ + name_from_domain_id, CommittedMessage, Indexer, ListValidity, OutboxIndexer, + SyncBlockRangeCursor, +}; use crate::contract_sync::last_message::validate_message_continuity; use crate::{contract_sync::schema::OutboxContractSyncDB, ContractSync}; @@ -40,82 +39,91 @@ where let message_leaf_index = self.metrics.message_leaf_index.clone(); let chain_name = self.chain_name.clone(); - let config_from = self.index_settings.from(); - let chunk_size = self.index_settings.chunk_size(); + let cursor = { + let config_initial_height = self.index_settings.from(); + let initial_height = db + .retrieve_latest_valid_message_range_start_block() + .map_or(config_initial_height, |b| b + 1); + create_cursor( + indexer.clone(), + self.index_settings.chunk_size(), + initial_height, + ) + }; // Indexes messages by fetching messages in ranges of blocks. // We've observed occasional flakiness with providers where some events in // a range will be missing. The leading theories are: + // // 1. The provider is just flaky and sometimes misses events :( + // // 2. For outbox chains with low finality times, it's possible that when - // we query the RPC provider for the latest finalized block number, - // we're returned a block number T. However when we attempt to index a range - // where the `to` block is T, the `eth_getLogs` RPC is load balanced by the - // provider to a different node whose latest known block is some block T' < T. - // The `eth_getLogs` RPC implementations seem to happily accept `to` blocks that - // exceed the latest known block, so it's possible that in our indexer we think - // that we've indexed up to block T but we've only *actually* indexed up to block T'. - + // we query the RPC provider for the latest finalized block number, + // we're returned a block number T. However when we attempt to index a range + // where the `to` block is T, the `eth_getLogs` RPC is load balanced by the + // provider to a different node whose latest known block is some block T' M + 1. - // This missing messages could be anywhere in the range [A,D]: - // * It's possible there was an issue when the prior block range [A,B] was indexed, where - // the provider didn't provide some messages with indices > M that it should have. - // * It's possible that the range [B,C] that was presumed to be empty when it was indexed - // actually wasn't. - // * And it's possible that this was just a flaky gap, where there are messages in the [C,D] - // range that weren't returned for some reason. - // We can handle this by re-indexing starting from block A. - // Note this means we only handle this case upon observing messages in some range [C,D] - // that indicate a previously indexed range may have missed some messages. + // + // 1. The latest previously indexed message index is M that was found in a + // previously indexed block range. A new block range [A,B] is indexed, returning + // a list of messages. The lowest message index in that list is `M + 1`, + // but there are some missing messages indices in the list. This is + // likely a flaky provider, and we can simply re-index the range [A,B] + // hoping that the provider will soon return a correct list. + // + // 2. The latest previously indexed message index is M that was found in a + // previously indexed block range, [A,B]. A new block range [C,D] is + // indexed, returning a list of messages. However, the lowest message + // index in that list is M' where M' > M + 1. This missing messages + // could be anywhere in the range [A,D]: + // * It's possible there was an issue when the prior block range [A,B] was + // indexed, where the provider didn't provide some messages with indices > + // M that it should have. + // * It's possible that the range [B,C] that was presumed to be empty when it + // was indexed actually wasn't. + // * And it's possible that this was just a flaky gap, where there are + // messages in the [C,D] range that weren't returned for some reason. + // + // We can handle this by re-indexing starting from block A. + // Note this means we only handle this case upon observing messages in some + // range [C,D] that indicate a previously indexed range may have + // missed some messages. tokio::spawn(async move { - let mut from = db - .retrieve_latest_valid_message_range_start_block() - .unwrap_or(config_from); - - let mut last_valid_range_start_block = from; + let mut cursor = cursor.await?; - info!(from = from, "[Messages]: resuming indexer from latest valid message range start block"); + let start_block = cursor.current_position(); + let mut last_valid_range_start_block = start_block; + info!(from = start_block, "[Messages]: resuming indexer from latest valid message range start block"); + indexed_height.set(start_block as i64); - indexed_height.set(from as i64); loop { - sleep(Duration::from_secs(5)).await; - - // Only index blocks considered final. - // If there's an error getting the block number, just start the loop over - let Ok(tip) = indexer.get_finalized_block_number().await else { - continue; + let start_block = cursor.current_position(); + let (from, to) = match cursor.next_range().await { + Ok(range) => range, + Err(err) => { + warn!(error = %err, "[Messages]: failed to get next block range"); + continue; + } }; - if tip <= from { - // Sleep if caught up to tip - sleep(Duration::from_secs(10)).await; - continue; - } - - // Index the chunk_size, capping at the tip. - let to = min(tip, from + chunk_size); - - // Still search the full-size chunk size to possibly catch events that nodes have dropped "close to the tip" - let full_chunk_from = to.checked_sub(chunk_size).unwrap_or_default(); - let mut sorted_messages: Vec<_> = indexer.fetch_sorted_messages(full_chunk_from, to).await?.into_iter().map(|(msg, _)| msg).collect(); + let mut sorted_messages: Vec<_> = indexer + .fetch_sorted_messages(from, to) + .await? + .into_iter() + .map(|(msg, _)| msg) + .collect(); - info!( - from = full_chunk_from, - to = to, - message_count = sorted_messages.len(), - "[Messages]: indexed block range" - ); + info!(from, to, message_count = sorted_messages.len(), "[Messages]: indexed block range"); // Get the latest known leaf index. All messages whose indices are <= this index // have been stored in the DB. @@ -127,12 +135,7 @@ where sorted_messages.retain(|m| m.leaf_index > min_index); } - debug!( - from = full_chunk_from, - to = to, - message_count = sorted_messages.len(), - "[Messages]: filtered any messages already indexed" - ); + debug!(from, to, message_count = sorted_messages.len(), "[Messages]: filtered any messages already indexed"); // Ensure the sorted messages are a valid continuation of last_leaf_index match validate_message_continuity(last_leaf_index, &sorted_messages.iter().collect::>()) { @@ -155,11 +158,10 @@ where } // Update the latest valid start block. - db.store_latest_valid_message_range_start_block(full_chunk_from)?; - last_valid_range_start_block = full_chunk_from; + db.store_latest_valid_message_range_start_block(from)?; + last_valid_range_start_block = from; // Move forward to the next height - from = to + 1; indexed_height.set(to as i64); } // The index of the first message in sorted_messages is not the @@ -175,11 +177,12 @@ where "[Messages]: Found invalid continuation in range. Re-indexing from the start block of the last successful range.", ); - from = last_valid_range_start_block; - indexed_height.set(from as i64); + cursor.backtrack(last_valid_range_start_block); + indexed_height.set(last_valid_range_start_block as i64); } ListValidity::ContainsGaps => { missed_messages.inc(); + cursor.backtrack(start_block); warn!( last_leaf_index = ?last_leaf_index, @@ -193,7 +196,6 @@ where // We don't update last_valid_range_start_block because we cannot extrapolate // if the range was correctly indexed if there are no messages to observe their // indices. - from = to + 1; indexed_height.set(to as i64); } }; @@ -203,6 +205,28 @@ where } } +#[cfg(test)] +static mut MOCK_CURSOR: Option = None; + +/// Create a new cursor. In test mode we should use the mock cursor created by +/// the test. +#[cfg_attr(test, allow(unused_variables))] +async fn create_cursor( + indexer: I, + chunk_size: u32, + initial_height: u32, +) -> eyre::Result { + #[cfg(not(test))] + { + crate::RateLimitedSyncBlockRangeCursor::new(indexer, chunk_size, initial_height).await + } + #[cfg(test)] + { + let cursor = unsafe { MOCK_CURSOR.take() }; + Ok(cursor.expect("Mock cursor was not set before it was used")) + } +} + #[cfg(test)] mod test { use std::sync::Arc; @@ -210,22 +234,32 @@ mod test { use ethers::core::types::H256; use eyre::eyre; + use mockall::predicate::eq; use mockall::*; use tokio::select; - use tokio::time::{interval, timeout}; + use tokio::sync::Mutex; + use tokio::time::{interval, sleep, timeout}; use abacus_core::{db::AbacusDB, AbacusMessage, Encode, LogMeta, RawCommittedMessage}; + use abacus_test::mocks::cursor::MockSyncBlockRangeCursor; use abacus_test::mocks::indexer::MockAbacusIndexer; use abacus_test::test_utils; - use mockall::predicate::eq; + use crate::contract_sync::outbox::MOCK_CURSOR; use crate::contract_sync::schema::OutboxContractSyncDB; use crate::ContractSync; use crate::{settings::IndexSettings, ContractSyncMetrics, CoreMetrics}; + // we need a mutex for our tests because of the static cursor object + lazy_static! { + static ref TEST_MTX: Mutex<()> = Mutex::new(()); + } + #[tokio::test] async fn handles_missing_rpc_messages() { test_utils::run_test_db(|db| async move { + let _test_lock = TEST_MTX.lock().await; + let mut message_vec = vec![]; AbacusMessage { origin: 1000, @@ -279,160 +313,102 @@ mod test { let latest_valid_message_range_start_block = 100; let mut mock_indexer = MockAbacusIndexer::new(); + let mut mock_cursor = MockSyncBlockRangeCursor::new(); { let mut seq = Sequence::new(); + // Some local macros to reduce code-duplication. + macro_rules! expect_current_position { + ($return_position:literal) => { + mock_cursor + .expect__current_position() + .times(1) + .in_sequence(&mut seq) + .return_once(|| $return_position); + }; + } + macro_rules! expect_backtrack { + ($expected_new_from:literal) => { + mock_cursor + .expect__backtrack() + .times(1) + .in_sequence(&mut seq) + .with(eq($expected_new_from)) + .return_once(|_| ()); + }; + } + macro_rules! expect_fetches_range { + ($expected_from:literal, $expected_to:literal, $return_messages:expr) => { + let messages: &[&RawCommittedMessage] = $return_messages; + let messages = messages.iter().map(|&msg| (msg.clone(), meta())).collect(); + mock_cursor + .expect__next_range() + .times(1) + .in_sequence(&mut seq) + .return_once(|| Box::pin(async { Ok(($expected_from, $expected_to)) })); + mock_indexer + .expect__fetch_sorted_messages() + .times(1) + .with(eq($expected_from), eq($expected_to)) + .in_sequence(&mut seq) + .return_once(move |_, _| Ok(messages)); + }; + } + + expect_current_position!(91); + expect_current_position!(91); + // Return m0. - let m0_clone = m0.clone(); - mock_indexer - .expect__get_finalized_block_number() - .times(1) - .in_sequence(&mut seq) - .return_once(|| Ok(110)); - mock_indexer - .expect__fetch_sorted_messages() - .times(1) - .with(eq(91), eq(110)) - .in_sequence(&mut seq) - .return_once(move |_, _| Ok(vec![(m0_clone, meta())])); + expect_fetches_range!(91, 110, &[&m0]); // Return m1, miss m2. - let m1_clone = m1.clone(); - mock_indexer - .expect__get_finalized_block_number() - .times(1) - .in_sequence(&mut seq) - .return_once(|| Ok(120)); - mock_indexer - .expect__fetch_sorted_messages() - .times(1) - .with(eq(101), eq(120)) - .in_sequence(&mut seq) - .return_once(move |_, _| Ok(vec![(m1_clone, meta())])); + expect_current_position!(111); + expect_fetches_range!(101, 120, &[&m1]); // Miss m3. - mock_indexer - .expect__get_finalized_block_number() - .times(1) - .in_sequence(&mut seq) - .return_once(|| Ok(130)); - mock_indexer - .expect__fetch_sorted_messages() - .times(1) - .with(eq(111), eq(130)) - .in_sequence(&mut seq) - .return_once(move |_, _| Ok(vec![])); + expect_current_position!(121); + expect_fetches_range!(111, 130, &[]); // Empty range. - mock_indexer - .expect__get_finalized_block_number() - .times(1) - .in_sequence(&mut seq) - .return_once(|| Ok(140)); - mock_indexer - .expect__fetch_sorted_messages() - .times(1) - .with(eq(121), eq(140)) - .in_sequence(&mut seq) - .return_once(move |_, _| Ok(vec![])); - - mock_indexer - .expect__get_finalized_block_number() - .times(1) - .in_sequence(&mut seq) - .return_once(|| Ok(140)); + expect_current_position!(131); + expect_fetches_range!(121, 140, &[]); // m1 --> m5 seen as an invalid continuation - let m5_clone = m5.clone(); - mock_indexer - .expect__get_finalized_block_number() - .times(1) - .in_sequence(&mut seq) - .return_once(|| Ok(150)); - mock_indexer - .expect__fetch_sorted_messages() - .times(1) - .with(eq(131), eq(150)) - .in_sequence(&mut seq) - .return_once(move |_, _| Ok(vec![(m5_clone, meta())])); + expect_current_position!(141); + expect_fetches_range!(131, 150, &[&m5]); + expect_backtrack!(101); // Indexer goes back to the last valid message range start block - // and indexes the range based off the chunk size of 19. + // and indexes the range // This time it gets m1 and m2 (which was previously skipped) - let m1_clone = m1.clone(); - let m2_clone = m2.clone(); - mock_indexer - .expect__get_finalized_block_number() - .times(1) - .in_sequence(&mut seq) - .return_once(|| Ok(160)); - mock_indexer - .expect__fetch_sorted_messages() - .times(1) - .with(eq(101), eq(120)) - .in_sequence(&mut seq) - .return_once(move |_, _| Ok(vec![(m1_clone, meta()), (m2_clone, meta())])); + expect_current_position!(101); + expect_fetches_range!(101, 120, &[&m1, &m2]); // Indexer continues, this time getting m3 and m5 message, but skipping m4, // which means this range contains gaps - let m3_clone = m3.clone(); - let m5_clone = m5.clone(); - mock_indexer - .expect__get_finalized_block_number() - .times(1) - .in_sequence(&mut seq) - .return_once(|| Ok(170)); - mock_indexer - .expect__fetch_sorted_messages() - .times(1) - .with(eq(121), eq(140)) - .in_sequence(&mut seq) - .return_once(move |_, _| Ok(vec![(m3_clone, meta()), (m5_clone, meta())])); + expect_current_position!(121); + expect_fetches_range!(118, 140, &[&m3, &m5]); + expect_backtrack!(121); // Indexer retries, the same range in hope of filling the gap, // which it now does successfully - mock_indexer - .expect__get_finalized_block_number() - .times(1) - .in_sequence(&mut seq) - .return_once(|| Ok(170)); - mock_indexer - .expect__fetch_sorted_messages() - .times(1) - .with(eq(121), eq(140)) - .in_sequence(&mut seq) - .return_once(move |_, _| Ok(vec![(m3, meta()), (m4, meta()), (m5, meta())])); + expect_current_position!(121); + expect_fetches_range!(121, 140, &[&m3, &m4, &m5]); // Indexer continues with the next block range, which happens to be empty - mock_indexer - .expect__get_finalized_block_number() - .times(1) - .in_sequence(&mut seq) - .return_once(|| Ok(180)); - mock_indexer - .expect__fetch_sorted_messages() - .times(1) - .with(eq(141), eq(160)) - .in_sequence(&mut seq) - .return_once(move |_, _| Ok(vec![])); - - // Indexer catches up with the tip - mock_indexer - .expect__get_finalized_block_number() - .times(1) - .in_sequence(&mut seq) - .return_once(|| Ok(180)); - mock_indexer - .expect__fetch_sorted_messages() - .times(1) - .with(eq(161), eq(180)) - .in_sequence(&mut seq) - .return_once(move |_, _| Ok(vec![])); + expect_current_position!(141); + expect_fetches_range!(141, 160, &[]); // Stay at the same tip, so no other fetch_sorted_messages calls are made - mock_indexer - .expect__get_finalized_block_number() - .returning(|| Ok(180)); + mock_cursor.expect__current_position().returning(|| 161); + mock_cursor.expect__next_range().returning(|| { + Box::pin(async move { + // this sleep should be longer than the test timeout since we don't actually + // want to yield any more values at this point. + sleep(Duration::from_secs(100)).await; + Ok((161, 161)) + }) + }); } let abacus_db = AbacusDB::new("outbox_1", db); @@ -449,13 +425,14 @@ mod test { CoreMetrics::new("contract_sync_test", None, prometheus::Registry::new()) .expect("could not make metrics"), ); + unsafe { MOCK_CURSOR = Some(mock_cursor) }; let sync_metrics = ContractSyncMetrics::new(metrics); let contract_sync = ContractSync::new( "outbox_1".into(), abacus_db.clone(), - indexer.clone(), + indexer, IndexSettings { from: Some("0".to_string()), chunk: Some("19".to_string()), @@ -464,7 +441,7 @@ mod test { ); let sync_task = contract_sync.sync_outbox_messages(); - let test_pass_fut = timeout(Duration::from_secs(90), async move { + let test_pass_fut = timeout(Duration::from_secs(5), async move { let mut interval = interval(Duration::from_millis(20)); loop { if abacus_db.message_by_leaf_index(0).expect("!db").is_some() @@ -485,7 +462,9 @@ mod test { tests_result = test_pass_fut => if tests_result.is_ok() { Ok(()) } else { Err(eyre!("timed out")) } }; - assert!(test_result.is_ok()); + if let Err(err) = test_result { + panic!("Test failed: {err}") + } }) .await } diff --git a/rust/abacus-base/src/lib.rs b/rust/abacus-base/src/lib.rs index ad45d673d..76b8f496a 100644 --- a/rust/abacus-base/src/lib.rs +++ b/rust/abacus-base/src/lib.rs @@ -5,7 +5,8 @@ //! Implementations of the `Outbox` and `Inbox` traits on different chains //! ought to live here. -#![forbid(unsafe_code)] +// Forbid unsafe code outside of tests +#![cfg_attr(not(test), forbid(unsafe_code))] #![warn(missing_docs)] #![warn(unused_extern_crates)] diff --git a/rust/abacus-core/src/traits/cursor.rs b/rust/abacus-core/src/traits/cursor.rs new file mode 100644 index 000000000..f796f59d8 --- /dev/null +++ b/rust/abacus-core/src/traits/cursor.rs @@ -0,0 +1,34 @@ +use async_trait::async_trait; +use auto_impl::auto_impl; +use eyre::Result; + +/// Tool for handling the logic of what the next block range that should be +/// queried and may perform rate limiting on `next_range` queries. +#[async_trait] +#[auto_impl(Box)] +pub trait SyncBlockRangeCursor { + /// Returns the current `from` position of the scraper. Note that + /// `next_range` may return a `from` value that is lower than this in order + /// to have some overlap. + fn current_position(&self) -> u32; + + /// Get the next block range `(from, to)` which should be fetched (this + /// returns an inclusive range such as (0,50), (51,100), ...). This + /// will automatically rate limit based on how far we are from the + /// highest block we can scrape according to + /// `get_finalized_block_number`. + /// + /// In reality this will often return a from value that overlaps with the + /// previous range to help ensure that we scrape everything even if the + /// provider failed to respond in full previously. + /// + /// This assumes the caller will call next_range again automatically on Err, + /// but it returns the error to allow for tailored logging or different end + /// cases. + async fn next_range(&mut self) -> Result<(u32, u32)>; + + /// If there was an issue when a range of data was fetched, this rolls back + /// so the next range fetched will be from `start_from`. Note that it is a + /// no-op if a later block value is specified. + fn backtrack(&mut self, start_from: u32); +} diff --git a/rust/abacus-core/src/traits/mod.rs b/rust/abacus-core/src/traits/mod.rs index 95a3e5f95..b6f757b87 100644 --- a/rust/abacus-core/src/traits/mod.rs +++ b/rust/abacus-core/src/traits/mod.rs @@ -13,6 +13,7 @@ use ethers::{ use eyre::Result; pub use common::*; +pub use cursor::*; pub use encode::*; pub use inbox::*; pub use indexer::*; @@ -24,6 +25,7 @@ pub use validator_manager::*; use crate::{db::DbError, utils::domain_hash, AbacusError}; mod common; +mod cursor; mod encode; mod inbox; mod indexer; diff --git a/rust/abacus-test/Cargo.toml b/rust/abacus-test/Cargo.toml index 239b354d8..c21dcdf7e 100644 --- a/rust/abacus-test/Cargo.toml +++ b/rust/abacus-test/Cargo.toml @@ -12,7 +12,7 @@ thiserror = { version = "1.0", default-features = false } async-trait = { version = "0.1", default-features = false } futures-util = "0.3" eyre = "0.6" -mockall = "0.10.2" +mockall = "0.11" rand = "0.8.3" rocksdb = "0.18" tempfile = "3.3" diff --git a/rust/abacus-test/src/mocks/cursor.rs b/rust/abacus-test/src/mocks/cursor.rs new file mode 100644 index 000000000..f0d4ee8f9 --- /dev/null +++ b/rust/abacus-test/src/mocks/cursor.rs @@ -0,0 +1,32 @@ +#![allow(non_snake_case)] + +use abacus_core::SyncBlockRangeCursor; +use async_trait::async_trait; +use eyre::Result; +use mockall::mock; +use std::future::Future; + +mock! { + pub SyncBlockRangeCursor { + pub fn _next_range(&mut self) -> impl Future> + Send {} + + pub fn _current_position(&self) -> u32 {} + + pub fn _backtrack(&mut self, start_from: u32) {} + } +} + +#[async_trait] +impl SyncBlockRangeCursor for MockSyncBlockRangeCursor { + fn current_position(&self) -> u32 { + self._current_position() + } + + async fn next_range(&mut self) -> Result<(u32, u32)> { + self._next_range().await + } + + fn backtrack(&mut self, start_from: u32) { + self._backtrack(start_from) + } +} diff --git a/rust/abacus-test/src/mocks/mod.rs b/rust/abacus-test/src/mocks/mod.rs index 032b483f9..0f4978842 100644 --- a/rust/abacus-test/src/mocks/mod.rs +++ b/rust/abacus-test/src/mocks/mod.rs @@ -7,5 +7,8 @@ pub mod inbox; /// Mock indexer pub mod indexer; +/// Mock SyncBlockRangeCursor +pub mod cursor; + pub use indexer::MockIndexer; pub use outbox::MockOutboxContract; diff --git a/rust/agents/scraper/src/chain_scraper/mod.rs b/rust/agents/scraper/src/chain_scraper/mod.rs index 311ec2dd5..5889545e0 100644 --- a/rust/agents/scraper/src/chain_scraper/mod.rs +++ b/rust/agents/scraper/src/chain_scraper/mod.rs @@ -88,10 +88,6 @@ impl SqlChainScraper { self.remotes.keys().copied() } - pub async fn get_finalized_block_number(&self) -> Result { - self.local.indexer.get_finalized_block_number().await - } - /// Sync contract data and other blockchain with the current chain state. /// This will create a long-running task that should be spawned. pub fn sync(self) -> impl Future> + Send + 'static { diff --git a/rust/agents/scraper/src/chain_scraper/sync.rs b/rust/agents/scraper/src/chain_scraper/sync.rs index c8ba92d40..b42f69ea5 100644 --- a/rust/agents/scraper/src/chain_scraper/sync.rs +++ b/rust/agents/scraper/src/chain_scraper/sync.rs @@ -1,16 +1,17 @@ -use std::cmp::min; use std::collections::HashMap; use std::ops::Deref; -use std::time::Duration; +use std::sync::Arc; +use abacus_base::RateLimitedSyncBlockRangeCursor; use ethers::prelude::H256; use eyre::Result; use prometheus::{IntCounter, IntGauge, IntGaugeVec}; -use tokio::time::sleep; use tracing::{debug, info, instrument, warn}; use abacus_base::last_message::validate_message_continuity; -use abacus_core::{name_from_domain_id, CommittedMessage, ListValidity}; +use abacus_core::{ + name_from_domain_id, CommittedMessage, ListValidity, OutboxIndexer, SyncBlockRangeCursor, +}; use crate::chain_scraper::{Delivery, RawMsgWithMeta, SqlChainScraper, TxnWithIdAndTime}; @@ -30,9 +31,8 @@ pub(super) struct Syncer { stored_deliveries: IntCounter, missed_messages: IntCounter, message_leaf_index: IntGaugeVec, - chunk_size: u32, + sync_cursor: RateLimitedSyncBlockRangeCursor>, - from: u32, last_valid_range_start_block: u32, last_leaf_index: u32, } @@ -79,10 +79,17 @@ impl Syncer { let message_leaf_index = scraper.metrics.message_leaf_index.clone(); let chunk_size = scraper.chunk_size; - let from = scraper.cursor.height().await as u32; - let last_valid_range_start_block = from; + let initial_height = scraper.cursor.height().await as u32; + let last_valid_range_start_block = initial_height; let last_leaf_index = scraper.last_message_leaf_index().await?.unwrap_or(0); + let sync_cursor = RateLimitedSyncBlockRangeCursor::new( + scraper.local.indexer.clone(), + chunk_size, + initial_height, + ) + .await?; + Ok(Self { scraper, indexed_message_height, @@ -91,8 +98,7 @@ impl Syncer { stored_deliveries, missed_messages, message_leaf_index, - chunk_size, - from, + sync_cursor, last_valid_range_start_block, last_leaf_index, }) @@ -101,25 +107,23 @@ impl Syncer { /// Sync contract and other blockchain data with the current chain state. #[instrument(skip(self), fields(chain_name = self.chain_name(), chink_size = self.chunk_size))] pub async fn run(mut self) -> Result<()> { - info!(from = self.from, "Resuming chain sync"); - self.indexed_message_height.set(self.from as i64); - self.indexed_deliveries_height.set(self.from as i64); + let start_block = self.sync_cursor.current_position(); + info!(from = start_block, "Resuming chain sync"); + self.indexed_message_height.set(start_block as i64); + self.indexed_deliveries_height.set(start_block as i64); loop { - sleep(Duration::from_secs(5)).await; - - let Ok(tip) = self.get_finalized_block_number().await else { - continue; + debug_assert_eq!(self.local.outbox.local_domain(), self.local_domain()); + let start_block = self.sync_cursor.current_position(); + let (from, to) = match self.sync_cursor.next_range().await { + Ok(range) => range, + Err(err) => { + warn!(error = %err, "failed to get next block range"); + continue; + } }; - if tip <= self.from { - sleep(Duration::from_secs(10)).await; - continue; - } - let to = min(tip, self.from + self.chunk_size); - let full_chunk_from = to.checked_sub(self.chunk_size).unwrap_or_default(); - debug_assert_eq!(self.local.outbox.local_domain(), self.local_domain()); - let (sorted_messages, deliveries) = self.scrape_range(full_chunk_from, to).await?; + let (sorted_messages, deliveries) = self.scrape_range(from, to).await?; let validation = validate_message_continuity( Some(self.last_leaf_index), @@ -130,18 +134,16 @@ impl Syncer { let max_leaf_index_of_batch = self.record_data(sorted_messages, deliveries).await?; - self.cursor.update(full_chunk_from as u64).await; + self.cursor.update(from as u64).await; if let Some(idx) = max_leaf_index_of_batch { self.last_leaf_index = idx; } - self.last_valid_range_start_block = full_chunk_from; - self.from = to + 1; + self.last_valid_range_start_block = from; self.indexed_message_height.set(to as i64); self.indexed_deliveries_height.set(to as i64); } ListValidity::Empty => { let _ = self.record_data(sorted_messages, deliveries).await?; - self.from = to + 1; self.indexed_message_height.set(to as i64); self.indexed_deliveries_height.set(to as i64); } @@ -149,20 +151,24 @@ impl Syncer { self.missed_messages.inc(); warn!( last_leaf_index = self.last_leaf_index, - start_block = self.from, + start_block = from, end_block = to, last_valid_range_start_block = self.last_valid_range_start_block, "Found invalid continuation in range. Re-indexing from the start block of the last successful range." ); - self.from = self.last_valid_range_start_block; - self.indexed_message_height.set(self.from as i64); - self.indexed_deliveries_height.set(self.from as i64); + self.sync_cursor + .backtrack(self.last_valid_range_start_block); + self.indexed_message_height + .set(self.last_valid_range_start_block as i64); + self.indexed_deliveries_height + .set(self.last_valid_range_start_block as i64); } ListValidity::ContainsGaps => { self.missed_messages.inc(); + self.sync_cursor.backtrack(start_block); warn!( last_leaf_index = self.last_leaf_index, - start_block = self.from, + start_block = from, end_block = to, last_valid_range_start_block = self.last_valid_range_start_block, "Found gaps in the message in range, re-indexing the same range." From 3fc46ed311d39f8a6f45ad528d961d6c9e6511b1 Mon Sep 17 00:00:00 2001 From: Nam Chu Hoai Date: Tue, 22 Nov 2022 14:40:28 -0500 Subject: [PATCH 3/7] Minor changes to verifications (#1291) --- typescript/infra/scripts/verify.ts | 2 +- typescript/sdk/src/deploy/verify/ContractVerifier.ts | 3 +++ 2 files changed, 4 insertions(+), 1 deletion(-) diff --git a/typescript/infra/scripts/verify.ts b/typescript/infra/scripts/verify.ts index b997dc60c..5e77e465f 100644 --- a/typescript/infra/scripts/verify.ts +++ b/typescript/infra/scripts/verify.ts @@ -32,7 +32,7 @@ async function main() { // from solidity/core/hardhat.config.ts const compilerOptions: CompilerOptions = { codeformat: 'solidity-single-file', - compilerversion: 'v0.8.13+commit.07a7930e', + compilerversion: 'v0.8.16+commit.07a7930e', optimizationUsed: '1', runs: '999999', }; diff --git a/typescript/sdk/src/deploy/verify/ContractVerifier.ts b/typescript/sdk/src/deploy/verify/ContractVerifier.ts index 56684a683..04026a784 100644 --- a/typescript/sdk/src/deploy/verify/ContractVerifier.ts +++ b/typescript/sdk/src/deploy/verify/ContractVerifier.ts @@ -102,6 +102,7 @@ export class ContractVerifier extends MultiGeneric< this.logger(`Proxy verification failed, try manually?`); return; default: + this.logger(`Verification failed for some unknown reason`, result); throw new Error(`Verification failed: ${result.result}`); } } @@ -122,6 +123,8 @@ export class ContractVerifier extends MultiGeneric< return; } + this.logger(`Verifying ${input.name} at ${input.address} on ${chain}`); + const data = { sourceCode: this.flattenedSource, contractname: input.name, From 6ac9e9f1d4539f6f027536c5fffa26e6c5e9e355 Mon Sep 17 00:00:00 2001 From: Mattie Conover Date: Tue, 22 Nov 2022 12:10:57 -0800 Subject: [PATCH 4/7] Mattie/update retrying provider (#1294) * Removed dead code * Bump info to warn in retrying provider * Fix span levels --- rust/chains/abacus-ethereum/src/retrying.rs | 74 ++------------------- 1 file changed, 7 insertions(+), 67 deletions(-) diff --git a/rust/chains/abacus-ethereum/src/retrying.rs b/rust/chains/abacus-ethereum/src/retrying.rs index d4583ebee..2f9d828f2 100644 --- a/rust/chains/abacus-ethereum/src/retrying.rs +++ b/rust/chains/abacus-ethereum/src/retrying.rs @@ -6,9 +6,9 @@ use serde::{de::DeserializeOwned, Serialize}; use serde_json::Value; use thiserror::Error; use tokio::time::sleep; -use tracing::{debug, error, info, instrument, trace, warn}; +use tracing::{debug, error, instrument, trace, warn}; -use crate::{HttpClientError, QuorumProvider}; +use crate::HttpClientError; const METHODS_TO_NOT_RETRY: &[&str] = &[ "eth_estimateGas", @@ -72,7 +72,7 @@ where /// The retrying provider logic which accepts a matcher function that can /// handle specific cases for different underlying provider /// implementations. - #[instrument(level = "error", skip_all, fields(method = %method))] + #[instrument(skip_all, fields(method = %method))] async fn request_with_retry( &self, method: &str, @@ -160,7 +160,7 @@ where impl JsonRpcClient for RetryingProvider { type Error = RetryingProviderError; - #[instrument(level = "error", skip(self), fields(provider_host = %self.inner.url().host_str().unwrap_or("unknown")))] + #[instrument(skip(self), fields(provider_host = %self.inner.url().host_str().unwrap_or("unknown")))] async fn request(&self, method: &str, params: T) -> Result where T: Debug + Serialize + Send + Sync, @@ -169,12 +169,7 @@ impl JsonRpcClient for RetryingProvider { self.request_with_retry::(method, params, |res, attempt, next_backoff_ms| match res { Ok(res) => HandleMethod::Accept(res), Err(HttpClientError::ReqwestError(e)) => { - info!( - next_backoff_ms, - retries_remaining = self.max_requests - attempt, - error = %e, - "ReqwestError in http provider.", - ); + warn!(next_backoff_ms, retries_remaining = self.max_requests - attempt, error = %e, "ReqwestError in http provider."); HandleMethod::Retry(HttpClientError::ReqwestError(e)) } Err(HttpClientError::JsonRpcError(e)) => { @@ -185,12 +180,12 @@ impl JsonRpcClient for RetryingProvider { warn!(attempt, next_backoff_ms, error = %e, "JsonRpcError in http provider; not retrying."); HandleMethod::Halt(HttpClientError::JsonRpcError(e)) } else { - info!(attempt, next_backoff_ms, error = %e, "JsonRpcError in http provider."); + warn!(attempt, next_backoff_ms, error = %e, "JsonRpcError in http provider."); HandleMethod::Retry(HttpClientError::JsonRpcError(e)) } } Err(HttpClientError::SerdeJson { err, text }) => { - info!(attempt, next_backoff_ms, error = %err, text = text, "SerdeJson error in http provider"); + warn!(attempt, next_backoff_ms, error = %err, text = text, "SerdeJson error in http provider"); HandleMethod::Retry(HttpClientError::SerdeJson { err, text }) } }) @@ -198,61 +193,6 @@ impl JsonRpcClient for RetryingProvider { } } -#[async_trait] -impl JsonRpcClient for RetryingProvider> -where - C: JsonRpcClient + 'static, -{ - type Error = RetryingProviderError>; - - #[instrument(level = "error", skip_all, fields(method = %method))] - async fn request(&self, method: &str, params: T) -> Result - where - T: Debug + Serialize + Send + Sync, - R: DeserializeOwned, - { - use HandleMethod::*; - self.request_with_retry::(method, params, |res, attempt, next_backoff_ms| { - let handling = match res { - Ok(v) => Accept(v), - Err(ProviderError::CustomError(e)) => Retry(ProviderError::CustomError(e)), - Err(ProviderError::EnsError(e)) => Retry(ProviderError::EnsError(e)), - Err(ProviderError::EnsNotOwned(e)) => Halt(ProviderError::EnsNotOwned(e)), - Err(ProviderError::HTTPError(e)) => Retry(ProviderError::HTTPError(e)), - Err(ProviderError::HexError(e)) => Halt(ProviderError::HexError(e)), - Err(ProviderError::JsonRpcClientError(e)) => { - if METHODS_TO_NOT_RETRY.contains(&method) { - Halt(ProviderError::JsonRpcClientError(e)) - } else { - Retry(ProviderError::JsonRpcClientError(e)) - } - } - Err(ProviderError::SerdeJson(e)) => Retry(ProviderError::SerdeJson(e)), - Err(ProviderError::SignerUnavailable) => Halt(ProviderError::SignerUnavailable), - Err(ProviderError::UnsupportedNodeClient) => { - Halt(ProviderError::UnsupportedNodeClient) - } - Err(ProviderError::UnsupportedRPC) => Halt(ProviderError::UnsupportedRPC), - }; - - match &handling { - Accept(_) => { - trace!("Quorum reached successfully."); - } - Halt(e) => { - error!(attempt, next_backoff_ms, error = %e, "Failed to reach quorum; not retrying."); - } - Retry(e) => { - warn!(attempt, next_backoff_ms, error = %e, "Failed to reach quorum; suggesting retry."); - } - } - - handling - }) - .await - } -} - impl

FromStr for RetryingProvider

where P: JsonRpcClient + FromStr, From 551af4c12ec434b64afc71bf65fb9e052bd0fef0 Mon Sep 17 00:00:00 2001 From: Nam Chu Hoai Date: Wed, 23 Nov 2022 20:26:29 -0500 Subject: [PATCH 5/7] Support ICA/IQS functions without Call struct (#1307) * Support ICA/IQS functions without Call struct * Fix --- .../middleware/InterchainAccountRouter.sol | 10 ++++++++++ .../middleware/InterchainQueryRouter.sol | 20 +++++++++++++++++++ .../mock/MockInterchainAccountRouter.sol | 18 +++++++++++++++-- .../interfaces/IInterchainAccountRouter.sol | 6 ++++++ .../interfaces/IInterchainQueryRouter.sol | 7 +++++++ .../src/middleware/accounts.hardhat-test.ts | 4 +++- 6 files changed, 62 insertions(+), 3 deletions(-) diff --git a/solidity/contracts/middleware/InterchainAccountRouter.sol b/solidity/contracts/middleware/InterchainAccountRouter.sol index 448c7c5a3..1e4798d75 100644 --- a/solidity/contracts/middleware/InterchainAccountRouter.sol +++ b/solidity/contracts/middleware/InterchainAccountRouter.sol @@ -44,6 +44,16 @@ contract InterchainAccountRouter is Router, IInterchainAccountRouter { return _dispatch(_destinationDomain, abi.encode(msg.sender, calls)); } + function dispatch( + uint32 _destinationDomain, + address target, + bytes calldata data + ) external returns (uint256) { + Call[] memory calls = new Call[](1); + calls[0] = Call({to: target, data: data}); + return _dispatch(_destinationDomain, abi.encode(msg.sender, calls)); + } + function getInterchainAccount(uint32 _origin, address _sender) public view diff --git a/solidity/contracts/middleware/InterchainQueryRouter.sol b/solidity/contracts/middleware/InterchainQueryRouter.sol index e732719d4..49dabe5c1 100644 --- a/solidity/contracts/middleware/InterchainQueryRouter.sol +++ b/solidity/contracts/middleware/InterchainQueryRouter.sol @@ -43,6 +43,26 @@ contract InterchainQueryRouter is _setInterchainGasPaymaster(_interchainGasPaymaster); } + /** + * @param _destinationDomain Domain of destination chain + * @param target The address of the contract to query on destination chain. + * @param queryData The calldata of the view call to make on the destination chain. + * @param callback Callback function selector on `msg.sender` and optionally abi-encoded prefix arguments. + */ + function query( + uint32 _destinationDomain, + address target, + bytes calldata queryData, + bytes calldata callback + ) external returns (uint256 leafIndex) { + // TODO: fix this ugly arrayification + Call[] memory calls = new Call[](1); + calls[0] = Call({to: target, data: queryData}); + bytes[] memory callbacks = new bytes[](1); + callbacks[0] = callback; + leafIndex = query(_destinationDomain, calls, callbacks); + } + /** * @param _destinationDomain Domain of destination chain * @param call Call (to and data packed struct) to be made on destination chain. diff --git a/solidity/contracts/mock/MockInterchainAccountRouter.sol b/solidity/contracts/mock/MockInterchainAccountRouter.sol index 974614a96..dfd475063 100644 --- a/solidity/contracts/mock/MockInterchainAccountRouter.sol +++ b/solidity/contracts/mock/MockInterchainAccountRouter.sol @@ -38,10 +38,24 @@ contract MockInterchainAccountRouter is IInterchainAccountRouter { originDomain = _originDomain; } - function dispatch(uint32, Call[] calldata calls) - external + function dispatch(uint32 _destinationDomain, Call[] calldata calls) + public returns (uint256) { + return _dispatch(_destinationDomain, calls); + } + + function dispatch( + uint32 _destinationDomain, + address target, + bytes calldata data + ) external returns (uint256) { + Call[] memory calls = new Call[](1); + calls[0] = Call({to: target, data: data}); + return _dispatch(_destinationDomain, calls); + } + + function _dispatch(uint32, Call[] memory calls) internal returns (uint256) { pendingCalls[totalCalls] = PendingCall( originDomain, msg.sender, diff --git a/solidity/interfaces/IInterchainAccountRouter.sol b/solidity/interfaces/IInterchainAccountRouter.sol index c162c9be7..da24042bb 100644 --- a/solidity/interfaces/IInterchainAccountRouter.sol +++ b/solidity/interfaces/IInterchainAccountRouter.sol @@ -8,6 +8,12 @@ interface IInterchainAccountRouter { external returns (uint256); + function dispatch( + uint32 _destinationDomain, + address target, + bytes calldata data + ) external returns (uint256); + function getInterchainAccount(uint32 _originDomain, address _sender) external view diff --git a/solidity/interfaces/IInterchainQueryRouter.sol b/solidity/interfaces/IInterchainQueryRouter.sol index 490e23283..8954c815d 100644 --- a/solidity/interfaces/IInterchainQueryRouter.sol +++ b/solidity/interfaces/IInterchainQueryRouter.sol @@ -4,6 +4,13 @@ pragma solidity >=0.6.11; import {Call} from "../contracts/Call.sol"; interface IInterchainQueryRouter { + function query( + uint32 _destinationDomain, + address target, + bytes calldata queryData, + bytes calldata callback + ) external returns (uint256); + function query( uint32 _destinationDomain, Call calldata call, diff --git a/typescript/sdk/src/middleware/accounts.hardhat-test.ts b/typescript/sdk/src/middleware/accounts.hardhat-test.ts index 2726902d4..e3b6a9966 100644 --- a/typescript/sdk/src/middleware/accounts.hardhat-test.ts +++ b/typescript/sdk/src/middleware/accounts.hardhat-test.ts @@ -67,7 +67,9 @@ describe('InterchainAccountRouter', async () => { localDomain, signer.address, ); - await local.dispatch(remoteDomain, [{ to: recipient.address, data }]); + await local['dispatch(uint32,(address,bytes)[])'](remoteDomain, [ + { to: recipient.address, data }, + ]); await coreApp.processMessages(); expect(await recipient.lastCallMessage()).to.eql(fooMessage); expect(await recipient.lastCaller()).to.eql(icaAddress); From b96880c40d48f2510d6b69ae9941e7f0822df340 Mon Sep 17 00:00:00 2001 From: Trevor Porter Date: Mon, 28 Nov 2022 13:54:02 +0000 Subject: [PATCH 6/7] Correct span name (#1303) --- rust/agents/scraper/src/chain_scraper/sync.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/rust/agents/scraper/src/chain_scraper/sync.rs b/rust/agents/scraper/src/chain_scraper/sync.rs index b42f69ea5..56d9dfb73 100644 --- a/rust/agents/scraper/src/chain_scraper/sync.rs +++ b/rust/agents/scraper/src/chain_scraper/sync.rs @@ -105,7 +105,7 @@ impl Syncer { } /// Sync contract and other blockchain data with the current chain state. - #[instrument(skip(self), fields(chain_name = self.chain_name(), chink_size = self.chunk_size))] + #[instrument(skip(self), fields(chain_name = self.chain_name(), chunk_size = self.chunk_size))] pub async fn run(mut self) -> Result<()> { let start_block = self.sync_cursor.current_position(); info!(from = start_block, "Resuming chain sync"); From e6a3373979a90c09027cfd83c4abc6a23e6c267f Mon Sep 17 00:00:00 2001 From: Asa Oines Date: Mon, 28 Nov 2022 14:37:46 -0500 Subject: [PATCH 7/7] Fix build --- rust/abacus-base/src/contract_sync/mailbox.rs | 17 +++++++++++++---- rust/abacus-core/src/traits/mod.rs | 2 -- rust/agents/scraper/src/chain_scraper/sync.rs | 9 +++------ .../middleware/InterchainAccountRouter.sol | 2 +- .../middleware/InterchainQueryRouter.sol | 4 ++-- .../interfaces/IInterchainAccountRouter.sol | 2 +- solidity/interfaces/IInterchainQueryRouter.sol | 2 +- 7 files changed, 21 insertions(+), 17 deletions(-) diff --git a/rust/abacus-base/src/contract_sync/mailbox.rs b/rust/abacus-base/src/contract_sync/mailbox.rs index 4bf0a772e..6f2496b3f 100644 --- a/rust/abacus-base/src/contract_sync/mailbox.rs +++ b/rust/abacus-base/src/contract_sync/mailbox.rs @@ -2,8 +2,7 @@ use tracing::{debug, info, info_span, warn}; use tracing::{instrument::Instrumented, Instrument}; use abacus_core::{ - name_from_domain_id, CommittedMessage, Indexer, ListValidity, OutboxIndexer, - SyncBlockRangeCursor, + name_from_domain_id, Indexer, ListValidity, MailboxIndexer, SyncBlockRangeCursor, }; use crate::contract_sync::last_message::validate_message_continuity; @@ -16,7 +15,9 @@ where I: MailboxIndexer + Clone + 'static, { /// Sync dispatched messages - pub fn sync_dispatched_messages(&self) -> Instrumented>> { + pub fn sync_dispatched_messages( + &self, + ) -> Instrumented>> { let span = info_span!("MessageContractSync"); let db = self.db.clone(); @@ -238,11 +239,13 @@ mod test { use tokio::time::{interval, sleep, timeout}; use abacus_core::{db::AbacusDB, AbacusMessage, LogMeta}; + use abacus_test::mocks::cursor::MockSyncBlockRangeCursor; use abacus_test::mocks::indexer::MockAbacusIndexer; use abacus_test::test_utils; use crate::contract_sync::mailbox::MOCK_CURSOR; use crate::contract_sync::schema::OutboxContractSyncDB; + use crate::contract_sync::IndexSettings; use crate::ContractSync; use crate::{ContractSyncMetrics, CoreMetrics}; @@ -267,6 +270,12 @@ mod test { }; let messages = (0..10).map(message_gen).collect::>(); + let m0 = messages[0].clone(); + let m1 = messages[1].clone(); + let m2 = messages[2].clone(); + let m3 = messages[3].clone(); + let m4 = messages[4].clone(); + let m5 = messages[5].clone(); let meta = || LogMeta { address: Default::default(), @@ -306,7 +315,7 @@ mod test { } macro_rules! expect_fetches_range { ($expected_from:literal, $expected_to:literal, $return_messages:expr) => { - let messages: &[&RawCommittedMessage] = $return_messages; + let messages: &[&AbacusMessage] = $return_messages; let messages = messages.iter().map(|&msg| (msg.clone(), meta())).collect(); mock_cursor .expect__next_range() diff --git a/rust/abacus-core/src/traits/mod.rs b/rust/abacus-core/src/traits/mod.rs index d3dc46650..7e0b71cb6 100644 --- a/rust/abacus-core/src/traits/mod.rs +++ b/rust/abacus-core/src/traits/mod.rs @@ -10,7 +10,6 @@ use ethers::{ providers::{Middleware, ProviderError}, }; -pub use common::*; pub use cursor::*; pub use encode::*; pub use indexer::*; @@ -21,7 +20,6 @@ pub use provider::*; use crate::{db::DbError, AbacusError}; -mod common; mod cursor; mod encode; mod indexer; diff --git a/rust/agents/scraper/src/chain_scraper/sync.rs b/rust/agents/scraper/src/chain_scraper/sync.rs index 737238de5..a99412256 100644 --- a/rust/agents/scraper/src/chain_scraper/sync.rs +++ b/rust/agents/scraper/src/chain_scraper/sync.rs @@ -10,9 +10,7 @@ use prometheus::{IntCounter, IntGauge, IntGaugeVec}; use tracing::{debug, info, instrument, warn}; use abacus_base::last_message::validate_message_continuity; -use abacus_core::{ - name_from_domain_id, CommittedMessage, ListValidity, OutboxIndexer, SyncBlockRangeCursor, -}; +use abacus_core::{name_from_domain_id, ListValidity, MailboxIndexer, SyncBlockRangeCursor}; use crate::chain_scraper::{AbacusMessageWithMeta, Delivery, SqlChainScraper, TxnWithIdAndTime}; @@ -32,7 +30,7 @@ pub(super) struct Syncer { stored_deliveries: IntCounter, missed_messages: IntCounter, message_nonce: IntGaugeVec, - sync_cursor: RateLimitedSyncBlockRangeCursor>, + sync_cursor: RateLimitedSyncBlockRangeCursor>, last_valid_range_start_block: u32, last_nonce: u32, @@ -85,7 +83,7 @@ impl Syncer { let last_nonce = scraper.last_message_nonce().await?.unwrap_or(0); let sync_cursor = RateLimitedSyncBlockRangeCursor::new( - scraper.local.indexer.clone(), + scraper.contracts.indexer.clone(), chunk_size, initial_height, ) @@ -114,7 +112,6 @@ impl Syncer { self.indexed_deliveries_height.set(start_block as i64); loop { - debug_assert_eq!(self.local.outbox.local_domain(), self.local_domain()); let start_block = self.sync_cursor.current_position(); let (from, to) = match self.sync_cursor.next_range().await { Ok(range) => range, diff --git a/solidity/contracts/middleware/InterchainAccountRouter.sol b/solidity/contracts/middleware/InterchainAccountRouter.sol index 73f2a3067..f99b3ab32 100644 --- a/solidity/contracts/middleware/InterchainAccountRouter.sol +++ b/solidity/contracts/middleware/InterchainAccountRouter.sol @@ -49,7 +49,7 @@ contract InterchainAccountRouter is Router, IInterchainAccountRouter { uint32 _destinationDomain, address target, bytes calldata data - ) external returns (uint256) { + ) external returns (bytes32) { Call[] memory calls = new Call[](1); calls[0] = Call({to: target, data: data}); return _dispatch(_destinationDomain, abi.encode(msg.sender, calls)); diff --git a/solidity/contracts/middleware/InterchainQueryRouter.sol b/solidity/contracts/middleware/InterchainQueryRouter.sol index c16ea41b4..cbb010c08 100644 --- a/solidity/contracts/middleware/InterchainQueryRouter.sol +++ b/solidity/contracts/middleware/InterchainQueryRouter.sol @@ -55,13 +55,13 @@ contract InterchainQueryRouter is address target, bytes calldata queryData, bytes calldata callback - ) external returns (uint256 leafIndex) { + ) external returns (bytes32 messageId) { // TODO: fix this ugly arrayification Call[] memory calls = new Call[](1); calls[0] = Call({to: target, data: queryData}); bytes[] memory callbacks = new bytes[](1); callbacks[0] = callback; - leafIndex = query(_destinationDomain, calls, callbacks); + messageId = query(_destinationDomain, calls, callbacks); } /** diff --git a/solidity/interfaces/IInterchainAccountRouter.sol b/solidity/interfaces/IInterchainAccountRouter.sol index 11fcd3024..db619205b 100644 --- a/solidity/interfaces/IInterchainAccountRouter.sol +++ b/solidity/interfaces/IInterchainAccountRouter.sol @@ -12,7 +12,7 @@ interface IInterchainAccountRouter { uint32 _destinationDomain, address target, bytes calldata data - ) external returns (uint256); + ) external returns (bytes32); function getInterchainAccount(uint32 _originDomain, address _sender) external diff --git a/solidity/interfaces/IInterchainQueryRouter.sol b/solidity/interfaces/IInterchainQueryRouter.sol index b398a716e..315fc85e7 100644 --- a/solidity/interfaces/IInterchainQueryRouter.sol +++ b/solidity/interfaces/IInterchainQueryRouter.sol @@ -9,7 +9,7 @@ interface IInterchainQueryRouter { address target, bytes calldata queryData, bytes calldata callback - ) external returns (uint256); + ) external returns (bytes32); function query( uint32 _destinationDomain,