|
|
|
@ -6,9 +6,9 @@ use serde::{de::DeserializeOwned, Serialize}; |
|
|
|
|
use serde_json::Value; |
|
|
|
|
use thiserror::Error; |
|
|
|
|
use tokio::time::sleep; |
|
|
|
|
use tracing::{debug, error, info, instrument, trace, warn}; |
|
|
|
|
use tracing::{debug, error, instrument, trace, warn}; |
|
|
|
|
|
|
|
|
|
use crate::{HttpClientError, QuorumProvider}; |
|
|
|
|
use crate::HttpClientError; |
|
|
|
|
|
|
|
|
|
const METHODS_TO_NOT_RETRY: &[&str] = &[ |
|
|
|
|
"eth_estimateGas", |
|
|
|
@ -72,7 +72,7 @@ where |
|
|
|
|
/// The retrying provider logic which accepts a matcher function that can
|
|
|
|
|
/// handle specific cases for different underlying provider
|
|
|
|
|
/// implementations.
|
|
|
|
|
#[instrument(level = "error", skip_all, fields(method = %method))] |
|
|
|
|
#[instrument(skip_all, fields(method = %method))] |
|
|
|
|
async fn request_with_retry<T, R>( |
|
|
|
|
&self, |
|
|
|
|
method: &str, |
|
|
|
@ -160,7 +160,7 @@ where |
|
|
|
|
impl JsonRpcClient for RetryingProvider<Http> { |
|
|
|
|
type Error = RetryingProviderError<Http>; |
|
|
|
|
|
|
|
|
|
#[instrument(level = "error", skip(self), fields(provider_host = %self.inner.url().host_str().unwrap_or("unknown")))] |
|
|
|
|
#[instrument(skip(self), fields(provider_host = %self.inner.url().host_str().unwrap_or("unknown")))] |
|
|
|
|
async fn request<T, R>(&self, method: &str, params: T) -> Result<R, Self::Error> |
|
|
|
|
where |
|
|
|
|
T: Debug + Serialize + Send + Sync, |
|
|
|
@ -169,12 +169,7 @@ impl JsonRpcClient for RetryingProvider<Http> { |
|
|
|
|
self.request_with_retry::<T, R>(method, params, |res, attempt, next_backoff_ms| match res { |
|
|
|
|
Ok(res) => HandleMethod::Accept(res), |
|
|
|
|
Err(HttpClientError::ReqwestError(e)) => { |
|
|
|
|
info!( |
|
|
|
|
next_backoff_ms, |
|
|
|
|
retries_remaining = self.max_requests - attempt, |
|
|
|
|
error = %e, |
|
|
|
|
"ReqwestError in http provider.", |
|
|
|
|
); |
|
|
|
|
warn!(next_backoff_ms, retries_remaining = self.max_requests - attempt, error = %e, "ReqwestError in http provider."); |
|
|
|
|
HandleMethod::Retry(HttpClientError::ReqwestError(e)) |
|
|
|
|
} |
|
|
|
|
Err(HttpClientError::JsonRpcError(e)) => { |
|
|
|
@ -185,12 +180,12 @@ impl JsonRpcClient for RetryingProvider<Http> { |
|
|
|
|
warn!(attempt, next_backoff_ms, error = %e, "JsonRpcError in http provider; not retrying."); |
|
|
|
|
HandleMethod::Halt(HttpClientError::JsonRpcError(e)) |
|
|
|
|
} else { |
|
|
|
|
info!(attempt, next_backoff_ms, error = %e, "JsonRpcError in http provider."); |
|
|
|
|
warn!(attempt, next_backoff_ms, error = %e, "JsonRpcError in http provider."); |
|
|
|
|
HandleMethod::Retry(HttpClientError::JsonRpcError(e)) |
|
|
|
|
} |
|
|
|
|
} |
|
|
|
|
Err(HttpClientError::SerdeJson { err, text }) => { |
|
|
|
|
info!(attempt, next_backoff_ms, error = %err, text = text, "SerdeJson error in http provider"); |
|
|
|
|
warn!(attempt, next_backoff_ms, error = %err, text = text, "SerdeJson error in http provider"); |
|
|
|
|
HandleMethod::Retry(HttpClientError::SerdeJson { err, text }) |
|
|
|
|
} |
|
|
|
|
}) |
|
|
|
@ -198,61 +193,6 @@ impl JsonRpcClient for RetryingProvider<Http> { |
|
|
|
|
} |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
#[async_trait] |
|
|
|
|
impl<C> JsonRpcClient for RetryingProvider<QuorumProvider<C>> |
|
|
|
|
where |
|
|
|
|
C: JsonRpcClient + 'static, |
|
|
|
|
{ |
|
|
|
|
type Error = RetryingProviderError<QuorumProvider<C>>; |
|
|
|
|
|
|
|
|
|
#[instrument(level = "error", skip_all, fields(method = %method))] |
|
|
|
|
async fn request<T, R>(&self, method: &str, params: T) -> Result<R, Self::Error> |
|
|
|
|
where |
|
|
|
|
T: Debug + Serialize + Send + Sync, |
|
|
|
|
R: DeserializeOwned, |
|
|
|
|
{ |
|
|
|
|
use HandleMethod::*; |
|
|
|
|
self.request_with_retry::<T, R>(method, params, |res, attempt, next_backoff_ms| { |
|
|
|
|
let handling = match res { |
|
|
|
|
Ok(v) => Accept(v), |
|
|
|
|
Err(ProviderError::CustomError(e)) => Retry(ProviderError::CustomError(e)), |
|
|
|
|
Err(ProviderError::EnsError(e)) => Retry(ProviderError::EnsError(e)), |
|
|
|
|
Err(ProviderError::EnsNotOwned(e)) => Halt(ProviderError::EnsNotOwned(e)), |
|
|
|
|
Err(ProviderError::HTTPError(e)) => Retry(ProviderError::HTTPError(e)), |
|
|
|
|
Err(ProviderError::HexError(e)) => Halt(ProviderError::HexError(e)), |
|
|
|
|
Err(ProviderError::JsonRpcClientError(e)) => { |
|
|
|
|
if METHODS_TO_NOT_RETRY.contains(&method) { |
|
|
|
|
Halt(ProviderError::JsonRpcClientError(e)) |
|
|
|
|
} else { |
|
|
|
|
Retry(ProviderError::JsonRpcClientError(e)) |
|
|
|
|
} |
|
|
|
|
} |
|
|
|
|
Err(ProviderError::SerdeJson(e)) => Retry(ProviderError::SerdeJson(e)), |
|
|
|
|
Err(ProviderError::SignerUnavailable) => Halt(ProviderError::SignerUnavailable), |
|
|
|
|
Err(ProviderError::UnsupportedNodeClient) => { |
|
|
|
|
Halt(ProviderError::UnsupportedNodeClient) |
|
|
|
|
} |
|
|
|
|
Err(ProviderError::UnsupportedRPC) => Halt(ProviderError::UnsupportedRPC), |
|
|
|
|
}; |
|
|
|
|
|
|
|
|
|
match &handling { |
|
|
|
|
Accept(_) => { |
|
|
|
|
trace!("Quorum reached successfully."); |
|
|
|
|
} |
|
|
|
|
Halt(e) => { |
|
|
|
|
error!(attempt, next_backoff_ms, error = %e, "Failed to reach quorum; not retrying."); |
|
|
|
|
} |
|
|
|
|
Retry(e) => { |
|
|
|
|
warn!(attempt, next_backoff_ms, error = %e, "Failed to reach quorum; suggesting retry."); |
|
|
|
|
} |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
handling |
|
|
|
|
}) |
|
|
|
|
.await |
|
|
|
|
} |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
impl<P> FromStr for RetryingProvider<P> |
|
|
|
|
where |
|
|
|
|
P: JsonRpcClient + FromStr, |
|
|
|
|