Add timeout to inner HttpQuorum providers & all API requests, correctly record # of gas payment events processed (#1115)

* Add timeout to inner HttpQuorum providers, correctly record # of gas payment events processed

* Add timeouts to S3 requests

* Add timeout everywhere
pull/1118/head
Trevor Porter 2 years ago committed by GitHub
parent 19e6691ffb
commit 08b4edecd8
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 9
      rust/abacus-base/src/contract_sync/interchain_gas.rs
  2. 22
      rust/abacus-base/src/types/s3_storage.rs
  3. 9
      rust/abacus-core/src/db/abacus_db.rs
  4. 33
      rust/agents/relayer/src/msg/gas_payment/policies/meets_estimated_cost.rs
  5. 8
      rust/agents/relayer/src/msg/gelato_submitter/mod.rs
  6. 8
      rust/chains/abacus-ethereum/src/trait_builder.rs

@ -72,11 +72,16 @@ where
"[GasPayments]: indexed block range"
);
let mut new_payments_processed: i64 = 0;
for gas_payment in gas_payments.iter() {
db.process_gas_payment(gas_payment)?;
// Attempt to process the gas payment, incrementing new_payments_processed
// if it was processed for the first time.
if db.process_gas_payment(gas_payment)? {
new_payments_processed += 1;
}
}
stored_messages.add(gas_payments.len().try_into()?);
stored_messages.add(new_payments_processed);
db.store_latest_indexed_gas_payment_block(to)?;
from = to + 1;

