diff --git a/rust/Cargo.lock b/rust/Cargo.lock index 6a62ddee8..689772c54 100644 --- a/rust/Cargo.lock +++ b/rust/Cargo.lock @@ -70,11 +70,14 @@ dependencies = [ "lazy_static", "maplit", "num", + "num-derive", "num-traits", "rocksdb", "serde", "serde_json", "sha3 0.9.1", + "strum", + "strum_macros", "thiserror", "tokio", "tracing", @@ -758,9 +761,12 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "bfd4d1b31faaa3a89d7934dbded3111da0d2ef28e3ebccdb4f0179f5929d1ef1" dependencies = [ "iana-time-zone", + "js-sys", "num-integer", "num-traits", "serde", + "time 0.1.44", + "wasm-bindgen", "winapi", ] @@ -829,6 +835,18 @@ dependencies = [ "os_str_bytes", ] +[[package]] +name = "coingecko" +version = "1.0.1" +source = "git+https://github.com/hyperlane-xyz/coingecko-rs?tag=2022-09-14-02#63dfc9d6a8b92516209ee9dc01e154d11b63b0ab" +dependencies = [ + "chrono", + "reqwest", + "serde", + "serde_json", + "tokio", +] + [[package]] name = "coins-bip32" version = "0.7.0" @@ -1947,7 +1965,7 @@ dependencies = [ "cfg-if", "js-sys", "libc", - "wasi", + "wasi 0.11.0+wasi-snapshot-preview1", "wasm-bindgen", ] @@ -2561,7 +2579,7 @@ checksum = "57ee1c23c7c63b0c9250c339ffdc69255f110b298b901b9f6c82547b7b87caaf" dependencies = [ "libc", "log", - "wasi", + "wasi 0.11.0+wasi-snapshot-preview1", "windows-sys", ] @@ -2692,6 +2710,17 @@ dependencies = [ "serde", ] +[[package]] +name = "num-derive" +version = "0.3.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "876a53fff98e03a936a674b29568b0e605f06b29372c2489ff4de23f1949743d" +dependencies = [ + "proc-macro2", + "quote", + "syn", +] + [[package]] name = "num-integer" version = "0.1.45" @@ -3254,7 +3283,7 @@ dependencies = [ "postgres-protocol", "serde", "serde_json", - "time", + "time 0.3.13", "uuid 1.1.2", ] @@ -3483,6 +3512,7 @@ dependencies = [ "abacus-ethereum", "abacus-test", "async-trait", + "coingecko", "color-eyre", "config", "ethers", @@ -3494,6 +3524,7 @@ dependencies = [ "reqwest", "serde", "serde_json", + "strum", "thiserror", "tokio", "tokio-test", @@ -3933,7 +3964,7 @@ dependencies = [ "serde", "serde_json", "sqlx", - "time", + "time 0.3.13", "tracing", "url", "uuid 1.1.2", @@ -3997,7 +4028,7 @@ dependencies = [ "sea-query-derive", "sea-query-driver", "serde_json", - "time", + "time 0.3.13", "uuid 1.1.2", ] @@ -4412,7 +4443,7 @@ dependencies = [ "sqlx-rt", "stringprep", "thiserror", - "time", + "time 0.3.13", "tokio-stream", "url", "uuid 1.1.2", @@ -4604,6 +4635,17 @@ dependencies = [ "threadpool", ] +[[package]] +name = "time" +version = "0.1.44" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "6db9e6914ab8b1ae1c260a4ae7a49b6c5611b40328a735b21862567685e73255" +dependencies = [ + "libc", + "wasi 0.10.0+wasi-snapshot-preview1", + "winapi", +] + [[package]] name = "time" version = "0.3.13" @@ -4659,6 +4701,7 @@ dependencies = [ "mio", "num_cpus", "once_cell", + "parking_lot 0.12.1", "pin-project-lite", "signal-hook-registry", "socket2", @@ -5209,6 +5252,12 @@ dependencies = [ "tracing", ] +[[package]] +name = "wasi" +version = "0.10.0+wasi-snapshot-preview1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1a143597ca7c7793eff794def352d41792a93c481eb1042423ff7ff72ba2c31f" + [[package]] name = "wasi" version = "0.11.0+wasi-snapshot-preview1" diff --git a/rust/abacus-base/src/contract_sync/outbox.rs b/rust/abacus-base/src/contract_sync/outbox.rs index afd26ba04..3990c5a3d 100644 --- a/rust/abacus-base/src/contract_sync/outbox.rs +++ b/rust/abacus-base/src/contract_sync/outbox.rs @@ -5,7 +5,7 @@ use tokio::time::sleep; use tracing::{debug, info, info_span, warn}; use tracing::{instrument::Instrumented, Instrument}; -use abacus_core::{chain_from_domain, CommittedMessage, ListValidity, OutboxIndexer}; +use abacus_core::{name_from_domain_id, CommittedMessage, ListValidity, OutboxIndexer}; use crate::{ contract_sync::{last_message::OptLatestLeafIndex, schema::OutboxContractSyncDB}, @@ -159,10 +159,10 @@ where for raw_msg in sorted_messages.iter() { let dst = CommittedMessage::try_from(raw_msg) .ok() - .and_then(|msg| chain_from_domain(msg.message.destination)) - .unwrap_or("unknown"); + .and_then(|msg| name_from_domain_id(msg.message.destination)) + .unwrap_or_else(|| "unknown".into()); message_leaf_index - .with_label_values(&["dispatch", &chain_name, dst]) + .with_label_values(&["dispatch", &chain_name, &dst]) .set(max_leaf_index_of_batch as i64); } diff --git a/rust/abacus-core/Cargo.toml b/rust/abacus-core/Cargo.toml index 83a4d33a8..1a95d2a0d 100644 --- a/rust/abacus-core/Cargo.toml +++ b/rust/abacus-core/Cargo.toml @@ -17,7 +17,6 @@ sha3 = "0.9.1" lazy_static = "*" thiserror = "*" async-trait = { version = "0.1", default-features = false } -num-traits = "0.2" maplit = "1.0" tokio = { version = "1", features = ["rt", "macros"] } tracing = "0.1" @@ -28,6 +27,10 @@ eyre = "0.6" rocksdb = "0.18" bytes = { version = "1", features = ["serde"]} num = {version="0", features=["serde"]} +num-traits = "0.2" +num-derive = "0.3" +strum = "0.24" +strum_macros = "0.24" [dev-dependencies] abacus-base = { path = "../abacus-base" } diff --git a/rust/abacus-core/src/chain.rs b/rust/abacus-core/src/chain.rs index adb22eed0..046578b7c 100644 --- a/rust/abacus-core/src/chain.rs +++ b/rust/abacus-core/src/chain.rs @@ -1,7 +1,12 @@ #![allow(missing_docs)] +use std::str::FromStr; + use eyre::Result; +use num_derive::FromPrimitive; +use num_traits::FromPrimitive; use serde::{Deserialize, Serialize}; +use strum::{EnumIter, EnumString}; #[derive(Debug, Clone, Deserialize, Serialize)] pub struct Address(pub bytes::Bytes); @@ -49,53 +54,134 @@ impl From<&'_ Address> for ethers::types::H160 { } } -/// Quick single-use macro to prevent typing domain and chain twice and risking -/// inconsistencies. -macro_rules! domain_and_chain { - {$($domain:literal <=> $chain:literal,)*} => { - /// Get the chain name from a domain id. Returns `None` if the `domain` is unknown. - pub fn chain_from_domain(domain: u32) -> Option<&'static str> { - match domain { - $( $domain => Some($chain), )* - _ => None - } - } +/// All domains supported by Abacus. +#[derive(FromPrimitive, EnumString, strum::Display, EnumIter, PartialEq, Eq, Debug)] +#[strum(serialize_all = "lowercase")] +pub enum AbacusDomain { + /// Ethereum mainnet domain ID, decimal ID 6648936 + Ethereum = 0x657468, + /// Ethereum testnet Goerli domain ID + Goerli = 5, + /// Ethereum testnet Kovan domain ID + Kovan = 3000, + + /// Polygon mainnet domain ID, decimal ID 1886350457 + Polygon = 0x706f6c79, + /// Polygon testnet Mumbai domain ID + Mumbai = 80001, + + /// Avalanche mainnet domain ID, decimal ID 1635148152 + Avalanche = 0x61766178, + /// Avalanche testnet Fuji domain ID + Fuji = 43113, + + /// Arbitrum mainnet domain ID, decimal ID 6386274 + Arbitrum = 0x617262, + /// Arbitrum testnet ArbitrumRinkeby domain ID, decimal ID 1634872690 + ArbitrumRinkeby = 0x61722d72, + + /// Optimism mainnet domain ID, decimal ID 28528 + Optimism = 0x6f70, + /// Optimism testnet OptimismKovan domain ID, decimal ID 1869622635 + OptimismKovan = 0x6f702d6b, + + /// BSC mainnet domain ID, decimal ID 6452067 + #[strum(serialize = "bsc")] + BinanceSmartChain = 0x627363, + /// BSC testnet, decimal ID 1651715444 + #[strum(serialize = "bsctestnet")] + BinanceSmartChainTestnet = 0x62732d74, + + /// Celo domain ID, decimal ID 1667591279 + Celo = 0x63656c6f, + /// Celo testnet Alfajores domain ID + Alfajores = 1000, + + /// Moonbeam testnet MoonbaseAlpha domain ID, decimal ID 1836002657 + MoonbaseAlpha = 0x6d6f2d61, + + // -- Local test chains -- + /// Test1 local chain + Test1 = 13371, + /// Test2 local chain + Test2 = 13372, + /// Test3 local chain + Test3 = 13373, +} + +impl From for u32 { + fn from(domain: AbacusDomain) -> Self { + domain as u32 + } +} + +impl TryFrom for AbacusDomain { + type Error = eyre::Error; + + fn try_from(domain_id: u32) -> Result { + FromPrimitive::from_u32(domain_id) + .ok_or_else(|| eyre::eyre!("Unknown domain ID {domain_id}")) + } +} + +/// Types of Abacus domains. +pub enum AbacusDomainType { + /// A mainnet. + Mainnet, + /// A testnet. + Testnet, + /// A local chain for testing (i.e. Hardhat node). + LocalTestChain, +} + +impl AbacusDomain { + pub fn domain_type(&self) -> AbacusDomainType { + match self { + AbacusDomain::Ethereum => AbacusDomainType::Mainnet, + AbacusDomain::Goerli => AbacusDomainType::Testnet, + AbacusDomain::Kovan => AbacusDomainType::Testnet, + + AbacusDomain::Polygon => AbacusDomainType::Mainnet, + AbacusDomain::Mumbai => AbacusDomainType::Testnet, + + AbacusDomain::Avalanche => AbacusDomainType::Mainnet, + AbacusDomain::Fuji => AbacusDomainType::Testnet, + + AbacusDomain::Arbitrum => AbacusDomainType::Mainnet, + AbacusDomain::ArbitrumRinkeby => AbacusDomainType::Testnet, + + AbacusDomain::Optimism => AbacusDomainType::Mainnet, + AbacusDomain::OptimismKovan => AbacusDomainType::Testnet, - /// Get the domain id from a chain name. Expects `chain` to be a lowercase str. - /// Returns `None` if the `chain` is unknown. - pub fn domain_from_chain(chain: &str) -> Option { - match chain { - $( $chain => Some($domain), )* - _ => None - } + AbacusDomain::BinanceSmartChain => AbacusDomainType::Mainnet, + AbacusDomain::BinanceSmartChainTestnet => AbacusDomainType::Testnet, + + AbacusDomain::Celo => AbacusDomainType::Mainnet, + AbacusDomain::Alfajores => AbacusDomainType::Testnet, + + AbacusDomain::MoonbaseAlpha => AbacusDomainType::Testnet, + + AbacusDomain::Test1 => AbacusDomainType::LocalTestChain, + AbacusDomain::Test2 => AbacusDomainType::LocalTestChain, + AbacusDomain::Test3 => AbacusDomainType::LocalTestChain, } } } -// The unit test in this file `tests::json_mappings_match_code_map` -// tries to ensure some stability between the {chain} X {domain} -// mapping below with the agent configuration file. -domain_and_chain! { - 0x63656c6f <=> "celo", - 0x657468 <=> "ethereum", - 0x61766178 <=> "avalanche", - 0x706f6c79 <=> "polygon", - 1000 <=> "alfajores", - 43113 <=> "fuji", - 5 <=> "goerli", - 3000 <=> "kovan", - 80001 <=> "mumbai", - 6386274 <=> "arbitrum", - 6452067 <=> "bsc", - 28528 <=> "optimism", - 13371 <=> "test1", - 13372 <=> "test2", - 13373 <=> "test3", - 0x62732d74 <=> "bsctestnet", - 0x61722d72 <=> "arbitrumrinkeby", - 0x6f702d6b <=> "optimismkovan", - 0x61752d74 <=> "auroratestnet", - 0x6d6f2d61 <=> "moonbasealpha", +/// Gets the name of the chain from a domain id. +/// Returns None if the domain ID is not recognized. +pub fn name_from_domain_id(domain_id: u32) -> Option { + AbacusDomain::try_from(domain_id) + .ok() + .map(|domain| domain.to_string()) +} + +/// Gets the domain ID of the chain its name. +/// Returns None if the chain name is not recognized. +pub fn domain_id_from_name(name: &'static str) -> Option { + AbacusDomain::from_str(name) + .ok() + .map(|domain| domain.into()) } #[cfg(test)] @@ -106,8 +192,11 @@ mod tests { use std::collections::BTreeSet; use std::fs::read_to_string; use std::path::Path; + use std::str::FromStr; use walkdir::WalkDir; + use crate::{domain_id_from_name, name_from_domain_id, AbacusDomain}; + /// Relative path to the `abacus-monorepo/rust/config/` /// directory, which is where the agent's config files /// currently live. @@ -247,12 +336,51 @@ mod tests { // by the macro `domain_and_chain` is complete // and in agreement with our on-disk json-based // configuration data. + for ChainCoordinate { name, domain } in inbox_coords.iter().chain(outbox_coords.iter()) { assert_eq!( - super::chain_from_domain(domain.to_owned()).unwrap(), + AbacusDomain::try_from(domain.to_owned()) + .unwrap() + .to_string(), name.to_owned() ); - assert_eq!(super::domain_from_chain(name).unwrap(), domain.to_owned()); + assert_eq!( + u32::from(AbacusDomain::from_str(name).unwrap()), + domain.to_owned() + ); } } + + #[test] + fn domain_strings() { + assert_eq!( + AbacusDomain::from_str("ethereum").unwrap(), + AbacusDomain::Ethereum, + ); + assert_eq!(AbacusDomain::Ethereum.to_string(), "ethereum".to_string(),); + } + + #[test] + fn domain_ids() { + assert_eq!( + AbacusDomain::try_from(0x657468u32).unwrap(), + AbacusDomain::Ethereum, + ); + + assert_eq!(u32::from(AbacusDomain::Ethereum), 0x657468u32,); + } + + #[test] + fn test_name_from_domain_id() { + assert_eq!(name_from_domain_id(0x657468u32), Some("ethereum".into()),); + + assert_eq!(name_from_domain_id(0xf00u32), None,); + } + + #[test] + fn test_domain_id_from_name() { + assert_eq!(domain_id_from_name("ethereum"), Some(0x657468u32),); + + assert_eq!(domain_id_from_name("foo"), None,); + } } diff --git a/rust/abacus-core/src/traits/validator_manager.rs b/rust/abacus-core/src/traits/validator_manager.rs index 1bdce74f9..23048ac44 100644 --- a/rust/abacus-core/src/traits/validator_manager.rs +++ b/rust/abacus-core/src/traits/validator_manager.rs @@ -2,12 +2,13 @@ use std::fmt::Debug; use async_trait::async_trait; use auto_impl::auto_impl; +use ethers::types::U256; use eyre::Result; use crate::{ accumulator::merkle::Proof, traits::{ChainCommunicationError, TxOutcome}, - AbacusMessage, Address, MultisigSignedCheckpoint, + AbacusMessage, Address, MultisigSignedCheckpoint, TxCostEstimate, }; /// Interface for an InboxValidatorManager @@ -20,8 +21,17 @@ pub trait InboxValidatorManager: Send + Sync + Debug { multisig_signed_checkpoint: &MultisigSignedCheckpoint, message: &AbacusMessage, proof: &Proof, + tx_gas_limit: Option, ) -> Result; + /// Estimate transaction costs to process a message. + async fn process_estimate_costs( + &self, + multisig_signed_checkpoint: &MultisigSignedCheckpoint, + message: &AbacusMessage, + proof: &Proof, + ) -> Result; + /// Get the calldata for a transaction to process a message with a proof /// against the provided signed checkpoint fn process_calldata( diff --git a/rust/abacus-core/src/types/mod.rs b/rust/abacus-core/src/types/mod.rs index 2c8f30121..e481197a9 100644 --- a/rust/abacus-core/src/types/mod.rs +++ b/rust/abacus-core/src/types/mod.rs @@ -63,3 +63,12 @@ pub struct InterchainGasPaymentWithMeta { /// Metadata for the payment pub meta: InterchainGasPaymentMeta, } + +/// A cost estimate for a transaction. +#[derive(Clone, Debug)] +pub struct TxCostEstimate { + /// The gas limit for the transaction. + pub gas_limit: U256, + /// The gas price for the transaction. + pub gas_price: U256, +} diff --git a/rust/agents/relayer/Cargo.toml b/rust/agents/relayer/Cargo.toml index ddd238f72..4d92ea101 100644 --- a/rust/agents/relayer/Cargo.toml +++ b/rust/agents/relayer/Cargo.toml @@ -5,6 +5,7 @@ edition = "2021" [dependencies] tokio = { version = "1", features = ["rt", "macros"] } +coingecko = { git = "https://github.com/hyperlane-xyz/coingecko-rs", tag = "2022-09-14-02" } config = "0.13" color-eyre = { version = "0.6", optional = true } serde = {version = "1.0", features = ["derive"]} @@ -16,6 +17,7 @@ async-trait = { version = "0.1", default-features = false } futures-util = "0.3" eyre = "0.6" reqwest = { version = "0", features = ["json"]} +strum = "0.24" tracing = "0.1" tracing-futures = "0.2" tracing-subscriber = "0.3" diff --git a/rust/agents/relayer/src/msg/gas_payment/mod.rs b/rust/agents/relayer/src/msg/gas_payment/mod.rs new file mode 100644 index 000000000..1fc53c113 --- /dev/null +++ b/rust/agents/relayer/src/msg/gas_payment/mod.rs @@ -0,0 +1,71 @@ +use std::fmt::Debug; + +use abacus_core::{ + db::{AbacusDB, DbError}, + CommittedMessage, TxCostEstimate, +}; +use async_trait::async_trait; +use ethers::types::U256; +use eyre::Result; + +use crate::settings::GasPaymentEnforcementPolicy; + +use self::policies::{ + GasPaymentPolicyMeetsEstimatedCost, GasPaymentPolicyMinimum, GasPaymentPolicyNone, +}; + +mod policies; + +#[async_trait] +pub trait GasPaymentPolicy: Debug + Send + Sync { + async fn message_meets_gas_payment_requirement( + &self, + message: &CommittedMessage, + current_payment: &U256, + tx_cost_estimate: &TxCostEstimate, + ) -> Result; +} + +#[derive(Debug)] +pub struct GasPaymentEnforcer { + policy: Box, + db: AbacusDB, +} + +impl GasPaymentEnforcer { + pub fn new(policy_config: GasPaymentEnforcementPolicy, db: AbacusDB) -> Self { + let policy: Box = match policy_config { + GasPaymentEnforcementPolicy::None => Box::new(GasPaymentPolicyNone::new()), + GasPaymentEnforcementPolicy::Minimum { payment } => { + Box::new(GasPaymentPolicyMinimum::new(payment)) + } + GasPaymentEnforcementPolicy::MeetsEstimatedCost { coingeckoapikey } => { + Box::new(GasPaymentPolicyMeetsEstimatedCost::new(coingeckoapikey)) + } + }; + + Self { policy, db } + } +} + +impl GasPaymentEnforcer { + /// Returns (gas payment requirement met, current payment according to the DB) + pub async fn message_meets_gas_payment_requirement( + &self, + message: &CommittedMessage, + tx_cost_estimate: &TxCostEstimate, + ) -> Result<(bool, U256)> { + let current_payment = self.get_message_gas_payment(message.leaf_index)?; + + let meets_requirement = self + .policy + .message_meets_gas_payment_requirement(message, ¤t_payment, tx_cost_estimate) + .await?; + + Ok((meets_requirement, current_payment)) + } + + fn get_message_gas_payment(&self, msg_leaf_index: u32) -> Result { + self.db.retrieve_gas_payment_for_leaf(msg_leaf_index) + } +} diff --git a/rust/agents/relayer/src/msg/gas_payment/policies/meets_estimated_cost.rs b/rust/agents/relayer/src/msg/gas_payment/policies/meets_estimated_cost.rs new file mode 100644 index 000000000..6a39dc145 --- /dev/null +++ b/rust/agents/relayer/src/msg/gas_payment/policies/meets_estimated_cost.rs @@ -0,0 +1,382 @@ +use std::{ + collections::HashMap, + time::{Duration, Instant}, +}; + +use abacus_core::{AbacusDomain, CommittedMessage, TxCostEstimate}; +use async_trait::async_trait; +use coingecko::CoinGeckoClient; +use ethers::types::U256; +use eyre::{eyre, Result}; +use tokio::sync::RwLock; + +use crate::msg::gas_payment::GasPaymentPolicy; + +const CACHE_TTL_SECONDS: u64 = 60; +/// 1 / 100th of a cent +const FIXED_POINT_PRECISION: usize = 1000; + +#[derive(Debug)] +struct CachedValue { + created_at: Instant, + value: T, +} + +impl From for CachedValue { + fn from(value: T) -> Self { + Self { + created_at: Instant::now(), + value, + } + } +} + +/// Given a domain, gets the CoinGecko ID for the native token. +/// If the domain isn't a mainnet (and therefore doesn't have a native +/// token with a CoinGecko ID), an Err is returned. +fn abacus_domain_id_to_native_token_coingecko_id(domain_id: u32) -> Result<&'static str> { + let abacus_domain = AbacusDomain::try_from(domain_id)?; + + Ok(match abacus_domain { + AbacusDomain::Ethereum => "ethereum", + AbacusDomain::Polygon => "matic-network", + AbacusDomain::Avalanche => "avalanche-2", + // Arbitrum's native token is Ethereum + AbacusDomain::Arbitrum => "ethereum", + // Optimism's native token is Ethereum + AbacusDomain::Optimism => "ethereum", + AbacusDomain::BinanceSmartChain => "binancecoin", + AbacusDomain::Celo => "celo", + _ => eyre::bail!("No CoinGecko ID for domain {abacus_domain}"), + }) +} + +/// Gets prices from CoinGecko quoted in USD, caching them with a TTL. +#[derive(Default)] +struct CoinGeckoCachingPriceGetter { + coingecko: CoinGeckoClient, + cache_ttl: Duration, + /// Keyed by CoinGecko API ID. RwLock to be thread-safe. + cached_usd_prices: RwLock>>, +} + +impl std::fmt::Debug for CoinGeckoCachingPriceGetter { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + write!(f, "CoinGeckoCachingPriceGetter {{ .. }}",) + } +} + +impl CoinGeckoCachingPriceGetter { + pub fn new(cache_ttl: Duration, coingecko_api_key: Option) -> Self { + let coingecko = if let Some(api_key) = coingecko_api_key { + CoinGeckoClient::new_with_key("https://pro-api.coingecko.com/api/v3".into(), api_key) + } else { + CoinGeckoClient::new("https://api.coingecko.com/api/v3".into()) + }; + + Self { + cache_ttl, + coingecko, + cached_usd_prices: RwLock::default(), + } + } + + async fn get_cached_usd_price(&self, coingecko_id: &'static str) -> Option { + let cached_usd_prices = self.cached_usd_prices.read().await; + + if let Some(cached_value) = cached_usd_prices.get(coingecko_id) { + if cached_value.created_at.elapsed() <= self.cache_ttl { + return Some(cached_value.value); + } + } + + None + } + + async fn set_cached_usd_price(&self, coingecko_id: &'static str, usd_price: f64) { + let mut cached_usd_prices = self.cached_usd_prices.write().await; + cached_usd_prices.insert(coingecko_id, usd_price.into()); + } + + async fn get_usd_price(&self, coingecko_id: &'static str) -> Result { + if let Some(usd_price) = self.get_cached_usd_price(coingecko_id).await { + return Ok(usd_price); + } + + // Returns a HashMap keyed by coingecko IDs + let api_response = self + .coingecko + .price(&[coingecko_id], &["usd"], false, false, false, false) + .await?; + let usd_price = api_response + .get(coingecko_id) + .and_then(|p| p.usd) + .ok_or_else(|| { + eyre!( + "Unable to get USD price for {} from CoinGecko API response", + coingecko_id + ) + })?; + + self.set_cached_usd_price(coingecko_id, usd_price).await; + + Ok(usd_price) + } +} + +#[derive(Debug)] +pub struct GasPaymentPolicyMeetsEstimatedCost { + coingecko_price_getter: CoinGeckoCachingPriceGetter, +} + +impl GasPaymentPolicyMeetsEstimatedCost { + pub fn new(coingecko_api_key: Option) -> Self { + Self { + coingecko_price_getter: CoinGeckoCachingPriceGetter::new( + Duration::from_secs(CACHE_TTL_SECONDS), + coingecko_api_key, + ), + } + } + + async fn get_native_token_usd_price(&self, domain: u32) -> Result { + let coingecko_id = abacus_domain_id_to_native_token_coingecko_id(domain)?; + self.coingecko_price_getter + .get_usd_price(coingecko_id) + .await + } + + async fn convert_native_tokens( + &self, + amount: U256, + from_domain: u32, + to_domain: u32, + ) -> Result { + convert_tokens( + amount, + self.get_native_token_usd_price(from_domain).await?, + self.get_native_token_usd_price(to_domain).await?, + ) + .ok_or_else(|| { + eyre!( + "Unable to convert {} native tokens from {} to {}", + amount, + from_domain, + to_domain + ) + }) + } +} + +#[async_trait] +impl GasPaymentPolicy for GasPaymentPolicyMeetsEstimatedCost { + /// Returns (gas payment requirement met, current payment according to the DB) + async fn message_meets_gas_payment_requirement( + &self, + message: &CommittedMessage, + current_payment: &U256, + tx_cost_estimate: &TxCostEstimate, + ) -> Result { + // Estimated cost of the process tx, quoted in destination native tokens + let destination_token_tx_cost = tx_cost_estimate.gas_limit * tx_cost_estimate.gas_price; + // Convert the destination token tx cost into origin tokens + let origin_token_tx_cost = self + .convert_native_tokens( + destination_token_tx_cost, + message.message.destination, + message.message.origin, + ) + .await?; + + let meets_requirement = *current_payment >= origin_token_tx_cost; + tracing::info!( + message_leaf_index=?message.leaf_index, + tx_cost_estimate=?tx_cost_estimate, + destination_token_tx_cost=?destination_token_tx_cost, + origin_token_tx_cost=?origin_token_tx_cost, + current_payment=?current_payment, + meets_requirement=?meets_requirement, + "Evaluated whether message gas payment meets estimated cost", + ); + + Ok(meets_requirement) + } +} + +fn f64_to_fixed_point(f: f64, precision: usize) -> U256 { + U256::from_f64_lossy(f * precision as f64) +} + +fn convert_tokens(amount: U256, from_price: f64, to_price: f64) -> Option { + let from_price = f64_to_fixed_point(from_price, FIXED_POINT_PRECISION); + let to_price = f64_to_fixed_point(to_price, FIXED_POINT_PRECISION); + + amount + .checked_mul(from_price) + .and_then(|n| n.checked_div(to_price)) +} + +#[tokio::test] +async fn test_gas_payment_policy_meets_estimated_cost() { + use abacus_core::AbacusMessage; + use ethers::types::H256; + + // Using a fake message from Celo -> Polygon, based off + // hardcoded tx cost estimates and prices, assert that a payment + // that doesn't meet the expected costs returns false, and a payment + // that does returns true. + + let celo_price = 5.5f64; + let polygon_price = 11.0f64; + let celo_domain_id = u32::from(AbacusDomain::Celo); + let polygon_domain_id = u32::from(AbacusDomain::Polygon); + + // Take advantage of the coingecko_price_getter caching already-stored values + // by just writing to them directly. + // This is a little sketchy because if the cache TTL does elapse, an API + // request could be made. Because this TTL is 60 seconds, this isn't reasonable. + let policy = GasPaymentPolicyMeetsEstimatedCost::new(None); + { + let mut usd_prices = policy + .coingecko_price_getter + .cached_usd_prices + .write() + .await; + let celo_coingecko_id = + abacus_domain_id_to_native_token_coingecko_id(celo_domain_id).unwrap(); + let polygon_coingecko_id = + abacus_domain_id_to_native_token_coingecko_id(polygon_domain_id).unwrap(); + + usd_prices.insert(celo_coingecko_id, celo_price.into()); + usd_prices.insert(polygon_coingecko_id, polygon_price.into()); + } + + let message = CommittedMessage { + leaf_index: 10u32, + message: AbacusMessage { + origin: celo_domain_id, + destination: polygon_domain_id, + sender: H256::zero(), + recipient: H256::zero(), + body: vec![], + }, + }; + let tx_cost_estimate = TxCostEstimate { + // 1M gas + gas_limit: U256::from(1000000u32), + // 15 gwei + gas_price: ethers::utils::parse_units("15", "gwei").unwrap(), + }; + + // Expected polygon fee: 1M * 15 gwei = 0.015 MATIC + // Converted into Celo, 0.015 MATIC * ($11 / $5.5) = 0.03 CELO + let required_celo_payment = ethers::utils::parse_ether("0.03").unwrap(); + + // Any less than 0.03 CELO as payment, return false. + assert_eq!( + policy + .message_meets_gas_payment_requirement( + &message, + &(required_celo_payment - U256::one()), + &tx_cost_estimate, + ) + .await + .unwrap(), + false, + ); + + // If the payment is at least 0.03 CELO, return true. + assert_eq!( + policy + .message_meets_gas_payment_requirement( + &message, + &required_celo_payment, + &tx_cost_estimate, + ) + .await + .unwrap(), + true, + ); +} + +#[test] +fn test_convert_tokens() { + // A lowish number + + // Converting to a less valuable token + assert_eq!( + convert_tokens( + // 1M + U256::from(1000000), + 20000.0f64, + 2000.0f64, + ), + // 10M + Some(U256::from(10000000)), + ); + + // Converting to a more valuable token + assert_eq!( + convert_tokens( + // 10M + U256::from(10000000), + 2000.0f64, + 20000.0f64, + ), + // 1M + Some(U256::from(1000000)), + ); + + // A higher number + + // Converting to a less valuable token + assert_eq!( + convert_tokens( + // 100 ether + ethers::utils::parse_ether(100u32).unwrap(), + 20000.0f64, + 200.0f64, + ), + // 10000 ether + Some(ethers::utils::parse_ether(10000u32).unwrap()), + ); + + // Converting to a more valuable token + assert_eq!( + convert_tokens( + // 10000 ether + ethers::utils::parse_ether(10000u32).unwrap(), + 200.0f64, + 20000.0f64, + ), + // 100 ether + Some(ethers::utils::parse_ether(100u32).unwrap()), + ); + + // If the to_price is 0 + assert_eq!( + convert_tokens( + // 1M + U256::from(1000000), + 20000.0f64, + 0f64, + ), + None, + ) +} + +#[test] +fn test_abacus_domain_id_to_native_token_coingecko_id() { + use abacus_core::AbacusDomainType; + use strum::IntoEnumIterator; + + // Iterate through all AbacusDomains, ensuring all mainnet domains + // are included in abacus_domain_id_to_native_token_coingecko_id. + for abacus_domain in AbacusDomain::iter() { + if let AbacusDomainType::Mainnet = abacus_domain.domain_type() { + assert!( + abacus_domain_id_to_native_token_coingecko_id(u32::from(abacus_domain)).is_ok() + ); + } + } +} diff --git a/rust/agents/relayer/src/msg/gas_payment/policies/minimum.rs b/rust/agents/relayer/src/msg/gas_payment/policies/minimum.rs new file mode 100644 index 000000000..7f6c68d96 --- /dev/null +++ b/rust/agents/relayer/src/msg/gas_payment/policies/minimum.rs @@ -0,0 +1,76 @@ +use abacus_core::{CommittedMessage, TxCostEstimate}; +use async_trait::async_trait; +use ethers::types::U256; +use eyre::Result; + +use crate::msg::gas_payment::GasPaymentPolicy; + +#[derive(Debug)] +pub struct GasPaymentPolicyMinimum { + minimum_payment: U256, +} + +impl GasPaymentPolicyMinimum { + pub fn new(minimum_payment: U256) -> Self { + Self { minimum_payment } + } +} + +#[async_trait] +impl GasPaymentPolicy for GasPaymentPolicyMinimum { + /// Returns (gas payment requirement met, current payment according to the DB) + async fn message_meets_gas_payment_requirement( + &self, + _message: &CommittedMessage, + current_payment: &U256, + _tx_cost_estimate: &TxCostEstimate, + ) -> Result { + Ok(*current_payment >= self.minimum_payment) + } +} + +#[tokio::test] +async fn test_gas_payment_policy_none() { + use abacus_core::AbacusMessage; + + let min = U256::from(1000u32); + + let policy = GasPaymentPolicyMinimum::new(min); + + let message = CommittedMessage { + leaf_index: 100, + message: AbacusMessage::default(), + }; + + // If the payment is less than the minimum, returns false + assert_eq!( + policy + .message_meets_gas_payment_requirement( + &message, + &U256::from(999u32), + &TxCostEstimate { + gas_limit: U256::from(100000u32), + gas_price: U256::from(100000u32), + }, + ) + .await + .unwrap(), + false, + ); + + // If the payment is at least the minimum, returns false + assert_eq!( + policy + .message_meets_gas_payment_requirement( + &message, + &U256::from(1000u32), + &TxCostEstimate { + gas_limit: U256::from(100000u32), + gas_price: U256::from(100000u32), + }, + ) + .await + .unwrap(), + true, + ); +} diff --git a/rust/agents/relayer/src/msg/gas_payment/policies/mod.rs b/rust/agents/relayer/src/msg/gas_payment/policies/mod.rs new file mode 100644 index 000000000..fc6a260e8 --- /dev/null +++ b/rust/agents/relayer/src/msg/gas_payment/policies/mod.rs @@ -0,0 +1,7 @@ +mod meets_estimated_cost; +mod minimum; +mod none; + +pub(crate) use meets_estimated_cost::GasPaymentPolicyMeetsEstimatedCost; +pub(crate) use minimum::GasPaymentPolicyMinimum; +pub(crate) use none::GasPaymentPolicyNone; diff --git a/rust/agents/relayer/src/msg/gas_payment/policies/none.rs b/rust/agents/relayer/src/msg/gas_payment/policies/none.rs new file mode 100644 index 000000000..5cea7e1d7 --- /dev/null +++ b/rust/agents/relayer/src/msg/gas_payment/policies/none.rs @@ -0,0 +1,56 @@ +use abacus_core::{CommittedMessage, TxCostEstimate}; +use async_trait::async_trait; +use ethers::types::U256; +use eyre::Result; + +use crate::msg::gas_payment::GasPaymentPolicy; + +#[derive(Debug)] +pub struct GasPaymentPolicyNone {} + +impl GasPaymentPolicyNone { + pub fn new() -> Self { + Self {} + } +} + +#[async_trait] +impl GasPaymentPolicy for GasPaymentPolicyNone { + /// Returns (gas payment requirement met, current payment according to the DB) + async fn message_meets_gas_payment_requirement( + &self, + _message: &CommittedMessage, + _current_payment: &U256, + _tx_cost_estimate: &TxCostEstimate, + ) -> Result { + Ok(true) + } +} + +#[tokio::test] +async fn test_gas_payment_policy_none() { + use abacus_core::AbacusMessage; + + let policy = GasPaymentPolicyNone::new(); + + let message = CommittedMessage { + leaf_index: 100, + message: AbacusMessage::default(), + }; + + // Always returns true + assert_eq!( + policy + .message_meets_gas_payment_requirement( + &message, + &U256::zero(), + &TxCostEstimate { + gas_limit: U256::from(100000u32), + gas_price: U256::from(100000u32), + }, + ) + .await + .unwrap(), + true, + ); +} diff --git a/rust/agents/relayer/src/msg/gas_payment_enforcer.rs b/rust/agents/relayer/src/msg/gas_payment_enforcer.rs deleted file mode 100644 index 98523c22f..000000000 --- a/rust/agents/relayer/src/msg/gas_payment_enforcer.rs +++ /dev/null @@ -1,37 +0,0 @@ -use abacus_core::db::{AbacusDB, DbError}; -use ethers::types::U256; - -use crate::settings::GasPaymentEnforcementPolicy; - -#[derive(Debug)] -pub struct GasPaymentEnforcer { - policy: GasPaymentEnforcementPolicy, - db: AbacusDB, -} - -impl GasPaymentEnforcer { - pub fn new(policy: GasPaymentEnforcementPolicy, db: AbacusDB) -> Self { - Self { policy, db } - } - - /// Returns (gas payment requirement met, current payment according to the DB) - pub fn message_meets_gas_payment_requirement( - &self, - msg_leaf_index: u32, - ) -> Result<(bool, U256), DbError> { - let current_payment = self.get_message_gas_payment(msg_leaf_index)?; - - let meets_requirement = match self.policy { - GasPaymentEnforcementPolicy::None => true, - GasPaymentEnforcementPolicy::Minimum { - payment: min_payment, - } => current_payment >= min_payment, - }; - - Ok((meets_requirement, current_payment)) - } - - fn get_message_gas_payment(&self, msg_leaf_index: u32) -> Result { - self.db.retrieve_gas_payment_for_leaf(msg_leaf_index) - } -} diff --git a/rust/agents/relayer/src/msg/gelato_submitter/mod.rs b/rust/agents/relayer/src/msg/gelato_submitter/mod.rs index cab2c1ec3..af91ce2f5 100644 --- a/rust/agents/relayer/src/msg/gelato_submitter/mod.rs +++ b/rust/agents/relayer/src/msg/gelato_submitter/mod.rs @@ -3,7 +3,7 @@ use std::sync::Arc; use abacus_base::chains::GelatoConf; use abacus_base::{CoreMetrics, InboxContracts}; use abacus_core::db::AbacusDB; -use abacus_core::AbacusCommon; +use abacus_core::{AbacusCommon, AbacusDomain}; use eyre::{bail, Result}; use gelato::types::Chain; use prometheus::{Histogram, IntCounter, IntGauge}; @@ -16,7 +16,7 @@ use crate::msg::gelato_submitter::sponsored_call_op::{ SponsoredCallOp, SponsoredCallOpArgs, SponsoredCallOptions, }; -use super::gas_payment_enforcer::GasPaymentEnforcer; +use super::gas_payment::GasPaymentEnforcer; use super::SubmitMessageArgs; mod sponsored_call_op; @@ -58,8 +58,10 @@ impl GelatoSubmitter { mpsc::unbounded_channel::(); Self { message_receiver, - inbox_gelato_chain: abacus_domain_to_gelato_chain(inbox_contracts.inbox.local_domain()) - .unwrap(), + inbox_gelato_chain: abacus_domain_id_to_gelato_chain( + inbox_contracts.inbox.local_domain(), + ) + .unwrap(), inbox_contracts, db: abacus_db, gelato_config, @@ -194,33 +196,51 @@ impl GelatoSubmitterMetrics { } } -fn abacus_domain_to_gelato_chain(domain: u32) -> Result { - Ok(match domain { - 6648936 => Chain::Ethereum, - 1634872690 => Chain::Rinkeby, - 3000 => Chain::Kovan, - 5 => Chain::Goerli, +// While this may be more ergonomic as an Into / From impl, +// it feels a bit awkward to have abacus-base (where AbacusDomain) +// is implemented to be aware of the gelato crate or vice versa. +pub fn abacus_domain_id_to_gelato_chain(domain: u32) -> Result { + let abacus_domain = AbacusDomain::try_from(domain)?; - 1886350457 => Chain::Polygon, - 80001 => Chain::Mumbai, + Ok(match abacus_domain { + AbacusDomain::Ethereum => Chain::Ethereum, + AbacusDomain::Kovan => Chain::Kovan, + AbacusDomain::Goerli => Chain::Goerli, - 1635148152 => Chain::Avalanche, - 43113 => Chain::Fuji, + AbacusDomain::Polygon => Chain::Polygon, + AbacusDomain::Mumbai => Chain::Mumbai, - 6386274 => Chain::Arbitrum, - 421611 => Chain::ArbitrumRinkeby, + AbacusDomain::Avalanche => Chain::Avalanche, + AbacusDomain::Fuji => Chain::Fuji, - 28528 => Chain::Optimism, - 1869622635 => Chain::OptimismKovan, + AbacusDomain::Arbitrum => Chain::Arbitrum, + AbacusDomain::ArbitrumRinkeby => Chain::ArbitrumRinkeby, - 6452067 => Chain::BinanceSmartChain, - 1651715444 => Chain::BinanceSmartChainTestnet, + AbacusDomain::Optimism => Chain::Optimism, + AbacusDomain::OptimismKovan => Chain::OptimismKovan, - 1667591279 => Chain::Celo, - 1000 => Chain::Alfajores, + AbacusDomain::BinanceSmartChain => Chain::BinanceSmartChain, + AbacusDomain::BinanceSmartChainTestnet => Chain::BinanceSmartChainTestnet, - 1836002657 => Chain::MoonbaseAlpha, + AbacusDomain::Celo => Chain::Celo, + AbacusDomain::Alfajores => Chain::Alfajores, - _ => bail!("Unknown domain {}", domain), + AbacusDomain::MoonbaseAlpha => Chain::MoonbaseAlpha, + + _ => bail!("No Gelato Chain for domain {abacus_domain}"), }) } + +#[test] +fn test_abacus_domain_id_to_gelato_chain() { + use abacus_core::AbacusDomainType; + use strum::IntoEnumIterator; + + // Iterate through all AbacusDomains, ensuring all mainnet and testnet domains + // are included in abacus_domain_id_to_gelato_chain. + for abacus_domain in AbacusDomain::iter() { + if let AbacusDomainType::Mainnet | AbacusDomainType::Testnet = abacus_domain.domain_type() { + assert!(abacus_domain_id_to_gelato_chain(u32::from(abacus_domain)).is_ok()); + } + } +} diff --git a/rust/agents/relayer/src/msg/gelato_submitter/sponsored_call_op.rs b/rust/agents/relayer/src/msg/gelato_submitter/sponsored_call_op.rs index fd1fb2a07..275824b33 100644 --- a/rust/agents/relayer/src/msg/gelato_submitter/sponsored_call_op.rs +++ b/rust/agents/relayer/src/msg/gelato_submitter/sponsored_call_op.rs @@ -1,4 +1,8 @@ -use std::{ops::Deref, sync::Arc, time::Duration}; +use std::{ + ops::{Deref, DerefMut}, + sync::Arc, + time::Duration, +}; use abacus_base::InboxContracts; use abacus_core::{ChainCommunicationError, Inbox, InboxValidatorManager, MessageStatus}; @@ -14,15 +18,11 @@ use tokio::{ }; use tracing::instrument; -use crate::msg::{gas_payment_enforcer::GasPaymentEnforcer, SubmitMessageArgs}; +use crate::msg::{gas_payment::GasPaymentEnforcer, SubmitMessageArgs}; // The number of seconds after a tick to sleep before attempting the next tick. const TICK_SLEEP_DURATION_SECONDS: u64 = 30; -/// The period to sleep after observing the message's gas payment -/// as insufficient, in secs. -const INSUFFICIENT_GAS_PAYMENT_SLEEP_PERIOD_SECS: u64 = 15; - #[derive(Debug, Clone)] pub struct SponsoredCallOpArgs { pub opts: SponsoredCallOptions, @@ -50,12 +50,18 @@ impl Deref for SponsoredCallOp { } } +impl DerefMut for SponsoredCallOp { + fn deref_mut(&mut self) -> &mut Self::Target { + &mut self.0 + } +} + impl SponsoredCallOp { pub fn new(args: SponsoredCallOpArgs) -> Self { Self(args) } - #[instrument(skip(self), fields(msg_leaf_index=self.0.message.leaf_index))] + #[instrument(skip(self), fields(msg_leaf_index=self.message.leaf_index))] pub async fn run(&mut self) { loop { match self.tick().await { @@ -79,7 +85,7 @@ impl SponsoredCallOp { _ => {} } - self.0.message.num_retries += 1; + self.message.num_retries += 1; sleep(Duration::from_secs(TICK_SLEEP_DURATION_SECONDS)).await; } } @@ -92,31 +98,41 @@ impl SponsoredCallOp { return Ok(MessageStatus::Processed); } + let tx_estimated_cost = self + .inbox_contracts + .validator_manager + .process_estimate_costs( + &self.message.checkpoint, + &self.message.committed_message.message, + &self.message.proof, + ) + .await?; + // If the gas payment requirement hasn't been met, sleep briefly and wait for the next tick. let (meets_gas_requirement, gas_payment) = self .gas_payment_enforcer - .message_meets_gas_payment_requirement(self.0.message.leaf_index)?; + .message_meets_gas_payment_requirement( + &self.message.committed_message, + &tx_estimated_cost, + ) + .await?; if !meets_gas_requirement { tracing::info!(gas_payment=?gas_payment, "Gas payment requirement not met yet"); - sleep(Duration::from_secs( - INSUFFICIENT_GAS_PAYMENT_SLEEP_PERIOD_SECS, - )) - .await; return Ok(MessageStatus::None); } // Send the sponsored call. let sponsored_call_result = self.send_sponsored_call_api_call().await?; tracing::info!( - msg=?self.0.message, + msg=?self.message, task_id=sponsored_call_result.task_id, "Sent sponsored call", ); // Wait for a terminal state, timing out according to the retry_submit_interval. match timeout( - self.0.opts.retry_submit_interval, + self.opts.retry_submit_interval, self.poll_for_terminal_state(sponsored_call_result.task_id.clone()), ) .await @@ -138,7 +154,7 @@ impl SponsoredCallOp { // by Gelato. async fn poll_for_terminal_state(&self, task_id: String) -> Result { loop { - sleep(self.0.opts.poll_interval).await; + sleep(self.opts.poll_interval).await; // Check if the message has been processed. Checking with the Inbox directly // is the best source of truth, and is the only way in which a message can be @@ -151,7 +167,7 @@ impl SponsoredCallOp { // If the task was cancelled for some reason by Gelato, stop waiting. let task_status_api_call = TaskStatusApiCall { - http: self.0.http.clone(), + http: self.http.clone(), args: TaskStatusApiCallArgs { task_id: task_id.clone(), }, @@ -183,7 +199,7 @@ impl SponsoredCallOp { let sponsored_call_api_call = SponsoredCallApiCall { args: &args, - http: self.0.http.clone(), + http: self.http.clone(), sponsor_api_key: &self.sponsor_api_key, }; @@ -191,13 +207,13 @@ impl SponsoredCallOp { } fn create_sponsored_call_args(&self) -> SponsoredCallArgs { - let calldata = self.0.inbox_contracts.validator_manager.process_calldata( - &self.0.message.checkpoint, - &self.0.message.committed_message.message, - &self.0.message.proof, + let calldata = self.inbox_contracts.validator_manager.process_calldata( + &self.message.checkpoint, + &self.message.committed_message.message, + &self.message.proof, ); SponsoredCallArgs { - chain_id: self.0.destination_chain, + chain_id: self.destination_chain, target: self .inbox_contracts .validator_manager diff --git a/rust/agents/relayer/src/msg/mod.rs b/rust/agents/relayer/src/msg/mod.rs index e4fd750f1..4dd2fba17 100644 --- a/rust/agents/relayer/src/msg/mod.rs +++ b/rust/agents/relayer/src/msg/mod.rs @@ -4,7 +4,7 @@ use abacus_core::{accumulator::merkle::Proof, CommittedMessage, MultisigSignedCh use tokio::time::Instant; -pub mod gas_payment_enforcer; +pub mod gas_payment; pub mod gelato_submitter; pub mod processor; pub mod serial_submitter; diff --git a/rust/agents/relayer/src/msg/serial_submitter.rs b/rust/agents/relayer/src/msg/serial_submitter.rs index a2737f4a0..36c01f36a 100644 --- a/rust/agents/relayer/src/msg/serial_submitter.rs +++ b/rust/agents/relayer/src/msg/serial_submitter.rs @@ -16,9 +16,10 @@ use tokio::task::JoinHandle; use tokio::time::Instant; use tracing::debug; use tracing::instrument; +use tracing::warn; use tracing::{info, info_span, instrument::Instrumented, Instrument}; -use super::gas_payment_enforcer::GasPaymentEnforcer; +use super::gas_payment::GasPaymentEnforcer; use super::SubmitMessageArgs; /// SerialSubmitter accepts undelivered messages over a channel from a MessageProcessor. It is @@ -210,6 +211,37 @@ impl SerialSubmitter { None => return Ok(()), }; + match self.process_message(&msg).await { + Ok(MessageStatus::Processed) => { + info!(msg=?msg, msg_leaf_index=msg.leaf_index, "Message processed"); + self.record_message_process_success(&msg)?; + return Ok(()); + } + Ok(MessageStatus::None) => { + info!(msg=?msg, msg_leaf_index=msg.leaf_index, "Message not processed"); + } + // We expect this branch to be hit when there is unexpected behavior - + // defined behavior like gas estimation failing will not hit this branch. + Err(err) => { + warn!(msg=?msg, msg_leaf_index=msg.leaf_index, error=?err, "Error occurred when attempting to process message"); + } + } + + // The message was not processed, so increment the # of retries and add + // it back to the run_queue so it will be processed again at some point. + msg.num_retries += 1; + self.run_queue.push_back(msg); + + Ok(()) + } + + /// Returns the message's status. If the message is processed, either by a transaction + /// in this fn or by a view call to the Inbox contract discovering the message has already + /// been processed, Ok(MessageStatus::Processed) is returned. If this message is unable to + /// be processed, either due to failed gas estimation or an insufficient gas payment, + /// Ok(MessageStatus::None) is returned. + #[instrument(skip(self, msg), fields(msg_leaf_index=msg.leaf_index))] + async fn process_message(&self, msg: &SubmitMessageArgs) -> Result { // If the message has already been processed according to message_status call on // inbox, e.g. due to another relayer having already processed, then mark it as // already-processed, and move on to the next tick. @@ -220,60 +252,74 @@ impl SerialSubmitter { .message_status(msg.committed_message.to_leaf()) .await? { - info!( - msg_leaf_index=msg.leaf_index, - msg=?msg, - "Message already processed", - ); - self.record_message_process_success(&msg)?; - return Ok(()); + info!("Message already processed"); + return Ok(MessageStatus::Processed); } + // Estimate transaction costs for the process call. If there are issues, it's likely + // that gas estimation has failed because the message is reverting. This is defined behavior, + // so we just log the error and move onto the next tick. + let tx_cost_estimate = match self + .inbox_contracts + .validator_manager + .process_estimate_costs(&msg.checkpoint, &msg.committed_message.message, &msg.proof) + .await + { + Ok(tx_cost_estimate) => tx_cost_estimate, + Err(err) => { + info!(msg=?msg, error=?err, "Error estimating process costs"); + return Ok(MessageStatus::None); + } + }; + // If the gas payment requirement hasn't been met, move to the next tick. let (meets_gas_requirement, gas_payment) = self .gas_payment_enforcer - .message_meets_gas_payment_requirement(msg.leaf_index)?; + .message_meets_gas_payment_requirement(&msg.committed_message, &tx_cost_estimate) + .await?; if !meets_gas_requirement { - tracing::info!(msg_leaf_index=msg.leaf_index, gas_payment=?gas_payment, "Gas payment requirement not met yet"); - return Ok(()); + tracing::info!(gas_payment=?gas_payment, "Gas payment requirement not met yet"); + return Ok(MessageStatus::None); } // Go ahead and attempt processing of message to destination chain. debug!(gas_payment=?gas_payment, msg=?msg, "Ready to process message"); + // TODO: consider differentiating types of processing errors, and pushing to the front of the // run queue for intermittent types of errors that can occur even if a message's processing isn't // reverting, e.g. timeouts or txs being dropped from the mempool. To avoid consistently retrying // only these messages, the number of retries could be considered. - match self.process_message(&msg).await { - Ok(()) => { - info!(msg=?msg, "Message processed"); - } - Err(e) => { - info!(msg=?msg, leaf_index=msg.leaf_index, error=?e, "Message processing failed"); - msg.num_retries += 1; - self.run_queue.push_back(msg); - } - } - Ok(()) - } - - // TODO(webbhorn): Move the process() call below into a function defined over SubmitMessageArgs - // or wrapped Schedulable(SubmitMessageArgs) so that we can fake submit in test. - // TODO(webbhorn): Instead of immediately marking as processed, move to a verification - // queue, which will wait for finality and indexing by the inbox indexer and then mark - // as processed (or eventually retry if no confirmation is ever seen). - async fn process_message(&mut self, msg: &SubmitMessageArgs) -> Result<()> { - let result = self + // We use the estimated gas limit from the prior call to `process_estimate_costs` to + // avoid a second gas estimation. + let process_result = self .inbox_contracts .validator_manager - .process(&msg.checkpoint, &msg.committed_message.message, &msg.proof) - .await?; - self.record_message_process_success(msg)?; - info!(leaf_index=?msg.leaf_index, hash=?result.txid, - wq_sz=?self.wait_queue.len(), rq_sz=?self.run_queue.len(), - "Message successfully processed"); - Ok(()) + .process( + &msg.checkpoint, + &msg.committed_message.message, + &msg.proof, + Some(tx_cost_estimate.gas_limit), + ) + .await; + match process_result { + // TODO(trevor): Instead of immediately marking as processed, move to a verification + // queue, which will wait for finality and indexing by the inbox indexer and then mark + // as processed (or eventually retry if no confirmation is ever seen). + + // Only mark the message as processed if the transaction didn't revert. + Ok(outcome) if outcome.executed => { + info!(hash=?outcome.txid, + wq_sz=?self.wait_queue.len(), rq_sz=?self.run_queue.len(), + "Message successfully processed by transaction"); + Ok(MessageStatus::Processed) + } + Ok(outcome) => { + info!(hash=?outcome.txid, "Transaction attempting to process transaction reverted"); + Ok(MessageStatus::None) + } + Err(e) => Err(e.into()), + } } /// Record in AbacusDB and various metrics that this process has observed the successful diff --git a/rust/agents/relayer/src/relayer.rs b/rust/agents/relayer/src/relayer.rs index 5e0085e1f..1814ff9dd 100644 --- a/rust/agents/relayer/src/relayer.rs +++ b/rust/agents/relayer/src/relayer.rs @@ -14,7 +14,7 @@ use abacus_base::{ }; use abacus_core::{AbacusContract, MultisigSignedCheckpoint, Signers}; -use crate::msg::gas_payment_enforcer::GasPaymentEnforcer; +use crate::msg::gas_payment::GasPaymentEnforcer; use crate::msg::gelato_submitter::{GelatoSubmitter, GelatoSubmitterMetrics}; use crate::msg::processor::{MessageProcessor, MessageProcessorMetrics}; use crate::msg::serial_submitter::SerialSubmitter; diff --git a/rust/agents/relayer/src/settings/mod.rs b/rust/agents/relayer/src/settings/mod.rs index d73015f1d..5c2fc883e 100644 --- a/rust/agents/relayer/src/settings/mod.rs +++ b/rust/agents/relayer/src/settings/mod.rs @@ -12,7 +12,13 @@ pub enum GasPaymentEnforcementPolicy { /// No requirement - all messages are processed regardless of gas payment None, /// Messages that have paid a minimum amount will be processed - Minimum { payment: U256 }, + Minimum { + payment: U256, + }, + + MeetsEstimatedCost { + coingeckoapikey: Option, + }, } decl_settings!(Relayer { diff --git a/rust/agents/scraper/migration/src/m20220805_000001_create_table_domain.rs b/rust/agents/scraper/migration/src/m20220805_000001_create_table_domain.rs index eab8dd095..26d555523 100644 --- a/rust/agents/scraper/migration/src/m20220805_000001_create_table_domain.rs +++ b/rust/agents/scraper/migration/src/m20220805_000001_create_table_domain.rs @@ -1,11 +1,10 @@ use std::time; use std::time::UNIX_EPOCH; +// use abacus_base::chain::domain_id_from_name; use sea_orm::prelude::DateTime; use sea_orm_migration::prelude::*; -use abacus_core::domain_from_chain; - #[derive(DeriveMigrationName)] pub struct Migration; @@ -68,7 +67,8 @@ impl MigrationTrait for Migration { }; EntityTrait::insert(domain::ActiveModel { - id: Set(domain_from_chain(domain.0).expect("Unknown chain name")), + // id: Set(domain_id_from_name(domain.0).expect("Unknown chain name")), + id: Set(1u32), time_created: Set(now), time_updated: Set(now), name: Set(domain.0.to_owned()), diff --git a/rust/chains/abacus-ethereum/src/validator_manager.rs b/rust/chains/abacus-ethereum/src/validator_manager.rs index b7feb006a..4d0f2a694 100644 --- a/rust/chains/abacus-ethereum/src/validator_manager.rs +++ b/rust/chains/abacus-ethereum/src/validator_manager.rs @@ -4,10 +4,12 @@ use std::fmt::Display; use std::sync::Arc; +use abacus_core::TxCostEstimate; use async_trait::async_trait; use ethers::abi::AbiEncode; use ethers::prelude::*; -use eyre::Result; +use ethers_contract::builders::ContractCall; +use eyre::{eyre, Result}; use abacus_core::{ accumulator::merkle::Proof, AbacusMessage, ChainCommunicationError, ContractLocator, Encode, @@ -98,32 +100,37 @@ where multisig_signed_checkpoint: &MultisigSignedCheckpoint, message: &AbacusMessage, proof: &Proof, + tx_gas_limit: Option, ) -> Result { - let mut sol_proof: [[u8; 32]; 32] = Default::default(); - sol_proof - .iter_mut() - .enumerate() - .for_each(|(i, elem)| *elem = proof.path[i].to_fixed_bytes()); - - let tx = self.contract.process( - self.inbox_address, - multisig_signed_checkpoint.checkpoint.root.to_fixed_bytes(), - multisig_signed_checkpoint.checkpoint.index.into(), - multisig_signed_checkpoint - .signatures - .iter() - .map(|s| s.to_vec().into()) - .collect(), - message.to_vec().into(), - sol_proof, - proof.index.into(), - ); - let gas = tx.estimate_gas().await?.saturating_add(U256::from(100000)); - let gassed = tx.gas(gas); - let receipt = report_tx(gassed).await?; + let contract_call = self + .process_contract_call(multisig_signed_checkpoint, message, proof, tx_gas_limit) + .await?; + let receipt = report_tx(contract_call).await?; Ok(receipt.into()) } + async fn process_estimate_costs( + &self, + multisig_signed_checkpoint: &MultisigSignedCheckpoint, + message: &AbacusMessage, + proof: &Proof, + ) -> Result { + let contract_call = self + .process_contract_call(multisig_signed_checkpoint, message, proof, None) + .await?; + + let gas_limit = contract_call + .tx + .gas() + .ok_or_else(|| eyre!("Expected gas limit for process contract call"))?; + let gas_price = self.provider.get_gas_price().await?; + + Ok(TxCostEstimate { + gas_limit: *gas_limit, + gas_price, + }) + } + fn process_calldata( &self, multisig_signed_checkpoint: &MultisigSignedCheckpoint, @@ -157,3 +164,44 @@ where self.contract.address().into() } } + +impl EthereumInboxValidatorManager +where + M: Middleware + 'static, +{ + /// Returns a ContractCall that processes the provided message. + /// If the provided tx_gas_limit is None, gas estimation occurs. + async fn process_contract_call( + &self, + multisig_signed_checkpoint: &MultisigSignedCheckpoint, + message: &AbacusMessage, + proof: &Proof, + tx_gas_limit: Option, + ) -> Result, ChainCommunicationError> { + let mut sol_proof: [[u8; 32]; 32] = Default::default(); + sol_proof + .iter_mut() + .enumerate() + .for_each(|(i, elem)| *elem = proof.path[i].to_fixed_bytes()); + + let tx = self.contract.process( + self.inbox_address, + multisig_signed_checkpoint.checkpoint.root.to_fixed_bytes(), + multisig_signed_checkpoint.checkpoint.index.into(), + multisig_signed_checkpoint + .signatures + .iter() + .map(|s| s.to_vec().into()) + .collect(), + message.to_vec().into(), + sol_proof, + proof.index.into(), + ); + let gas_limit = if let Some(gas_limit) = tx_gas_limit { + gas_limit + } else { + tx.estimate_gas().await?.saturating_add(U256::from(100000)) + }; + Ok(tx.gas(gas_limit)) + } +} diff --git a/rust/gelato/src/types.rs b/rust/gelato/src/types.rs index 5663281f8..4aff2adc7 100644 --- a/rust/gelato/src/types.rs +++ b/rust/gelato/src/types.rs @@ -6,7 +6,7 @@ use serde_repr::Serialize_repr; use std::fmt; // Each chain and chain ID supported by Abacus -#[derive(Debug, Clone, Copy, Eq, PartialEq, Serialize_repr)] +#[derive(Debug, Clone, Copy, Eq, PartialEq, Serialize_repr, Hash)] #[repr(u64)] pub enum Chain { Ethereum = 1, diff --git a/rust/helm/abacus-agent/templates/relayer-external-secret.yaml b/rust/helm/abacus-agent/templates/relayer-external-secret.yaml index 30c3657a3..ee271acc0 100644 --- a/rust/helm/abacus-agent/templates/relayer-external-secret.yaml +++ b/rust/helm/abacus-agent/templates/relayer-external-secret.yaml @@ -30,6 +30,9 @@ spec: AWS_ACCESS_KEY_ID: {{ print "'{{ .aws_access_key_id | toString }}'" }} AWS_SECRET_ACCESS_KEY: {{ print "'{{ .aws_secret_access_key | toString }}'" }} {{- end }} + {{- if eq .Values.abacus.relayer.config.gasPaymentEnforcementPolicy.type "meetsEstimatedCost" }} + ABC_RELAYER_GASPAYMENTENFORCEMENTPOLICY_COINGECKOAPIKEY: {{ print "'{{ .coingecko_api_key | toString }}'" }} + {{- end }} data: {{- range .Values.abacus.relayer.signers }} {{- if eq .keyConfig.type "hexKey" }} @@ -47,4 +50,9 @@ spec: remoteRef: key: {{ printf "%s-%s-%s-relayer-aws-secret-access-key" .Values.abacus.context .Values.abacus.runEnv .Values.abacus.outboxChain.name }} {{- end }} + {{- if eq .Values.abacus.relayer.config.gasPaymentEnforcementPolicy.type "meetsEstimatedCost" }} + - secretKey: coingecko_api_key + remoteRef: + key: {{ printf "%s-coingecko-api-key" .Values.abacus.runEnv }} + {{- end }} {{- end }} diff --git a/typescript/infra/src/agents/index.ts b/typescript/infra/src/agents/index.ts index 06613036c..4ef0bfc53 100644 --- a/typescript/infra/src/agents/index.ts +++ b/typescript/infra/src/agents/index.ts @@ -26,6 +26,7 @@ async function helmValuesForChain( const chainAgentConfig = new ChainAgentConfig(agentConfig, chainName); const gelatoApiKeyRequired = await chainAgentConfig.ensureGelatoApiKeySecretExistsIfRequired(); + await chainAgentConfig.ensureCoingeckoApiKeySecretExistsIfRequired(); // By default, if a context only enables a subset of chains, the // connection url (or urls, when HttpQuorum is used) are not fetched diff --git a/typescript/infra/src/config/agent.ts b/typescript/infra/src/config/agent.ts index c3e3ed565..cec811959 100644 --- a/typescript/infra/src/config/agent.ts +++ b/typescript/infra/src/config/agent.ts @@ -100,6 +100,7 @@ interface MatchingListElement { export enum GasPaymentEnforcementPolicyType { None = 'none', Minimum = 'minimum', + MeetsEstimatedCost = 'meetsEstimatedCost', } export type GasPaymentEnforcementPolicy = @@ -109,6 +110,9 @@ export type GasPaymentEnforcementPolicy = | { type: GasPaymentEnforcementPolicyType.Minimum; payment: string | number; + } + | { + type: GasPaymentEnforcementPolicyType.MeetsEstimatedCost; }; // Incomplete basic relayer agent config @@ -477,6 +481,24 @@ export class ChainAgentConfig { return true; } + async ensureCoingeckoApiKeySecretExistsIfRequired() { + // The CoinGecko API Key is only needed when using the "MeetsEstimatedCost" policy. + if ( + this.relayerConfig?.gasPaymentEnforcementPolicy.type !== + GasPaymentEnforcementPolicyType.MeetsEstimatedCost + ) { + return; + } + // Check to see if the Gelato API key exists in GCP secret manager - throw if it doesn't + const secretName = `${this.agentConfig.runEnv}-coingecko-api-key`; + const secretExists = await gcpSecretExists(secretName); + if (!secretExists) { + throw Error( + `Expected CoinGecko API Key GCP Secret named ${secretName} to exist, have you created it?`, + ); + } + } + transactionSubmissionType(chain: Chain): TransactionSubmissionType { if (this.agentConfig.gelato?.enabledChains.includes(chain)) { return TransactionSubmissionType.Gelato;