From 08b4edecd8b7032ac32544ddbaaa35294dd7bd11 Mon Sep 17 00:00:00 2001 From: Trevor Porter Date: Mon, 26 Sep 2022 14:53:55 +0100 Subject: [PATCH] Add timeout to inner HttpQuorum providers & all API requests, correctly record # of gas payment events processed (#1115) * Add timeout to inner HttpQuorum providers, correctly record # of gas payment events processed * Add timeouts to S3 requests * Add timeout everywhere --- .../src/contract_sync/interchain_gas.rs | 9 +++-- rust/abacus-base/src/types/s3_storage.rs | 22 +++++++++++-- rust/abacus-core/src/db/abacus_db.rs | 9 +++-- .../policies/meets_estimated_cost.rs | 33 +++++++++++-------- .../relayer/src/msg/gelato_submitter/mod.rs | 8 ++++- .../abacus-ethereum/src/trait_builder.rs | 8 +++-- 6 files changed, 64 insertions(+), 25 deletions(-) diff --git a/rust/abacus-base/src/contract_sync/interchain_gas.rs b/rust/abacus-base/src/contract_sync/interchain_gas.rs index ec591b450..4e204fccf 100644 --- a/rust/abacus-base/src/contract_sync/interchain_gas.rs +++ b/rust/abacus-base/src/contract_sync/interchain_gas.rs @@ -72,11 +72,16 @@ where "[GasPayments]: indexed block range" ); + let mut new_payments_processed: i64 = 0; for gas_payment in gas_payments.iter() { - db.process_gas_payment(gas_payment)?; + // Attempt to process the gas payment, incrementing new_payments_processed + // if it was processed for the first time. + if db.process_gas_payment(gas_payment)? { + new_payments_processed += 1; + } } - stored_messages.add(gas_payments.len().try_into()?); + stored_messages.add(new_payments_processed); db.store_latest_indexed_gas_payment_block(to)?; from = to + 1; diff --git a/rust/abacus-base/src/types/s3_storage.rs b/rust/abacus-base/src/types/s3_storage.rs index f96f425b0..8e8ebd6ba 100644 --- a/rust/abacus-base/src/types/s3_storage.rs +++ b/rust/abacus-base/src/types/s3_storage.rs @@ -1,4 +1,4 @@ -use std::fmt; +use std::{fmt, time::Duration}; use abacus_core::SignedCheckpoint; use async_trait::async_trait; @@ -11,9 +11,15 @@ use rusoto_core::{ HttpClient, Region, RusotoError, }; use rusoto_s3::{GetObjectError, GetObjectRequest, PutObjectRequest, S3Client, S3}; +use tokio::time::timeout; use crate::CheckpointSyncer; +/// The timeout for S3 requests. Rusoto doesn't offer timeout configuration +/// out of the box, so S3 requests must be wrapped with a timeout. +/// See https://github.com/rusoto/rusoto/issues/1795. +const S3_REQUEST_TIMEOUT_SECONDS: u64 = 30; + #[derive(Clone)] /// Type for reading/writing to S3 pub struct S3Storage { @@ -58,7 +64,11 @@ impl S3Storage { content_type: Some("application/json".to_owned()), ..Default::default() }; - self.authenticated_client().put_object(req).await?; + timeout( + Duration::from_secs(S3_REQUEST_TIMEOUT_SECONDS), + self.authenticated_client().put_object(req), + ) + .await??; Ok(()) } @@ -69,7 +79,13 @@ impl S3Storage { bucket: self.bucket.clone(), ..Default::default() }; - match self.anonymous_client().get_object(req).await { + let get_object_result = timeout( + Duration::from_secs(S3_REQUEST_TIMEOUT_SECONDS), + self.anonymous_client().get_object(req), + ) + .await?; + + match get_object_result { Ok(res) => match res.body { Some(body) => Ok(Some(body.map_ok(|b| b.to_vec()).try_concat().await?)), None => Ok(None), diff --git a/rust/abacus-core/src/db/abacus_db.rs b/rust/abacus-core/src/db/abacus_db.rs index 2f1268972..c369ab67c 100644 --- a/rust/abacus-core/src/db/abacus_db.rs +++ b/rust/abacus-core/src/db/abacus_db.rs @@ -236,15 +236,17 @@ impl AbacusDB { /// If the provided gas payment, identified by its metadata, has not been processed, /// processes the gas payment and records it as processed. + /// Returns whether the gas payment was processed for the first time. pub fn process_gas_payment( &self, gas_payment_with_meta: &InterchainGasPaymentWithMeta, - ) -> Result<(), DbError> { + ) -> Result { let meta = &gas_payment_with_meta.meta; // If the gas payment has already been processed, do nothing if self.retrieve_gas_payment_meta_processed(meta)? { trace!(gas_payment_with_meta=?gas_payment_with_meta, "Attempted to process an already-processed gas payment"); - return Ok(()); + // Return false to indicate the gas payment was already processed + return Ok(false); } // Set the gas payment as processed self.store_gas_payment_meta_processed(meta)?; @@ -252,7 +254,8 @@ impl AbacusDB { // Update the total gas payment for the leaf to include the payment self.update_gas_payment_for_leaf(&gas_payment_with_meta.payment)?; - Ok(()) + // Return true to indicate the gas payment was processed for the first time + Ok(true) } /// Record a gas payment, identified by its metadata, as processed diff --git a/rust/agents/relayer/src/msg/gas_payment/policies/meets_estimated_cost.rs b/rust/agents/relayer/src/msg/gas_payment/policies/meets_estimated_cost.rs index 6a39dc145..a1a0edd8f 100644 --- a/rust/agents/relayer/src/msg/gas_payment/policies/meets_estimated_cost.rs +++ b/rust/agents/relayer/src/msg/gas_payment/policies/meets_estimated_cost.rs @@ -8,10 +8,11 @@ use async_trait::async_trait; use coingecko::CoinGeckoClient; use ethers::types::U256; use eyre::{eyre, Result}; -use tokio::sync::RwLock; +use tokio::{sync::RwLock, time::timeout}; use crate::msg::gas_payment::GasPaymentPolicy; +const COINGECKO_API_HTTP_TIMEOUT_SECONDS: u64 = 30; const CACHE_TTL_SECONDS: u64 = 60; /// 1 / 100th of a cent const FIXED_POINT_PRECISION: usize = 1000; @@ -98,17 +99,16 @@ impl CoinGeckoCachingPriceGetter { cached_usd_prices.insert(coingecko_id, usd_price.into()); } - async fn get_usd_price(&self, coingecko_id: &'static str) -> Result { - if let Some(usd_price) = self.get_cached_usd_price(coingecko_id).await { - return Ok(usd_price); - } - - // Returns a HashMap keyed by coingecko IDs - let api_response = self - .coingecko - .price(&[coingecko_id], &["usd"], false, false, false, false) - .await?; - let usd_price = api_response + async fn get_coingecko_usd_price(&self, coingecko_id: &'static str) -> Result { + // Make the API request with a timeout, which can't be configured in the library we're using. + // Returns a HashMap keyed by coingecko IDs. + let api_response = timeout( + Duration::from_secs(COINGECKO_API_HTTP_TIMEOUT_SECONDS), + self.coingecko + .price(&[coingecko_id], &["usd"], false, false, false, false), + ) + .await??; + api_response .get(coingecko_id) .and_then(|p| p.usd) .ok_or_else(|| { @@ -116,8 +116,15 @@ impl CoinGeckoCachingPriceGetter { "Unable to get USD price for {} from CoinGecko API response", coingecko_id ) - })?; + }) + } + + async fn get_usd_price(&self, coingecko_id: &'static str) -> Result { + if let Some(usd_price) = self.get_cached_usd_price(coingecko_id).await { + return Ok(usd_price); + } + let usd_price = self.get_coingecko_usd_price(coingecko_id).await?; self.set_cached_usd_price(coingecko_id, usd_price).await; Ok(usd_price) diff --git a/rust/agents/relayer/src/msg/gelato_submitter/mod.rs b/rust/agents/relayer/src/msg/gelato_submitter/mod.rs index af91ce2f5..214f397bf 100644 --- a/rust/agents/relayer/src/msg/gelato_submitter/mod.rs +++ b/rust/agents/relayer/src/msg/gelato_submitter/mod.rs @@ -21,6 +21,8 @@ use super::SubmitMessageArgs; mod sponsored_call_op; +const HTTP_CLIENT_REQUEST_SECONDS: u64 = 30; + #[derive(Debug)] pub(crate) struct GelatoSubmitter { /// The Gelato config. @@ -56,6 +58,10 @@ impl GelatoSubmitter { ) -> Self { let (message_processed_sender, message_processed_receiver) = mpsc::unbounded_channel::(); + let http_client = reqwest::ClientBuilder::new() + .timeout(Duration::from_secs(HTTP_CLIENT_REQUEST_SECONDS)) + .build() + .unwrap(); Self { message_receiver, inbox_gelato_chain: abacus_domain_id_to_gelato_chain( @@ -65,7 +71,7 @@ impl GelatoSubmitter { inbox_contracts, db: abacus_db, gelato_config, - http_client: reqwest::Client::new(), + http_client, metrics, message_processed_sender, message_processed_receiver, diff --git a/rust/chains/abacus-ethereum/src/trait_builder.rs b/rust/chains/abacus-ethereum/src/trait_builder.rs index eb209d874..a0fbf7236 100644 --- a/rust/chains/abacus-ethereum/src/trait_builder.rs +++ b/rust/chains/abacus-ethereum/src/trait_builder.rs @@ -45,8 +45,10 @@ pub trait MakeableWithProvider { Connection::HttpQuorum { urls } => { let rpc_metrics = rpc_metrics.map(|f| f()); let mut builder = QuorumProvider::builder().quorum(Quorum::Majority); + let http_client = Client::builder().timeout(HTTP_CLIENT_TIMEOUT).build()?; for url in urls.split(',') { - let http_provider: Http = url.parse()?; + let http_provider = + Http::new_with_client(url.parse::()?, http_client.clone()); // Wrap the inner providers as RetryingProviders rather than the QuorumProvider. // We've observed issues where the QuorumProvider will first get the latest block // number and then submit an RPC at that block height, sometimes resulting in the @@ -70,8 +72,8 @@ pub trait MakeableWithProvider { .await? } Connection::Http { url } => { - let client = Client::builder().timeout(HTTP_CLIENT_TIMEOUT).build()?; - let http_provider = Http::new_with_client(url.parse::()?, client); + let http_client = Client::builder().timeout(HTTP_CLIENT_TIMEOUT).build()?; + let http_provider = Http::new_with_client(url.parse::()?, http_client); let retrying_http_provider: RetryingProvider = RetryingProvider::new(http_provider, None, None); self.wrap_with_metrics(retrying_http_provider, locator, signer, middleware_metrics)