Ported fix provider order (#1437)

* Fix provider wrapping order (#1436)

* Fix provider wrapping order

* Better RPC provider metrics

* Clean after cherry-pick

* Cleanup
pull/1676/head
Mattie Conover 2 years ago committed by GitHub
parent 84224b98be
commit ba5f04a46b
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 7
      rust/chains/hyperlane-ethereum/src/retrying.rs
  2. 41
      rust/chains/hyperlane-ethereum/src/trait_builder.rs
  3. 55
      rust/ethers-prometheus/src/json_rpc_client.rs
  4. 5
      rust/hyperlane-base/src/metrics/json_rpc_client.rs
  5. 2
      rust/hyperlane-base/src/settings/chains.rs

@ -3,6 +3,7 @@ use std::{fmt::Debug, str::FromStr, time::Duration};
use async_trait::async_trait;
use ethers::prelude::HttpClientError;
use ethers::providers::{Http, JsonRpcClient, ProviderError};
use ethers_prometheus::json_rpc_client::PrometheusJsonRpcClient;
use serde::{de::DeserializeOwned, Serialize};
use serde_json::Value;
use thiserror::Error;
@ -156,10 +157,10 @@ where
}
#[async_trait]
impl JsonRpcClient for RetryingProvider<Http> {
type Error = RetryingProviderError<Http>;
impl JsonRpcClient for RetryingProvider<PrometheusJsonRpcClient<Http>> {
type Error = RetryingProviderError<PrometheusJsonRpcClient<Http>>;
#[instrument(skip(self), fields(provider_host = %self.inner.url().host_str().unwrap_or("unknown")))]
#[instrument(skip(self), fields(provider_host = %self.inner.node_host(), chain_name = %self.inner.chain_name()))]
async fn request<T, R>(&self, method: &str, params: T) -> Result<R, Self::Error>
where
T: Debug + Serialize + Send + Sync,

@ -59,24 +59,22 @@ pub trait BuildableWithProvider {
conn: &ConnectionConf,
locator: &ContractLocator,
signer: Option<Signers>,
rpc_metrics: Option<impl FnOnce() -> JsonRpcClientMetrics + Send>,
rpc_metrics: Option<JsonRpcClientMetrics>,
middleware_metrics: Option<(MiddlewareMetrics, PrometheusMiddlewareConf)>,
) -> ChainResult<Self::Output> {
Ok(match conn {
ConnectionConf::HttpQuorum { urls } => {
let rpc_metrics = rpc_metrics.map(|f| f());
let mut builder = QuorumProvider::builder().quorum(Quorum::Majority);
let http_client = Client::builder()
.timeout(HTTP_CLIENT_TIMEOUT)
.build()
.map_err(EthereumProviderConnectionError::from)?;
for url in urls.split(',') {
let http_provider = Http::new_with_client(
url.parse::<Url>().map_err(|e| {
let parsed_url = url.parse::<Url>().map_err(|e| {
EthereumProviderConnectionError::InvalidUrl(e, url.to_owned())
})?,
http_client.clone(),
);
})?;
let http_provider =
Http::new_with_client(parsed_url.clone(), http_client.clone());
// Wrap the inner providers as RetryingProviders rather than the QuorumProvider.
// We've observed issues where the QuorumProvider will first get the latest
// block number and then submit an RPC at that block height,
@ -86,17 +84,15 @@ pub trait BuildableWithProvider {
// RPCs being retried, while retrying at the inner provider
// level will result in only the second RPC being retried
// (the one with the error), which is the desired behavior.
let retrying_provider =
RetryingProvider::new(http_provider, Some(5), Some(1000));
let metrics_provider = self.wrap_rpc_with_metrics(
retrying_provider,
Url::parse(url).map_err(|e| {
EthereumProviderConnectionError::InvalidUrl(e, url.to_owned())
})?,
http_provider,
parsed_url,
&rpc_metrics,
&middleware_metrics,
);
let weighted_provider = WeightedProvider::new(metrics_provider);
let retrying_provider =
RetryingProvider::new(metrics_provider, Some(5), Some(1000));
let weighted_provider = WeightedProvider::new(retrying_provider);
builder = builder.add_provider(weighted_provider);
}
let quorum_provider = builder.build();
@ -104,7 +100,6 @@ pub trait BuildableWithProvider {
.await?
}
ConnectionConf::HttpFallback { urls } => {
let rpc_metrics = rpc_metrics.map(|f| f());
let mut builder = FallbackProvider::builder();
let http_client = Client::builder()
.timeout(HTTP_CLIENT_TIMEOUT)
@ -136,13 +131,17 @@ pub trait BuildableWithProvider {
.timeout(HTTP_CLIENT_TIMEOUT)
.build()
.map_err(EthereumProviderConnectionError::from)?;
let http_provider = Http::new_with_client(
url.parse::<Url>()
.map_err(|e| EthereumProviderConnectionError::InvalidUrl(e, url.clone()))?,
http_client,
let parsed_url = url
.parse::<Url>()
.map_err(|e| EthereumProviderConnectionError::InvalidUrl(e, url.clone()))?;
let http_provider = Http::new_with_client(parsed_url.clone(), http_client);
let metrics_provider = self.wrap_rpc_with_metrics(
http_provider,
parsed_url,
&rpc_metrics,
&middleware_metrics,
);
let retrying_http_provider: RetryingProvider<Http> =
RetryingProvider::new(http_provider, None, None);
let retrying_http_provider = RetryingProvider::new(metrics_provider, None, None);
self.wrap_with_metrics(retrying_http_provider, locator, signer, middleware_metrics)
.await?
}

@ -33,42 +33,31 @@ pub struct JsonRpcClientMetrics {
/// - `chain`: chain name (or chain id if the name is unknown) of the chain
/// the request was made on.
/// - `method`: request method string.
/// - `status`: `success` or `failure` depending on the response. A `success`
/// might still be an "error" but not one with the transport layer.
#[builder(setter(into, strip_option), default)]
request_count: Option<IntCounterVec>,
/// Total number of requests made which resulted in an error from the inner
/// client.
/// - `provider_node`: node this is connecting to, e.g. `alchemy.com`,
/// `quicknode.pro`, or `localhost:8545`.
/// - `chain`: chain name (or chain id if the name is unknown) of the chain
/// the request was made on.
/// - `method`: request method string.
#[builder(setter(into, strip_option), default)]
request_failure_count: Option<IntCounterVec>,
/// Total number of seconds spent making requests.
/// - `provider_node`: node this is connecting to, e.g. `alchemy.com`,
/// `quicknode.pro`, or `localhost:8545`.
/// - `chain`: chain name (or chain id if the name is unknown) of the chain
/// the request was made on.
/// - `method`: request method string.
/// - `status`: `success` or `failure` depending on the response. A `success`
/// might still be an "error" but not one with the transport layer.
#[builder(setter(into, strip_option), default)]
request_duration_seconds: Option<CounterVec>,
}
/// Expected label names for the metric.
pub const REQUEST_COUNT_LABELS: &[&str] = &["provider_node", "chain", "method"];
pub const REQUEST_COUNT_LABELS: &[&str] = &["provider_node", "chain", "method", "status"];
/// Help string for the metric.
pub const REQUEST_COUNT_HELP: &str = "Total number of requests made to this client";
/// Expected label names for the metric.
pub const REQUEST_FAILURE_COUNT_LABELS: &[&str] = &["provider_node", "chain", "method"];
/// Help string for the metric.
pub const REQUEST_FAILURE_COUNT_HELP: &str =
"Total number of requests made which resulted in an error from the inner client";
/// Expected label names for the metric.
pub const REQUEST_DURATION_SECONDS_LABELS: &[&str] = &["provider_node", "chain", "method"];
pub const REQUEST_DURATION_SECONDS_LABELS: &[&str] =
&["provider_node", "chain", "method", "status"];
/// Help string for the metric.
pub const REQUEST_DURATION_SECONDS_HELP: &str = "Total number of seconds spent making requests";
@ -136,6 +125,24 @@ where
}
}
impl<C> PrometheusJsonRpcClient<C> {
/// The "host" part of the URL this node is connecting to. E.g.
/// `avalanche.api.onfinality.io`.
pub fn node_host(&self) -> &str {
self.config.node_host()
}
/// Chain name this RPC client is connected to.
pub fn chain_name(&self) -> &str {
self.config.chain_name()
}
/// The inner RpcClient implementation
pub fn inner(&self) -> &C {
&self.inner
}
}
#[cfg_attr(target_arch = "wasm32", async_trait(?Send))]
#[cfg_attr(not(target_arch = "wasm32"), async_trait)]
impl<C> JsonRpcClient for PrometheusJsonRpcClient<C>
@ -149,21 +156,17 @@ where
T: Debug + Serialize + Send + Sync,
R: DeserializeOwned,
{
let start = Instant::now();
let res = self.inner.request(method, params).await;
let labels = hashmap! {
"provider_node" => self.config.node_host(),
"chain" => self.config.chain_name(),
"method" => method
"method" => method,
"status" => if res.is_ok() { "success" } else { "failure" }
};
if let Some(counter) = &self.metrics.request_count {
counter.with(&labels).inc()
}
let start = Instant::now();
let res = self.inner.request(method, params).await;
if let Some(counter) = &self.metrics.request_failure_count {
if res.is_err() {
counter.with(&labels).inc()
}
}
if let Some(counter) = &self.metrics.request_duration_seconds {
counter
.with(&labels)

@ -12,11 +12,6 @@ pub(crate) fn create_json_rpc_client_metrics(
REQUEST_COUNT_HELP,
REQUEST_COUNT_LABELS,
)?)
.request_failure_count(metrics.new_int_counter(
"request_failure_count",
REQUEST_FAILURE_COUNT_HELP,
REQUEST_FAILURE_COUNT_LABELS,
)?)
.request_duration_seconds(metrics.new_counter(
"request_duration_seconds",
REQUEST_DURATION_SECONDS_HELP,

@ -375,7 +375,7 @@ impl ChainSetup {
{
let signer = self.ethereum_signer().await?;
let metrics_conf = self.metrics_conf(metrics.agent_name(), &signer);
let rpc_metrics = Some(|| metrics.json_rpc_client_metrics());
let rpc_metrics = Some(metrics.json_rpc_client_metrics());
let middleware_metrics = Some((metrics.provider_metrics(), metrics_conf));
let res = builder
.build_with_connection_conf(conf, locator, signer, rpc_metrics, middleware_metrics)

Loading…
Cancel
Save