diff --git a/rust/Cargo.lock b/rust/Cargo.lock index c00b0b2b7..fbc72723a 100644 --- a/rust/Cargo.lock +++ b/rust/Cargo.lock @@ -58,6 +58,7 @@ version = "0.1.0" dependencies = [ "abacus-base", "async-trait", + "auto_impl 1.0.1", "bytes", "color-eyre", "config", diff --git a/rust/abacus-base/src/agent.rs b/rust/abacus-base/src/agent.rs index 5fac721a8..a1ba57dc2 100644 --- a/rust/abacus-base/src/agent.rs +++ b/rust/abacus-base/src/agent.rs @@ -9,30 +9,31 @@ use tracing::instrument::Instrumented; use tracing::{info_span, Instrument}; use abacus_core::db::DB; +use abacus_core::InboxValidatorManager; use crate::{ cancel_task, metrics::CoreMetrics, settings::{IndexSettings, Settings}, - CachingInbox, CachingInterchainGasPaymaster, CachingOutbox, InboxValidatorManagers, + CachingInbox, CachingInterchainGasPaymaster, CachingOutbox, }; /// Contracts relating to an inbox chain #[derive(Clone, Debug)] pub struct InboxContracts { /// A boxed Inbox - pub inbox: Arc, + pub inbox: CachingInbox, /// A boxed InboxValidatorManager - pub validator_manager: Arc, + pub validator_manager: Arc, } /// Properties shared across all abacus agents #[derive(Debug)] pub struct AbacusAgentCore { /// A boxed Outbox - pub outbox: Arc, + pub outbox: CachingOutbox, /// A boxed InterchainGasPaymaster - pub interchain_gas_paymaster: Option>, + pub interchain_gas_paymaster: Option, /// A map of boxed Inbox contracts pub inboxes: HashMap, /// A persistent KV Store (currently implemented as rocksdb) @@ -75,45 +76,48 @@ pub trait Agent: BaseAgent { fn metrics(&self) -> Arc; /// Return a handle to the DB - fn db(&self) -> DB; + fn db(&self) -> &DB; /// Return a reference to an Outbox contract - fn outbox(&self) -> Arc; + fn outbox(&self) -> &CachingOutbox; /// Return a reference to an InterchainGasPaymaster contract - fn interchain_gas_paymaster(&self) -> Option>; + fn interchain_gas_paymaster(&self) -> Option<&CachingInterchainGasPaymaster>; /// Get a reference to the inboxes map fn inboxes(&self) -> &HashMap; /// Get a reference to an inbox's contracts by its name - fn inbox_by_name(&self, name: &str) -> Option; + fn inbox_by_name(&self, name: &str) -> Option<&InboxContracts>; } #[async_trait] -impl> Agent for B { +impl Agent for B +where + B: BaseAgent + AsRef, +{ fn metrics(&self) -> Arc { self.as_ref().metrics.clone() } - fn db(&self) -> DB { - self.as_ref().db.clone() + fn db(&self) -> &DB { + &self.as_ref().db } - fn outbox(&self) -> Arc { - self.as_ref().outbox.clone() + fn outbox(&self) -> &CachingOutbox { + &self.as_ref().outbox } - fn interchain_gas_paymaster(&self) -> Option> { - self.as_ref().interchain_gas_paymaster.clone() + fn interchain_gas_paymaster(&self) -> Option<&CachingInterchainGasPaymaster> { + self.as_ref().interchain_gas_paymaster.as_ref() } fn inboxes(&self) -> &HashMap { &self.as_ref().inboxes } - fn inbox_by_name(&self, name: &str) -> Option { - self.inboxes().get(name).map(Clone::clone) + fn inbox_by_name(&self, name: &str) -> Option<&InboxContracts> { + self.inboxes().get(name) } } diff --git a/rust/abacus-base/src/contract_sync/interchain_gas.rs b/rust/abacus-base/src/contract_sync/interchain_gas.rs index 5890568b0..ec591b450 100644 --- a/rust/abacus-base/src/contract_sync/interchain_gas.rs +++ b/rust/abacus-base/src/contract_sync/interchain_gas.rs @@ -1,10 +1,10 @@ -use abacus_core::InterchainGasPaymasterIndexer; +use std::cmp::min; +use std::time::Duration; use tokio::{task::JoinHandle, time::sleep}; use tracing::{debug, info, info_span, instrument::Instrumented, Instrument}; -use std::cmp::min; -use std::time::Duration; +use abacus_core::InterchainGasPaymasterIndexer; use crate::{contract_sync::schema::InterchainGasPaymasterContractSyncDB, ContractSync}; @@ -12,7 +12,7 @@ const GAS_PAYMENTS_LABEL: &str = "gas_payments"; impl ContractSync where - I: InterchainGasPaymasterIndexer + 'static, + I: InterchainGasPaymasterIndexer + Clone + 'static, { /// Sync gas payments pub fn sync_gas_payments(&self) -> Instrumented>> { diff --git a/rust/abacus-base/src/contract_sync/mod.rs b/rust/abacus-base/src/contract_sync/mod.rs index d1f98bda4..2140dd249 100644 --- a/rust/abacus-base/src/contract_sync/mod.rs +++ b/rust/abacus-base/src/contract_sync/mod.rs @@ -1,10 +1,12 @@ // TODO: Reapply tip buffer // TODO: Reapply metrics -use std::sync::Arc; +use abacus_core::db::AbacusDB; +pub use interchain_gas::*; +pub use metrics::ContractSyncMetrics; +pub use outbox::*; use crate::settings::IndexSettings; -use abacus_core::db::AbacusDB; mod interchain_gas; mod last_message; @@ -12,10 +14,6 @@ mod metrics; mod outbox; mod schema; -pub use interchain_gas::*; -pub use metrics::ContractSyncMetrics; -pub use outbox::*; - /// Entity that drives the syncing of an agent's db with on-chain data. /// Extracts chain-specific data (emitted checkpoints, messages, etc) from an /// `indexer` and fills the agent's db with this data. A CachingOutbox or @@ -25,7 +23,7 @@ pub use outbox::*; pub struct ContractSync { chain_name: String, db: AbacusDB, - indexer: Arc, + indexer: I, index_settings: IndexSettings, metrics: ContractSyncMetrics, } @@ -35,7 +33,7 @@ impl ContractSync { pub fn new( chain_name: String, db: AbacusDB, - indexer: Arc, + indexer: I, index_settings: IndexSettings, metrics: ContractSyncMetrics, ) -> Self { diff --git a/rust/abacus-base/src/contract_sync/outbox.rs b/rust/abacus-base/src/contract_sync/outbox.rs index eb76af8e6..afd26ba04 100644 --- a/rust/abacus-base/src/contract_sync/outbox.rs +++ b/rust/abacus-base/src/contract_sync/outbox.rs @@ -16,7 +16,7 @@ const MESSAGES_LABEL: &str = "messages"; impl ContractSync where - I: OutboxIndexer + 'static, + I: OutboxIndexer + Clone + 'static, { /// Sync outbox messages pub fn sync_outbox_messages(&self) -> Instrumented>> { diff --git a/rust/abacus-base/src/inbox.rs b/rust/abacus-base/src/inbox.rs index b6ccfddf4..73bd16da4 100644 --- a/rust/abacus-base/src/inbox.rs +++ b/rust/abacus-base/src/inbox.rs @@ -1,19 +1,18 @@ -use abacus_core::{ - db::AbacusDB, AbacusCommon, AbacusContract, Address, ChainCommunicationError, Inbox, - MessageStatus, TxOutcome, -}; -use abacus_test::mocks::inbox::MockInboxContract; +use std::sync::Arc; + use async_trait::async_trait; use ethers::core::types::H256; use eyre::Result; -use abacus_ethereum::EthereumInbox; -use std::sync::Arc; +use abacus_core::{ + db::AbacusDB, AbacusCommon, AbacusContract, Address, ChainCommunicationError, Inbox, + MessageStatus, TxOutcome, +}; -/// Caching inbox type -#[derive(Debug)] +/// Caching inbox type. +#[derive(Debug, Clone)] pub struct CachingInbox { - inbox: Inboxes, + inbox: Arc, db: AbacusDB, } @@ -25,18 +24,18 @@ impl std::fmt::Display for CachingInbox { impl CachingInbox { /// Instantiate new CachingInbox - pub fn new(inbox: Inboxes, db: AbacusDB) -> Self { + pub fn new(inbox: Arc, db: AbacusDB) -> Self { Self { inbox, db } } /// Return handle on inbox object - pub fn inbox(&self) -> Inboxes { - self.inbox.clone() + pub fn inbox(&self) -> &Arc { + &self.inbox } /// Return handle on AbacusDB - pub fn db(&self) -> AbacusDB { - self.db.clone() + pub fn db(&self) -> &AbacusDB { + &self.db } } @@ -75,136 +74,3 @@ impl AbacusCommon for CachingInbox { self.inbox.validator_manager().await } } - -#[derive(Debug, Clone)] -/// Arc wrapper for InboxVariants enum -pub struct Inboxes(Arc); - -impl From for Inboxes { - fn from(inboxes: InboxVariants) -> Self { - Self(Arc::new(inboxes)) - } -} - -impl std::ops::Deref for Inboxes { - type Target = Arc; - - fn deref(&self) -> &Self::Target { - &self.0 - } -} - -impl std::ops::DerefMut for Inboxes { - fn deref_mut(&mut self) -> &mut Self::Target { - &mut self.0 - } -} - -/// Inbox type -#[derive(Debug)] -pub enum InboxVariants { - /// Ethereum inbox contract - Ethereum(Box), - /// Mock inbox contract - Mock(Box), - /// Other inbox variant - Other(Box), -} - -impl InboxVariants { - /// Calls checkpoint on mock variant. Should - /// only be used during tests. - #[doc(hidden)] - pub fn checkpoint(&mut self) { - if let InboxVariants::Mock(inbox) = self { - inbox.checkpoint(); - } else { - panic!("Inbox should be mock variant!"); - } - } -} - -impl From> for Inboxes -where - M: ethers::providers::Middleware + 'static, -{ - fn from(inbox: EthereumInbox) -> Self { - InboxVariants::Ethereum(Box::new(inbox)).into() - } -} - -impl From for Inboxes { - fn from(inbox: MockInboxContract) -> Self { - InboxVariants::Mock(Box::new(inbox)).into() - } -} - -impl From> for Inboxes { - fn from(inbox: Box) -> Self { - InboxVariants::Other(inbox).into() - } -} - -#[async_trait] -impl Inbox for InboxVariants { - async fn remote_domain(&self) -> Result { - match self { - InboxVariants::Ethereum(inbox) => inbox.remote_domain().await, - InboxVariants::Mock(mock_inbox) => mock_inbox.remote_domain().await, - InboxVariants::Other(inbox) => inbox.remote_domain().await, - } - } - - async fn message_status(&self, leaf: H256) -> Result { - match self { - InboxVariants::Ethereum(inbox) => inbox.message_status(leaf).await, - InboxVariants::Mock(mock_inbox) => mock_inbox.message_status(leaf).await, - InboxVariants::Other(inbox) => inbox.message_status(leaf).await, - } - } - - fn contract_address(&self) -> Address { - match self { - InboxVariants::Ethereum(inbox) => inbox.contract_address(), - InboxVariants::Mock(mock_inbox) => mock_inbox.contract_address(), - InboxVariants::Other(inbox) => inbox.contract_address(), - } - } -} - -impl AbacusContract for InboxVariants { - fn chain_name(&self) -> &str { - match self { - InboxVariants::Ethereum(inbox) => inbox.chain_name(), - InboxVariants::Mock(mock_inbox) => mock_inbox.chain_name(), - InboxVariants::Other(inbox) => inbox.chain_name(), - } - } -} - -#[async_trait] -impl AbacusCommon for InboxVariants { - fn local_domain(&self) -> u32 { - match self { - InboxVariants::Ethereum(inbox) => inbox.local_domain(), - InboxVariants::Mock(mock_inbox) => mock_inbox.local_domain(), - InboxVariants::Other(inbox) => inbox.local_domain(), - } - } - - async fn status(&self, txid: H256) -> Result, ChainCommunicationError> { - match self { - InboxVariants::Ethereum(inbox) => inbox.status(txid).await, - InboxVariants::Mock(mock_inbox) => mock_inbox.status(txid).await, - InboxVariants::Other(inbox) => inbox.status(txid).await, - } - } - - async fn validator_manager(&self) -> Result { - match self { - InboxVariants::Ethereum(inbox) => inbox.validator_manager().await, - InboxVariants::Mock(mock_inbox) => mock_inbox.validator_manager().await, - InboxVariants::Other(inbox) => inbox.validator_manager().await, - } - } -} diff --git a/rust/abacus-base/src/indexer.rs b/rust/abacus-base/src/indexer.rs deleted file mode 100644 index 45ce88d65..000000000 --- a/rust/abacus-base/src/indexer.rs +++ /dev/null @@ -1,113 +0,0 @@ -use abacus_core::{ - CheckpointWithMeta, Indexer, InterchainGasPaymasterIndexer, InterchainGasPaymentWithMeta, - OutboxIndexer, RawCommittedMessage, -}; -use abacus_test::mocks::indexer::MockAbacusIndexer; -use async_trait::async_trait; -use eyre::Result; - -/// OutboxIndexer type -#[derive(Debug)] -pub enum OutboxIndexers { - /// Ethereum contract indexer - Ethereum(Box), - /// Mock indexer - Mock(Box), - /// Other indexer variant - Other(Box), -} - -impl From for OutboxIndexers { - fn from(mock_indexer: MockAbacusIndexer) -> Self { - OutboxIndexers::Mock(Box::new(mock_indexer)) - } -} - -#[async_trait] -impl Indexer for OutboxIndexers { - async fn get_finalized_block_number(&self) -> Result { - match self { - OutboxIndexers::Ethereum(indexer) => indexer.get_finalized_block_number().await, - OutboxIndexers::Mock(indexer) => indexer.get_finalized_block_number().await, - OutboxIndexers::Other(indexer) => indexer.get_finalized_block_number().await, - } - } -} - -#[async_trait] -impl OutboxIndexer for OutboxIndexers { - async fn fetch_sorted_messages(&self, from: u32, to: u32) -> Result> { - match self { - OutboxIndexers::Ethereum(indexer) => indexer.fetch_sorted_messages(from, to).await, - OutboxIndexers::Mock(indexer) => indexer.fetch_sorted_messages(from, to).await, - OutboxIndexers::Other(indexer) => indexer.fetch_sorted_messages(from, to).await, - } - } - - async fn fetch_sorted_cached_checkpoints( - &self, - from: u32, - to: u32, - ) -> Result> { - match self { - OutboxIndexers::Ethereum(indexer) => { - indexer.fetch_sorted_cached_checkpoints(from, to).await - } - OutboxIndexers::Mock(indexer) => { - indexer.fetch_sorted_cached_checkpoints(from, to).await - } - OutboxIndexers::Other(indexer) => { - indexer.fetch_sorted_cached_checkpoints(from, to).await - } - } - } -} - -/// InterchainGasPaymasterIndexer type -#[derive(Debug)] -pub enum InterchainGasPaymasterIndexers { - /// Ethereum contract indexer - Ethereum(Box), - /// Mock indexer - Mock(Box), - /// Other indexer variant - Other(Box), -} - -#[async_trait] -impl Indexer for InterchainGasPaymasterIndexers { - async fn get_finalized_block_number(&self) -> Result { - match self { - InterchainGasPaymasterIndexers::Ethereum(indexer) => { - indexer.get_finalized_block_number().await - } - InterchainGasPaymasterIndexers::Mock(indexer) => { - indexer.get_finalized_block_number().await - } - InterchainGasPaymasterIndexers::Other(indexer) => { - indexer.get_finalized_block_number().await - } - } - } -} - -#[async_trait] -impl InterchainGasPaymasterIndexer for InterchainGasPaymasterIndexers { - async fn fetch_gas_payments( - &self, - from_block: u32, - to_block: u32, - ) -> Result> { - match self { - InterchainGasPaymasterIndexers::Ethereum(indexer) => { - indexer.fetch_gas_payments(from_block, to_block).await - } - InterchainGasPaymasterIndexers::Mock(indexer) => { - indexer.fetch_gas_payments(from_block, to_block).await - } - InterchainGasPaymasterIndexers::Other(indexer) => { - indexer.fetch_gas_payments(from_block, to_block).await - } - } - } -} diff --git a/rust/abacus-base/src/interchain_gas.rs b/rust/abacus-base/src/interchain_gas.rs index 188ff4be0..d273b9e10 100644 --- a/rust/abacus-base/src/interchain_gas.rs +++ b/rust/abacus-base/src/interchain_gas.rs @@ -1,23 +1,23 @@ -use abacus_core::db::AbacusDB; -use abacus_core::{AbacusContract, InterchainGasPaymaster}; +use std::fmt::Debug; +use std::sync::Arc; -use abacus_ethereum::EthereumInterchainGasPaymaster; -use async_trait::async_trait; use eyre::Result; use futures_util::future::select_all; -use std::sync::Arc; use tokio::task::JoinHandle; use tracing::instrument::Instrumented; use tracing::{info_span, Instrument}; -use crate::{ContractSync, ContractSyncMetrics, IndexSettings, InterchainGasPaymasterIndexers}; +use abacus_core::db::AbacusDB; +use abacus_core::{AbacusContract, InterchainGasPaymaster, InterchainGasPaymasterIndexer}; + +use crate::{ContractSync, ContractSyncMetrics, IndexSettings}; /// Caching InterchainGasPaymaster type -#[derive(Debug)] +#[derive(Debug, Clone)] pub struct CachingInterchainGasPaymaster { - paymaster: InterchainGasPaymasters, + paymaster: Arc, db: AbacusDB, - indexer: Arc, + indexer: Arc, } impl std::fmt::Display for CachingInterchainGasPaymaster { @@ -29,9 +29,9 @@ impl std::fmt::Display for CachingInterchainGasPaymaster { impl CachingInterchainGasPaymaster { /// Instantiate new CachingInterchainGasPaymaster pub fn new( - paymaster: InterchainGasPaymasters, + paymaster: Arc, db: AbacusDB, - indexer: Arc, + indexer: Arc, ) -> Self { Self { paymaster, @@ -41,17 +41,17 @@ impl CachingInterchainGasPaymaster { } /// Return handle on paymaster object - pub fn paymaster(&self) -> InterchainGasPaymasters { - self.paymaster.clone() + pub fn paymaster(&self) -> &Arc { + &self.paymaster } /// Return handle on AbacusDB - pub fn db(&self) -> AbacusDB { - self.db.clone() + pub fn db(&self) -> &AbacusDB { + &self.db } - /// Spawn a task that syncs the CachingInterchainGasPaymaster's db with the on-chain event - /// data + /// Spawn a task that syncs the CachingInterchainGasPaymaster's db with the + /// on-chain event data pub fn sync( &self, index_settings: IndexSettings, @@ -80,68 +80,3 @@ impl CachingInterchainGasPaymaster { .instrument(span) } } - -#[derive(Debug, Clone)] -/// Arc wrapper for InterchainGasPaymasterVariants enum -pub struct InterchainGasPaymasters(Arc); - -impl From for InterchainGasPaymasters { - fn from(paymaster: InterchainGasPaymasterVariants) -> Self { - Self(Arc::new(paymaster)) - } -} - -impl std::ops::Deref for InterchainGasPaymasters { - type Target = Arc; - - fn deref(&self) -> &Self::Target { - &self.0 - } -} - -impl std::ops::DerefMut for InterchainGasPaymasters { - fn deref_mut(&mut self) -> &mut Self::Target { - &mut self.0 - } -} - -/// InterchainGasPaymaster type -#[derive(Debug)] -pub enum InterchainGasPaymasterVariants { - /// Ethereum InterchainGasPaymaster contract - Ethereum(Box), - /// Mock InterchainGasPaymaster contract - Mock(Box), - /// Other InterchainGasPaymaster variant - Other(Box), -} - -impl InterchainGasPaymasterVariants {} - -impl From> for InterchainGasPaymasters -where - M: ethers::providers::Middleware + 'static, -{ - fn from(paymaster: EthereumInterchainGasPaymaster) -> Self { - InterchainGasPaymasterVariants::Ethereum(Box::new(paymaster)).into() - } -} - -impl From> for InterchainGasPaymasters { - fn from(paymaster: Box) -> Self { - InterchainGasPaymasterVariants::Other(paymaster).into() - } -} - -impl AbacusContract for InterchainGasPaymasterVariants { - fn chain_name(&self) -> &str { - match self { - InterchainGasPaymasterVariants::Ethereum(paymaster) => paymaster.chain_name(), - InterchainGasPaymasterVariants::Mock(paymaster) => paymaster.chain_name(), - InterchainGasPaymasterVariants::Other(paymaster) => paymaster.chain_name(), - } - } -} - -#[async_trait] -impl InterchainGasPaymaster for InterchainGasPaymasterVariants {} diff --git a/rust/abacus-base/src/lib.rs b/rust/abacus-base/src/lib.rs index 722f71fa3..ad45d673d 100644 --- a/rust/abacus-base/src/lib.rs +++ b/rust/abacus-base/src/lib.rs @@ -35,9 +35,6 @@ pub use metrics::*; mod contract_sync; pub use contract_sync::*; -mod indexer; -pub use indexer::*; - mod interchain_gas; pub use interchain_gas::*; @@ -47,8 +44,5 @@ pub use traits::*; mod types; pub use types::*; -mod validator_manager; -pub use validator_manager::*; - #[cfg(feature = "oneline-eyre")] pub mod oneline_eyre; diff --git a/rust/abacus-base/src/outbox.rs b/rust/abacus-base/src/outbox.rs index 68706a866..b75b3a22c 100644 --- a/rust/abacus-base/src/outbox.rs +++ b/rust/abacus-base/src/outbox.rs @@ -1,29 +1,29 @@ -use abacus_core::db::AbacusDB; -use abacus_core::{ - AbacusCommon, AbacusContract, ChainCommunicationError, Checkpoint, Message, Outbox, - OutboxEvents, OutboxState, RawCommittedMessage, TxOutcome, -}; +use std::fmt::Debug; +use std::sync::Arc; -use abacus_ethereum::EthereumOutbox; -use abacus_test::mocks::MockOutboxContract; use async_trait::async_trait; use ethers::core::types::H256; use eyre::Result; use futures_util::future::select_all; -use std::sync::Arc; use tokio::task::JoinHandle; use tokio::time::{sleep, Duration}; +use tracing::instrument::Instrumented; use tracing::{info_span, Instrument}; -use tracing::{instrument, instrument::Instrumented}; -use crate::{ContractSync, ContractSyncMetrics, IndexSettings, OutboxIndexers}; +use abacus_core::db::AbacusDB; +use abacus_core::{ + AbacusCommon, AbacusContract, ChainCommunicationError, Checkpoint, Message, Outbox, + OutboxEvents, OutboxIndexer, OutboxState, RawCommittedMessage, TxOutcome, +}; + +use crate::{ContractSync, ContractSyncMetrics, IndexSettings}; /// Caching Outbox type -#[derive(Debug)] +#[derive(Debug, Clone)] pub struct CachingOutbox { - outbox: Outboxes, + outbox: Arc, db: AbacusDB, - indexer: Arc, + indexer: Arc, } impl std::fmt::Display for CachingOutbox { @@ -34,7 +34,7 @@ impl std::fmt::Display for CachingOutbox { impl CachingOutbox { /// Instantiate new CachingOutbox - pub fn new(outbox: Outboxes, db: AbacusDB, indexer: Arc) -> Self { + pub fn new(outbox: Arc, db: AbacusDB, indexer: Arc) -> Self { Self { outbox, db, @@ -43,13 +43,13 @@ impl CachingOutbox { } /// Return handle on outbox object - pub fn outbox(&self) -> Outboxes { - self.outbox.clone() + pub fn outbox(&self) -> &Arc { + &self.outbox } /// Return handle on AbacusDB - pub fn db(&self) -> AbacusDB { - self.db.clone() + pub fn db(&self) -> &AbacusDB { + &self.db } /// Spawn a task that syncs the CachingOutbox's db with the on-chain event @@ -85,10 +85,6 @@ impl CachingOutbox { #[async_trait] impl Outbox for CachingOutbox { - async fn dispatch(&self, message: &Message) -> Result { - self.outbox.dispatch(message).await - } - async fn state(&self) -> Result { self.outbox.state().await } @@ -97,6 +93,10 @@ impl Outbox for CachingOutbox { self.outbox.count().await } + async fn dispatch(&self, message: &Message) -> Result { + self.outbox.dispatch(message).await + } + async fn cache_checkpoint(&self) -> Result { self.outbox.cache_checkpoint().await } @@ -165,172 +165,3 @@ impl AbacusCommon for CachingOutbox { self.outbox.validator_manager().await } } - -#[derive(Debug, Clone)] -/// Arc wrapper for OutboxVariants enum -pub struct Outboxes(Arc); - -impl From for Outboxes { - fn from(outboxes: OutboxVariants) -> Self { - Self(Arc::new(outboxes)) - } -} - -impl std::ops::Deref for Outboxes { - type Target = Arc; - - fn deref(&self) -> &Self::Target { - &self.0 - } -} - -impl std::ops::DerefMut for Outboxes { - fn deref_mut(&mut self) -> &mut Self::Target { - &mut self.0 - } -} - -/// Outbox type -#[derive(Debug)] -pub enum OutboxVariants { - /// Ethereum Outbox contract - Ethereum(Box), - /// Mock Outbox contract - Mock(Box), - /// Other Outbox variant - Other(Box), -} - -impl OutboxVariants { - /// Calls checkpoint on mock variant. Should - /// only be used during tests. - #[doc(hidden)] - pub fn checkpoint(&mut self) { - if let OutboxVariants::Mock(outbox) = self { - outbox.checkpoint(); - } else { - panic!("Outbox should be mock variant!"); - } - } -} - -impl From> for Outboxes -where - M: ethers::providers::Middleware + 'static, -{ - fn from(outbox: EthereumOutbox) -> Self { - OutboxVariants::Ethereum(Box::new(outbox)).into() - } -} - -impl From for Outboxes { - fn from(mock_outbox: MockOutboxContract) -> Self { - OutboxVariants::Mock(Box::new(mock_outbox)).into() - } -} - -impl From> for Outboxes { - fn from(outbox: Box) -> Self { - OutboxVariants::Other(outbox).into() - } -} - -#[async_trait] -impl Outbox for OutboxVariants { - #[instrument(level = "trace", err)] - async fn dispatch(&self, message: &Message) -> Result { - match self { - OutboxVariants::Ethereum(outbox) => outbox.dispatch(message).await, - OutboxVariants::Mock(mock_outbox) => mock_outbox.dispatch(message).await, - OutboxVariants::Other(outbox) => outbox.dispatch(message).await, - } - } - - async fn state(&self) -> Result { - match self { - OutboxVariants::Ethereum(outbox) => outbox.state().await, - OutboxVariants::Mock(mock_outbox) => mock_outbox.state().await, - OutboxVariants::Other(outbox) => outbox.state().await, - } - } - - async fn count(&self) -> Result { - match self { - OutboxVariants::Ethereum(outbox) => outbox.count().await, - OutboxVariants::Mock(mock_outbox) => mock_outbox.count().await, - OutboxVariants::Other(outbox) => outbox.count().await, - } - } - - async fn cache_checkpoint(&self) -> Result { - match self { - OutboxVariants::Ethereum(outbox) => outbox.cache_checkpoint().await, - OutboxVariants::Mock(mock_outbox) => mock_outbox.cache_checkpoint().await, - OutboxVariants::Other(outbox) => outbox.cache_checkpoint().await, - } - } - - async fn latest_cached_root(&self) -> Result { - match self { - OutboxVariants::Ethereum(outbox) => outbox.latest_cached_root().await, - OutboxVariants::Mock(mock_outbox) => mock_outbox.latest_cached_root().await, - OutboxVariants::Other(outbox) => outbox.latest_cached_root().await, - } - } - - async fn latest_cached_checkpoint(&self) -> Result { - match self { - OutboxVariants::Ethereum(outbox) => outbox.latest_cached_checkpoint().await, - OutboxVariants::Mock(mock_outbox) => mock_outbox.latest_cached_checkpoint().await, - OutboxVariants::Other(outbox) => outbox.latest_cached_checkpoint().await, - } - } - - async fn latest_checkpoint( - &self, - maybe_lag: Option, - ) -> Result { - match self { - OutboxVariants::Ethereum(outbox) => outbox.latest_checkpoint(maybe_lag).await, - OutboxVariants::Mock(mock_outbox) => mock_outbox.latest_checkpoint(maybe_lag).await, - OutboxVariants::Other(outbox) => outbox.latest_checkpoint(maybe_lag).await, - } - } -} - -impl AbacusContract for OutboxVariants { - fn chain_name(&self) -> &str { - match self { - OutboxVariants::Ethereum(outbox) => outbox.chain_name(), - OutboxVariants::Mock(mock_outbox) => mock_outbox.chain_name(), - OutboxVariants::Other(outbox) => outbox.chain_name(), - } - } -} - -#[async_trait] -impl AbacusCommon for OutboxVariants { - fn local_domain(&self) -> u32 { - match self { - OutboxVariants::Ethereum(outbox) => outbox.local_domain(), - OutboxVariants::Mock(mock_outbox) => mock_outbox.local_domain(), - OutboxVariants::Other(outbox) => outbox.local_domain(), - } - } - - async fn status(&self, txid: H256) -> Result, ChainCommunicationError> { - match self { - OutboxVariants::Ethereum(outbox) => outbox.status(txid).await, - OutboxVariants::Mock(mock_outbox) => mock_outbox.status(txid).await, - OutboxVariants::Other(outbox) => outbox.status(txid).await, - } - } - - async fn validator_manager(&self) -> Result { - match self { - OutboxVariants::Ethereum(outbox) => outbox.validator_manager().await, - OutboxVariants::Mock(mock_outbox) => mock_outbox.validator_manager().await, - OutboxVariants::Other(outbox) => outbox.validator_manager().await, - } - } -} diff --git a/rust/abacus-base/src/settings/chains.rs b/rust/abacus-base/src/settings/chains.rs index 3f7091c18..5cd8a7136 100644 --- a/rust/abacus-base/src/settings/chains.rs +++ b/rust/abacus-base/src/settings/chains.rs @@ -1,8 +1,10 @@ use ethers::signers::Signer; -use eyre::Report; use serde::Deserialize; -use abacus_core::{AbacusAbi, ContractLocator, Signers}; +use abacus_core::{ + AbacusAbi, ContractLocator, Inbox, InboxValidatorManager, InterchainGasPaymaster, Outbox, + Signers, +}; use abacus_ethereum::{ Connection, EthereumInboxAbi, EthereumInterchainGasPaymasterAbi, EthereumOutboxAbi, InboxBuilder, InboxValidatorManagerBuilder, InterchainGasPaymasterBuilder, @@ -12,10 +14,7 @@ use ethers_prometheus::middleware::{ ChainInfo, ContractInfo, PrometheusMiddlewareConf, WalletInfo, }; -use crate::{ - CoreMetrics, InboxValidatorManagerVariants, InboxValidatorManagers, InboxVariants, Inboxes, - InterchainGasPaymasterVariants, InterchainGasPaymasters, OutboxVariants, Outboxes, -}; +use crate::CoreMetrics; /// A connection to _some_ blockchain. /// @@ -105,28 +104,25 @@ impl ChainSetup { &self, signer: Option, metrics: &CoreMetrics, - ) -> Result { + ) -> eyre::Result> { match &self.chain { - ChainConf::Ethereum(conf) => Ok(OutboxVariants::Ethereum( - OutboxBuilder {} - .make_with_connection( - conf.clone(), - &ContractLocator { - chain_name: self.name.clone(), - domain: self.domain.parse().expect("invalid uint"), - address: self - .addresses - .outbox - .parse::()? - .into(), - }, - signer, - Some(|| metrics.json_rpc_client_metrics()), - Some((metrics.provider_metrics(), self.metrics_conf())), - ) - .await?, - ) - .into()), + ChainConf::Ethereum(conf) => Ok(OutboxBuilder {} + .make_with_connection( + conf.clone(), + &ContractLocator { + chain_name: self.name.clone(), + domain: self.domain.parse().expect("invalid uint"), + address: self + .addresses + .outbox + .parse::()? + .into(), + }, + signer, + Some(|| metrics.json_rpc_client_metrics()), + Some((metrics.provider_metrics(), self.metrics_conf())), + ) + .await?), } } @@ -135,7 +131,7 @@ impl ChainSetup { &self, signer: Option, metrics: &CoreMetrics, - ) -> Result, Report> { + ) -> eyre::Result>> { let paymaster_address = if let Some(address) = &self.addresses.interchain_gas_paymaster { address } else { @@ -143,24 +139,19 @@ impl ChainSetup { }; match &self.chain { ChainConf::Ethereum(conf) => Ok(Some( - InterchainGasPaymasterVariants::Ethereum( - InterchainGasPaymasterBuilder {} - .make_with_connection( - conf.clone(), - &ContractLocator { - chain_name: self.name.clone(), - domain: self.domain.parse().expect("invalid uint"), - address: paymaster_address - .parse::()? - .into(), - }, - signer, - Some(|| metrics.json_rpc_client_metrics()), - Some((metrics.provider_metrics(), self.metrics_conf())), - ) - .await?, - ) - .into(), + InterchainGasPaymasterBuilder {} + .make_with_connection( + conf.clone(), + &ContractLocator { + chain_name: self.name.clone(), + domain: self.domain.parse().expect("invalid uint"), + address: paymaster_address.parse::()?.into(), + }, + signer, + Some(|| metrics.json_rpc_client_metrics()), + Some((metrics.provider_metrics(), self.metrics_conf())), + ) + .await?, )), } } @@ -200,29 +191,26 @@ impl ChainSetup { &self, signer: Option, metrics: &CoreMetrics, - ) -> Result { + ) -> eyre::Result> { let metrics_conf = self.metrics_conf(metrics.agent_name(), &signer); match &self.chain { - ChainConf::Ethereum(conf) => Ok(InboxVariants::Ethereum( - InboxBuilder {} - .make_with_connection( - conf.clone(), - &ContractLocator { - chain_name: self.name.clone(), - domain: self.domain.parse().expect("invalid uint"), - address: self - .addresses - .inbox - .parse::()? - .into(), - }, - signer, - Some(|| metrics.json_rpc_client_metrics()), - Some((metrics.provider_metrics(), metrics_conf)), - ) - .await?, - ) - .into()), + ChainConf::Ethereum(conf) => Ok(InboxBuilder {} + .make_with_connection( + conf.clone(), + &ContractLocator { + chain_name: self.name.clone(), + domain: self.domain.parse().expect("invalid uint"), + address: self + .addresses + .inbox + .parse::()? + .into(), + }, + signer, + Some(|| metrics.json_rpc_client_metrics()), + Some((metrics.provider_metrics(), metrics_conf)), + ) + .await?), } } @@ -231,30 +219,27 @@ impl ChainSetup { &self, signer: Option, metrics: &CoreMetrics, - ) -> Result { + ) -> eyre::Result> { let inbox_address = self.addresses.inbox.parse::()?; let metrics_conf = self.metrics_conf(metrics.agent_name(), &signer); match &self.chain { - ChainConf::Ethereum(conf) => Ok(InboxValidatorManagerVariants::Ethereum( - InboxValidatorManagerBuilder { inbox_address } - .make_with_connection( - conf.clone(), - &ContractLocator { - chain_name: self.name.clone(), - domain: self.domain.parse().expect("invalid uint"), - address: self - .addresses - .validator_manager - .parse::()? - .into(), - }, - signer, - Some(|| metrics.json_rpc_client_metrics()), - Some((metrics.provider_metrics(), metrics_conf)), - ) - .await?, - ) - .into()), + ChainConf::Ethereum(conf) => Ok(InboxValidatorManagerBuilder { inbox_address } + .make_with_connection( + conf.clone(), + &ContractLocator { + chain_name: self.name.clone(), + domain: self.domain.parse().expect("invalid uint"), + address: self + .addresses + .validator_manager + .parse::()? + .into(), + }, + signer, + Some(|| metrics.json_rpc_client_metrics()), + Some((metrics.provider_metrics(), metrics_conf)), + ) + .await?), } } diff --git a/rust/abacus-base/src/settings/mod.rs b/rust/abacus-base/src/settings/mod.rs index 2bf408346..02a456802 100644 --- a/rust/abacus-base/src/settings/mod.rs +++ b/rust/abacus-base/src/settings/mod.rs @@ -89,7 +89,8 @@ use tracing::instrument; use abacus_core::{ db::{AbacusDB, DB}, utils::HexString, - AbacusContract, ContractLocator, Signers, + AbacusContract, ContractLocator, InboxValidatorManager, InterchainGasPaymasterIndexer, + OutboxIndexer, Signers, }; use abacus_ethereum::{ InterchainGasPaymasterIndexerBuilder, MakeableWithProvider, OutboxIndexerBuilder, @@ -97,10 +98,7 @@ use abacus_ethereum::{ pub use chains::{ChainConf, ChainSetup, InboxAddresses, OutboxAddresses}; use crate::{settings::trace::TracingConfig, CachingInterchainGasPaymaster}; -use crate::{ - AbacusAgentCore, CachingInbox, CachingOutbox, CoreMetrics, InboxContracts, - InboxValidatorManagers, InterchainGasPaymasterIndexers, OutboxIndexers, -}; +use crate::{AbacusAgentCore, CachingInbox, CachingOutbox, CoreMetrics, InboxContracts}; /// Chain configuration pub mod chains; @@ -263,7 +261,7 @@ impl Settings { &self, db: DB, metrics: &CoreMetrics, - ) -> Result, Report> { + ) -> eyre::Result> { let mut result = HashMap::new(); for (k, v) in self.inboxes.iter().filter(|(_, v)| { !v.disabled @@ -283,8 +281,8 @@ impl Settings { result.insert( v.name.clone(), InboxContracts { - inbox: Arc::new(caching_inbox), - validator_manager: Arc::new(validator_manager), + inbox: caching_inbox, + validator_manager: validator_manager.into(), }, ); } @@ -297,11 +295,11 @@ impl Settings { chain_setup: &ChainSetup, db: DB, metrics: &CoreMetrics, - ) -> Result { + ) -> eyre::Result { let signer = self.get_signer(&chain_setup.name).await; let inbox = chain_setup.try_into_inbox(signer, metrics).await?; let abacus_db = AbacusDB::new(inbox.chain_name(), db); - Ok(CachingInbox::new(inbox, abacus_db)) + Ok(CachingInbox::new(inbox.into(), abacus_db)) } /// Try to get an InboxValidatorManager @@ -309,7 +307,7 @@ impl Settings { &self, chain_setup: &ChainSetup, metrics: &CoreMetrics, - ) -> Result { + ) -> eyre::Result> { let signer = self.get_signer(&chain_setup.name).await; chain_setup @@ -322,12 +320,12 @@ impl Settings { &self, db: DB, metrics: &CoreMetrics, - ) -> Result { + ) -> eyre::Result { let signer = self.get_signer(&self.outbox.name).await; let outbox = self.outbox.try_into_outbox(signer, metrics).await?; - let indexer = Arc::new(self.try_outbox_indexer(metrics).await?); + let indexer = self.try_outbox_indexer(metrics).await?; let abacus_db = AbacusDB::new(outbox.chain_name(), db); - Ok(CachingOutbox::new(outbox, abacus_db, indexer)) + Ok(CachingOutbox::new(outbox.into(), abacus_db, indexer.into())) } /// Try to get a CachingInterchainGasPaymaster @@ -335,7 +333,7 @@ impl Settings { &self, db: DB, metrics: &CoreMetrics, - ) -> Result, Report> { + ) -> eyre::Result> { let signer = self.get_signer(&self.outbox.name).await; match self .outbox @@ -343,10 +341,12 @@ impl Settings { .await? { Some(paymaster) => { - let indexer = Arc::new(self.try_interchain_gas_paymaster_indexer(metrics).await?); + let indexer = self.try_interchain_gas_paymaster_indexer(metrics).await?; let abacus_db = AbacusDB::new(paymaster.chain_name(), db); Ok(Some(CachingInterchainGasPaymaster::new( - paymaster, abacus_db, indexer, + paymaster.into(), + abacus_db, + indexer.into(), ))) } None => Ok(None), @@ -357,32 +357,30 @@ impl Settings { pub async fn try_outbox_indexer( &self, metrics: &CoreMetrics, - ) -> Result { + ) -> eyre::Result> { match &self.outbox.chain { - ChainConf::Ethereum(conn) => Ok(OutboxIndexers::Ethereum( - OutboxIndexerBuilder { - from_height: self.index.from(), - chunk_size: self.index.chunk_size(), - finality_blocks: self.outbox.finality_blocks(), - } - .make_with_connection( - conn.clone(), - &ContractLocator { - chain_name: self.outbox.name.clone(), - domain: self.outbox.domain.parse().expect("invalid uint"), - address: self - .outbox - .addresses - .outbox - .parse::()? - .into(), - }, - self.get_signer(&self.outbox.name).await, - Some(|| metrics.json_rpc_client_metrics()), - Some((metrics.provider_metrics(), self.outbox.metrics_conf())), - ) - .await?, - )), + ChainConf::Ethereum(conn) => Ok(OutboxIndexerBuilder { + from_height: self.index.from(), + chunk_size: self.index.chunk_size(), + finality_blocks: self.outbox.finality_blocks(), + } + .make_with_connection( + conn.clone(), + &ContractLocator { + chain_name: self.outbox.name.clone(), + domain: self.outbox.domain.parse().expect("invalid uint"), + address: self + .outbox + .addresses + .outbox + .parse::()? + .into(), + }, + self.get_signer(&self.outbox.name).await, + Some(|| metrics.json_rpc_client_metrics()), + Some((metrics.provider_metrics(), self.outbox.metrics_conf())), + ) + .await?), } } @@ -393,39 +391,37 @@ impl Settings { pub async fn try_interchain_gas_paymaster_indexer( &self, metrics: &CoreMetrics, - ) -> Result { + ) -> eyre::Result> { match &self.outbox.chain { - ChainConf::Ethereum(conn) => Ok(InterchainGasPaymasterIndexers::Ethereum( - InterchainGasPaymasterIndexerBuilder { - outbox_address: self + ChainConf::Ethereum(conn) => Ok(InterchainGasPaymasterIndexerBuilder { + outbox_address: self + .outbox + .addresses + .outbox + .parse::()?, + from_height: self.index.from(), + chunk_size: self.index.chunk_size(), + finality_blocks: self.outbox.finality_blocks(), + } + .make_with_connection( + conn.clone(), + &ContractLocator { + chain_name: self.outbox.name.clone(), + domain: self.outbox.domain.parse().expect("invalid uint"), + address: self .outbox .addresses - .outbox - .parse::()?, - from_height: self.index.from(), - chunk_size: self.index.chunk_size(), - finality_blocks: self.outbox.finality_blocks(), - } - .make_with_connection( - conn.clone(), - &ContractLocator { - chain_name: self.outbox.name.clone(), - domain: self.outbox.domain.parse().expect("invalid uint"), - address: self - .outbox - .addresses - .interchain_gas_paymaster - .as_ref() - .expect("interchain_gas_paymaster not provided") - .parse::()? - .into(), - }, - self.get_signer(&self.outbox.name).await, - Some(|| metrics.json_rpc_client_metrics()), - Some((metrics.provider_metrics(), self.outbox.metrics_conf())), - ) - .await?, - )), + .interchain_gas_paymaster + .as_ref() + .expect("interchain_gas_paymaster not provided") + .parse::()? + .into(), + }, + self.get_signer(&self.outbox.name).await, + Some(|| metrics.json_rpc_client_metrics()), + Some((metrics.provider_metrics(), self.outbox.metrics_conf())), + ) + .await?), } } @@ -434,7 +430,7 @@ impl Settings { &self, name: &str, parse_inboxes: bool, - ) -> Result { + ) -> eyre::Result { let metrics = Arc::new(CoreMetrics::new( name, self.metrics @@ -444,11 +440,10 @@ impl Settings { )?); let db = DB::from_path(&self.db)?; - let outbox = Arc::new(self.try_caching_outbox(db.clone(), &metrics).await?); + let outbox = self.try_caching_outbox(db.clone(), &metrics).await?; let interchain_gas_paymaster = self .try_caching_interchain_gas_paymaster(db.clone(), &metrics) - .await? - .map(Arc::new); + .await?; let inbox_contracts = if parse_inboxes { self.try_inbox_contracts(db.clone(), &metrics).await? diff --git a/rust/abacus-base/src/validator_manager.rs b/rust/abacus-base/src/validator_manager.rs deleted file mode 100644 index bee84c14b..000000000 --- a/rust/abacus-base/src/validator_manager.rs +++ /dev/null @@ -1,105 +0,0 @@ -use async_trait::async_trait; -use std::sync::Arc; - -use abacus_core::{ - accumulator::merkle::Proof, AbacusMessage, Address, ChainCommunicationError, - InboxValidatorManager, MultisigSignedCheckpoint, TxOutcome, -}; - -#[derive(Debug, Clone)] -/// Arc wrapper for InboxValidatorManagerVariants enum -pub struct InboxValidatorManagers(Arc); - -impl From for InboxValidatorManagers { - fn from(inbox_validator_managers: InboxValidatorManagerVariants) -> Self { - Self(Arc::new(inbox_validator_managers)) - } -} - -impl std::ops::Deref for InboxValidatorManagers { - type Target = Arc; - - fn deref(&self) -> &Self::Target { - &self.0 - } -} - -impl std::ops::DerefMut for InboxValidatorManagers { - fn deref_mut(&mut self) -> &mut Self::Target { - &mut self.0 - } -} - -/// InboxValidatorManager type -#[derive(Debug)] -pub enum InboxValidatorManagerVariants { - /// Ethereum InboxValidatorManager contract - Ethereum(Box), - /// Mock InboxValidatorManager contract - Mock(Box), - /// Other InboxValidatorManager variant - Other(Box), -} - -#[async_trait] -impl InboxValidatorManager for InboxValidatorManagerVariants { - /// Submit a signed checkpoint for inclusion - async fn process( - &self, - multisig_signed_checkpoint: &MultisigSignedCheckpoint, - message: &AbacusMessage, - proof: &Proof, - ) -> Result { - match self { - InboxValidatorManagerVariants::Ethereum(validator_manager) => { - validator_manager - .process(multisig_signed_checkpoint, message, proof) - .await - } - InboxValidatorManagerVariants::Mock(mock_validator_manager) => { - mock_validator_manager - .process(multisig_signed_checkpoint, message, proof) - .await - } - InboxValidatorManagerVariants::Other(validator_manager) => { - validator_manager - .process(multisig_signed_checkpoint, message, proof) - .await - } - } - } - - /// Get calldata for a process tx - fn process_calldata( - &self, - multisig_signed_checkpoint: &MultisigSignedCheckpoint, - message: &AbacusMessage, - proof: &Proof, - ) -> Vec { - match self { - InboxValidatorManagerVariants::Ethereum(validator_manager) => { - validator_manager.process_calldata(multisig_signed_checkpoint, message, proof) - } - InboxValidatorManagerVariants::Mock(mock_validator_manager) => { - mock_validator_manager.process_calldata(multisig_signed_checkpoint, message, proof) - } - InboxValidatorManagerVariants::Other(validator_manager) => { - validator_manager.process_calldata(multisig_signed_checkpoint, message, proof) - } - } - } - - fn contract_address(&self) -> Address { - match self { - InboxValidatorManagerVariants::Ethereum(validator_manager) => { - validator_manager.contract_address() - } - InboxValidatorManagerVariants::Mock(validator_manager) => { - validator_manager.contract_address() - } - InboxValidatorManagerVariants::Other(validator_manager) => { - validator_manager.contract_address() - } - } - } -} diff --git a/rust/abacus-core/Cargo.toml b/rust/abacus-core/Cargo.toml index 7c39049d9..83a4d33a8 100644 --- a/rust/abacus-core/Cargo.toml +++ b/rust/abacus-core/Cargo.toml @@ -7,6 +7,7 @@ edition = "2021" # See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html [dependencies] +auto_impl = "1.0" ethers = { git = "https://github.com/abacus-network/ethers-rs", tag = "2022-09-12-01", default-features = false, features = ['legacy'] } ethers-signers = { git = "https://github.com/abacus-network/ethers-rs", tag = "2022-09-12-01", features=["aws"] } ethers-providers = { git = "https://github.com/abacus-network/ethers-rs", tag = "2022-09-12-01", features=["ws", "rustls"] } diff --git a/rust/abacus-core/src/traits/inbox.rs b/rust/abacus-core/src/traits/inbox.rs index 4aeafd699..f6410af2c 100644 --- a/rust/abacus-core/src/traits/inbox.rs +++ b/rust/abacus-core/src/traits/inbox.rs @@ -1,6 +1,7 @@ use std::fmt::Debug; use async_trait::async_trait; +use auto_impl::auto_impl; use ethers::core::types::H256; use eyre::Result; @@ -11,6 +12,7 @@ use crate::{ /// Interface for on-chain inboxes #[async_trait] +#[auto_impl(Box, Arc)] pub trait Inbox: AbacusCommon + Send + Sync + Debug { /// Return the domain of the inbox's linked outbox async fn remote_domain(&self) -> Result; diff --git a/rust/abacus-core/src/traits/indexer.rs b/rust/abacus-core/src/traits/indexer.rs index 88a84d0a9..6a30b401c 100644 --- a/rust/abacus-core/src/traits/indexer.rs +++ b/rust/abacus-core/src/traits/indexer.rs @@ -1,8 +1,8 @@ //! An Indexer provides a common interface for bubbling up chain-specific //! event-data to another entity (e.g. a `ContractSync`). For example, the only //! way to retrieve data such as the chain's latest block number or a list of -//! checkpoints/messages emitted within a certain block range by calling out to a -//! chain-specific library and provider (e.g. ethers::provider). A +//! checkpoints/messages emitted within a certain block range by calling out to +//! a chain-specific library and provider (e.g. ethers::provider). A //! chain-specific outbox or inbox should implement one or both of the Indexer //! traits (CommonIndexer or OutboxIndexer) to provide an common interface which //! other entities can retrieve this chain-specific info. @@ -10,12 +10,14 @@ use std::fmt::Debug; use async_trait::async_trait; +use auto_impl::auto_impl; use eyre::Result; use crate::{CheckpointWithMeta, InterchainGasPaymentWithMeta, RawCommittedMessage}; /// Interface for an indexer. #[async_trait] +#[auto_impl(Box, Arc)] pub trait Indexer: Send + Sync + Debug { /// Get the chain's latest block number that has reached finality async fn get_finalized_block_number(&self) -> Result; @@ -24,12 +26,14 @@ pub trait Indexer: Send + Sync + Debug { /// Interface for Outbox contract indexer. Interface for allowing other /// entities to retrieve chain-specific data from an outbox. #[async_trait] +#[auto_impl(Box, Arc)] pub trait OutboxIndexer: Indexer + Send + Sync + Debug { /// Fetch list of messages between blocks `from` and `to`. async fn fetch_sorted_messages(&self, _from: u32, _to: u32) -> Result>; - /// Fetch sequentially sorted list of cached checkpoints between blocks `from` and `to` + /// Fetch sequentially sorted list of cached checkpoints between blocks + /// `from` and `to` async fn fetch_sorted_cached_checkpoints( &self, from: u32, @@ -39,8 +43,10 @@ pub trait OutboxIndexer: Indexer + Send + Sync + Debug { /// Interface for InterchainGasPaymaster contract indexer. #[async_trait] +#[auto_impl(Box, Arc)] pub trait InterchainGasPaymasterIndexer: Indexer + Send + Sync + Debug { - /// Fetch list of gas payments between `from_block` and `to_block`, inclusive + /// Fetch list of gas payments between `from_block` and `to_block`, + /// inclusive async fn fetch_gas_payments( &self, from_block: u32, diff --git a/rust/abacus-core/src/traits/interchain_gas.rs b/rust/abacus-core/src/traits/interchain_gas.rs index bdc47ff9e..1be363be0 100644 --- a/rust/abacus-core/src/traits/interchain_gas.rs +++ b/rust/abacus-core/src/traits/interchain_gas.rs @@ -1,10 +1,12 @@ use std::fmt::Debug; use async_trait::async_trait; +use auto_impl::auto_impl; use crate::AbacusContract; /// Interface for the InterchainGasPaymaster chain contract. /// Allows abstraction over different chains. #[async_trait] +#[auto_impl(Box, Arc)] pub trait InterchainGasPaymaster: AbacusContract + Send + Sync + Debug {} diff --git a/rust/abacus-core/src/traits/mod.rs b/rust/abacus-core/src/traits/mod.rs index 30dcc4011..dd752a84c 100644 --- a/rust/abacus-core/src/traits/mod.rs +++ b/rust/abacus-core/src/traits/mod.rs @@ -1,16 +1,9 @@ -mod common; -mod encode; -mod inbox; -mod indexer; -mod interchain_gas; -mod message; -mod outbox; -mod validator_manager; - use std::collections::HashMap; +use std::error::Error as StdError; use std::fmt::Debug; use async_trait::async_trait; +use auto_impl::auto_impl; use ethers::prelude::Selector; use ethers::{ contract::ContractError, @@ -18,9 +11,6 @@ use ethers::{ providers::{Middleware, ProviderError}, }; use eyre::Result; -use std::error::Error as StdError; - -use crate::{db::DbError, utils::domain_hash, AbacusError}; pub use common::*; pub use encode::*; @@ -31,6 +21,17 @@ pub use message::*; pub use outbox::*; pub use validator_manager::*; +use crate::{db::DbError, utils::domain_hash, AbacusError}; + +mod common; +mod encode; +mod inbox; +mod indexer; +mod interchain_gas; +mod message; +mod outbox; +mod validator_manager; + /// The result of a transaction #[derive(Debug, Clone, Copy)] pub struct TxOutcome { @@ -89,6 +90,7 @@ where /// Interface for a deployed contract. /// This trait is intended to expose attributes of any contract, and /// should not consider the purpose or implementation details of the contract. +#[auto_impl(Box, Arc)] pub trait AbacusContract { /// Return an identifier (not necessarily unique) for the chain this /// contract is deployed to. @@ -97,6 +99,7 @@ pub trait AbacusContract { /// Interface for attributes shared by Outbox and Inbox #[async_trait] +#[auto_impl(Box, Arc)] pub trait AbacusCommon: AbacusContract + Sync + Send + Debug { /// Return the domain ID fn local_domain(&self) -> u32; @@ -114,11 +117,13 @@ pub trait AbacusCommon: AbacusContract + Sync + Send + Debug { } /// Static contract ABI information. +#[auto_impl(Box, Arc)] pub trait AbacusAbi { /// Get a mapping from function selectors to human readable function names. fn fn_map() -> HashMap; - /// Get a mapping from function selectors to owned human readable function names. + /// Get a mapping from function selectors to owned human readable function + /// names. fn fn_map_owned() -> HashMap { Self::fn_map() .into_iter() diff --git a/rust/abacus-core/src/traits/outbox.rs b/rust/abacus-core/src/traits/outbox.rs index 968e02d8e..4650dce79 100644 --- a/rust/abacus-core/src/traits/outbox.rs +++ b/rust/abacus-core/src/traits/outbox.rs @@ -1,17 +1,20 @@ use std::convert::TryFrom; use std::fmt::Debug; +use async_trait::async_trait; +use auto_impl::auto_impl; +use ethers::core::types::H256; +use eyre::Result; + use crate::{ traits::{ChainCommunicationError, TxOutcome}, AbacusCommon, Checkpoint, CommittedMessage, Message, OutboxState, RawCommittedMessage, }; -use async_trait::async_trait; -use ethers::core::types::H256; -use eyre::Result; /// Interface for the Outbox chain contract. Allows abstraction over different /// chains #[async_trait] +#[auto_impl(Box, Arc)] pub trait Outbox: AbacusCommon + Send + Sync + Debug { /// Fetch the current state. async fn state(&self) -> Result; @@ -40,6 +43,7 @@ pub trait Outbox: AbacusCommon + Send + Sync + Debug { /// Interface for retrieving event data emitted specifically by the outbox #[async_trait] +#[auto_impl(Box, Arc)] pub trait OutboxEvents: Outbox + Send + Sync + Debug { /// Look up a message by its hash. /// This should fetch events from the chain API diff --git a/rust/abacus-core/src/traits/validator_manager.rs b/rust/abacus-core/src/traits/validator_manager.rs index 1b31c8c3b..1bdce74f9 100644 --- a/rust/abacus-core/src/traits/validator_manager.rs +++ b/rust/abacus-core/src/traits/validator_manager.rs @@ -1,6 +1,7 @@ use std::fmt::Debug; use async_trait::async_trait; +use auto_impl::auto_impl; use eyre::Result; use crate::{ @@ -11,6 +12,7 @@ use crate::{ /// Interface for an InboxValidatorManager #[async_trait] +#[auto_impl(Box, Arc)] pub trait InboxValidatorManager: Send + Sync + Debug { /// Process a message with a proof against the provided signed checkpoint async fn process( diff --git a/rust/abacus-test/src/mocks/outbox.rs b/rust/abacus-test/src/mocks/outbox.rs index 8372cdbd1..508102f9a 100644 --- a/rust/abacus-test/src/mocks/outbox.rs +++ b/rust/abacus-test/src/mocks/outbox.rs @@ -56,10 +56,6 @@ impl std::fmt::Debug for MockOutboxContract { #[async_trait] impl Outbox for MockOutboxContract { - async fn dispatch(&self, message: &Message) -> Result { - self._dispatch(message) - } - async fn state(&self) -> Result { self._state() } @@ -68,6 +64,10 @@ impl Outbox for MockOutboxContract { self._count() } + async fn dispatch(&self, message: &Message) -> Result { + self._dispatch(message) + } + async fn cache_checkpoint(&self) -> Result { self._cache_checkpoint() } diff --git a/rust/agents/relayer/src/checkpoint_fetcher.rs b/rust/agents/relayer/src/checkpoint_fetcher.rs index 54c8cb202..51f8d6af9 100644 --- a/rust/agents/relayer/src/checkpoint_fetcher.rs +++ b/rust/agents/relayer/src/checkpoint_fetcher.rs @@ -3,11 +3,10 @@ use std::time::Duration; use eyre::Result; use prometheus::{IntGauge, IntGaugeVec}; use tokio::{sync::watch::Sender, task::JoinHandle, time::sleep}; - use tracing::{debug, info, info_span, instrument, instrument::Instrumented, Instrument}; -use abacus_base::{MultisigCheckpointSyncer, Outboxes}; -use abacus_core::{AbacusContract, MultisigSignedCheckpoint}; +use abacus_base::MultisigCheckpointSyncer; +use abacus_core::{MultisigSignedCheckpoint, Outbox}; pub(crate) struct CheckpointFetcher { polling_interval: u64, @@ -19,7 +18,7 @@ pub(crate) struct CheckpointFetcher { impl CheckpointFetcher { #[allow(clippy::too_many_arguments)] pub(crate) fn new( - outbox: Outboxes, + outbox: &dyn Outbox, polling_interval: u64, multisig_checkpoint_syncer: MultisigCheckpointSyncer, signed_checkpoint_sender: Sender>, diff --git a/rust/agents/relayer/src/msg/processor.rs b/rust/agents/relayer/src/msg/processor.rs index 831468083..0a145815c 100644 --- a/rust/agents/relayer/src/msg/processor.rs +++ b/rust/agents/relayer/src/msg/processor.rs @@ -9,7 +9,7 @@ use tokio::{ }; use tracing::{debug, info_span, instrument, instrument::Instrumented, warn, Instrument}; -use abacus_base::{CoreMetrics, InboxContracts, Outboxes}; +use abacus_base::{CoreMetrics, InboxContracts}; use abacus_core::{ db::AbacusDB, AbacusCommon, AbacusContract, CommittedMessage, MultisigSignedCheckpoint, Outbox, }; @@ -20,7 +20,7 @@ use super::SubmitMessageArgs; #[derive(Debug)] pub(crate) struct MessageProcessor { - outbox: Outboxes, + outbox: Arc, db: AbacusDB, inbox_contracts: InboxContracts, whitelist: Arc, @@ -35,7 +35,7 @@ pub(crate) struct MessageProcessor { impl MessageProcessor { #[allow(clippy::too_many_arguments)] pub(crate) fn new( - outbox: Outboxes, + outbox: Arc, db: AbacusDB, inbox_contracts: InboxContracts, whitelist: Arc, @@ -74,16 +74,18 @@ impl MessageProcessor { #[instrument(ret, err, skip(self), fields(inbox_name=self.inbox_contracts.inbox.chain_name(), local_domain=?self.inbox_contracts.inbox.local_domain()), level = "info")] async fn main_loop(mut self) -> Result<()> { - // Ensure that there is at least one valid, known checkpoint before starting work loop. + // Ensure that there is at least one valid, known checkpoint before starting + // work loop. loop { self.ckpt_rx.changed().await?; if self.ckpt_rx.borrow().clone().is_some() { break; } } - // Forever, scan AbacusDB looking for new messages to send. When criteria are satisfied - // or the message is disqualified, push the message onto self.tx_msg and then continue - // the scan at the next outbox highest leaf index. + // Forever, scan AbacusDB looking for new messages to send. When criteria are + // satisfied or the message is disqualified, push the message onto + // self.tx_msg and then continue the scan at the next outbox highest + // leaf index. loop { self.tick().await?; } @@ -123,12 +125,13 @@ impl MessageProcessor { "Leaf in db without message idx: {}", self.message_leaf_index ); - // Not clear what the best thing to do here is, but there is seemingly an existing - // race wherein an indexer might non-atomically write leaf info to rocksdb across a - // few records, so we might see the leaf status above, but not the message contents - // here. For now, optimistically yield and then re-enter the loop in hopes that - // the DB is now coherent. - // TODO(webbhorn): Why can't we yield here instead of sleep? + // Not clear what the best thing to do here is, but there is seemingly an + // existing race wherein an indexer might non-atomically write leaf + // info to rocksdb across a few records, so we might see the leaf + // status above, but not the message contents here. For now, + // optimistically yield and then re-enter the loop in hopes that the + // DB is now coherent. TODO(webbhorn): Why can't we yield here + // instead of sleep? tokio::time::sleep(Duration::from_secs(1)).await; return Ok(()); }; @@ -171,8 +174,9 @@ impl MessageProcessor { return Ok(()); } - // If validator hasn't published checkpoint covering self.message_leaf_index yet, wait - // until it has, before forwarding the message to the submitter channel. + // If validator hasn't published checkpoint covering self.message_leaf_index + // yet, wait until it has, before forwarding the message to the + // submitter channel. let mut ckpt; loop { ckpt = self.ckpt_rx.borrow().clone(); @@ -188,7 +192,8 @@ impl MessageProcessor { let checkpoint = ckpt.unwrap(); assert!(checkpoint.checkpoint.index >= self.message_leaf_index); - // Include proof against checkpoint for message in the args provided to the submitter. + // Include proof against checkpoint for message in the args provided to the + // submitter. if checkpoint.checkpoint.index >= self.prover_sync.count() { self.prover_sync .update_to_checkpoint(&checkpoint.checkpoint) @@ -226,7 +231,7 @@ impl MessageProcessor { } /// Spawn a task to update the outbox state gauge. - async fn metrics_loop(outbox_state_gauge: IntGauge, outbox: Outboxes) { + async fn metrics_loop(outbox_state_gauge: IntGauge, outbox: Arc) { let mut interval = tokio::time::interval(Duration::from_secs(60)); interval.set_missed_tick_behavior(MissedTickBehavior::Skip); loop { diff --git a/rust/agents/relayer/src/relayer.rs b/rust/agents/relayer/src/relayer.rs index 88c7ce3f6..6f36ecfbc 100644 --- a/rust/agents/relayer/src/relayer.rs +++ b/rust/agents/relayer/src/relayer.rs @@ -88,7 +88,7 @@ impl BaseAgent for Relayer { let gas_payment_enforcer = Arc::new(GasPaymentEnforcer::new( self.gas_payment_enforcement_policy.clone(), - self.outbox().db(), + self.outbox().db().clone(), )); for (inbox_name, inbox_contracts) in inboxes { @@ -113,7 +113,7 @@ impl BaseAgent for Relayer { tasks.push(self.run_outbox_sync(sync_metrics.clone())); if let Some(paymaster) = self.interchain_gas_paymaster() { - tasks.push(self.run_interchain_gas_paymaster_sync(paymaster, sync_metrics)); + tasks.push(self.run_interchain_gas_paymaster_sync(paymaster.clone(), sync_metrics)); } else { info!("Interchain Gas Paymaster not provided, not running sync"); } @@ -134,7 +134,7 @@ impl Relayer { fn run_interchain_gas_paymaster_sync( &self, - paymaster: Arc, + paymaster: CachingInterchainGasPaymaster, sync_metrics: ContractSyncMetrics, ) -> Instrumented>> { paymaster.sync(self.as_ref().indexer.clone(), sync_metrics) @@ -168,7 +168,7 @@ impl Relayer { message_receiver, self.outbox().local_domain(), inbox_contracts, - self.outbox().db(), + self.outbox().db().clone(), signer, GelatoSubmitterMetrics::new( &self.core.metrics, @@ -209,7 +209,7 @@ impl Relayer { let serial_submitter = SerialSubmitter::new( msg_receive, inbox_contracts.clone(), - self.outbox().db(), + self.outbox().db().clone(), SerialSubmitterMetrics::new( &self.core.metrics, outbox_name, @@ -221,8 +221,8 @@ impl Relayer { } }; let message_processor = MessageProcessor::new( - outbox, - self.outbox().db(), + outbox.clone(), + self.outbox().db().clone(), inbox_contracts, self.whitelist.clone(), self.blacklist.clone(), diff --git a/rust/agents/validator/src/submit.rs b/rust/agents/validator/src/submit.rs index 2fcd50f04..12b05d679 100644 --- a/rust/agents/validator/src/submit.rs +++ b/rust/agents/validator/src/submit.rs @@ -15,7 +15,7 @@ pub(crate) struct ValidatorSubmitter { interval: u64, reorg_period: u64, signer: Arc, - outbox: Arc, + outbox: CachingOutbox, checkpoint_syncer: Arc, metrics: ValidatorSubmitterMetrics, } @@ -24,7 +24,7 @@ impl ValidatorSubmitter { pub(crate) fn new( interval: u64, reorg_period: u64, - outbox: Arc, + outbox: CachingOutbox, signer: Arc, checkpoint_syncer: Arc, metrics: ValidatorSubmitterMetrics, @@ -54,7 +54,7 @@ impl ValidatorSubmitter { } /// Spawn a task to update the outbox state gauge. - async fn metrics_loop(outbox_state_gauge: IntGauge, outbox: Arc) { + async fn metrics_loop(outbox_state_gauge: IntGauge, outbox: CachingOutbox) { let mut interval = tokio::time::interval(Duration::from_secs(60)); interval.set_missed_tick_behavior(MissedTickBehavior::Skip); loop { diff --git a/rust/agents/validator/src/validator.rs b/rust/agents/validator/src/validator.rs index 80fbd559b..92fa96055 100644 --- a/rust/agents/validator/src/validator.rs +++ b/rust/agents/validator/src/validator.rs @@ -79,7 +79,7 @@ impl BaseAgent for Validator { let submit = ValidatorSubmitter::new( self.interval, self.reorg_period, - self.outbox(), + self.outbox().clone(), self.signer.clone(), self.checkpoint_syncer.clone(), ValidatorSubmitterMetrics::new(&self.core.metrics, self.outbox().chain_name()),