@ -1,4 +1,4 @@
use std::fmt;
use std::{fmt, time::Duration};
use abacus_core::SignedCheckpoint;
use async_trait::async_trait;
@ -11,9 +11,15 @@ use rusoto_core::{
HttpClient, Region, RusotoError,
};
use rusoto_s3::{GetObjectError, GetObjectRequest, PutObjectRequest, S3Client, S3};
use tokio::time::timeout;
use crate::CheckpointSyncer;
/// The timeout for S3 requests. Rusoto doesn't offer timeout configuration
/// out of the box, so S3 requests must be wrapped with a timeout.
/// See https://github.com/rusoto/rusoto/issues/1795.
const S3_REQUEST_TIMEOUT_SECONDS: u64 = 30;
#[derive(Clone)]
/// Type for reading/writing to S3
pub struct S3Storage {
@ -58,7 +64,11 @@ impl S3Storage {
content_type: Some("application/json".to_owned()),
..Default::default()
};
self.authenticated_client().put_object(req).await?;
timeout(
Duration::from_secs(S3_REQUEST_TIMEOUT_SECONDS),
self.authenticated_client().put_object(req),
)
.await??;
Ok(())
}
@ -69,7 +79,13 @@ impl S3Storage {
bucket: self.bucket.clone(),
..Default::default()
};
match self.anonymous_client().get_object(req).await {
let get_object_result = timeout(
Duration::from_secs(S3_REQUEST_TIMEOUT_SECONDS),
self.anonymous_client().get_object(req),
)
.await?;
match get_object_result {
Ok(res) => match res.body {
Some(body) => Ok(Some(body.map_ok(|b| b.to_vec()).try_concat().await?)),
None => Ok(None),

@ -236,15 +236,17 @@ impl AbacusDB {
/// If the provided gas payment, identified by its metadata, has not been processed,
/// processes the gas payment and records it as processed.
/// Returns whether the gas payment was processed for the first time.
pub fn process_gas_payment(
&self,
gas_payment_with_meta: &InterchainGasPaymentWithMeta,
) -> Result<(), DbError> {
) -> Result<bool, DbError> {
let meta = &gas_payment_with_meta.meta;
// If the gas payment has already been processed, do nothing
if self.retrieve_gas_payment_meta_processed(meta)? {
trace!(gas_payment_with_meta=?gas_payment_with_meta, "Attempted to process an already-processed gas payment");
return Ok(());
// Return false to indicate the gas payment was already processed
return Ok(false);
}
// Set the gas payment as processed
self.store_gas_payment_meta_processed(meta)?;
@ -252,7 +254,8 @@ impl AbacusDB {
// Update the total gas payment for the leaf to include the payment
self.update_gas_payment_for_leaf(&gas_payment_with_meta.payment)?;
Ok(())
// Return true to indicate the gas payment was processed for the first time
Ok(true)
}
/// Record a gas payment, identified by its metadata, as processed

@ -8,10 +8,11 @@ use async_trait::async_trait;
use coingecko::CoinGeckoClient;
use ethers::types::U256;
use eyre::{eyre, Result};
use tokio::sync::RwLock;
use tokio::{sync::RwLock, time::timeout};
use crate::msg::gas_payment::GasPaymentPolicy;
const COINGECKO_API_HTTP_TIMEOUT_SECONDS: u64 = 30;
const CACHE_TTL_SECONDS: u64 = 60;
/// 1 / 100th of a cent
const FIXED_POINT_PRECISION: usize = 1000;
@ -98,17 +99,16 @@ impl CoinGeckoCachingPriceGetter {
cached_usd_prices.insert(coingecko_id, usd_price.into());
}
async fn get_usd_price(&self, coingecko_id: &'static str) -> Result<f64> {
if let Some(usd_price) = self.get_cached_usd_price(coingecko_id).await {
return Ok(usd_price);
}
// Returns a HashMap keyed by coingecko IDs
let api_response = self
.coingecko
.price(&[coingecko_id], &["usd"], false, false, false, false)
.await?;
let usd_price = api_response
async fn get_coingecko_usd_price(&self, coingecko_id: &'static str) -> Result<f64> {
// Make the API request with a timeout, which can't be configured in the library we're using.
// Returns a HashMap keyed by coingecko IDs.
let api_response = timeout(
Duration::from_secs(COINGECKO_API_HTTP_TIMEOUT_SECONDS),
self.coingecko
.price(&[coingecko_id], &["usd"], false, false, false, false),
)
.await??;
api_response
.get(coingecko_id)
.and_then(|p| p.usd)
.ok_or_else(|| {
@ -116,8 +116,15 @@ impl CoinGeckoCachingPriceGetter {
"Unable to get USD price for {} from CoinGecko API response",
coingecko_id
)
})?;
})
}
async fn get_usd_price(&self, coingecko_id: &'static str) -> Result<f64> {
if let Some(usd_price) = self.get_cached_usd_price(coingecko_id).await {
return Ok(usd_price);
}
let usd_price = self.get_coingecko_usd_price(coingecko_id).await?;
self.set_cached_usd_price(coingecko_id, usd_price).await;
Ok(usd_price)

@ -21,6 +21,8 @@ use super::SubmitMessageArgs;
mod sponsored_call_op;
const HTTP_CLIENT_REQUEST_SECONDS: u64 = 30;
#[derive(Debug)]
pub(crate) struct GelatoSubmitter {
/// The Gelato config.
@ -56,6 +58,10 @@ impl GelatoSubmitter {
) -> Self {
let (message_processed_sender, message_processed_receiver) =
mpsc::unbounded_channel::<SubmitMessageArgs>();
let http_client = reqwest::ClientBuilder::new()
.timeout(Duration::from_secs(HTTP_CLIENT_REQUEST_SECONDS))
.build()
.unwrap();
Self {
message_receiver,
inbox_gelato_chain: abacus_domain_id_to_gelato_chain(
@ -65,7 +71,7 @@ impl GelatoSubmitter {
inbox_contracts,
db: abacus_db,
gelato_config,
http_client: reqwest::Client::new(),
http_client,
metrics,
message_processed_sender,
message_processed_receiver,

@ -45,8 +45,10 @@ pub trait MakeableWithProvider {
Connection::HttpQuorum { urls } => {
let rpc_metrics = rpc_metrics.map(|f| f());
let mut builder = QuorumProvider::builder().quorum(Quorum::Majority);
let http_client = Client::builder().timeout(HTTP_CLIENT_TIMEOUT).build()?;
for url in urls.split(',') {
let http_provider: Http = url.parse()?;
let http_provider =
Http::new_with_client(url.parse::<Url>()?, http_client.clone());
// Wrap the inner providers as RetryingProviders rather than the QuorumProvider.
// We've observed issues where the QuorumProvider will first get the latest block
// number and then submit an RPC at that block height, sometimes resulting in the
@ -70,8 +72,8 @@ pub trait MakeableWithProvider {
.await?
}
Connection::Http { url } => {
let client = Client::builder().timeout(HTTP_CLIENT_TIMEOUT).build()?;
let http_provider = Http::new_with_client(url.parse::<Url>()?, client);
let http_client = Client::builder().timeout(HTTP_CLIENT_TIMEOUT).build()?;
let http_provider = Http::new_with_client(url.parse::<Url>()?, http_client);
let retrying_http_provider: RetryingProvider<Http> =
RetryingProvider::new(http_provider, None, None);
self.wrap_with_metrics(retrying_http_provider, locator, signer, middleware_metrics)

Loading…
Cancel
Save