Gas enforcement policy requiring payment to meet estimated costs (#1083)

* All compiles

* Rename files fwd request -> sponsored call

* More scaffolding to get the API key in there

* Cleaning up

* Rename task_status_call to task_status

* Finish rename

* rm useForDisabledOriginChains

* Introduce TransactionSubmitterType

* submitter type -> submission type

* Pass around gelato_config instead of the sponsor api key

* Final cleanup

* Nit

* Use default tx submission type

* Wip

* wip

* Getting there

* Getting there

* Refactor of serial submitter

* Some renames

* cargo fmt

* Some testing

* Nits after some testing

* A bit of a refactor of policies

* Adding some chain enums

* Single source of truth for abacus domain IDs

* Add local test chains

* Get coingecko api key via external-secrets

* Move some things around, rm gelato oracle api interaction

* Gonna move to abacus-core

* Move chain stuff to abacus-core

* nit

* comment nit

* Deploy tooling

* MeetsEstimatedCost policy test

* PR comments, still need to merge domain enums

* Single AbacusDomain enum
pull/1094/head
Trevor Porter 2 years ago committed by GitHub
parent 9b66dfebbb
commit 7040e5c8a4
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 61
      rust/Cargo.lock
  2. 8
      rust/abacus-base/src/contract_sync/outbox.rs
  3. 5
      rust/abacus-core/Cargo.toml
  4. 216
      rust/abacus-core/src/chain.rs
  5. 12
      rust/abacus-core/src/traits/validator_manager.rs
  6. 9
      rust/abacus-core/src/types/mod.rs
  7. 2
      rust/agents/relayer/Cargo.toml
  8. 71
      rust/agents/relayer/src/msg/gas_payment/mod.rs
  9. 382
      rust/agents/relayer/src/msg/gas_payment/policies/meets_estimated_cost.rs
  10. 76
      rust/agents/relayer/src/msg/gas_payment/policies/minimum.rs
  11. 7
      rust/agents/relayer/src/msg/gas_payment/policies/mod.rs
  12. 56
      rust/agents/relayer/src/msg/gas_payment/policies/none.rs
  13. 37
      rust/agents/relayer/src/msg/gas_payment_enforcer.rs
  14. 68
      rust/agents/relayer/src/msg/gelato_submitter/mod.rs
  15. 62
      rust/agents/relayer/src/msg/gelato_submitter/sponsored_call_op.rs
  16. 2
      rust/agents/relayer/src/msg/mod.rs
  17. 122
      rust/agents/relayer/src/msg/serial_submitter.rs
  18. 2
      rust/agents/relayer/src/relayer.rs
  19. 8
      rust/agents/relayer/src/settings/mod.rs
  20. 6
      rust/agents/scraper/migration/src/m20220805_000001_create_table_domain.rs
  21. 94
      rust/chains/abacus-ethereum/src/validator_manager.rs
  22. 2
      rust/gelato/src/types.rs
  23. 8
      rust/helm/abacus-agent/templates/relayer-external-secret.yaml
  24. 1
      typescript/infra/src/agents/index.ts
  25. 22
      typescript/infra/src/config/agent.ts

61
rust/Cargo.lock generated

@ -70,11 +70,14 @@ dependencies = [
"lazy_static",
"maplit",
"num",
"num-derive",
"num-traits",
"rocksdb",
"serde",
"serde_json",
"sha3 0.9.1",
"strum",
"strum_macros",
"thiserror",
"tokio",
"tracing",
@ -758,9 +761,12 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "bfd4d1b31faaa3a89d7934dbded3111da0d2ef28e3ebccdb4f0179f5929d1ef1"
dependencies = [
"iana-time-zone",
"js-sys",
"num-integer",
"num-traits",
"serde",
"time 0.1.44",
"wasm-bindgen",
"winapi",
]
@ -829,6 +835,18 @@ dependencies = [
"os_str_bytes",
]
[[package]]
name = "coingecko"
version = "1.0.1"
source = "git+https://github.com/hyperlane-xyz/coingecko-rs?tag=2022-09-14-02#63dfc9d6a8b92516209ee9dc01e154d11b63b0ab"
dependencies = [
"chrono",
"reqwest",
"serde",
"serde_json",
"tokio",
]
[[package]]
name = "coins-bip32"
version = "0.7.0"
@ -1947,7 +1965,7 @@ dependencies = [
"cfg-if",
"js-sys",
"libc",
"wasi",
"wasi 0.11.0+wasi-snapshot-preview1",
"wasm-bindgen",
]
@ -2561,7 +2579,7 @@ checksum = "57ee1c23c7c63b0c9250c339ffdc69255f110b298b901b9f6c82547b7b87caaf"
dependencies = [
"libc",
"log",
"wasi",
"wasi 0.11.0+wasi-snapshot-preview1",
"windows-sys",
]
@ -2692,6 +2710,17 @@ dependencies = [
"serde",
]
[[package]]
name = "num-derive"
version = "0.3.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "876a53fff98e03a936a674b29568b0e605f06b29372c2489ff4de23f1949743d"
dependencies = [
"proc-macro2",
"quote",
"syn",
]
[[package]]
name = "num-integer"
version = "0.1.45"
@ -3254,7 +3283,7 @@ dependencies = [
"postgres-protocol",
"serde",
"serde_json",
"time",
"time 0.3.13",
"uuid 1.1.2",
]
@ -3483,6 +3512,7 @@ dependencies = [
"abacus-ethereum",
"abacus-test",
"async-trait",
"coingecko",
"color-eyre",
"config",
"ethers",
@ -3494,6 +3524,7 @@ dependencies = [
"reqwest",
"serde",
"serde_json",
"strum",
"thiserror",
"tokio",
"tokio-test",
@ -3933,7 +3964,7 @@ dependencies = [
"serde",
"serde_json",
"sqlx",
"time",
"time 0.3.13",
"tracing",
"url",
"uuid 1.1.2",
@ -3997,7 +4028,7 @@ dependencies = [
"sea-query-derive",
"sea-query-driver",
"serde_json",
"time",
"time 0.3.13",
"uuid 1.1.2",
]
@ -4412,7 +4443,7 @@ dependencies = [
"sqlx-rt",
"stringprep",
"thiserror",
"time",
"time 0.3.13",
"tokio-stream",
"url",
"uuid 1.1.2",
@ -4604,6 +4635,17 @@ dependencies = [
"threadpool",
]
[[package]]
name = "time"
version = "0.1.44"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "6db9e6914ab8b1ae1c260a4ae7a49b6c5611b40328a735b21862567685e73255"
dependencies = [
"libc",
"wasi 0.10.0+wasi-snapshot-preview1",
"winapi",
]
[[package]]
name = "time"
version = "0.3.13"
@ -4659,6 +4701,7 @@ dependencies = [
"mio",
"num_cpus",
"once_cell",
"parking_lot 0.12.1",
"pin-project-lite",
"signal-hook-registry",
"socket2",
@ -5209,6 +5252,12 @@ dependencies = [
"tracing",
]
[[package]]
name = "wasi"
version = "0.10.0+wasi-snapshot-preview1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "1a143597ca7c7793eff794def352d41792a93c481eb1042423ff7ff72ba2c31f"
[[package]]
name = "wasi"
version = "0.11.0+wasi-snapshot-preview1"

@ -5,7 +5,7 @@ use tokio::time::sleep;
use tracing::{debug, info, info_span, warn};
use tracing::{instrument::Instrumented, Instrument};
use abacus_core::{chain_from_domain, CommittedMessage, ListValidity, OutboxIndexer};
use abacus_core::{name_from_domain_id, CommittedMessage, ListValidity, OutboxIndexer};
use crate::{
contract_sync::{last_message::OptLatestLeafIndex, schema::OutboxContractSyncDB},
@ -159,10 +159,10 @@ where
for raw_msg in sorted_messages.iter() {
let dst = CommittedMessage::try_from(raw_msg)
.ok()
.and_then(|msg| chain_from_domain(msg.message.destination))
.unwrap_or("unknown");
.and_then(|msg| name_from_domain_id(msg.message.destination))
.unwrap_or_else(|| "unknown".into());
message_leaf_index
.with_label_values(&["dispatch", &chain_name, dst])
.with_label_values(&["dispatch", &chain_name, &dst])
.set(max_leaf_index_of_batch as i64);
}

@ -17,7 +17,6 @@ sha3 = "0.9.1"
lazy_static = "*"
thiserror = "*"
async-trait = { version = "0.1", default-features = false }
num-traits = "0.2"
maplit = "1.0"
tokio = { version = "1", features = ["rt", "macros"] }
tracing = "0.1"
@ -28,6 +27,10 @@ eyre = "0.6"
rocksdb = "0.18"
bytes = { version = "1", features = ["serde"]}
num = {version="0", features=["serde"]}
num-traits = "0.2"
num-derive = "0.3"
strum = "0.24"
strum_macros = "0.24"
[dev-dependencies]
abacus-base = { path = "../abacus-base" }

@ -1,7 +1,12 @@
#![allow(missing_docs)]
use std::str::FromStr;
use eyre::Result;
use num_derive::FromPrimitive;
use num_traits::FromPrimitive;
use serde::{Deserialize, Serialize};
use strum::{EnumIter, EnumString};
#[derive(Debug, Clone, Deserialize, Serialize)]
pub struct Address(pub bytes::Bytes);
@ -49,53 +54,134 @@ impl From<&'_ Address> for ethers::types::H160 {
}
}
/// Quick single-use macro to prevent typing domain and chain twice and risking
/// inconsistencies.
macro_rules! domain_and_chain {
{$($domain:literal <=> $chain:literal,)*} => {
/// Get the chain name from a domain id. Returns `None` if the `domain` is unknown.
pub fn chain_from_domain(domain: u32) -> Option<&'static str> {
match domain {
$( $domain => Some($chain), )*
_ => None
}
}
/// All domains supported by Abacus.
#[derive(FromPrimitive, EnumString, strum::Display, EnumIter, PartialEq, Eq, Debug)]
#[strum(serialize_all = "lowercase")]
pub enum AbacusDomain {
/// Ethereum mainnet domain ID, decimal ID 6648936
Ethereum = 0x657468,
/// Ethereum testnet Goerli domain ID
Goerli = 5,
/// Ethereum testnet Kovan domain ID
Kovan = 3000,
/// Polygon mainnet domain ID, decimal ID 1886350457
Polygon = 0x706f6c79,
/// Polygon testnet Mumbai domain ID
Mumbai = 80001,
/// Avalanche mainnet domain ID, decimal ID 1635148152
Avalanche = 0x61766178,
/// Avalanche testnet Fuji domain ID
Fuji = 43113,
/// Arbitrum mainnet domain ID, decimal ID 6386274
Arbitrum = 0x617262,
/// Arbitrum testnet ArbitrumRinkeby domain ID, decimal ID 1634872690
ArbitrumRinkeby = 0x61722d72,
/// Optimism mainnet domain ID, decimal ID 28528
Optimism = 0x6f70,
/// Optimism testnet OptimismKovan domain ID, decimal ID 1869622635
OptimismKovan = 0x6f702d6b,
/// BSC mainnet domain ID, decimal ID 6452067
#[strum(serialize = "bsc")]
BinanceSmartChain = 0x627363,
/// BSC testnet, decimal ID 1651715444
#[strum(serialize = "bsctestnet")]
BinanceSmartChainTestnet = 0x62732d74,
/// Celo domain ID, decimal ID 1667591279
Celo = 0x63656c6f,
/// Celo testnet Alfajores domain ID
Alfajores = 1000,
/// Moonbeam testnet MoonbaseAlpha domain ID, decimal ID 1836002657
MoonbaseAlpha = 0x6d6f2d61,
// -- Local test chains --
/// Test1 local chain
Test1 = 13371,
/// Test2 local chain
Test2 = 13372,
/// Test3 local chain
Test3 = 13373,
}
impl From<AbacusDomain> for u32 {
fn from(domain: AbacusDomain) -> Self {
domain as u32
}
}
impl TryFrom<u32> for AbacusDomain {
type Error = eyre::Error;
fn try_from(domain_id: u32) -> Result<Self, Self::Error> {
FromPrimitive::from_u32(domain_id)
.ok_or_else(|| eyre::eyre!("Unknown domain ID {domain_id}"))
}
}
/// Types of Abacus domains.
pub enum AbacusDomainType {
/// A mainnet.
Mainnet,
/// A testnet.
Testnet,
/// A local chain for testing (i.e. Hardhat node).
LocalTestChain,
}
impl AbacusDomain {
pub fn domain_type(&self) -> AbacusDomainType {
match self {
AbacusDomain::Ethereum => AbacusDomainType::Mainnet,
AbacusDomain::Goerli => AbacusDomainType::Testnet,
AbacusDomain::Kovan => AbacusDomainType::Testnet,
AbacusDomain::Polygon => AbacusDomainType::Mainnet,
AbacusDomain::Mumbai => AbacusDomainType::Testnet,
AbacusDomain::Avalanche => AbacusDomainType::Mainnet,
AbacusDomain::Fuji => AbacusDomainType::Testnet,
AbacusDomain::Arbitrum => AbacusDomainType::Mainnet,
AbacusDomain::ArbitrumRinkeby => AbacusDomainType::Testnet,
AbacusDomain::Optimism => AbacusDomainType::Mainnet,
AbacusDomain::OptimismKovan => AbacusDomainType::Testnet,
/// Get the domain id from a chain name. Expects `chain` to be a lowercase str.
/// Returns `None` if the `chain` is unknown.
pub fn domain_from_chain(chain: &str) -> Option<u32> {
match chain {
$( $chain => Some($domain), )*
_ => None
}
AbacusDomain::BinanceSmartChain => AbacusDomainType::Mainnet,
AbacusDomain::BinanceSmartChainTestnet => AbacusDomainType::Testnet,
AbacusDomain::Celo => AbacusDomainType::Mainnet,
AbacusDomain::Alfajores => AbacusDomainType::Testnet,
AbacusDomain::MoonbaseAlpha => AbacusDomainType::Testnet,
AbacusDomain::Test1 => AbacusDomainType::LocalTestChain,
AbacusDomain::Test2 => AbacusDomainType::LocalTestChain,
AbacusDomain::Test3 => AbacusDomainType::LocalTestChain,
}
}
}
// The unit test in this file `tests::json_mappings_match_code_map`
// tries to ensure some stability between the {chain} X {domain}
// mapping below with the agent configuration file.
domain_and_chain! {
0x63656c6f <=> "celo",
0x657468 <=> "ethereum",
0x61766178 <=> "avalanche",
0x706f6c79 <=> "polygon",
1000 <=> "alfajores",
43113 <=> "fuji",
5 <=> "goerli",
3000 <=> "kovan",
80001 <=> "mumbai",
6386274 <=> "arbitrum",
6452067 <=> "bsc",
28528 <=> "optimism",
13371 <=> "test1",
13372 <=> "test2",
13373 <=> "test3",
0x62732d74 <=> "bsctestnet",
0x61722d72 <=> "arbitrumrinkeby",
0x6f702d6b <=> "optimismkovan",
0x61752d74 <=> "auroratestnet",
0x6d6f2d61 <=> "moonbasealpha",
/// Gets the name of the chain from a domain id.
/// Returns None if the domain ID is not recognized.
pub fn name_from_domain_id(domain_id: u32) -> Option<String> {
AbacusDomain::try_from(domain_id)
.ok()
.map(|domain| domain.to_string())
}
/// Gets the domain ID of the chain its name.
/// Returns None if the chain name is not recognized.
pub fn domain_id_from_name(name: &'static str) -> Option<u32> {
AbacusDomain::from_str(name)
.ok()
.map(|domain| domain.into())
}
#[cfg(test)]
@ -106,8 +192,11 @@ mod tests {
use std::collections::BTreeSet;
use std::fs::read_to_string;
use std::path::Path;
use std::str::FromStr;
use walkdir::WalkDir;
use crate::{domain_id_from_name, name_from_domain_id, AbacusDomain};
/// Relative path to the `abacus-monorepo/rust/config/`
/// directory, which is where the agent's config files
/// currently live.
@ -247,12 +336,51 @@ mod tests {
// by the macro `domain_and_chain` is complete
// and in agreement with our on-disk json-based
// configuration data.
for ChainCoordinate { name, domain } in inbox_coords.iter().chain(outbox_coords.iter()) {
assert_eq!(
super::chain_from_domain(domain.to_owned()).unwrap(),
AbacusDomain::try_from(domain.to_owned())
.unwrap()
.to_string(),
name.to_owned()
);
assert_eq!(super::domain_from_chain(name).unwrap(), domain.to_owned());
assert_eq!(
u32::from(AbacusDomain::from_str(name).unwrap()),
domain.to_owned()
);
}
}
#[test]
fn domain_strings() {
assert_eq!(
AbacusDomain::from_str("ethereum").unwrap(),
AbacusDomain::Ethereum,
);
assert_eq!(AbacusDomain::Ethereum.to_string(), "ethereum".to_string(),);
}
#[test]
fn domain_ids() {
assert_eq!(
AbacusDomain::try_from(0x657468u32).unwrap(),
AbacusDomain::Ethereum,
);
assert_eq!(u32::from(AbacusDomain::Ethereum), 0x657468u32,);
}
#[test]
fn test_name_from_domain_id() {
assert_eq!(name_from_domain_id(0x657468u32), Some("ethereum".into()),);
assert_eq!(name_from_domain_id(0xf00u32), None,);
}
#[test]
fn test_domain_id_from_name() {
assert_eq!(domain_id_from_name("ethereum"), Some(0x657468u32),);
assert_eq!(domain_id_from_name("foo"), None,);
}
}

@ -2,12 +2,13 @@ use std::fmt::Debug;
use async_trait::async_trait;
use auto_impl::auto_impl;
use ethers::types::U256;
use eyre::Result;
use crate::{
accumulator::merkle::Proof,
traits::{ChainCommunicationError, TxOutcome},
AbacusMessage, Address, MultisigSignedCheckpoint,
AbacusMessage, Address, MultisigSignedCheckpoint, TxCostEstimate,
};
/// Interface for an InboxValidatorManager
@ -20,8 +21,17 @@ pub trait InboxValidatorManager: Send + Sync + Debug {
multisig_signed_checkpoint: &MultisigSignedCheckpoint,
message: &AbacusMessage,
proof: &Proof,
tx_gas_limit: Option<U256>,
) -> Result<TxOutcome, ChainCommunicationError>;
/// Estimate transaction costs to process a message.
async fn process_estimate_costs(
&self,
multisig_signed_checkpoint: &MultisigSignedCheckpoint,
message: &AbacusMessage,
proof: &Proof,
) -> Result<TxCostEstimate>;
/// Get the calldata for a transaction to process a message with a proof
/// against the provided signed checkpoint
fn process_calldata(

@ -63,3 +63,12 @@ pub struct InterchainGasPaymentWithMeta {
/// Metadata for the payment
pub meta: InterchainGasPaymentMeta,
}
/// A cost estimate for a transaction.
#[derive(Clone, Debug)]
pub struct TxCostEstimate {
/// The gas limit for the transaction.
pub gas_limit: U256,
/// The gas price for the transaction.
pub gas_price: U256,
}

@ -5,6 +5,7 @@ edition = "2021"
[dependencies]
tokio = { version = "1", features = ["rt", "macros"] }
coingecko = { git = "https://github.com/hyperlane-xyz/coingecko-rs", tag = "2022-09-14-02" }
config = "0.13"
color-eyre = { version = "0.6", optional = true }
serde = {version = "1.0", features = ["derive"]}
@ -16,6 +17,7 @@ async-trait = { version = "0.1", default-features = false }
futures-util = "0.3"
eyre = "0.6"
reqwest = { version = "0", features = ["json"]}
strum = "0.24"
tracing = "0.1"
tracing-futures = "0.2"
tracing-subscriber = "0.3"

@ -0,0 +1,71 @@
use std::fmt::Debug;
use abacus_core::{
db::{AbacusDB, DbError},
CommittedMessage, TxCostEstimate,
};
use async_trait::async_trait;
use ethers::types::U256;
use eyre::Result;
use crate::settings::GasPaymentEnforcementPolicy;
use self::policies::{
GasPaymentPolicyMeetsEstimatedCost, GasPaymentPolicyMinimum, GasPaymentPolicyNone,
};
mod policies;
#[async_trait]
pub trait GasPaymentPolicy: Debug + Send + Sync {
async fn message_meets_gas_payment_requirement(
&self,
message: &CommittedMessage,
current_payment: &U256,
tx_cost_estimate: &TxCostEstimate,
) -> Result<bool>;
}
#[derive(Debug)]
pub struct GasPaymentEnforcer {
policy: Box<dyn GasPaymentPolicy>,
db: AbacusDB,
}
impl GasPaymentEnforcer {
pub fn new(policy_config: GasPaymentEnforcementPolicy, db: AbacusDB) -> Self {
let policy: Box<dyn GasPaymentPolicy> = match policy_config {
GasPaymentEnforcementPolicy::None => Box::new(GasPaymentPolicyNone::new()),
GasPaymentEnforcementPolicy::Minimum { payment } => {
Box::new(GasPaymentPolicyMinimum::new(payment))
}
GasPaymentEnforcementPolicy::MeetsEstimatedCost { coingeckoapikey } => {
Box::new(GasPaymentPolicyMeetsEstimatedCost::new(coingeckoapikey))
}
};
Self { policy, db }
}
}
impl GasPaymentEnforcer {
/// Returns (gas payment requirement met, current payment according to the DB)
pub async fn message_meets_gas_payment_requirement(
&self,
message: &CommittedMessage,
tx_cost_estimate: &TxCostEstimate,
) -> Result<(bool, U256)> {
let current_payment = self.get_message_gas_payment(message.leaf_index)?;
let meets_requirement = self
.policy
.message_meets_gas_payment_requirement(message, &current_payment, tx_cost_estimate)
.await?;
Ok((meets_requirement, current_payment))
}
fn get_message_gas_payment(&self, msg_leaf_index: u32) -> Result<U256, DbError> {
self.db.retrieve_gas_payment_for_leaf(msg_leaf_index)
}
}

@ -0,0 +1,382 @@
use std::{
collections::HashMap,
time::{Duration, Instant},
};
use abacus_core::{AbacusDomain, CommittedMessage, TxCostEstimate};
use async_trait::async_trait;
use coingecko::CoinGeckoClient;
use ethers::types::U256;
use eyre::{eyre, Result};
use tokio::sync::RwLock;
use crate::msg::gas_payment::GasPaymentPolicy;
const CACHE_TTL_SECONDS: u64 = 60;
/// 1 / 100th of a cent
const FIXED_POINT_PRECISION: usize = 1000;
#[derive(Debug)]
struct CachedValue<T> {
created_at: Instant,
value: T,
}
impl<T> From<T> for CachedValue<T> {
fn from(value: T) -> Self {
Self {
created_at: Instant::now(),
value,
}
}
}
/// Given a domain, gets the CoinGecko ID for the native token.
/// If the domain isn't a mainnet (and therefore doesn't have a native
/// token with a CoinGecko ID), an Err is returned.
fn abacus_domain_id_to_native_token_coingecko_id(domain_id: u32) -> Result<&'static str> {
let abacus_domain = AbacusDomain::try_from(domain_id)?;
Ok(match abacus_domain {
AbacusDomain::Ethereum => "ethereum",
AbacusDomain::Polygon => "matic-network",
AbacusDomain::Avalanche => "avalanche-2",
// Arbitrum's native token is Ethereum
AbacusDomain::Arbitrum => "ethereum",
// Optimism's native token is Ethereum
AbacusDomain::Optimism => "ethereum",
AbacusDomain::BinanceSmartChain => "binancecoin",
AbacusDomain::Celo => "celo",
_ => eyre::bail!("No CoinGecko ID for domain {abacus_domain}"),
})
}
/// Gets prices from CoinGecko quoted in USD, caching them with a TTL.
#[derive(Default)]
struct CoinGeckoCachingPriceGetter {
coingecko: CoinGeckoClient,
cache_ttl: Duration,
/// Keyed by CoinGecko API ID. RwLock to be thread-safe.
cached_usd_prices: RwLock<HashMap<&'static str, CachedValue<f64>>>,
}
impl std::fmt::Debug for CoinGeckoCachingPriceGetter {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
write!(f, "CoinGeckoCachingPriceGetter {{ .. }}",)
}
}
impl CoinGeckoCachingPriceGetter {
pub fn new(cache_ttl: Duration, coingecko_api_key: Option<String>) -> Self {
let coingecko = if let Some(api_key) = coingecko_api_key {
CoinGeckoClient::new_with_key("https://pro-api.coingecko.com/api/v3".into(), api_key)
} else {
CoinGeckoClient::new("https://api.coingecko.com/api/v3".into())
};
Self {
cache_ttl,
coingecko,
cached_usd_prices: RwLock::default(),
}
}
async fn get_cached_usd_price(&self, coingecko_id: &'static str) -> Option<f64> {
let cached_usd_prices = self.cached_usd_prices.read().await;
if let Some(cached_value) = cached_usd_prices.get(coingecko_id) {
if cached_value.created_at.elapsed() <= self.cache_ttl {
return Some(cached_value.value);
}
}
None
}
async fn set_cached_usd_price(&self, coingecko_id: &'static str, usd_price: f64) {
let mut cached_usd_prices = self.cached_usd_prices.write().await;
cached_usd_prices.insert(coingecko_id, usd_price.into());
}
async fn get_usd_price(&self, coingecko_id: &'static str) -> Result<f64> {
if let Some(usd_price) = self.get_cached_usd_price(coingecko_id).await {
return Ok(usd_price);
}
// Returns a HashMap keyed by coingecko IDs
let api_response = self
.coingecko
.price(&[coingecko_id], &["usd"], false, false, false, false)
.await?;
let usd_price = api_response
.get(coingecko_id)
.and_then(|p| p.usd)
.ok_or_else(|| {
eyre!(
"Unable to get USD price for {} from CoinGecko API response",
coingecko_id
)
})?;
self.set_cached_usd_price(coingecko_id, usd_price).await;
Ok(usd_price)
}
}
#[derive(Debug)]
pub struct GasPaymentPolicyMeetsEstimatedCost {
coingecko_price_getter: CoinGeckoCachingPriceGetter,
}
impl GasPaymentPolicyMeetsEstimatedCost {
pub fn new(coingecko_api_key: Option<String>) -> Self {
Self {
coingecko_price_getter: CoinGeckoCachingPriceGetter::new(
Duration::from_secs(CACHE_TTL_SECONDS),
coingecko_api_key,
),
}
}
async fn get_native_token_usd_price(&self, domain: u32) -> Result<f64> {
let coingecko_id = abacus_domain_id_to_native_token_coingecko_id(domain)?;
self.coingecko_price_getter
.get_usd_price(coingecko_id)
.await
}
async fn convert_native_tokens(
&self,
amount: U256,
from_domain: u32,
to_domain: u32,
) -> Result<U256> {
convert_tokens(
amount,
self.get_native_token_usd_price(from_domain).await?,
self.get_native_token_usd_price(to_domain).await?,
)
.ok_or_else(|| {
eyre!(
"Unable to convert {} native tokens from {} to {}",
amount,
from_domain,
to_domain
)
})
}
}
#[async_trait]
impl GasPaymentPolicy for GasPaymentPolicyMeetsEstimatedCost {
/// Returns (gas payment requirement met, current payment according to the DB)
async fn message_meets_gas_payment_requirement(
&self,
message: &CommittedMessage,
current_payment: &U256,
tx_cost_estimate: &TxCostEstimate,
) -> Result<bool> {
// Estimated cost of the process tx, quoted in destination native tokens
let destination_token_tx_cost = tx_cost_estimate.gas_limit * tx_cost_estimate.gas_price;
// Convert the destination token tx cost into origin tokens
let origin_token_tx_cost = self
.convert_native_tokens(
destination_token_tx_cost,
message.message.destination,
message.message.origin,
)
.await?;
let meets_requirement = *current_payment >= origin_token_tx_cost;
tracing::info!(
message_leaf_index=?message.leaf_index,
tx_cost_estimate=?tx_cost_estimate,
destination_token_tx_cost=?destination_token_tx_cost,
origin_token_tx_cost=?origin_token_tx_cost,
current_payment=?current_payment,
meets_requirement=?meets_requirement,
"Evaluated whether message gas payment meets estimated cost",
);
Ok(meets_requirement)
}
}
fn f64_to_fixed_point(f: f64, precision: usize) -> U256 {
U256::from_f64_lossy(f * precision as f64)
}
fn convert_tokens(amount: U256, from_price: f64, to_price: f64) -> Option<U256> {
let from_price = f64_to_fixed_point(from_price, FIXED_POINT_PRECISION);
let to_price = f64_to_fixed_point(to_price, FIXED_POINT_PRECISION);
amount
.checked_mul(from_price)
.and_then(|n| n.checked_div(to_price))
}
#[tokio::test]
async fn test_gas_payment_policy_meets_estimated_cost() {
use abacus_core::AbacusMessage;
use ethers::types::H256;
// Using a fake message from Celo -> Polygon, based off
// hardcoded tx cost estimates and prices, assert that a payment
// that doesn't meet the expected costs returns false, and a payment
// that does returns true.
let celo_price = 5.5f64;
let polygon_price = 11.0f64;
let celo_domain_id = u32::from(AbacusDomain::Celo);
let polygon_domain_id = u32::from(AbacusDomain::Polygon);
// Take advantage of the coingecko_price_getter caching already-stored values
// by just writing to them directly.
// This is a little sketchy because if the cache TTL does elapse, an API
// request could be made. Because this TTL is 60 seconds, this isn't reasonable.
let policy = GasPaymentPolicyMeetsEstimatedCost::new(None);
{
let mut usd_prices = policy
.coingecko_price_getter
.cached_usd_prices
.write()
.await;
let celo_coingecko_id =
abacus_domain_id_to_native_token_coingecko_id(celo_domain_id).unwrap();
let polygon_coingecko_id =
abacus_domain_id_to_native_token_coingecko_id(polygon_domain_id).unwrap();
usd_prices.insert(celo_coingecko_id, celo_price.into());
usd_prices.insert(polygon_coingecko_id, polygon_price.into());
}
let message = CommittedMessage {
leaf_index: 10u32,
message: AbacusMessage {
origin: celo_domain_id,
destination: polygon_domain_id,
sender: H256::zero(),
recipient: H256::zero(),
body: vec![],
},
};
let tx_cost_estimate = TxCostEstimate {
// 1M gas
gas_limit: U256::from(1000000u32),
// 15 gwei
gas_price: ethers::utils::parse_units("15", "gwei").unwrap(),
};
// Expected polygon fee: 1M * 15 gwei = 0.015 MATIC
// Converted into Celo, 0.015 MATIC * ($11 / $5.5) = 0.03 CELO
let required_celo_payment = ethers::utils::parse_ether("0.03").unwrap();
// Any less than 0.03 CELO as payment, return false.
assert_eq!(
policy
.message_meets_gas_payment_requirement(
&message,
&(required_celo_payment - U256::one()),
&tx_cost_estimate,
)
.await
.unwrap(),
false,
);
// If the payment is at least 0.03 CELO, return true.
assert_eq!(
policy
.message_meets_gas_payment_requirement(
&message,
&required_celo_payment,
&tx_cost_estimate,
)
.await
.unwrap(),
true,
);
}
#[test]
fn test_convert_tokens() {
// A lowish number
// Converting to a less valuable token
assert_eq!(
convert_tokens(
// 1M
U256::from(1000000),
20000.0f64,
2000.0f64,
),
// 10M
Some(U256::from(10000000)),
);
// Converting to a more valuable token
assert_eq!(
convert_tokens(
// 10M
U256::from(10000000),
2000.0f64,
20000.0f64,
),
// 1M
Some(U256::from(1000000)),
);
// A higher number
// Converting to a less valuable token
assert_eq!(
convert_tokens(
// 100 ether
ethers::utils::parse_ether(100u32).unwrap(),
20000.0f64,
200.0f64,
),
// 10000 ether
Some(ethers::utils::parse_ether(10000u32).unwrap()),
);
// Converting to a more valuable token
assert_eq!(
convert_tokens(
// 10000 ether
ethers::utils::parse_ether(10000u32).unwrap(),
200.0f64,
20000.0f64,
),
// 100 ether
Some(ethers::utils::parse_ether(100u32).unwrap()),
);
// If the to_price is 0
assert_eq!(
convert_tokens(
// 1M
U256::from(1000000),
20000.0f64,
0f64,
),
None,
)
}
#[test]
fn test_abacus_domain_id_to_native_token_coingecko_id() {
use abacus_core::AbacusDomainType;
use strum::IntoEnumIterator;
// Iterate through all AbacusDomains, ensuring all mainnet domains
// are included in abacus_domain_id_to_native_token_coingecko_id.
for abacus_domain in AbacusDomain::iter() {
if let AbacusDomainType::Mainnet = abacus_domain.domain_type() {
assert!(
abacus_domain_id_to_native_token_coingecko_id(u32::from(abacus_domain)).is_ok()
);
}
}
}

@ -0,0 +1,76 @@
use abacus_core::{CommittedMessage, TxCostEstimate};
use async_trait::async_trait;
use ethers::types::U256;
use eyre::Result;
use crate::msg::gas_payment::GasPaymentPolicy;
#[derive(Debug)]
pub struct GasPaymentPolicyMinimum {
minimum_payment: U256,
}
impl GasPaymentPolicyMinimum {
pub fn new(minimum_payment: U256) -> Self {
Self { minimum_payment }
}
}
#[async_trait]
impl GasPaymentPolicy for GasPaymentPolicyMinimum {
/// Returns (gas payment requirement met, current payment according to the DB)
async fn message_meets_gas_payment_requirement(
&self,
_message: &CommittedMessage,
current_payment: &U256,
_tx_cost_estimate: &TxCostEstimate,
) -> Result<bool> {
Ok(*current_payment >= self.minimum_payment)
}
}
#[tokio::test]
async fn test_gas_payment_policy_none() {
use abacus_core::AbacusMessage;
let min = U256::from(1000u32);
let policy = GasPaymentPolicyMinimum::new(min);
let message = CommittedMessage {
leaf_index: 100,
message: AbacusMessage::default(),
};
// If the payment is less than the minimum, returns false
assert_eq!(
policy
.message_meets_gas_payment_requirement(
&message,
&U256::from(999u32),
&TxCostEstimate {
gas_limit: U256::from(100000u32),
gas_price: U256::from(100000u32),
},
)
.await
.unwrap(),
false,
);
// If the payment is at least the minimum, returns false
assert_eq!(
policy
.message_meets_gas_payment_requirement(
&message,
&U256::from(1000u32),
&TxCostEstimate {
gas_limit: U256::from(100000u32),
gas_price: U256::from(100000u32),
},
)
.await
.unwrap(),
true,
);
}

@ -0,0 +1,7 @@
mod meets_estimated_cost;
mod minimum;
mod none;
pub(crate) use meets_estimated_cost::GasPaymentPolicyMeetsEstimatedCost;
pub(crate) use minimum::GasPaymentPolicyMinimum;
pub(crate) use none::GasPaymentPolicyNone;

@ -0,0 +1,56 @@
use abacus_core::{CommittedMessage, TxCostEstimate};
use async_trait::async_trait;
use ethers::types::U256;
use eyre::Result;
use crate::msg::gas_payment::GasPaymentPolicy;
#[derive(Debug)]
pub struct GasPaymentPolicyNone {}
impl GasPaymentPolicyNone {
pub fn new() -> Self {
Self {}
}
}
#[async_trait]
impl GasPaymentPolicy for GasPaymentPolicyNone {
/// Returns (gas payment requirement met, current payment according to the DB)
async fn message_meets_gas_payment_requirement(
&self,
_message: &CommittedMessage,
_current_payment: &U256,
_tx_cost_estimate: &TxCostEstimate,
) -> Result<bool> {
Ok(true)
}
}
#[tokio::test]
async fn test_gas_payment_policy_none() {
use abacus_core::AbacusMessage;
let policy = GasPaymentPolicyNone::new();
let message = CommittedMessage {
leaf_index: 100,
message: AbacusMessage::default(),
};
// Always returns true
assert_eq!(
policy
.message_meets_gas_payment_requirement(
&message,
&U256::zero(),
&TxCostEstimate {
gas_limit: U256::from(100000u32),
gas_price: U256::from(100000u32),
},
)
.await
.unwrap(),
true,
);
}

@ -1,37 +0,0 @@
use abacus_core::db::{AbacusDB, DbError};
use ethers::types::U256;
use crate::settings::GasPaymentEnforcementPolicy;
#[derive(Debug)]
pub struct GasPaymentEnforcer {
policy: GasPaymentEnforcementPolicy,
db: AbacusDB,
}
impl GasPaymentEnforcer {
pub fn new(policy: GasPaymentEnforcementPolicy, db: AbacusDB) -> Self {
Self { policy, db }
}
/// Returns (gas payment requirement met, current payment according to the DB)
pub fn message_meets_gas_payment_requirement(
&self,
msg_leaf_index: u32,
) -> Result<(bool, U256), DbError> {
let current_payment = self.get_message_gas_payment(msg_leaf_index)?;
let meets_requirement = match self.policy {
GasPaymentEnforcementPolicy::None => true,
GasPaymentEnforcementPolicy::Minimum {
payment: min_payment,
} => current_payment >= min_payment,
};
Ok((meets_requirement, current_payment))
}
fn get_message_gas_payment(&self, msg_leaf_index: u32) -> Result<U256, DbError> {
self.db.retrieve_gas_payment_for_leaf(msg_leaf_index)
}
}

@ -3,7 +3,7 @@ use std::sync::Arc;
use abacus_base::chains::GelatoConf;
use abacus_base::{CoreMetrics, InboxContracts};
use abacus_core::db::AbacusDB;
use abacus_core::AbacusCommon;
use abacus_core::{AbacusCommon, AbacusDomain};
use eyre::{bail, Result};
use gelato::types::Chain;
use prometheus::{Histogram, IntCounter, IntGauge};
@ -16,7 +16,7 @@ use crate::msg::gelato_submitter::sponsored_call_op::{
SponsoredCallOp, SponsoredCallOpArgs, SponsoredCallOptions,
};
use super::gas_payment_enforcer::GasPaymentEnforcer;
use super::gas_payment::GasPaymentEnforcer;
use super::SubmitMessageArgs;
mod sponsored_call_op;
@ -58,8 +58,10 @@ impl GelatoSubmitter {
mpsc::unbounded_channel::<SubmitMessageArgs>();
Self {
message_receiver,
inbox_gelato_chain: abacus_domain_to_gelato_chain(inbox_contracts.inbox.local_domain())
.unwrap(),
inbox_gelato_chain: abacus_domain_id_to_gelato_chain(
inbox_contracts.inbox.local_domain(),
)
.unwrap(),
inbox_contracts,
db: abacus_db,
gelato_config,
@ -194,33 +196,51 @@ impl GelatoSubmitterMetrics {
}
}
fn abacus_domain_to_gelato_chain(domain: u32) -> Result<Chain> {
Ok(match domain {
6648936 => Chain::Ethereum,
1634872690 => Chain::Rinkeby,
3000 => Chain::Kovan,
5 => Chain::Goerli,
// While this may be more ergonomic as an Into / From impl,
// it feels a bit awkward to have abacus-base (where AbacusDomain)
// is implemented to be aware of the gelato crate or vice versa.
pub fn abacus_domain_id_to_gelato_chain(domain: u32) -> Result<Chain> {
let abacus_domain = AbacusDomain::try_from(domain)?;
1886350457 => Chain::Polygon,
80001 => Chain::Mumbai,
Ok(match abacus_domain {
AbacusDomain::Ethereum => Chain::Ethereum,
AbacusDomain::Kovan => Chain::Kovan,
AbacusDomain::Goerli => Chain::Goerli,
1635148152 => Chain::Avalanche,
43113 => Chain::Fuji,
AbacusDomain::Polygon => Chain::Polygon,
AbacusDomain::Mumbai => Chain::Mumbai,
6386274 => Chain::Arbitrum,
421611 => Chain::ArbitrumRinkeby,
AbacusDomain::Avalanche => Chain::Avalanche,
AbacusDomain::Fuji => Chain::Fuji,
28528 => Chain::Optimism,
1869622635 => Chain::OptimismKovan,
AbacusDomain::Arbitrum => Chain::Arbitrum,
AbacusDomain::ArbitrumRinkeby => Chain::ArbitrumRinkeby,
6452067 => Chain::BinanceSmartChain,
1651715444 => Chain::BinanceSmartChainTestnet,
AbacusDomain::Optimism => Chain::Optimism,
AbacusDomain::OptimismKovan => Chain::OptimismKovan,
1667591279 => Chain::Celo,
1000 => Chain::Alfajores,
AbacusDomain::BinanceSmartChain => Chain::BinanceSmartChain,
AbacusDomain::BinanceSmartChainTestnet => Chain::BinanceSmartChainTestnet,
1836002657 => Chain::MoonbaseAlpha,
AbacusDomain::Celo => Chain::Celo,
AbacusDomain::Alfajores => Chain::Alfajores,
_ => bail!("Unknown domain {}", domain),
AbacusDomain::MoonbaseAlpha => Chain::MoonbaseAlpha,
_ => bail!("No Gelato Chain for domain {abacus_domain}"),
})
}
#[test]
fn test_abacus_domain_id_to_gelato_chain() {
use abacus_core::AbacusDomainType;
use strum::IntoEnumIterator;
// Iterate through all AbacusDomains, ensuring all mainnet and testnet domains
// are included in abacus_domain_id_to_gelato_chain.
for abacus_domain in AbacusDomain::iter() {
if let AbacusDomainType::Mainnet | AbacusDomainType::Testnet = abacus_domain.domain_type() {
assert!(abacus_domain_id_to_gelato_chain(u32::from(abacus_domain)).is_ok());
}
}
}

@ -1,4 +1,8 @@
use std::{ops::Deref, sync::Arc, time::Duration};
use std::{
ops::{Deref, DerefMut},
sync::Arc,
time::Duration,
};
use abacus_base::InboxContracts;
use abacus_core::{ChainCommunicationError, Inbox, InboxValidatorManager, MessageStatus};
@ -14,15 +18,11 @@ use tokio::{
};
use tracing::instrument;
use crate::msg::{gas_payment_enforcer::GasPaymentEnforcer, SubmitMessageArgs};
use crate::msg::{gas_payment::GasPaymentEnforcer, SubmitMessageArgs};
// The number of seconds after a tick to sleep before attempting the next tick.
const TICK_SLEEP_DURATION_SECONDS: u64 = 30;
/// The period to sleep after observing the message's gas payment
/// as insufficient, in secs.
const INSUFFICIENT_GAS_PAYMENT_SLEEP_PERIOD_SECS: u64 = 15;
#[derive(Debug, Clone)]
pub struct SponsoredCallOpArgs {
pub opts: SponsoredCallOptions,
@ -50,12 +50,18 @@ impl Deref for SponsoredCallOp {
}
}
impl DerefMut for SponsoredCallOp {
fn deref_mut(&mut self) -> &mut Self::Target {
&mut self.0
}
}
impl SponsoredCallOp {
pub fn new(args: SponsoredCallOpArgs) -> Self {
Self(args)
}
#[instrument(skip(self), fields(msg_leaf_index=self.0.message.leaf_index))]
#[instrument(skip(self), fields(msg_leaf_index=self.message.leaf_index))]
pub async fn run(&mut self) {
loop {
match self.tick().await {
@ -79,7 +85,7 @@ impl SponsoredCallOp {
_ => {}
}
self.0.message.num_retries += 1;
self.message.num_retries += 1;
sleep(Duration::from_secs(TICK_SLEEP_DURATION_SECONDS)).await;
}
}
@ -92,31 +98,41 @@ impl SponsoredCallOp {
return Ok(MessageStatus::Processed);
}
let tx_estimated_cost = self
.inbox_contracts
.validator_manager
.process_estimate_costs(
&self.message.checkpoint,
&self.message.committed_message.message,
&self.message.proof,
)
.await?;
// If the gas payment requirement hasn't been met, sleep briefly and wait for the next tick.
let (meets_gas_requirement, gas_payment) = self
.gas_payment_enforcer
.message_meets_gas_payment_requirement(self.0.message.leaf_index)?;
.message_meets_gas_payment_requirement(
&self.message.committed_message,
&tx_estimated_cost,
)
.await?;
if !meets_gas_requirement {
tracing::info!(gas_payment=?gas_payment, "Gas payment requirement not met yet");
sleep(Duration::from_secs(
INSUFFICIENT_GAS_PAYMENT_SLEEP_PERIOD_SECS,
))
.await;
return Ok(MessageStatus::None);
}
// Send the sponsored call.
let sponsored_call_result = self.send_sponsored_call_api_call().await?;
tracing::info!(
msg=?self.0.message,
msg=?self.message,
task_id=sponsored_call_result.task_id,
"Sent sponsored call",
);
// Wait for a terminal state, timing out according to the retry_submit_interval.
match timeout(
self.0.opts.retry_submit_interval,
self.opts.retry_submit_interval,
self.poll_for_terminal_state(sponsored_call_result.task_id.clone()),
)
.await
@ -138,7 +154,7 @@ impl SponsoredCallOp {
// by Gelato.
async fn poll_for_terminal_state(&self, task_id: String) -> Result<MessageStatus> {
loop {
sleep(self.0.opts.poll_interval).await;
sleep(self.opts.poll_interval).await;
// Check if the message has been processed. Checking with the Inbox directly
// is the best source of truth, and is the only way in which a message can be
@ -151,7 +167,7 @@ impl SponsoredCallOp {
// If the task was cancelled for some reason by Gelato, stop waiting.
let task_status_api_call = TaskStatusApiCall {
http: self.0.http.clone(),
http: self.http.clone(),
args: TaskStatusApiCallArgs {
task_id: task_id.clone(),
},
@ -183,7 +199,7 @@ impl SponsoredCallOp {
let sponsored_call_api_call = SponsoredCallApiCall {
args: &args,
http: self.0.http.clone(),
http: self.http.clone(),
sponsor_api_key: &self.sponsor_api_key,
};
@ -191,13 +207,13 @@ impl SponsoredCallOp {
}
fn create_sponsored_call_args(&self) -> SponsoredCallArgs {
let calldata = self.0.inbox_contracts.validator_manager.process_calldata(
&self.0.message.checkpoint,
&self.0.message.committed_message.message,
&self.0.message.proof,
let calldata = self.inbox_contracts.validator_manager.process_calldata(
&self.message.checkpoint,
&self.message.committed_message.message,
&self.message.proof,
);
SponsoredCallArgs {
chain_id: self.0.destination_chain,
chain_id: self.destination_chain,
target: self
.inbox_contracts
.validator_manager

@ -4,7 +4,7 @@ use abacus_core::{accumulator::merkle::Proof, CommittedMessage, MultisigSignedCh
use tokio::time::Instant;
pub mod gas_payment_enforcer;
pub mod gas_payment;
pub mod gelato_submitter;
pub mod processor;
pub mod serial_submitter;

@ -16,9 +16,10 @@ use tokio::task::JoinHandle;
use tokio::time::Instant;
use tracing::debug;
use tracing::instrument;
use tracing::warn;
use tracing::{info, info_span, instrument::Instrumented, Instrument};
use super::gas_payment_enforcer::GasPaymentEnforcer;
use super::gas_payment::GasPaymentEnforcer;
use super::SubmitMessageArgs;
/// SerialSubmitter accepts undelivered messages over a channel from a MessageProcessor. It is
@ -210,6 +211,37 @@ impl SerialSubmitter {
None => return Ok(()),
};
match self.process_message(&msg).await {
Ok(MessageStatus::Processed) => {
info!(msg=?msg, msg_leaf_index=msg.leaf_index, "Message processed");
self.record_message_process_success(&msg)?;
return Ok(());
}
Ok(MessageStatus::None) => {
info!(msg=?msg, msg_leaf_index=msg.leaf_index, "Message not processed");
}
// We expect this branch to be hit when there is unexpected behavior -
// defined behavior like gas estimation failing will not hit this branch.
Err(err) => {
warn!(msg=?msg, msg_leaf_index=msg.leaf_index, error=?err, "Error occurred when attempting to process message");
}
}
// The message was not processed, so increment the # of retries and add
// it back to the run_queue so it will be processed again at some point.
msg.num_retries += 1;
self.run_queue.push_back(msg);
Ok(())
}
/// Returns the message's status. If the message is processed, either by a transaction
/// in this fn or by a view call to the Inbox contract discovering the message has already
/// been processed, Ok(MessageStatus::Processed) is returned. If this message is unable to
/// be processed, either due to failed gas estimation or an insufficient gas payment,
/// Ok(MessageStatus::None) is returned.
#[instrument(skip(self, msg), fields(msg_leaf_index=msg.leaf_index))]
async fn process_message(&self, msg: &SubmitMessageArgs) -> Result<MessageStatus> {
// If the message has already been processed according to message_status call on
// inbox, e.g. due to another relayer having already processed, then mark it as
// already-processed, and move on to the next tick.
@ -220,60 +252,74 @@ impl SerialSubmitter {
.message_status(msg.committed_message.to_leaf())
.await?
{
info!(
msg_leaf_index=msg.leaf_index,
msg=?msg,
"Message already processed",
);
self.record_message_process_success(&msg)?;
return Ok(());
info!("Message already processed");
return Ok(MessageStatus::Processed);
}
// Estimate transaction costs for the process call. If there are issues, it's likely
// that gas estimation has failed because the message is reverting. This is defined behavior,
// so we just log the error and move onto the next tick.
let tx_cost_estimate = match self
.inbox_contracts
.validator_manager
.process_estimate_costs(&msg.checkpoint, &msg.committed_message.message, &msg.proof)
.await
{
Ok(tx_cost_estimate) => tx_cost_estimate,
Err(err) => {
info!(msg=?msg, error=?err, "Error estimating process costs");
return Ok(MessageStatus::None);
}
};
// If the gas payment requirement hasn't been met, move to the next tick.
let (meets_gas_requirement, gas_payment) = self
.gas_payment_enforcer
.message_meets_gas_payment_requirement(msg.leaf_index)?;
.message_meets_gas_payment_requirement(&msg.committed_message, &tx_cost_estimate)
.await?;
if !meets_gas_requirement {
tracing::info!(msg_leaf_index=msg.leaf_index, gas_payment=?gas_payment, "Gas payment requirement not met yet");
return Ok(());
tracing::info!(gas_payment=?gas_payment, "Gas payment requirement not met yet");
return Ok(MessageStatus::None);
}
// Go ahead and attempt processing of message to destination chain.
debug!(gas_payment=?gas_payment, msg=?msg, "Ready to process message");
// TODO: consider differentiating types of processing errors, and pushing to the front of the
// run queue for intermittent types of errors that can occur even if a message's processing isn't
// reverting, e.g. timeouts or txs being dropped from the mempool. To avoid consistently retrying
// only these messages, the number of retries could be considered.
match self.process_message(&msg).await {
Ok(()) => {
info!(msg=?msg, "Message processed");
}
Err(e) => {
info!(msg=?msg, leaf_index=msg.leaf_index, error=?e, "Message processing failed");
msg.num_retries += 1;
self.run_queue.push_back(msg);
}
}
Ok(())
}
// TODO(webbhorn): Move the process() call below into a function defined over SubmitMessageArgs
// or wrapped Schedulable(SubmitMessageArgs) so that we can fake submit in test.
// TODO(webbhorn): Instead of immediately marking as processed, move to a verification
// queue, which will wait for finality and indexing by the inbox indexer and then mark
// as processed (or eventually retry if no confirmation is ever seen).
async fn process_message(&mut self, msg: &SubmitMessageArgs) -> Result<()> {
let result = self
// We use the estimated gas limit from the prior call to `process_estimate_costs` to
// avoid a second gas estimation.
let process_result = self
.inbox_contracts
.validator_manager
.process(&msg.checkpoint, &msg.committed_message.message, &msg.proof)
.await?;
self.record_message_process_success(msg)?;
info!(leaf_index=?msg.leaf_index, hash=?result.txid,
wq_sz=?self.wait_queue.len(), rq_sz=?self.run_queue.len(),
"Message successfully processed");
Ok(())
.process(
&msg.checkpoint,
&msg.committed_message.message,
&msg.proof,
Some(tx_cost_estimate.gas_limit),
)
.await;
match process_result {
// TODO(trevor): Instead of immediately marking as processed, move to a verification
// queue, which will wait for finality and indexing by the inbox indexer and then mark
// as processed (or eventually retry if no confirmation is ever seen).
// Only mark the message as processed if the transaction didn't revert.
Ok(outcome) if outcome.executed => {
info!(hash=?outcome.txid,
wq_sz=?self.wait_queue.len(), rq_sz=?self.run_queue.len(),
"Message successfully processed by transaction");
Ok(MessageStatus::Processed)
}
Ok(outcome) => {
info!(hash=?outcome.txid, "Transaction attempting to process transaction reverted");
Ok(MessageStatus::None)
}
Err(e) => Err(e.into()),
}
}
/// Record in AbacusDB and various metrics that this process has observed the successful

@ -14,7 +14,7 @@ use abacus_base::{
};
use abacus_core::{AbacusContract, MultisigSignedCheckpoint, Signers};
use crate::msg::gas_payment_enforcer::GasPaymentEnforcer;
use crate::msg::gas_payment::GasPaymentEnforcer;
use crate::msg::gelato_submitter::{GelatoSubmitter, GelatoSubmitterMetrics};
use crate::msg::processor::{MessageProcessor, MessageProcessorMetrics};
use crate::msg::serial_submitter::SerialSubmitter;

@ -12,7 +12,13 @@ pub enum GasPaymentEnforcementPolicy {
/// No requirement - all messages are processed regardless of gas payment
None,
/// Messages that have paid a minimum amount will be processed
Minimum { payment: U256 },
Minimum {
payment: U256,
},
MeetsEstimatedCost {
coingeckoapikey: Option<String>,
},
}
decl_settings!(Relayer {

@ -1,11 +1,10 @@
use std::time;
use std::time::UNIX_EPOCH;
// use abacus_base::chain::domain_id_from_name;
use sea_orm::prelude::DateTime;
use sea_orm_migration::prelude::*;
use abacus_core::domain_from_chain;
#[derive(DeriveMigrationName)]
pub struct Migration;
@ -68,7 +67,8 @@ impl MigrationTrait for Migration {
};
EntityTrait::insert(domain::ActiveModel {
id: Set(domain_from_chain(domain.0).expect("Unknown chain name")),
// id: Set(domain_id_from_name(domain.0).expect("Unknown chain name")),
id: Set(1u32),
time_created: Set(now),
time_updated: Set(now),
name: Set(domain.0.to_owned()),

@ -4,10 +4,12 @@
use std::fmt::Display;
use std::sync::Arc;
use abacus_core::TxCostEstimate;
use async_trait::async_trait;
use ethers::abi::AbiEncode;
use ethers::prelude::*;
use eyre::Result;
use ethers_contract::builders::ContractCall;
use eyre::{eyre, Result};
use abacus_core::{
accumulator::merkle::Proof, AbacusMessage, ChainCommunicationError, ContractLocator, Encode,
@ -98,32 +100,37 @@ where
multisig_signed_checkpoint: &MultisigSignedCheckpoint,
message: &AbacusMessage,
proof: &Proof,
tx_gas_limit: Option<U256>,
) -> Result<TxOutcome, ChainCommunicationError> {
let mut sol_proof: [[u8; 32]; 32] = Default::default();
sol_proof
.iter_mut()
.enumerate()
.for_each(|(i, elem)| *elem = proof.path[i].to_fixed_bytes());
let tx = self.contract.process(
self.inbox_address,
multisig_signed_checkpoint.checkpoint.root.to_fixed_bytes(),
multisig_signed_checkpoint.checkpoint.index.into(),
multisig_signed_checkpoint
.signatures
.iter()
.map(|s| s.to_vec().into())
.collect(),
message.to_vec().into(),
sol_proof,
proof.index.into(),
);
let gas = tx.estimate_gas().await?.saturating_add(U256::from(100000));
let gassed = tx.gas(gas);
let receipt = report_tx(gassed).await?;
let contract_call = self
.process_contract_call(multisig_signed_checkpoint, message, proof, tx_gas_limit)
.await?;
let receipt = report_tx(contract_call).await?;
Ok(receipt.into())
}
async fn process_estimate_costs(
&self,
multisig_signed_checkpoint: &MultisigSignedCheckpoint,
message: &AbacusMessage,
proof: &Proof,
) -> Result<TxCostEstimate> {
let contract_call = self
.process_contract_call(multisig_signed_checkpoint, message, proof, None)
.await?;
let gas_limit = contract_call
.tx
.gas()
.ok_or_else(|| eyre!("Expected gas limit for process contract call"))?;
let gas_price = self.provider.get_gas_price().await?;
Ok(TxCostEstimate {
gas_limit: *gas_limit,
gas_price,
})
}
fn process_calldata(
&self,
multisig_signed_checkpoint: &MultisigSignedCheckpoint,
@ -157,3 +164,44 @@ where
self.contract.address().into()
}
}
impl<M> EthereumInboxValidatorManager<M>
where
M: Middleware + 'static,
{
/// Returns a ContractCall that processes the provided message.
/// If the provided tx_gas_limit is None, gas estimation occurs.
async fn process_contract_call(
&self,
multisig_signed_checkpoint: &MultisigSignedCheckpoint,
message: &AbacusMessage,
proof: &Proof,
tx_gas_limit: Option<U256>,
) -> Result<ContractCall<M, ()>, ChainCommunicationError> {
let mut sol_proof: [[u8; 32]; 32] = Default::default();
sol_proof
.iter_mut()
.enumerate()
.for_each(|(i, elem)| *elem = proof.path[i].to_fixed_bytes());
let tx = self.contract.process(
self.inbox_address,
multisig_signed_checkpoint.checkpoint.root.to_fixed_bytes(),
multisig_signed_checkpoint.checkpoint.index.into(),
multisig_signed_checkpoint
.signatures
.iter()
.map(|s| s.to_vec().into())
.collect(),
message.to_vec().into(),
sol_proof,
proof.index.into(),
);
let gas_limit = if let Some(gas_limit) = tx_gas_limit {
gas_limit
} else {
tx.estimate_gas().await?.saturating_add(U256::from(100000))
};
Ok(tx.gas(gas_limit))
}
}

@ -6,7 +6,7 @@ use serde_repr::Serialize_repr;
use std::fmt;
// Each chain and chain ID supported by Abacus
#[derive(Debug, Clone, Copy, Eq, PartialEq, Serialize_repr)]
#[derive(Debug, Clone, Copy, Eq, PartialEq, Serialize_repr, Hash)]
#[repr(u64)]
pub enum Chain {
Ethereum = 1,

@ -30,6 +30,9 @@ spec:
AWS_ACCESS_KEY_ID: {{ print "'{{ .aws_access_key_id | toString }}'" }}
AWS_SECRET_ACCESS_KEY: {{ print "'{{ .aws_secret_access_key | toString }}'" }}
{{- end }}
{{- if eq .Values.abacus.relayer.config.gasPaymentEnforcementPolicy.type "meetsEstimatedCost" }}
ABC_RELAYER_GASPAYMENTENFORCEMENTPOLICY_COINGECKOAPIKEY: {{ print "'{{ .coingecko_api_key | toString }}'" }}
{{- end }}
data:
{{- range .Values.abacus.relayer.signers }}
{{- if eq .keyConfig.type "hexKey" }}
@ -47,4 +50,9 @@ spec:
remoteRef:
key: {{ printf "%s-%s-%s-relayer-aws-secret-access-key" .Values.abacus.context .Values.abacus.runEnv .Values.abacus.outboxChain.name }}
{{- end }}
{{- if eq .Values.abacus.relayer.config.gasPaymentEnforcementPolicy.type "meetsEstimatedCost" }}
- secretKey: coingecko_api_key
remoteRef:
key: {{ printf "%s-coingecko-api-key" .Values.abacus.runEnv }}
{{- end }}
{{- end }}

@ -26,6 +26,7 @@ async function helmValuesForChain<Chain extends ChainName>(
const chainAgentConfig = new ChainAgentConfig(agentConfig, chainName);
const gelatoApiKeyRequired =
await chainAgentConfig.ensureGelatoApiKeySecretExistsIfRequired();
await chainAgentConfig.ensureCoingeckoApiKeySecretExistsIfRequired();
// By default, if a context only enables a subset of chains, the
// connection url (or urls, when HttpQuorum is used) are not fetched

@ -100,6 +100,7 @@ interface MatchingListElement {
export enum GasPaymentEnforcementPolicyType {
None = 'none',
Minimum = 'minimum',
MeetsEstimatedCost = 'meetsEstimatedCost',
}
export type GasPaymentEnforcementPolicy =
@ -109,6 +110,9 @@ export type GasPaymentEnforcementPolicy =
| {
type: GasPaymentEnforcementPolicyType.Minimum;
payment: string | number;
}
| {
type: GasPaymentEnforcementPolicyType.MeetsEstimatedCost;
};
// Incomplete basic relayer agent config
@ -477,6 +481,24 @@ export class ChainAgentConfig<Chain extends ChainName> {
return true;
}
async ensureCoingeckoApiKeySecretExistsIfRequired() {
// The CoinGecko API Key is only needed when using the "MeetsEstimatedCost" policy.
if (
this.relayerConfig?.gasPaymentEnforcementPolicy.type !==
GasPaymentEnforcementPolicyType.MeetsEstimatedCost
) {
return;
}
// Check to see if the Gelato API key exists in GCP secret manager - throw if it doesn't
const secretName = `${this.agentConfig.runEnv}-coingecko-api-key`;
const secretExists = await gcpSecretExists(secretName);
if (!secretExists) {
throw Error(
`Expected CoinGecko API Key GCP Secret named ${secretName} to exist, have you created it?`,
);
}
}
transactionSubmissionType(chain: Chain): TransactionSubmissionType {
if (this.agentConfig.gelato?.enabledChains.includes(chain)) {
return TransactionSubmissionType.Gelato;

Loading…
Cancel
Save