Quorum metrics (#1039)

* Refactor ethers-prometheus in preparation

* Rename metrics struct

* Add JsonRpcMetrics wrapper

* Plumbing new metrics in

* Rename label for clarity
pull/1045/head
Mattie Conover 2 years ago committed by GitHub
parent 44361545f8
commit f19110f4de
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 5
      rust/Cargo.lock
  2. 23
      rust/abacus-base/src/metrics/core.rs
  3. 26
      rust/abacus-base/src/metrics/json_rpc_client.rs
  4. 1
      rust/abacus-base/src/metrics/mod.rs
  5. 6
      rust/abacus-base/src/metrics/provider.rs
  6. 21
      rust/abacus-base/src/settings/chains.rs
  7. 50
      rust/abacus-base/src/settings/mod.rs
  8. 7
      rust/chains/abacus-ethereum/src/retrying.rs
  9. 68
      rust/chains/abacus-ethereum/src/trait_builder.rs
  10. 174
      rust/ethers-prometheus/src/json_rpc_client.rs
  11. 549
      rust/ethers-prometheus/src/lib.rs
  12. 7
      rust/ethers-prometheus/src/middleware/error.rs
  13. 564
      rust/ethers-prometheus/src/middleware/mod.rs
  14. 4
      typescript/infra/config/environments/testnet2/agent.ts

5
rust/Cargo.lock generated

@ -5212,13 +5212,12 @@ dependencies = [
[[package]]
name = "url"
version = "2.2.2"
version = "2.3.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "a507c383b2d33b5fc35d1861e77e6b383d158b2da5e14fe51b83dfedf6fd578c"
checksum = "22fe195a4f217c25b25cb5058ced57059824a678474874038dc88d211bf508d3"
dependencies = [
"form_urlencoded",
"idna",
"matches",
"percent-encoding",
]

@ -12,8 +12,10 @@ use prometheus::{
};
use tokio::task::JoinHandle;
use ethers_prometheus::ProviderMetrics;
use ethers_prometheus::json_rpc_client::JsonRpcClientMetrics;
use ethers_prometheus::middleware::MiddlewareMetrics;
use crate::metrics::json_rpc_client::create_json_rpc_client_metrics;
use crate::metrics::provider::create_provider_metrics;
use super::NAMESPACE;
@ -51,8 +53,12 @@ pub struct CoreMetrics {
outbox_state: IntGaugeVec,
latest_checkpoint: IntGaugeVec,
/// Set of metrics that tightly wrap the JsonRpcClient for use with the
/// quorum provider.
json_rpc_client_metrics: OnceCell<JsonRpcClientMetrics>,
/// Set of provider-specific metrics. These only need to get created once.
provider_metrics: OnceCell<ProviderMetrics>,
provider_metrics: OnceCell<MiddlewareMetrics>,
}
impl CoreMetrics {
@ -191,12 +197,13 @@ impl CoreMetrics {
outbox_state,
latest_checkpoint,
json_rpc_client_metrics: OnceCell::new(),
provider_metrics: OnceCell::new(),
})
}
/// Create the provider metrics attached to this core metrics instance.
pub fn provider_metrics(&self) -> ProviderMetrics {
pub fn provider_metrics(&self) -> MiddlewareMetrics {
self.provider_metrics
.get_or_init(|| {
create_provider_metrics(self).expect("Failed to create provider metrics!")
@ -204,6 +211,16 @@ impl CoreMetrics {
.clone()
}
/// Create the json rpc provider metrics attached to this core metrics
/// instance.
pub fn json_rpc_client_metrics(&self) -> JsonRpcClientMetrics {
self.json_rpc_client_metrics
.get_or_init(|| {
create_json_rpc_client_metrics(self).expect("Failed to create rpc client metrics!")
})
.clone()
}
/// Create and register a new int gauge.
pub fn new_int_gauge(
&self,

@ -0,0 +1,26 @@
use ethers_prometheus::json_rpc_client::*;
use eyre::Result;
use crate::CoreMetrics;
pub(crate) fn create_json_rpc_client_metrics(
metrics: &CoreMetrics,
) -> Result<JsonRpcClientMetrics> {
Ok(JsonRpcClientMetricsBuilder::default()
.request_count(metrics.new_int_counter(
"request_count",
REQUEST_COUNT_HELP,
REQUEST_COUNT_LABELS,
)?)
.request_failure_count(metrics.new_int_counter(
"request_failure_count",
REQUEST_FAILURE_COUNT_HELP,
REQUEST_FAILURE_COUNT_LABELS,
)?)
.request_duration_seconds(metrics.new_counter(
"request_duration_seconds",
REQUEST_DURATION_SECONDS_HELP,
REQUEST_DURATION_SECONDS_LABELS,
)?)
.build()?)
}

@ -6,4 +6,5 @@ pub const NAMESPACE: &str = "abacus";
mod core;
pub use self::core::*;
mod json_rpc_client;
mod provider;

@ -1,11 +1,11 @@
use eyre::Result;
use ethers_prometheus::*;
use ethers_prometheus::middleware::*;
use crate::{CoreMetrics, NETWORK_HISTOGRAM_BUCKETS};
pub(crate) fn create_provider_metrics(metrics: &CoreMetrics) -> Result<ProviderMetrics> {
Ok(ProviderMetricsBuilder::default()
pub(crate) fn create_provider_metrics(metrics: &CoreMetrics) -> Result<MiddlewareMetrics> {
Ok(MiddlewareMetricsBuilder::default()
.block_height(metrics.new_int_gauge(
"block_height",
BLOCK_HEIGHT_HELP,

@ -8,7 +8,9 @@ use abacus_ethereum::{
InboxBuilder, InboxValidatorManagerBuilder, InterchainGasPaymasterBuilder,
MakeableWithProvider, OutboxBuilder,
};
use ethers_prometheus::{ChainInfo, ContractInfo, PrometheusMiddlewareConf, WalletInfo};
use ethers_prometheus::middleware::{
ChainInfo, ContractInfo, PrometheusMiddlewareConf, WalletInfo,
};
use crate::{
CoreMetrics, InboxValidatorManagerVariants, InboxValidatorManagers, InboxVariants, Inboxes,
@ -35,7 +37,8 @@ impl Default for ChainConf {
#[derive(Clone, Debug, Deserialize)]
#[serde(rename_all = "camelCase")]
pub struct GelatoConf {
/// Whether to use the Gelato Relay service for transactions submitted to the chain.
/// Whether to use the Gelato Relay service for transactions submitted to
/// the chain.
pub enabled: String,
}
@ -80,8 +83,8 @@ pub struct ChainSetup<T> {
/// Set this key to disable the inbox. Does nothing for outboxes.
#[serde(default)]
pub disabled: Option<String>,
/// Configure chain-specific metrics information. This will automatically add all contract
/// addresses but will not override any set explicitly.
/// Configure chain-specific metrics information. This will automatically
/// add all contract addresses but will not override any set explicitly.
/// Use `metrics_conf()` to get the metrics.
#[serde(default)]
pub metrics_conf: PrometheusMiddlewareConf,
@ -118,6 +121,7 @@ impl ChainSetup<OutboxAddresses> {
.into(),
},
signer,
Some(|| metrics.json_rpc_client_metrics()),
Some((metrics.provider_metrics(), self.metrics_conf())),
)
.await?,
@ -151,6 +155,7 @@ impl ChainSetup<OutboxAddresses> {
.into(),
},
signer,
Some(|| metrics.json_rpc_client_metrics()),
Some((metrics.provider_metrics(), self.metrics_conf())),
)
.await?,
@ -160,7 +165,8 @@ impl ChainSetup<OutboxAddresses> {
}
}
/// Get a clone of the metrics conf with correctly configured contract information.
/// Get a clone of the metrics conf with correctly configured contract
/// information.
pub fn metrics_conf(&self) -> PrometheusMiddlewareConf {
let mut cfg = self.metrics_conf.clone();
@ -211,6 +217,7 @@ impl ChainSetup<InboxAddresses> {
.into(),
},
signer,
Some(|| metrics.json_rpc_client_metrics()),
Some((metrics.provider_metrics(), metrics_conf)),
)
.await?,
@ -242,6 +249,7 @@ impl ChainSetup<InboxAddresses> {
.into(),
},
signer,
Some(|| metrics.json_rpc_client_metrics()),
Some((metrics.provider_metrics(), metrics_conf)),
)
.await?,
@ -250,7 +258,8 @@ impl ChainSetup<InboxAddresses> {
}
}
/// Get a clone of the metrics conf with correctly configured contract information.
/// Get a clone of the metrics conf with correctly configured contract
/// information.
pub fn metrics_conf(
&self,
agent_name: &str,

@ -17,21 +17,22 @@
//!
//! Agents read settings from the config files and/or env.
//!
//! Config files are loaded from `rust/config/default` unless specified otherwise,
//! i.e. via $RUN_ENV and $BASE_CONFIG (see the definition of `decl_settings` in
//! `rust/abacus-base/src/macros.rs`).
//! Config files are loaded from `rust/config/default` unless specified
//! otherwise, i.e. via $RUN_ENV and $BASE_CONFIG (see the definition of
//! `decl_settings` in `rust/abacus-base/src/macros.rs`).
//!
//! #### N.B.: Environment variable names correspond 1:1 with cfg file's JSON object hierarchy.
//!
//! In particular, note that any environment variables whose names are prefixed with:
//! In particular, note that any environment variables whose names are prefixed
//! with:
//!
//! * `ABC_BASE`
//! * `ABC_BASE`
//!
//! * `ABC_[agentname]`, where `[agentmame]` is agent-specific, e.g. `ABC_VALIDATOR` or
//! `ABC_RELAYER`.
//! * `ABC_[agentname]`, where `[agentmame]` is agent-specific, e.g.
//! `ABC_VALIDATOR` or `ABC_RELAYER`.
//!
//! will be read as an override to be applied against the hierarchical structure of
//! the configuration provided by the json config file at
//! will be read as an override to be applied against the hierarchical structure
//! of the configuration provided by the json config file at
//! `./config/$RUN_ENV/$BASE_CONFIG`.
//!
//! For example, if the config file `example_config.json` is:
@ -50,10 +51,12 @@
//! }
//! ```
//!
//! and an environment variable is supplied which defines `ABC_BASE_INBOXES_TEST3_DOMAIN=1`, then
//! the `decl_settings` macro in `rust/abacus-base/src/macros.rs` will directly override the
//! 'domain' field found in the json config to be `1`, since the fields in the environment variable
//! name describe the path traversal to arrive at this field in the JSON config object.
//! and an environment variable is supplied which defines
//! `ABC_BASE_INBOXES_TEST3_DOMAIN=1`, then the `decl_settings` macro in
//! `rust/abacus-base/src/macros.rs` will directly override the 'domain' field
//! found in the json config to be `1`, since the fields in the environment
//! variable name describe the path traversal to arrive at this field in the
//! JSON config object.
//!
//! ### Configuration value precedence
//!
@ -166,7 +169,8 @@ impl SignerConf {
pub struct IndexSettings {
/// The height at which to start indexing the Outbox contract
pub from: Option<String>,
/// The number of blocks to query at once at which to start indexing the Outbox contract
/// The number of blocks to query at once at which to start indexing the
/// Outbox contract
pub chunk: Option<String>,
}
@ -233,7 +237,8 @@ pub struct Settings {
}
impl Settings {
/// Private to preserve linearity of AgentCore::from_settings -- creating an agent consumes the settings.
/// Private to preserve linearity of AgentCore::from_settings -- creating an
/// agent consumes the settings.
fn clone(&self) -> Self {
Self {
db: self.db.clone(),
@ -353,8 +358,6 @@ impl Settings {
&self,
metrics: &CoreMetrics,
) -> Result<OutboxIndexers, Report> {
let signer = self.get_signer(&self.outbox.name).await;
let metrics = Some((metrics.provider_metrics(), self.outbox.metrics_conf()));
match &self.outbox.chain {
ChainConf::Ethereum(conn) => Ok(OutboxIndexers::Ethereum(
OutboxIndexerBuilder {
@ -374,8 +377,9 @@ impl Settings {
.parse::<ethers::types::Address>()?
.into(),
},
signer,
metrics,
self.get_signer(&self.outbox.name).await,
Some(|| metrics.json_rpc_client_metrics()),
Some((metrics.provider_metrics(), self.outbox.metrics_conf())),
)
.await?,
)),
@ -390,9 +394,6 @@ impl Settings {
&self,
metrics: &CoreMetrics,
) -> Result<InterchainGasPaymasterIndexers, Report> {
let signer = self.get_signer(&self.outbox.name).await;
let metrics = Some((metrics.provider_metrics(), self.outbox.metrics_conf()));
match &self.outbox.chain {
ChainConf::Ethereum(conn) => Ok(InterchainGasPaymasterIndexers::Ethereum(
InterchainGasPaymasterIndexerBuilder {
@ -419,8 +420,9 @@ impl Settings {
.parse::<ethers::types::Address>()?
.into(),
},
signer,
metrics,
self.get_signer(&self.outbox.name).await,
Some(|| metrics.json_rpc_client_metrics()),
Some((metrics.provider_metrics(), self.outbox.metrics_conf())),
)
.await?,
)),

@ -199,8 +199,11 @@ impl JsonRpcClient for RetryingProvider<Http> {
}
#[async_trait]
impl JsonRpcClient for RetryingProvider<QuorumProvider<Http>> {
type Error = RetryingProviderError<QuorumProvider<Http>>;
impl<C> JsonRpcClient for RetryingProvider<QuorumProvider<C>>
where
C: JsonRpcClient + 'static,
{
type Error = RetryingProviderError<QuorumProvider<C>>;
#[instrument(level = "error", skip_all, fields(method = %method))]
async fn request<T, R>(&self, method: &str, params: T) -> Result<R, Self::Error>

@ -1,3 +1,4 @@
use std::fmt::Write;
use std::sync::Arc;
use std::time::Duration;
@ -6,10 +7,16 @@ use ethers::prelude::{
Http, JsonRpcClient, Middleware, NonceManagerMiddleware, Provider, Quorum, QuorumProvider,
SignerMiddleware, WeightedProvider, Ws,
};
use reqwest::{Client, Url};
use abacus_core::{ContractLocator, Signers};
use ethers_prometheus::{PrometheusMiddleware, PrometheusMiddlewareConf, ProviderMetrics};
use reqwest::{Client, Url};
use ethers_prometheus::json_rpc_client::{
JsonRpcClientMetrics, JsonRpcClientMetricsBuilder, NodeInfo, PrometheusJsonRpcClient,
PrometheusJsonRpcClientConfig,
};
use ethers_prometheus::middleware::{
MiddlewareMetrics, PrometheusMiddleware, PrometheusMiddlewareConf,
};
use crate::{Connection, RetryingProvider};
@ -31,19 +38,27 @@ pub trait MakeableWithProvider {
conn: Connection,
locator: &ContractLocator,
signer: Option<Signers>,
metrics: Option<(ProviderMetrics, PrometheusMiddlewareConf)>,
rpc_metrics: Option<impl FnOnce() -> JsonRpcClientMetrics + Send>,
middleware_metrics: Option<(MiddlewareMetrics, PrometheusMiddlewareConf)>,
) -> eyre::Result<Self::Output> {
Ok(match conn {
Connection::HttpQuorum { urls } => {
let rpc_metrics = rpc_metrics.map(|f| f());
let mut builder = QuorumProvider::builder().quorum(Quorum::Majority);
for url in urls.split(',') {
let http_provider: Http = url.parse()?;
let weighted_provider = WeightedProvider::new(http_provider);
let metrics_provider = self.wrap_rpc_with_metrics(
http_provider,
Url::parse(url)?,
&rpc_metrics,
&middleware_metrics,
);
let weighted_provider = WeightedProvider::new(metrics_provider);
builder = builder.add_provider(weighted_provider);
}
let quorum_provider = builder.build();
let retrying = RetryingProvider::new(quorum_provider, Some(3), Some(1000));
self.wrap_with_metrics(retrying, locator, signer, metrics)
self.wrap_with_metrics(retrying, locator, signer, middleware_metrics)
.await?
}
Connection::Http { url } => {
@ -51,16 +66,53 @@ pub trait MakeableWithProvider {
let http_provider = Http::new_with_client(url.parse::<Url>()?, client);
let retrying_http_provider: RetryingProvider<Http> =
RetryingProvider::new(http_provider, None, None);
self.wrap_with_metrics(retrying_http_provider, locator, signer, metrics)
self.wrap_with_metrics(retrying_http_provider, locator, signer, middleware_metrics)
.await?
}
Connection::Ws { url } => {
let ws = Ws::connect(url).await?;
self.wrap_with_metrics(ws, locator, signer, metrics).await?
self.wrap_with_metrics(ws, locator, signer, middleware_metrics)
.await?
}
})
}
/// Wrap a JsonRpcClient with metrics for use with a quorum provider.
fn wrap_rpc_with_metrics<C>(
&self,
client: C,
url: Url,
rpc_metrics: &Option<JsonRpcClientMetrics>,
middleware_metrics: &Option<(MiddlewareMetrics, PrometheusMiddlewareConf)>,
) -> PrometheusJsonRpcClient<C> {
PrometheusJsonRpcClient::new(
client,
rpc_metrics
.clone()
.unwrap_or_else(|| JsonRpcClientMetricsBuilder::default().build().unwrap()),
PrometheusJsonRpcClientConfig {
node: Some(NodeInfo {
host: {
let mut s = String::new();
if let Some(host) = url.host_str() {
s.push_str(host);
if let Some(port) = url.port() {
write!(&mut s, ":{port}").unwrap();
}
Some(s)
} else {
None
}
},
}),
// steal the chain info from the middleware conf
chain: middleware_metrics
.as_ref()
.and_then(|(_, v)| v.chain.clone()),
},
)
}
/// Wrap the provider creation with metrics if provided; this is the second
/// step
async fn wrap_with_metrics<P>(
@ -68,7 +120,7 @@ pub trait MakeableWithProvider {
client: P,
locator: &ContractLocator,
signer: Option<Signers>,
metrics: Option<(ProviderMetrics, PrometheusMiddlewareConf)>,
metrics: Option<(MiddlewareMetrics, PrometheusMiddlewareConf)>,
) -> eyre::Result<Self::Output>
where
P: JsonRpcClient + 'static,

@ -0,0 +1,174 @@
//! A wrapper around a JsonRpcClient to give insight at the request level. This
//! was designed specifically for use with the quorum provider.
use std::fmt::{Debug, Formatter};
use std::time::Instant;
use async_trait::async_trait;
use derive_builder::Builder;
use ethers::prelude::JsonRpcClient;
use maplit::hashmap;
use prometheus::{CounterVec, IntCounterVec};
use serde::de::DeserializeOwned;
use serde::Serialize;
pub use crate::ChainInfo;
/// Some basic information about a node.
#[derive(Clone, Debug)]
#[cfg_attr(feature = "serde", derive(serde::Deserialize))]
#[cfg_attr(feature = "serde", serde(tag = "type", rename_all = "camelCase"))]
pub struct NodeInfo {
/// The host of the node, e.g. `alchemy.com`, `quicknode.pro`, or
/// `localhost:8545`.
pub host: Option<String>,
}
/// Container for all the relevant rpc client metrics.
#[derive(Clone, Builder)]
pub struct JsonRpcClientMetrics {
/// Total number of requests made to this client.
/// - `provider_node`: node this is connecting to, e.g. `alchemy.com`,
/// `quicknode.pro`, or `localhost:8545`.
/// - `chain`: chain name (or chain id if the name is unknown) of the chain
/// the request was made on.
/// - `method`: request method string.
#[builder(setter(into, strip_option), default)]
request_count: Option<IntCounterVec>,
/// Total number of requests made which resulted in an error from the inner
/// client.
/// - `provider_node`: node this is connecting to, e.g. `alchemy.com`,
/// `quicknode.pro`, or `localhost:8545`.
/// - `chain`: chain name (or chain id if the name is unknown) of the chain
/// the request was made on.
/// - `method`: request method string.
#[builder(setter(into, strip_option), default)]
request_failure_count: Option<IntCounterVec>,
/// Total number of seconds spent making requests.
/// - `provider_node`: node this is connecting to, e.g. `alchemy.com`,
/// `quicknode.pro`, or `localhost:8545`.
/// - `chain`: chain name (or chain id if the name is unknown) of the chain
/// the request was made on.
/// - `method`: request method string.
#[builder(setter(into, strip_option), default)]
request_duration_seconds: Option<CounterVec>,
}
/// Expected label names for the metric.
pub const REQUEST_COUNT_LABELS: &[&str] = &["provider_node", "chain", "method"];
/// Help string for the metric.
pub const REQUEST_COUNT_HELP: &str = "Total number of requests made to this client";
/// Expected label names for the metric.
pub const REQUEST_FAILURE_COUNT_LABELS: &[&str] = &["provider_node", "chain", "method"];
/// Help string for the metric.
pub const REQUEST_FAILURE_COUNT_HELP: &str =
"Total number of requests made which resulted in an error from the inner client";
/// Expected label names for the metric.
pub const REQUEST_DURATION_SECONDS_LABELS: &[&str] = &["provider_node", "chain", "method"];
/// Help string for the metric.
pub const REQUEST_DURATION_SECONDS_HELP: &str = "Total number of seconds spent making requests";
/// Configuration for the prometheus JsonRpcClioent. This can be loaded via
/// serde.
#[derive(Default, Clone, Debug)]
#[cfg_attr(feature = "serde", derive(serde::Deserialize))]
#[cfg_attr(feature = "serde", serde(tag = "type", rename_all = "camelCase"))]
pub struct PrometheusJsonRpcClientConfig {
/// Information about what node this client is connecting to.
pub node: Option<NodeInfo>,
/// Information about the chain this client is for.
pub chain: Option<ChainInfo>,
}
impl PrometheusJsonRpcClientConfig {
fn node_host(&self) -> &str {
self.node
.as_ref()
.and_then(|n| n.host.as_ref())
.map(|h| h.as_str())
.unwrap_or("unknown")
}
fn chain_name(&self) -> &str {
self.chain
.as_ref()
.and_then(|c| c.name.as_ref())
.map(|n| n.as_str())
.unwrap_or("unknown")
}
}
/// An ethers-rs JsonRpcClient wrapper that instruments requests with prometheus
/// metrics. To make this as flexible as possible, the metric vecs need to be
/// created and named externally, they should follow the naming convention here
/// and must include the described labels.
pub struct PrometheusJsonRpcClient<C> {
inner: C,
metrics: JsonRpcClientMetrics,
config: PrometheusJsonRpcClientConfig,
}
impl<C> PrometheusJsonRpcClient<C> {
/// Wrap a JsonRpcClient with metrics.
pub fn new(
client: C,
metrics: JsonRpcClientMetrics,
config: PrometheusJsonRpcClientConfig,
) -> Self {
Self {
inner: client,
metrics,
config,
}
}
}
impl<C> Debug for PrometheusJsonRpcClient<C>
where
C: JsonRpcClient,
{
fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
write!(f, "PrometheusJsonRpcClient({:?})", self.inner)
}
}
#[cfg_attr(target_arch = "wasm32", async_trait(?Send))]
#[cfg_attr(not(target_arch = "wasm32"), async_trait)]
impl<C> JsonRpcClient for PrometheusJsonRpcClient<C>
where
C: JsonRpcClient,
{
type Error = C::Error;
async fn request<T, R>(&self, method: &str, params: T) -> Result<R, Self::Error>
where
T: Debug + Serialize + Send + Sync,
R: DeserializeOwned,
{
let labels = hashmap! {
"provider_node" => self.config.node_host(),
"chain" => self.config.chain_name(),
"method" => method
};
let start = Instant::now();
let res = self.inner.request(method, params).await;
if let Some(counter) = &self.metrics.request_failure_count {
if res.is_err() {
counter.with(&labels).inc()
}
}
if let Some(counter) = &self.metrics.request_count {
counter.with(&labels).inc()
}
if let Some(counter) = &self.metrics.request_duration_seconds {
counter
.with(&labels)
.inc_by((Instant::now() - start).as_secs_f64())
};
res
}
}

@ -3,562 +3,23 @@
#![forbid(unsafe_code)]
#![warn(missing_docs)]
use std::collections::HashMap;
use std::fmt::{Debug, Formatter};
use std::future::Future;
use std::sync::Arc;
use std::time::{Duration, Instant};
use async_trait::async_trait;
use derive_builder::Builder;
use ethers::prelude::*;
use ethers::types::transaction::eip2718::TypedTransaction;
use ethers::utils::hex::ToHex;
use log::{debug, trace, warn};
use maplit::hashmap;
use prometheus::{CounterVec, GaugeVec, HistogramVec, IntCounterVec, IntGaugeVec};
use static_assertions::assert_impl_all;
use tokio::sync::RwLock;
use tokio::time::MissedTickBehavior;
use contracts::erc_20::Erc20;
pub use error::PrometheusMiddlewareError;
use ethers::prelude::U256;
mod contracts;
mod error;
/// Some basic information about a token.
#[derive(Clone, Debug)]
#[cfg_attr(feature = "serde", derive(serde::Deserialize))]
#[cfg_attr(feature = "serde", serde(tag = "type", rename_all = "camelCase"))]
pub struct TokenInfo {
/// Full name of the token. E.g. Ether.
pub name: String,
/// Token symbol. E.g. ETH.
pub symbol: String,
/// Number of
pub decimals: u8,
}
impl Default for TokenInfo {
fn default() -> Self {
Self {
name: "Unknown".into(),
symbol: "".into(),
decimals: 18,
}
}
}
/// Some basic information about a wallet.
#[derive(Clone, Debug)]
#[cfg_attr(feature = "serde", derive(serde::Deserialize))]
#[cfg_attr(feature = "serde", serde(tag = "type", rename_all = "camelCase"))]
pub struct WalletInfo {
/// A human-friendly name for the wallet. This should be a short string like "relayer".
pub name: Option<String>,
}
/// Some basic information about a contract.
#[derive(Clone, Debug)]
#[cfg_attr(feature = "serde", derive(serde::Deserialize))]
#[cfg_attr(feature = "serde", serde(tag = "type", rename_all = "camelCase"))]
pub struct ContractInfo {
/// A human-friendly name for the contract. This should be a short string like "inbox".
pub name: Option<String>,
/// Mapping from function selectors to human readable names.
pub functions: HashMap<Selector, String>,
}
pub mod json_rpc_client;
pub mod middleware;
/// Some basic information about a chain.
#[derive(Clone, Debug)]
#[cfg_attr(feature = "serde", derive(serde::Deserialize))]
#[cfg_attr(feature = "serde", serde(tag = "type", rename_all = "camelCase"))]
pub struct ChainInfo {
/// A human-friendly name for the chain. This should be a short string like "kovan".
/// A human-friendly name for the chain. This should be a short string like
/// "kovan".
pub name: Option<String>,
}
/// Expected label names for the `block_height` metric.
pub const BLOCK_HEIGHT_LABELS: &[&str] = &["chain"];
/// Help string for the metric.
pub const BLOCK_HEIGHT_HELP: &str = "Tracks the current block height of the chain";
/// Expected label names for the `gas_price_gwei` metric.
pub const GAS_PRICE_GWEI_LABELS: &[&str] = &["chain"];
/// Help string for the metric.
pub const GAS_PRICE_GWEI_HELP: &str = "Tracks the current gas price of the chain";
/// Expected label names for the `contract_call_duration_seconds` metric.
pub const CONTRACT_CALL_DURATION_SECONDS_LABELS: &[&str] = &[
"chain",
"contract_name",
"contract_address",
"function_name",
"function_selector",
];
/// Help string for the metric.
pub const CONTRACT_CALL_DURATION_SECONDS_HELP: &str =
"Contract call durations by contract and function";
/// Expected label names for the `contract_call_count` metric.
pub const CONTRACT_CALL_COUNT_LABELS: &[&str] = &[
"chain",
"contract_name",
"contract_address",
"function_name",
"function_selector",
];
/// Help string for the metric.
pub const CONTRACT_CALL_COUNT_HELP: &str = "Contract invocations by contract and function";
/// Expected label names for the `transaction_send_duration_seconds` metric.
pub const TRANSACTION_SEND_DURATION_SECONDS_LABELS: &[&str] = &["chain", "address_from"];
/// Help string for the metric.
pub const TRANSACTION_SEND_DURATION_SECONDS_HELP: &str =
"Time taken to submit the transaction (not counting time for it to be included)";
/// Expected label names for the `transaction_send_total` metric.
pub const TRANSACTION_SEND_TOTAL_LABELS: &[&str] =
&["chain", "address_from", "address_to", "txn_status"];
/// Help string for the metric.
pub const TRANSACTION_SEND_TOTAL_HELP: &str = "Number of transactions sent";
/// Expected label names for the `wallet_balance` metric.
pub const WALLET_BALANCE_LABELS: &[&str] = &[
"chain",
"wallet_address",
"wallet_name",
"token_address",
"token_symbol",
"token_name",
];
/// Help string for the metric.
pub const WALLET_BALANCE_HELP: &str = "Current balance of eth and other tokens in the `tokens` map for the wallet addresses in the `wallets` set";
/// Container for all the relevant middleware metrics.
#[derive(Clone, Builder)]
pub struct ProviderMetrics {
/// Tracks the current block height of the chain.
/// - `chain`: the chain name (or ID if the name is unknown) of the chain the block number refers to.
#[builder(setter(into, strip_option), default)]
block_height: Option<IntGaugeVec>,
/// Tracks the current gas price of the chain. Uses the base_fee_per_gas if available or else
/// the median of the transactions.
/// - `chain`: the chain name (or chain ID if the name is unknown) of the chain the gas price refers to.
#[builder(setter(into, strip_option), default)]
gas_price_gwei: Option<GaugeVec>,
/// Contract call durations by contract and function
/// - `chain`: the chain name (or chain ID if the name is unknown) of the chain the tx occurred on.
/// - `contract_name`: contract name.
/// - `contract_address`: contract address (hex).
/// - `function_name`: contract function name.
/// - `function_selector`: contract function hash (hex).
#[builder(setter(into, strip_option), default)]
contract_call_duration_seconds: Option<CounterVec>,
/// Contract invocations by contract and function.
/// - `chain`: the chain name (or chain ID if the name is unknown) of the chain the tx occurred on.
/// - `contract_name`: contract name.
/// - `contract_address`: contract address (hex).
/// - `function_name`: contract function name.
/// - `function_selector`: contract function hash (hex).
#[builder(setter(into, strip_option), default)]
contract_call_count: Option<IntCounterVec>,
/// Time taken to submit the transaction (not counting time for it to be included).
/// - `chain`: the chain name (or chain ID if the name is unknown) of the chain the tx occurred on.
/// - `address_from`: source address of the transaction.
#[builder(setter(into, strip_option), default)]
transaction_send_duration_seconds: Option<HistogramVec>,
/// Number of transactions sent.
/// - `chain`: the chain name (or chain ID if the name is unknown) of the chain the tx occurred on.
/// - `address_from`: source address of the transaction.
/// - `address_to`: destination address of the transaction.
#[builder(setter(into, strip_option), default)]
transaction_send_total: Option<IntCounterVec>,
// /// Gas spent on completed transactions.
// /// - `chain`: the chain name (or ID if the name is unknown) of the chain the tx occurred on.
// /// - `address_from`: source address of the transaction.
// /// - `address_to`: destination address of the transaction.
// #[builder(setter(into, strip_option), default)]
// transaction_send_gas_eth_total: Option<CounterVec>,
/// Current balance of eth and other tokens in the `tokens` map for the wallet addresses in the
/// `wallets` set.
/// - `chain`: the chain name (or chain ID if the name is unknown) of the chain the tx occurred on.
/// - `wallet_address`: Address of the wallet holding the funds.
/// - `wallet_name`: Name of the address holding the funds.
/// - `token_address`: Address of the token.
/// - `token_symbol`: Symbol of the token.
/// - `token_name`: Full name of the token.
#[builder(setter(into, strip_option), default)]
wallet_balance: Option<GaugeVec>,
}
/// An ethers-rs middleware that instruments calls with prometheus metrics. To make this is flexible
/// as possible, the metric vecs need to be created and named externally, they should follow the
/// naming convention here and must include the described labels.
pub struct PrometheusMiddleware<M> {
inner: Arc<M>,
metrics: ProviderMetrics,
conf: Arc<RwLock<PrometheusMiddlewareConf>>,
}
/// Configuration for the prometheus middleware. This can be loaded via serde.
#[derive(Default, Clone, Debug)]
#[cfg_attr(feature = "serde", derive(serde::Deserialize))]
#[cfg_attr(feature = "serde", serde(tag = "type", rename_all = "camelCase"))]
pub struct PrometheusMiddlewareConf {
/// The tokens to track and identifying info
#[cfg_attr(feature = "serde", serde(default))]
pub tokens: HashMap<Address, TokenInfo>,
/// The wallets to track and identifying info
#[cfg_attr(feature = "serde", serde(default))]
pub wallets: HashMap<Address, WalletInfo>,
/// Contract info for more useful metrics
#[cfg_attr(feature = "serde", serde(default))]
pub contracts: HashMap<Address, ContractInfo>,
/// Information about the chain this provider is for.
pub chain: Option<ChainInfo>,
}
assert_impl_all!(PrometheusMiddlewareConf: Send, Sync);
assert_impl_all!(tokio::sync::RwLockReadGuard<PrometheusMiddlewareConf>: Send);
#[cfg_attr(target_arch = "wasm32", async_trait(?Send))]
#[cfg_attr(not(target_arch = "wasm32"), async_trait)]
impl<M: Middleware> Middleware for PrometheusMiddleware<M> {
type Error = PrometheusMiddlewareError<M::Error>;
type Provider = M::Provider;
type Inner = M;
fn inner(&self) -> &Self::Inner {
&self.inner
}
async fn send_transaction<T: Into<TypedTransaction> + Send + Sync>(
&self,
tx: T,
block: Option<BlockId>,
) -> Result<PendingTransaction<'_, Self::Provider>, Self::Error> {
let start = Instant::now();
let tx: TypedTransaction = tx.into();
let chain = {
let data = self.conf.read().await;
chain_name(&data.chain).to_owned()
};
let addr_from: String = tx
.from()
.map(|v| v.encode_hex())
.unwrap_or_else(|| "none".into());
let addr_to = tx
.to()
.map(|v| match v {
NameOrAddress::Name(v) => v.clone(),
NameOrAddress::Address(v) => v.encode_hex(),
})
.unwrap_or_else(|| "none".into());
if let Some(m) = &self.metrics.transaction_send_total {
m.with(&hashmap! {
"chain" => chain.as_str(),
"address_from" => addr_from.as_str(),
"address_to" => addr_to.as_str(),
"txn_status" => "dispatched"
})
.inc()
}
let result = self.inner.send_transaction(tx, block).await;
if let Some(m) = &self.metrics.transaction_send_duration_seconds {
let duration = (Instant::now() - start).as_secs_f64();
m.with(&hashmap! {
"chain" => chain.as_str(),
"address_from" => addr_from.as_str(),
})
.observe(duration);
}
if let Some(m) = &self.metrics.transaction_send_total {
m.with(&hashmap! {
"chain" => chain.as_str(),
"address_from" => addr_from.as_str(),
"address_to" => addr_to.as_str(),
"txn_status" => if result.is_ok() { "completed" } else { "failed" }
})
.inc()
}
Ok(result?)
}
async fn call(
&self,
tx: &TypedTransaction,
block: Option<BlockId>,
) -> Result<Bytes, Self::Error> {
let start = Instant::now();
let result = self.inner.call(tx, block).await;
if self.metrics.contract_call_duration_seconds.is_some()
|| self.metrics.contract_call_count.is_some()
{
let data = self.conf.read().await;
let chain = chain_name(&data.chain);
let empty_hm = HashMap::default();
let (contract_addr, contract_name, contract_fns) = tx
.to()
.and_then(|addr| match addr {
NameOrAddress::Name(n) => {
// not supporting ENS names for lookups by address right now
Some((n.clone(), n.clone(), &empty_hm))
}
NameOrAddress::Address(a) => data
.contracts
.get(a)
.map(|c| (c.name.as_deref().unwrap_or("unknown"), &c.functions))
.map(|(n, m)| (a.encode_hex(), n.into(), m)),
})
.unwrap_or_else(|| ("".into(), "unknown".into(), &empty_hm));
let fn_selector: Option<Selector> = tx
.data()
.filter(|data| data.0.len() >= 4)
.map(|data| [data.0[0], data.0[1], data.0[2], data.0[3]]);
let fn_name: &str = fn_selector
.and_then(|s| contract_fns.get(&s))
.map(|s| s.as_str())
.unwrap_or("unknown");
let fn_selector: String = fn_selector
.map(|s| format!("{:02x}{:02x}{:02x}{:02x}", s[0], s[1], s[2], s[3]))
.unwrap_or_else(|| "unknown".into());
let labels = hashmap! {
"chain" => chain,
"contract_name" => contract_name.as_str(),
"contract_address" => contract_addr.as_str(),
"function_name" => fn_name,
"function_selector" => &fn_selector,
};
if let Some(m) = &self.metrics.contract_call_count {
m.with(&labels).inc();
}
if let Some(m) = &self.metrics.contract_call_duration_seconds {
m.with(&labels)
.inc_by((Instant::now() - start).as_secs_f64());
}
}
Ok(result?)
}
}
impl<M> PrometheusMiddleware<M> {
/// Create a new prometheus middleware instance.
/// - `inner`: The wrapped middleware.
/// - `metrics`: Metrics objects we will report to.
/// - `tokens`: Tokens to watch the balances of.
/// - `wallets`: Wallets to watch the balances of.
pub fn new(inner: M, metrics: ProviderMetrics, conf: PrometheusMiddlewareConf) -> Self {
Self {
inner: Arc::new(inner),
metrics,
conf: Arc::new(RwLock::new(conf)),
}
}
/// Start tracking metrics for a new token.
pub async fn track_new_token(&self, addr: Address, info: TokenInfo) {
self.track_new_tokens([(addr, info)]).await;
}
/// Start tacking metrics for new tokens.
pub async fn track_new_tokens(&self, iter: impl IntoIterator<Item = (Address, TokenInfo)>) {
let mut data = self.conf.write().await;
for (addr, info) in iter {
data.tokens.insert(addr, info);
}
}
/// Start tracking metrics for a new wallet.
pub async fn track_new_wallet(&self, addr: Address, info: WalletInfo) {
self.track_new_wallets([(addr, info)]).await;
}
/// Start tracking metrics for new wallets.
pub async fn track_new_wallets(&self, iter: impl IntoIterator<Item = (Address, WalletInfo)>) {
let mut data = self.conf.write().await;
for (addr, info) in iter {
data.wallets.insert(addr, info);
}
}
}
impl<M: Middleware> PrometheusMiddleware<M> {
/// Start the update cycle using tokio. This must be called if you want
/// some metrics to be updated automatically. Alternatively you could call update yourself.
pub fn start_updating_on_interval(
self: &Arc<Self>,
period: Duration,
) -> impl Future<Output = ()> + Send {
let zelf = Arc::downgrade(self);
async move {
let mut interval = tokio::time::interval(period);
interval.set_missed_tick_behavior(MissedTickBehavior::Skip);
loop {
if let Some(zelf) = zelf.upgrade() {
zelf.update().await;
} else {
return;
}
interval.tick().await;
}
}
}
}
impl<M: Middleware + Send + Sync> PrometheusMiddleware<M> {
/// Update gauges. You should submit this on a schedule to your runtime to be collected once
/// on a regular interval that ideally aligns with the prometheus scrape interval.
pub fn update(&self) -> impl Future<Output = ()> {
// all metrics are Arcs internally so just clone the ones we want to report for.
let wallet_balance = self.metrics.wallet_balance.clone();
let block_height = self.metrics.block_height.clone();
let gas_price_gwei = self.metrics.gas_price_gwei.clone();
let data_ref = self.conf.clone();
let client = self.inner.clone();
async move {
let data = data_ref.read().await;
let chain = chain_name(&data.chain);
debug!("Updating metrics for chain ({chain})");
if block_height.is_some() || gas_price_gwei.is_some() {
Self::update_block_details(&*client, chain, block_height, gas_price_gwei).await;
}
if let Some(wallet_balance) = wallet_balance {
Self::update_wallet_balances(client.clone(), &*data, chain, wallet_balance).await;
}
// more metrics to come...
}
}
async fn update_block_details(
client: &M,
chain: &str,
block_height: Option<IntGaugeVec>,
gas_price_gwei: Option<GaugeVec>,
) {
let current_block = if let Ok(Some(b)) = client.get_block(BlockNumber::Latest).await {
b
} else {
return;
};
if let Some(block_height) = block_height {
let height = current_block
.number
.expect("Block number should always be Some for included blocks.")
.as_u64() as i64;
trace!("Block height for chain {chain} is {height}");
block_height
.with(&hashmap! { "chain" => chain })
.set(height);
}
if let Some(gas_price_gwei) = gas_price_gwei {
if let Some(london_fee) = current_block.base_fee_per_gas {
let gas = u256_as_scaled_f64(london_fee, 18) * 1e9;
trace!("Gas price for chain {chain} is {gas:.1}gwei");
gas_price_gwei.with(&hashmap! { "chain" => chain }).set(gas);
} else {
trace!("Gas price for chain {chain} unknown, chain is pre-london");
}
}
}
async fn update_wallet_balances(
client: Arc<M>,
data: &PrometheusMiddlewareConf,
chain: &str,
wallet_balance_metric: GaugeVec,
) {
for (wallet_addr, wallet_info) in data.wallets.iter() {
let wallet_addr_str: String = wallet_addr.encode_hex();
let wallet_name = wallet_info.name.as_deref().unwrap_or("none");
match client.get_balance(*wallet_addr, None).await {
Ok(balance) => {
// Okay, so the native type is not a token, but whatever, close enough.
// Note: This is ETH for many chains, but not all so that is why we use `N` and `Native`
// TODO: can we get away with scaling as 18 in all cases here? I am guessing not.
let balance = u256_as_scaled_f64(balance, 18);
trace!("Wallet {wallet_name} ({wallet_addr_str}) on chain {chain} balance is {balance} of the native currency");
wallet_balance_metric
.with(&hashmap! {
"chain" => chain,
"wallet_address" => wallet_addr_str.as_str(),
"wallet_name" => wallet_name,
"token_address" => "none",
"token_symbol" => "Native",
"token_name" => "Native"
}).set(balance)
},
Err(e) => warn!("Metric update failed for wallet {wallet_name} ({wallet_addr_str}) on chain {chain} balance for native currency; {e}")
}
for (token_addr, token) in data.tokens.iter() {
let token_addr_str: String = token_addr.encode_hex();
let balance = match Erc20::new(*token_addr, client.clone())
.balance_of(*wallet_addr)
.call()
.await
{
Ok(b) => u256_as_scaled_f64(b, token.decimals),
Err(e) => {
warn!("Metric update failed for wallet {wallet_name} ({wallet_addr_str}) on chain {chain} balance for {name}; {e}", name=token.name);
continue;
}
};
trace!("Wallet {wallet_name} ({wallet_addr_str}) on chain {chain} balance is {balance}{}", token.symbol);
wallet_balance_metric
.with(&hashmap! {
"chain" => chain,
"wallet_address" => wallet_addr_str.as_str(),
"wallet_name" => wallet_name,
"token_address" => token_addr_str.as_str(),
"token_symbol" => token.symbol.as_str(),
"token_name" => token.symbol.as_str()
})
.set(balance);
}
}
}
}
impl<M: Middleware> Debug for PrometheusMiddleware<M> {
fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
write!(f, "PrometheusMiddleware({:?})", self.inner)
}
}
/// Uniform way to name the chain.
fn chain_name(chain: &Option<ChainInfo>) -> &str {
chain
.as_ref()
.and_then(|c| c.name.as_deref())
.unwrap_or("unknown")
}
/// Convert a u256 scaled integer value into the corresponding f64 value.
fn u256_as_scaled_f64(value: U256, decimals: u8) -> f64 {
value.to_f64_lossy() / (10u64.pow(decimals as u32) as f64)

@ -1,9 +1,10 @@
use crate::FromErr;
use std::error::Error;
use std::fmt::{Debug, Display, Formatter};
/// For now this is just a thin wrapper around the underlying error type. Might want to extend this
/// later.
use ethers::prelude::FromErr;
/// For now this is just a thin wrapper around the underlying error type. Might
/// want to extend this later.
pub struct PrometheusMiddlewareError<E>(E);
impl<E: Debug> Debug for PrometheusMiddlewareError<E> {

@ -0,0 +1,564 @@
//! A middleware layer which collects metrics about operations made with a
//! provider.
use std::collections::HashMap;
use std::fmt::{Debug, Formatter};
use std::future::Future;
use std::sync::Arc;
use std::time::{Duration, Instant};
use async_trait::async_trait;
use derive_builder::Builder;
use ethers::prelude::*;
use ethers::types::transaction::eip2718::TypedTransaction;
use ethers::utils::hex::ToHex;
use log::{debug, trace, warn};
use maplit::hashmap;
use prometheus::{CounterVec, GaugeVec, HistogramVec, IntCounterVec, IntGaugeVec};
use static_assertions::assert_impl_all;
use tokio::sync::RwLock;
use tokio::time::MissedTickBehavior;
pub use error::PrometheusMiddlewareError;
use crate::contracts::erc_20::Erc20;
use crate::u256_as_scaled_f64;
pub use crate::ChainInfo;
mod error;
/// Some basic information about a token.
#[derive(Clone, Debug)]
#[cfg_attr(feature = "serde", derive(serde::Deserialize))]
#[cfg_attr(feature = "serde", serde(tag = "type", rename_all = "camelCase"))]
pub struct TokenInfo {
/// Full name of the token. E.g. Ether.
pub name: String,
/// Token symbol. E.g. ETH.
pub symbol: String,
/// Number of
pub decimals: u8,
}
impl Default for TokenInfo {
fn default() -> Self {
Self {
name: "Unknown".into(),
symbol: "".into(),
decimals: 18,
}
}
}
/// Some basic information about a wallet.
#[derive(Clone, Debug)]
#[cfg_attr(feature = "serde", derive(serde::Deserialize))]
#[cfg_attr(feature = "serde", serde(tag = "type", rename_all = "camelCase"))]
pub struct WalletInfo {
/// A human-friendly name for the wallet. This should be a short string like
/// "relayer".
pub name: Option<String>,
}
/// Some basic information about a contract.
#[derive(Clone, Debug)]
#[cfg_attr(feature = "serde", derive(serde::Deserialize))]
#[cfg_attr(feature = "serde", serde(tag = "type", rename_all = "camelCase"))]
pub struct ContractInfo {
/// A human-friendly name for the contract. This should be a short string
/// like "inbox".
pub name: Option<String>,
/// Mapping from function selectors to human readable names.
pub functions: HashMap<Selector, String>,
}
/// Expected label names for the `block_height` metric.
pub const BLOCK_HEIGHT_LABELS: &[&str] = &["chain"];
/// Help string for the metric.
pub const BLOCK_HEIGHT_HELP: &str = "Tracks the current block height of the chain";
/// Expected label names for the `gas_price_gwei` metric.
pub const GAS_PRICE_GWEI_LABELS: &[&str] = &["chain"];
/// Help string for the metric.
pub const GAS_PRICE_GWEI_HELP: &str = "Tracks the current gas price of the chain";
/// Expected label names for the `contract_call_duration_seconds` metric.
pub const CONTRACT_CALL_DURATION_SECONDS_LABELS: &[&str] = &[
"chain",
"contract_name",
"contract_address",
"function_name",
"function_selector",
];
/// Help string for the metric.
pub const CONTRACT_CALL_DURATION_SECONDS_HELP: &str =
"Contract call durations by contract and function";
/// Expected label names for the `contract_call_count` metric.
pub const CONTRACT_CALL_COUNT_LABELS: &[&str] = &[
"chain",
"contract_name",
"contract_address",
"function_name",
"function_selector",
];
/// Help string for the metric.
pub const CONTRACT_CALL_COUNT_HELP: &str = "Contract invocations by contract and function";
/// Expected label names for the `transaction_send_duration_seconds` metric.
pub const TRANSACTION_SEND_DURATION_SECONDS_LABELS: &[&str] = &["chain", "address_from"];
/// Help string for the metric.
pub const TRANSACTION_SEND_DURATION_SECONDS_HELP: &str =
"Time taken to submit the transaction (not counting time for it to be included)";
/// Expected label names for the `transaction_send_total` metric.
pub const TRANSACTION_SEND_TOTAL_LABELS: &[&str] =
&["chain", "address_from", "address_to", "txn_status"];
/// Help string for the metric.
pub const TRANSACTION_SEND_TOTAL_HELP: &str = "Number of transactions sent";
/// Expected label names for the `wallet_balance` metric.
pub const WALLET_BALANCE_LABELS: &[&str] = &[
"chain",
"wallet_address",
"wallet_name",
"token_address",
"token_symbol",
"token_name",
];
/// Help string for the metric.
pub const WALLET_BALANCE_HELP: &str = "Current balance of eth and other tokens in the `tokens` map for the wallet addresses in the `wallets` set";
/// Container for all the relevant middleware metrics.
#[derive(Clone, Builder)]
pub struct MiddlewareMetrics {
/// Tracks the current block height of the chain.
/// - `chain`: the chain name (or ID if the name is unknown) of the chain
/// the block number refers to.
#[builder(setter(into, strip_option), default)]
block_height: Option<IntGaugeVec>,
/// Tracks the current gas price of the chain. Uses the base_fee_per_gas if
/// available or else the median of the transactions.
/// - `chain`: the chain name (or chain ID if the name is unknown) of the
/// chain the gas price refers to.
#[builder(setter(into, strip_option), default)]
gas_price_gwei: Option<GaugeVec>,
/// Contract call durations by contract and function
/// - `chain`: the chain name (or chain ID if the name is unknown) of the
/// chain the tx occurred on.
/// - `contract_name`: contract name.
/// - `contract_address`: contract address (hex).
/// - `function_name`: contract function name.
/// - `function_selector`: contract function hash (hex).
#[builder(setter(into, strip_option), default)]
contract_call_duration_seconds: Option<CounterVec>,
/// Contract invocations by contract and function.
/// - `chain`: the chain name (or chain ID if the name is unknown) of the
/// chain the tx occurred on.
/// - `contract_name`: contract name.
/// - `contract_address`: contract address (hex).
/// - `function_name`: contract function name.
/// - `function_selector`: contract function hash (hex).
#[builder(setter(into, strip_option), default)]
contract_call_count: Option<IntCounterVec>,
/// Time taken to submit the transaction (not counting time for it to be
/// included).
/// - `chain`: the chain name (or chain ID if the name is unknown) of the
/// chain the tx occurred on.
/// - `address_from`: source address of the transaction.
#[builder(setter(into, strip_option), default)]
transaction_send_duration_seconds: Option<HistogramVec>,
/// Number of transactions sent.
/// - `chain`: the chain name (or chain ID if the name is unknown) of the
/// chain the tx occurred on.
/// - `address_from`: source address of the transaction.
/// - `address_to`: destination address of the transaction.
#[builder(setter(into, strip_option), default)]
transaction_send_total: Option<IntCounterVec>,
// /// Gas spent on completed transactions.
// /// - `chain`: the chain name (or ID if the name is unknown) of the chain the tx occurred
// on. /// - `address_from`: source address of the transaction.
// /// - `address_to`: destination address of the transaction.
// #[builder(setter(into, strip_option), default)]
// transaction_send_gas_eth_total: Option<CounterVec>,
/// Current balance of eth and other tokens in the `tokens` map for the
/// wallet addresses in the `wallets` set.
/// - `chain`: the chain name (or chain ID if the name is unknown) of the
/// chain the tx occurred on.
/// - `wallet_address`: Address of the wallet holding the funds.
/// - `wallet_name`: Name of the address holding the funds.
/// - `token_address`: Address of the token.
/// - `token_symbol`: Symbol of the token.
/// - `token_name`: Full name of the token.
#[builder(setter(into, strip_option), default)]
wallet_balance: Option<GaugeVec>,
}
/// An ethers-rs middleware that instruments calls with prometheus metrics. To
/// make this as flexible as possible, the metric vecs need to be created and
/// named externally, they should follow the naming convention here and must
/// include the described labels.
pub struct PrometheusMiddleware<M> {
inner: Arc<M>,
metrics: MiddlewareMetrics,
conf: Arc<RwLock<PrometheusMiddlewareConf>>,
}
/// Configuration for the prometheus middleware. This can be loaded via serde.
#[derive(Default, Clone, Debug)]
#[cfg_attr(feature = "serde", derive(serde::Deserialize))]
#[cfg_attr(feature = "serde", serde(tag = "type", rename_all = "camelCase"))]
pub struct PrometheusMiddlewareConf {
/// The tokens to track and identifying info
#[cfg_attr(feature = "serde", serde(default))]
pub tokens: HashMap<Address, TokenInfo>,
/// The wallets to track and identifying info
#[cfg_attr(feature = "serde", serde(default))]
pub wallets: HashMap<Address, WalletInfo>,
/// Contract info for more useful metrics
#[cfg_attr(feature = "serde", serde(default))]
pub contracts: HashMap<Address, ContractInfo>,
/// Information about the chain this provider is for.
pub chain: Option<ChainInfo>,
}
assert_impl_all!(PrometheusMiddlewareConf: Send, Sync);
assert_impl_all!(tokio::sync::RwLockReadGuard<PrometheusMiddlewareConf>: Send);
#[cfg_attr(target_arch = "wasm32", async_trait(?Send))]
#[cfg_attr(not(target_arch = "wasm32"), async_trait)]
impl<M: Middleware> Middleware for PrometheusMiddleware<M> {
type Error = PrometheusMiddlewareError<M::Error>;
type Provider = M::Provider;
type Inner = M;
fn inner(&self) -> &Self::Inner {
&self.inner
}
async fn send_transaction<T: Into<TypedTransaction> + Send + Sync>(
&self,
tx: T,
block: Option<BlockId>,
) -> Result<PendingTransaction<'_, Self::Provider>, Self::Error> {
let start = Instant::now();
let tx: TypedTransaction = tx.into();
let chain = {
let data = self.conf.read().await;
chain_name(&data.chain).to_owned()
};
let addr_from: String = tx
.from()
.map(|v| v.encode_hex())
.unwrap_or_else(|| "none".into());
let addr_to = tx
.to()
.map(|v| match v {
NameOrAddress::Name(v) => v.clone(),
NameOrAddress::Address(v) => v.encode_hex(),
})
.unwrap_or_else(|| "none".into());
if let Some(m) = &self.metrics.transaction_send_total {
m.with(&hashmap! {
"chain" => chain.as_str(),
"address_from" => addr_from.as_str(),
"address_to" => addr_to.as_str(),
"txn_status" => "dispatched"
})
.inc()
}
let result = self.inner.send_transaction(tx, block).await;
if let Some(m) = &self.metrics.transaction_send_duration_seconds {
let duration = (Instant::now() - start).as_secs_f64();
m.with(&hashmap! {
"chain" => chain.as_str(),
"address_from" => addr_from.as_str(),
})
.observe(duration);
}
if let Some(m) = &self.metrics.transaction_send_total {
m.with(&hashmap! {
"chain" => chain.as_str(),
"address_from" => addr_from.as_str(),
"address_to" => addr_to.as_str(),
"txn_status" => if result.is_ok() { "completed" } else { "failed" }
})
.inc()
}
Ok(result?)
}
async fn call(
&self,
tx: &TypedTransaction,
block: Option<BlockId>,
) -> Result<Bytes, Self::Error> {
let start = Instant::now();
let result = self.inner.call(tx, block).await;
if self.metrics.contract_call_duration_seconds.is_some()
|| self.metrics.contract_call_count.is_some()
{
let data = self.conf.read().await;
let chain = chain_name(&data.chain);
let empty_hm = HashMap::default();
let (contract_addr, contract_name, contract_fns) = tx
.to()
.and_then(|addr| match addr {
NameOrAddress::Name(n) => {
// not supporting ENS names for lookups by address right now
Some((n.clone(), n.clone(), &empty_hm))
}
NameOrAddress::Address(a) => data
.contracts
.get(a)
.map(|c| (c.name.as_deref().unwrap_or("unknown"), &c.functions))
.map(|(n, m)| (a.encode_hex(), n.into(), m)),
})
.unwrap_or_else(|| ("".into(), "unknown".into(), &empty_hm));
let fn_selector: Option<Selector> = tx
.data()
.filter(|data| data.0.len() >= 4)
.map(|data| [data.0[0], data.0[1], data.0[2], data.0[3]]);
let fn_name: &str = fn_selector
.and_then(|s| contract_fns.get(&s))
.map(|s| s.as_str())
.unwrap_or("unknown");
let fn_selector: String = fn_selector
.map(|s| format!("{:02x}{:02x}{:02x}{:02x}", s[0], s[1], s[2], s[3]))
.unwrap_or_else(|| "unknown".into());
let labels = hashmap! {
"chain" => chain,
"contract_name" => contract_name.as_str(),
"contract_address" => contract_addr.as_str(),
"function_name" => fn_name,
"function_selector" => &fn_selector,
};
if let Some(m) = &self.metrics.contract_call_count {
m.with(&labels).inc();
}
if let Some(m) = &self.metrics.contract_call_duration_seconds {
m.with(&labels)
.inc_by((Instant::now() - start).as_secs_f64());
}
}
Ok(result?)
}
}
impl<M> PrometheusMiddleware<M> {
/// Create a new prometheus middleware instance.
/// - `inner`: The wrapped middleware.
/// - `metrics`: Metrics objects we will report to.
/// - `tokens`: Tokens to watch the balances of.
/// - `wallets`: Wallets to watch the balances of.
pub fn new(inner: M, metrics: MiddlewareMetrics, conf: PrometheusMiddlewareConf) -> Self {
Self {
inner: Arc::new(inner),
metrics,
conf: Arc::new(RwLock::new(conf)),
}
}
/// Start tracking metrics for a new token.
pub async fn track_new_token(&self, addr: Address, info: TokenInfo) {
self.track_new_tokens([(addr, info)]).await;
}
/// Start tacking metrics for new tokens.
pub async fn track_new_tokens(&self, iter: impl IntoIterator<Item = (Address, TokenInfo)>) {
let mut data = self.conf.write().await;
for (addr, info) in iter {
data.tokens.insert(addr, info);
}
}
/// Start tracking metrics for a new wallet.
pub async fn track_new_wallet(&self, addr: Address, info: WalletInfo) {
self.track_new_wallets([(addr, info)]).await;
}
/// Start tracking metrics for new wallets.
pub async fn track_new_wallets(&self, iter: impl IntoIterator<Item = (Address, WalletInfo)>) {
let mut data = self.conf.write().await;
for (addr, info) in iter {
data.wallets.insert(addr, info);
}
}
}
impl<M: Middleware> PrometheusMiddleware<M> {
/// Start the update cycle using tokio. This must be called if you want
/// some metrics to be updated automatically. Alternatively you could call
/// update yourself.
pub fn start_updating_on_interval(
self: &Arc<Self>,
period: Duration,
) -> impl Future<Output = ()> + Send {
let zelf = Arc::downgrade(self);
async move {
let mut interval = tokio::time::interval(period);
interval.set_missed_tick_behavior(MissedTickBehavior::Skip);
loop {
if let Some(zelf) = zelf.upgrade() {
zelf.update().await;
} else {
return;
}
interval.tick().await;
}
}
}
}
impl<M: Middleware + Send + Sync> PrometheusMiddleware<M> {
/// Update gauges. You should submit this on a schedule to your runtime to
/// be collected once on a regular interval that ideally aligns with the
/// prometheus scrape interval.
pub fn update(&self) -> impl Future<Output = ()> {
// all metrics are Arcs internally so just clone the ones we want to report for.
let wallet_balance = self.metrics.wallet_balance.clone();
let block_height = self.metrics.block_height.clone();
let gas_price_gwei = self.metrics.gas_price_gwei.clone();
let data_ref = self.conf.clone();
let client = self.inner.clone();
async move {
let data = data_ref.read().await;
let chain = chain_name(&data.chain);
debug!("Updating metrics for chain ({chain})");
if block_height.is_some() || gas_price_gwei.is_some() {
Self::update_block_details(&*client, chain, block_height, gas_price_gwei).await;
}
if let Some(wallet_balance) = wallet_balance {
Self::update_wallet_balances(client.clone(), &*data, chain, wallet_balance).await;
}
// more metrics to come...
}
}
async fn update_block_details(
client: &M,
chain: &str,
block_height: Option<IntGaugeVec>,
gas_price_gwei: Option<GaugeVec>,
) {
let current_block = if let Ok(Some(b)) = client.get_block(BlockNumber::Latest).await {
b
} else {
return;
};
if let Some(block_height) = block_height {
let height = current_block
.number
.expect("Block number should always be Some for included blocks.")
.as_u64() as i64;
trace!("Block height for chain {chain} is {height}");
block_height
.with(&hashmap! { "chain" => chain })
.set(height);
}
if let Some(gas_price_gwei) = gas_price_gwei {
if let Some(london_fee) = current_block.base_fee_per_gas {
let gas = u256_as_scaled_f64(london_fee, 18) * 1e9;
trace!("Gas price for chain {chain} is {gas:.1}gwei");
gas_price_gwei.with(&hashmap! { "chain" => chain }).set(gas);
} else {
trace!("Gas price for chain {chain} unknown, chain is pre-london");
}
}
}
async fn update_wallet_balances(
client: Arc<M>,
data: &PrometheusMiddlewareConf,
chain: &str,
wallet_balance_metric: GaugeVec,
) {
for (wallet_addr, wallet_info) in data.wallets.iter() {
let wallet_addr_str: String = wallet_addr.encode_hex();
let wallet_name = wallet_info.name.as_deref().unwrap_or("none");
match client.get_balance(*wallet_addr, None).await {
Ok(balance) => {
// Okay, so the native type is not a token, but whatever, close enough.
// Note: This is ETH for many chains, but not all so that is why we use `N` and `Native`
// TODO: can we get away with scaling as 18 in all cases here? I am guessing not.
let balance = u256_as_scaled_f64(balance, 18);
trace!("Wallet {wallet_name} ({wallet_addr_str}) on chain {chain} balance is {balance} of the native currency");
wallet_balance_metric
.with(&hashmap! {
"chain" => chain,
"wallet_address" => wallet_addr_str.as_str(),
"wallet_name" => wallet_name,
"token_address" => "none",
"token_symbol" => "Native",
"token_name" => "Native"
}).set(balance)
},
Err(e) => warn!("Metric update failed for wallet {wallet_name} ({wallet_addr_str}) on chain {chain} balance for native currency; {e}")
}
for (token_addr, token) in data.tokens.iter() {
let token_addr_str: String = token_addr.encode_hex();
let balance = match Erc20::new(*token_addr, client.clone())
.balance_of(*wallet_addr)
.call()
.await
{
Ok(b) => u256_as_scaled_f64(b, token.decimals),
Err(e) => {
warn!("Metric update failed for wallet {wallet_name} ({wallet_addr_str}) on chain {chain} balance for {name}; {e}", name=token.name);
continue;
}
};
trace!("Wallet {wallet_name} ({wallet_addr_str}) on chain {chain} balance is {balance}{}", token.symbol);
wallet_balance_metric
.with(&hashmap! {
"chain" => chain,
"wallet_address" => wallet_addr_str.as_str(),
"wallet_name" => wallet_name,
"token_address" => token_addr_str.as_str(),
"token_symbol" => token.symbol.as_str(),
"token_name" => token.symbol.as_str()
})
.set(balance);
}
}
}
}
impl<M: Middleware> Debug for PrometheusMiddleware<M> {
fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
write!(f, "PrometheusMiddleware({:?})", self.inner)
}
}
/// Uniform way to name the chain.
fn chain_name(chain: &Option<ChainInfo>) -> &str {
chain
.as_ref()
.and_then(|c| c.name.as_deref())
.unwrap_or("unknown")
}

@ -105,7 +105,7 @@ export const releaseCandidate: AgentConfig<TestnetChains> = {
context: Contexts.ReleaseCandidate,
docker: {
repo: 'gcr.io/abacus-labs-dev/abacus-agent',
tag: 'sha-2d9f729',
tag: 'sha-02bb2d8',
},
aws: {
region: 'us-east-1',
@ -117,7 +117,7 @@ export const releaseCandidate: AgentConfig<TestnetChains> = {
useForDisabledOriginChains: true,
},
validatorSets: validators,
connectionType: ConnectionType.Http,
connectionType: ConnectionType.HttpQuorum,
relayer: {
default: {
signedCheckpointPollingInterval: 5,

Loading…
Cancel
Save