Ported http retrying logic from nomad (#390)

* Ported retrying logic from nomad

* remove unused import
pull/396/head
Mattie Conover 3 years ago committed by GitHub
parent 1d075d11e5
commit 50172104fa
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 1
      rust/Cargo.lock
  2. 1
      rust/chains/abacus-ethereum/Cargo.toml
  3. 5
      rust/chains/abacus-ethereum/src/lib.rs
  4. 4
      rust/chains/abacus-ethereum/src/macros.rs
  5. 125
      rust/chains/abacus-ethereum/src/retrying.rs

1
rust/Cargo.lock generated

@ -107,6 +107,7 @@ dependencies = [
"rocksdb",
"serde 1.0.130",
"serde_json",
"thiserror",
"tokio",
"tracing",
"tracing-futures",

@ -14,6 +14,7 @@ ethers = {git = "https://github.com/gakonst/ethers-rs", branch = "master", featu
ethers-signers = {git = "https://github.com/gakonst/ethers-rs", branch = "master", features = ["aws"]}
ethers-contract = { git = "https://github.com/gakonst/ethers-rs", branch = "master", features=["legacy"] }
async-trait = { version = "0.1.42", default-features = false }
thiserror = { version = "1.0.22", default-features = false }
tracing = "0.1.22"
color-eyre = "0.5.0"
anyhow = "1"

@ -9,7 +9,6 @@ use color_eyre::eyre::Result;
use ethers::providers::Middleware;
use ethers::types::{Address, BlockId, BlockNumber, NameOrAddress, H160};
use num::Num;
use std::convert::TryFrom;
use std::sync::Arc;
#[macro_use]
@ -27,6 +26,10 @@ mod inbox;
#[cfg(not(doctest))]
mod validator_manager;
/// Retrying Provider
mod retrying;
pub use retrying::{RetryingProvider, RetryingProviderError};
/// Ethereum connection configuration
#[derive(Debug, serde::Deserialize, Clone)]
#[serde(tag = "type", rename_all = "camelCase")]

@ -96,8 +96,8 @@ macro_rules! boxed_trait {
boxed_trait!(@finish provider, $($tail)*)
}};
(@http $url:expr, $($tail:tt)*) => {{
let provider =
Arc::new(ethers::providers::Provider::<ethers::providers::Http>::try_from($url)?);
let provider: crate::RetryingProvider<ethers::providers::Http> = $url.parse()?;
let provider = Arc::new(ethers::providers::Provider::new(provider));
boxed_trait!(@finish provider, $($tail)*)
}};
($name:ident, $abi:ident, $trait:ident, $($n:ident:$t:ty),*) => {

@ -0,0 +1,125 @@
use std::{fmt::Debug, str::FromStr, time::Duration};
use async_trait::async_trait;
use ethers::providers::{JsonRpcClient, ProviderError};
use serde::{de::DeserializeOwned, Serialize};
use serde_json::Value;
use thiserror::Error;
use tokio::time::sleep;
use tracing::{debug, instrument, warn};
/// An HTTP Provider with a simple naive exponential backoff built-in
#[derive(Debug, Clone)]
pub struct RetryingProvider<P> {
inner: P,
max_requests: usize,
}
impl<P> RetryingProvider<P> {
/// Instantiate a RetryingProvider
pub fn new(inner: P, max_requests: usize) -> Self {
Self {
inner,
max_requests,
}
}
/// Set the max_requests (and by extension the total time a request can take)
pub fn set_max_requests(&mut self, max_requests: usize) {
self.max_requests = max_requests;
}
/// Get the max_requests
pub fn get_max_requests(&self) -> usize {
self.max_requests
}
}
/// Error type for the RetryingProvider
#[derive(Error, Debug)]
pub enum RetryingProviderError<P>
where
P: JsonRpcClient,
{
// /// An internal error in the JSON RPC Client
// #[error(transparent)]
// JsonRpcClientError(#[from] <P as JsonRpcClient>::Error),
/// Hit max requests
#[error("Hit max requests")]
MaxRequests(Vec<P::Error>),
}
impl<P> From<RetryingProviderError<P>> for ProviderError
where
P: JsonRpcClient + 'static,
<P as JsonRpcClient>::Error: Send + Sync,
{
fn from(src: RetryingProviderError<P>) -> Self {
ProviderError::JsonRpcClientError(Box::new(src))
}
}
#[async_trait]
impl<P> JsonRpcClient for RetryingProvider<P>
where
P: JsonRpcClient + 'static,
<P as JsonRpcClient>::Error: Send + Sync,
{
type Error = RetryingProviderError<P>;
#[instrument(
level = "debug",
err,
skip(params),
fields(params = % serde_json::to_string(& params).unwrap()))
]
async fn request<T, R>(&self, method: &str, params: T) -> Result<R, Self::Error>
where
T: Debug + Serialize + Send + Sync,
R: DeserializeOwned,
{
let mut errors = vec![];
let params = serde_json::to_value(params).expect("valid");
for i in 0..self.max_requests {
let backoff_seconds = 2u64.pow(i as u32);
{
debug!(attempt = i, "Dispatching request");
let fut = match params {
Value::Null => self.inner.request(method, ()),
_ => self.inner.request(method, &params),
};
match fut.await {
Ok(res) => return Ok(res),
Err(e) => {
warn!(
backoff_seconds,
retries_remaining = self.max_requests - i - 1,
error = %e,
method = %method,
"Error in retrying provider",
);
errors.push(e);
}
}
}
sleep(Duration::from_secs(backoff_seconds)).await;
}
return Err(RetryingProviderError::MaxRequests(errors));
}
}
impl<P> FromStr for RetryingProvider<P>
where
P: JsonRpcClient + FromStr,
{
type Err = <P as FromStr>::Err;
fn from_str(src: &str) -> Result<Self, Self::Err> {
Ok(Self::new(src.parse()?, 6))
}
}
Loading…
Cancel
Save