Refactoring out redundant enums (#1060)

* Fix metric ordering (#1043)

* Remove redunant enums

* Remove checkpoint fn that was not actually in use
pull/1078/head
Mattie Conover 2 years ago committed by GitHub
parent 00feef0a69
commit 9cc91d9c51
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 1
      rust/Cargo.lock
  2. 40
      rust/abacus-base/src/agent.rs
  3. 8
      rust/abacus-base/src/contract_sync/interchain_gas.rs
  4. 14
      rust/abacus-base/src/contract_sync/mod.rs
  5. 2
      rust/abacus-base/src/contract_sync/outbox.rs
  6. 162
      rust/abacus-base/src/inbox.rs
  7. 113
      rust/abacus-base/src/indexer.rs
  8. 99
      rust/abacus-base/src/interchain_gas.rs
  9. 6
      rust/abacus-base/src/lib.rs
  10. 213
      rust/abacus-base/src/outbox.rs
  11. 161
      rust/abacus-base/src/settings/chains.rs
  12. 147
      rust/abacus-base/src/settings/mod.rs
  13. 105
      rust/abacus-base/src/validator_manager.rs
  14. 1
      rust/abacus-core/Cargo.toml
  15. 2
      rust/abacus-core/src/traits/inbox.rs
  16. 14
      rust/abacus-core/src/traits/indexer.rs
  17. 2
      rust/abacus-core/src/traits/interchain_gas.rs
  18. 31
      rust/abacus-core/src/traits/mod.rs
  19. 10
      rust/abacus-core/src/traits/outbox.rs
  20. 2
      rust/abacus-core/src/traits/validator_manager.rs
  21. 8
      rust/abacus-test/src/mocks/outbox.rs
  22. 7
      rust/agents/relayer/src/checkpoint_fetcher.rs
  23. 39
      rust/agents/relayer/src/msg/processor.rs
  24. 14
      rust/agents/relayer/src/relayer.rs
  25. 6
      rust/agents/validator/src/submit.rs
  26. 2
      rust/agents/validator/src/validator.rs

1
rust/Cargo.lock generated

@ -58,6 +58,7 @@ version = "0.1.0"
dependencies = [
"abacus-base",
"async-trait",
"auto_impl 1.0.1",
"bytes",
"color-eyre",
"config",

@ -9,30 +9,31 @@ use tracing::instrument::Instrumented;
use tracing::{info_span, Instrument};
use abacus_core::db::DB;
use abacus_core::InboxValidatorManager;
use crate::{
cancel_task,
metrics::CoreMetrics,
settings::{IndexSettings, Settings},
CachingInbox, CachingInterchainGasPaymaster, CachingOutbox, InboxValidatorManagers,
CachingInbox, CachingInterchainGasPaymaster, CachingOutbox,
};
/// Contracts relating to an inbox chain
#[derive(Clone, Debug)]
pub struct InboxContracts {
/// A boxed Inbox
pub inbox: Arc<CachingInbox>,
pub inbox: CachingInbox,
/// A boxed InboxValidatorManager
pub validator_manager: Arc<InboxValidatorManagers>,
pub validator_manager: Arc<dyn InboxValidatorManager>,
}
/// Properties shared across all abacus agents
#[derive(Debug)]
pub struct AbacusAgentCore {
/// A boxed Outbox
pub outbox: Arc<CachingOutbox>,
pub outbox: CachingOutbox,
/// A boxed InterchainGasPaymaster
pub interchain_gas_paymaster: Option<Arc<CachingInterchainGasPaymaster>>,
pub interchain_gas_paymaster: Option<CachingInterchainGasPaymaster>,
/// A map of boxed Inbox contracts
pub inboxes: HashMap<String, InboxContracts>,
/// A persistent KV Store (currently implemented as rocksdb)
@ -75,45 +76,48 @@ pub trait Agent: BaseAgent {
fn metrics(&self) -> Arc<CoreMetrics>;
/// Return a handle to the DB
fn db(&self) -> DB;
fn db(&self) -> &DB;
/// Return a reference to an Outbox contract
fn outbox(&self) -> Arc<CachingOutbox>;
fn outbox(&self) -> &CachingOutbox;
/// Return a reference to an InterchainGasPaymaster contract
fn interchain_gas_paymaster(&self) -> Option<Arc<CachingInterchainGasPaymaster>>;
fn interchain_gas_paymaster(&self) -> Option<&CachingInterchainGasPaymaster>;
/// Get a reference to the inboxes map
fn inboxes(&self) -> &HashMap<String, InboxContracts>;
/// Get a reference to an inbox's contracts by its name
fn inbox_by_name(&self, name: &str) -> Option<InboxContracts>;
fn inbox_by_name(&self, name: &str) -> Option<&InboxContracts>;
}
#[async_trait]
impl<B: BaseAgent + AsRef<AbacusAgentCore>> Agent for B {
impl<B> Agent for B
where
B: BaseAgent + AsRef<AbacusAgentCore>,
{
fn metrics(&self) -> Arc<CoreMetrics> {
self.as_ref().metrics.clone()
}
fn db(&self) -> DB {
self.as_ref().db.clone()
fn db(&self) -> &DB {
&self.as_ref().db
}
fn outbox(&self) -> Arc<CachingOutbox> {
self.as_ref().outbox.clone()
fn outbox(&self) -> &CachingOutbox {
&self.as_ref().outbox
}
fn interchain_gas_paymaster(&self) -> Option<Arc<CachingInterchainGasPaymaster>> {
self.as_ref().interchain_gas_paymaster.clone()
fn interchain_gas_paymaster(&self) -> Option<&CachingInterchainGasPaymaster> {
self.as_ref().interchain_gas_paymaster.as_ref()
}
fn inboxes(&self) -> &HashMap<String, InboxContracts> {
&self.as_ref().inboxes
}
fn inbox_by_name(&self, name: &str) -> Option<InboxContracts> {
self.inboxes().get(name).map(Clone::clone)
fn inbox_by_name(&self, name: &str) -> Option<&InboxContracts> {
self.inboxes().get(name)
}
}

@ -1,10 +1,10 @@
use abacus_core::InterchainGasPaymasterIndexer;
use std::cmp::min;
use std::time::Duration;
use tokio::{task::JoinHandle, time::sleep};
use tracing::{debug, info, info_span, instrument::Instrumented, Instrument};
use std::cmp::min;
use std::time::Duration;
use abacus_core::InterchainGasPaymasterIndexer;
use crate::{contract_sync::schema::InterchainGasPaymasterContractSyncDB, ContractSync};
@ -12,7 +12,7 @@ const GAS_PAYMENTS_LABEL: &str = "gas_payments";
impl<I> ContractSync<I>
where
I: InterchainGasPaymasterIndexer + 'static,
I: InterchainGasPaymasterIndexer + Clone + 'static,
{
/// Sync gas payments
pub fn sync_gas_payments(&self) -> Instrumented<JoinHandle<eyre::Result<()>>> {

@ -1,10 +1,12 @@
// TODO: Reapply tip buffer
// TODO: Reapply metrics
use std::sync::Arc;
use abacus_core::db::AbacusDB;
pub use interchain_gas::*;
pub use metrics::ContractSyncMetrics;
pub use outbox::*;
use crate::settings::IndexSettings;
use abacus_core::db::AbacusDB;
mod interchain_gas;
mod last_message;
@ -12,10 +14,6 @@ mod metrics;
mod outbox;
mod schema;
pub use interchain_gas::*;
pub use metrics::ContractSyncMetrics;
pub use outbox::*;
/// Entity that drives the syncing of an agent's db with on-chain data.
/// Extracts chain-specific data (emitted checkpoints, messages, etc) from an
/// `indexer` and fills the agent's db with this data. A CachingOutbox or
@ -25,7 +23,7 @@ pub use outbox::*;
pub struct ContractSync<I> {
chain_name: String,
db: AbacusDB,
indexer: Arc<I>,
indexer: I,
index_settings: IndexSettings,
metrics: ContractSyncMetrics,
}
@ -35,7 +33,7 @@ impl<I> ContractSync<I> {
pub fn new(
chain_name: String,
db: AbacusDB,
indexer: Arc<I>,
indexer: I,
index_settings: IndexSettings,
metrics: ContractSyncMetrics,
) -> Self {

@ -16,7 +16,7 @@ const MESSAGES_LABEL: &str = "messages";
impl<I> ContractSync<I>
where
I: OutboxIndexer + 'static,
I: OutboxIndexer + Clone + 'static,
{
/// Sync outbox messages
pub fn sync_outbox_messages(&self) -> Instrumented<tokio::task::JoinHandle<eyre::Result<()>>> {

@ -1,19 +1,18 @@
use abacus_core::{
db::AbacusDB, AbacusCommon, AbacusContract, Address, ChainCommunicationError, Inbox,
MessageStatus, TxOutcome,
};
use abacus_test::mocks::inbox::MockInboxContract;
use std::sync::Arc;
use async_trait::async_trait;
use ethers::core::types::H256;
use eyre::Result;
use abacus_ethereum::EthereumInbox;
use std::sync::Arc;
use abacus_core::{
db::AbacusDB, AbacusCommon, AbacusContract, Address, ChainCommunicationError, Inbox,
MessageStatus, TxOutcome,
};
/// Caching inbox type
#[derive(Debug)]
/// Caching inbox type.
#[derive(Debug, Clone)]
pub struct CachingInbox {
inbox: Inboxes,
inbox: Arc<dyn Inbox>,
db: AbacusDB,
}
@ -25,18 +24,18 @@ impl std::fmt::Display for CachingInbox {
impl CachingInbox {
/// Instantiate new CachingInbox
pub fn new(inbox: Inboxes, db: AbacusDB) -> Self {
pub fn new(inbox: Arc<dyn Inbox>, db: AbacusDB) -> Self {
Self { inbox, db }
}
/// Return handle on inbox object
pub fn inbox(&self) -> Inboxes {
self.inbox.clone()
pub fn inbox(&self) -> &Arc<dyn Inbox> {
&self.inbox
}
/// Return handle on AbacusDB
pub fn db(&self) -> AbacusDB {
self.db.clone()
pub fn db(&self) -> &AbacusDB {
&self.db
}
}
@ -75,136 +74,3 @@ impl AbacusCommon for CachingInbox {
self.inbox.validator_manager().await
}
}
#[derive(Debug, Clone)]
/// Arc wrapper for InboxVariants enum
pub struct Inboxes(Arc<InboxVariants>);
impl From<InboxVariants> for Inboxes {
fn from(inboxes: InboxVariants) -> Self {
Self(Arc::new(inboxes))
}
}
impl std::ops::Deref for Inboxes {
type Target = Arc<InboxVariants>;
fn deref(&self) -> &Self::Target {
&self.0
}
}
impl std::ops::DerefMut for Inboxes {
fn deref_mut(&mut self) -> &mut Self::Target {
&mut self.0
}
}
/// Inbox type
#[derive(Debug)]
pub enum InboxVariants {
/// Ethereum inbox contract
Ethereum(Box<dyn Inbox>),
/// Mock inbox contract
Mock(Box<MockInboxContract>),
/// Other inbox variant
Other(Box<dyn Inbox>),
}
impl InboxVariants {
/// Calls checkpoint on mock variant. Should
/// only be used during tests.
#[doc(hidden)]
pub fn checkpoint(&mut self) {
if let InboxVariants::Mock(inbox) = self {
inbox.checkpoint();
} else {
panic!("Inbox should be mock variant!");
}
}
}
impl<M> From<EthereumInbox<M>> for Inboxes
where
M: ethers::providers::Middleware + 'static,
{
fn from(inbox: EthereumInbox<M>) -> Self {
InboxVariants::Ethereum(Box::new(inbox)).into()
}
}
impl From<MockInboxContract> for Inboxes {
fn from(inbox: MockInboxContract) -> Self {
InboxVariants::Mock(Box::new(inbox)).into()
}
}
impl From<Box<dyn Inbox>> for Inboxes {
fn from(inbox: Box<dyn Inbox>) -> Self {
InboxVariants::Other(inbox).into()
}
}
#[async_trait]
impl Inbox for InboxVariants {
async fn remote_domain(&self) -> Result<u32, ChainCommunicationError> {
match self {
InboxVariants::Ethereum(inbox) => inbox.remote_domain().await,
InboxVariants::Mock(mock_inbox) => mock_inbox.remote_domain().await,
InboxVariants::Other(inbox) => inbox.remote_domain().await,
}
}
async fn message_status(&self, leaf: H256) -> Result<MessageStatus, ChainCommunicationError> {
match self {
InboxVariants::Ethereum(inbox) => inbox.message_status(leaf).await,
InboxVariants::Mock(mock_inbox) => mock_inbox.message_status(leaf).await,
InboxVariants::Other(inbox) => inbox.message_status(leaf).await,
}
}
fn contract_address(&self) -> Address {
match self {
InboxVariants::Ethereum(inbox) => inbox.contract_address(),
InboxVariants::Mock(mock_inbox) => mock_inbox.contract_address(),
InboxVariants::Other(inbox) => inbox.contract_address(),
}
}
}
impl AbacusContract for InboxVariants {
fn chain_name(&self) -> &str {
match self {
InboxVariants::Ethereum(inbox) => inbox.chain_name(),
InboxVariants::Mock(mock_inbox) => mock_inbox.chain_name(),
InboxVariants::Other(inbox) => inbox.chain_name(),
}
}
}
#[async_trait]
impl AbacusCommon for InboxVariants {
fn local_domain(&self) -> u32 {
match self {
InboxVariants::Ethereum(inbox) => inbox.local_domain(),
InboxVariants::Mock(mock_inbox) => mock_inbox.local_domain(),
InboxVariants::Other(inbox) => inbox.local_domain(),
}
}
async fn status(&self, txid: H256) -> Result<Option<TxOutcome>, ChainCommunicationError> {
match self {
InboxVariants::Ethereum(inbox) => inbox.status(txid).await,
InboxVariants::Mock(mock_inbox) => mock_inbox.status(txid).await,
InboxVariants::Other(inbox) => inbox.status(txid).await,
}
}
async fn validator_manager(&self) -> Result<H256, ChainCommunicationError> {
match self {
InboxVariants::Ethereum(inbox) => inbox.validator_manager().await,
InboxVariants::Mock(mock_inbox) => mock_inbox.validator_manager().await,
InboxVariants::Other(inbox) => inbox.validator_manager().await,
}
}
}

@ -1,113 +0,0 @@
use abacus_core::{
CheckpointWithMeta, Indexer, InterchainGasPaymasterIndexer, InterchainGasPaymentWithMeta,
OutboxIndexer, RawCommittedMessage,
};
use abacus_test::mocks::indexer::MockAbacusIndexer;
use async_trait::async_trait;
use eyre::Result;
/// OutboxIndexer type
#[derive(Debug)]
pub enum OutboxIndexers {
/// Ethereum contract indexer
Ethereum(Box<dyn OutboxIndexer>),
/// Mock indexer
Mock(Box<dyn OutboxIndexer>),
/// Other indexer variant
Other(Box<dyn OutboxIndexer>),
}
impl From<MockAbacusIndexer> for OutboxIndexers {
fn from(mock_indexer: MockAbacusIndexer) -> Self {
OutboxIndexers::Mock(Box::new(mock_indexer))
}
}
#[async_trait]
impl Indexer for OutboxIndexers {
async fn get_finalized_block_number(&self) -> Result<u32> {
match self {
OutboxIndexers::Ethereum(indexer) => indexer.get_finalized_block_number().await,
OutboxIndexers::Mock(indexer) => indexer.get_finalized_block_number().await,
OutboxIndexers::Other(indexer) => indexer.get_finalized_block_number().await,
}
}
}
#[async_trait]
impl OutboxIndexer for OutboxIndexers {
async fn fetch_sorted_messages(&self, from: u32, to: u32) -> Result<Vec<RawCommittedMessage>> {
match self {
OutboxIndexers::Ethereum(indexer) => indexer.fetch_sorted_messages(from, to).await,
OutboxIndexers::Mock(indexer) => indexer.fetch_sorted_messages(from, to).await,
OutboxIndexers::Other(indexer) => indexer.fetch_sorted_messages(from, to).await,
}
}
async fn fetch_sorted_cached_checkpoints(
&self,
from: u32,
to: u32,
) -> Result<Vec<CheckpointWithMeta>> {
match self {
OutboxIndexers::Ethereum(indexer) => {
indexer.fetch_sorted_cached_checkpoints(from, to).await
}
OutboxIndexers::Mock(indexer) => {
indexer.fetch_sorted_cached_checkpoints(from, to).await
}
OutboxIndexers::Other(indexer) => {
indexer.fetch_sorted_cached_checkpoints(from, to).await
}
}
}
}
/// InterchainGasPaymasterIndexer type
#[derive(Debug)]
pub enum InterchainGasPaymasterIndexers {
/// Ethereum contract indexer
Ethereum(Box<dyn InterchainGasPaymasterIndexer>),
/// Mock indexer
Mock(Box<dyn InterchainGasPaymasterIndexer>),
/// Other indexer variant
Other(Box<dyn InterchainGasPaymasterIndexer>),
}
#[async_trait]
impl Indexer for InterchainGasPaymasterIndexers {
async fn get_finalized_block_number(&self) -> Result<u32> {
match self {
InterchainGasPaymasterIndexers::Ethereum(indexer) => {
indexer.get_finalized_block_number().await
}
InterchainGasPaymasterIndexers::Mock(indexer) => {
indexer.get_finalized_block_number().await
}
InterchainGasPaymasterIndexers::Other(indexer) => {
indexer.get_finalized_block_number().await
}
}
}
}
#[async_trait]
impl InterchainGasPaymasterIndexer for InterchainGasPaymasterIndexers {
async fn fetch_gas_payments(
&self,
from_block: u32,
to_block: u32,
) -> Result<Vec<InterchainGasPaymentWithMeta>> {
match self {
InterchainGasPaymasterIndexers::Ethereum(indexer) => {
indexer.fetch_gas_payments(from_block, to_block).await
}
InterchainGasPaymasterIndexers::Mock(indexer) => {
indexer.fetch_gas_payments(from_block, to_block).await
}
InterchainGasPaymasterIndexers::Other(indexer) => {
indexer.fetch_gas_payments(from_block, to_block).await
}
}
}
}

@ -1,23 +1,23 @@
use abacus_core::db::AbacusDB;
use abacus_core::{AbacusContract, InterchainGasPaymaster};
use std::fmt::Debug;
use std::sync::Arc;
use abacus_ethereum::EthereumInterchainGasPaymaster;
use async_trait::async_trait;
use eyre::Result;
use futures_util::future::select_all;
use std::sync::Arc;
use tokio::task::JoinHandle;
use tracing::instrument::Instrumented;
use tracing::{info_span, Instrument};
use crate::{ContractSync, ContractSyncMetrics, IndexSettings, InterchainGasPaymasterIndexers};
use abacus_core::db::AbacusDB;
use abacus_core::{AbacusContract, InterchainGasPaymaster, InterchainGasPaymasterIndexer};
use crate::{ContractSync, ContractSyncMetrics, IndexSettings};
/// Caching InterchainGasPaymaster type
#[derive(Debug)]
#[derive(Debug, Clone)]
pub struct CachingInterchainGasPaymaster {
paymaster: InterchainGasPaymasters,
paymaster: Arc<dyn InterchainGasPaymaster>,
db: AbacusDB,
indexer: Arc<InterchainGasPaymasterIndexers>,
indexer: Arc<dyn InterchainGasPaymasterIndexer>,
}
impl std::fmt::Display for CachingInterchainGasPaymaster {
@ -29,9 +29,9 @@ impl std::fmt::Display for CachingInterchainGasPaymaster {
impl CachingInterchainGasPaymaster {
/// Instantiate new CachingInterchainGasPaymaster
pub fn new(
paymaster: InterchainGasPaymasters,
paymaster: Arc<dyn InterchainGasPaymaster>,
db: AbacusDB,
indexer: Arc<InterchainGasPaymasterIndexers>,
indexer: Arc<dyn InterchainGasPaymasterIndexer>,
) -> Self {
Self {
paymaster,
@ -41,17 +41,17 @@ impl CachingInterchainGasPaymaster {
}
/// Return handle on paymaster object
pub fn paymaster(&self) -> InterchainGasPaymasters {
self.paymaster.clone()
pub fn paymaster(&self) -> &Arc<dyn InterchainGasPaymaster> {
&self.paymaster
}
/// Return handle on AbacusDB
pub fn db(&self) -> AbacusDB {
self.db.clone()
pub fn db(&self) -> &AbacusDB {
&self.db
}
/// Spawn a task that syncs the CachingInterchainGasPaymaster's db with the on-chain event
/// data
/// Spawn a task that syncs the CachingInterchainGasPaymaster's db with the
/// on-chain event data
pub fn sync(
&self,
index_settings: IndexSettings,
@ -80,68 +80,3 @@ impl CachingInterchainGasPaymaster {
.instrument(span)
}
}
#[derive(Debug, Clone)]
/// Arc wrapper for InterchainGasPaymasterVariants enum
pub struct InterchainGasPaymasters(Arc<InterchainGasPaymasterVariants>);
impl From<InterchainGasPaymasterVariants> for InterchainGasPaymasters {
fn from(paymaster: InterchainGasPaymasterVariants) -> Self {
Self(Arc::new(paymaster))
}
}
impl std::ops::Deref for InterchainGasPaymasters {
type Target = Arc<InterchainGasPaymasterVariants>;
fn deref(&self) -> &Self::Target {
&self.0
}
}
impl std::ops::DerefMut for InterchainGasPaymasters {
fn deref_mut(&mut self) -> &mut Self::Target {
&mut self.0
}
}
/// InterchainGasPaymaster type
#[derive(Debug)]
pub enum InterchainGasPaymasterVariants {
/// Ethereum InterchainGasPaymaster contract
Ethereum(Box<dyn InterchainGasPaymaster>),
/// Mock InterchainGasPaymaster contract
Mock(Box<dyn InterchainGasPaymaster>),
/// Other InterchainGasPaymaster variant
Other(Box<dyn InterchainGasPaymaster>),
}
impl InterchainGasPaymasterVariants {}
impl<M> From<EthereumInterchainGasPaymaster<M>> for InterchainGasPaymasters
where
M: ethers::providers::Middleware + 'static,
{
fn from(paymaster: EthereumInterchainGasPaymaster<M>) -> Self {
InterchainGasPaymasterVariants::Ethereum(Box::new(paymaster)).into()
}
}
impl From<Box<dyn InterchainGasPaymaster>> for InterchainGasPaymasters {
fn from(paymaster: Box<dyn InterchainGasPaymaster>) -> Self {
InterchainGasPaymasterVariants::Other(paymaster).into()
}
}
impl AbacusContract for InterchainGasPaymasterVariants {
fn chain_name(&self) -> &str {
match self {
InterchainGasPaymasterVariants::Ethereum(paymaster) => paymaster.chain_name(),
InterchainGasPaymasterVariants::Mock(paymaster) => paymaster.chain_name(),
InterchainGasPaymasterVariants::Other(paymaster) => paymaster.chain_name(),
}
}
}
#[async_trait]
impl InterchainGasPaymaster for InterchainGasPaymasterVariants {}

@ -35,9 +35,6 @@ pub use metrics::*;
mod contract_sync;
pub use contract_sync::*;
mod indexer;
pub use indexer::*;
mod interchain_gas;
pub use interchain_gas::*;
@ -47,8 +44,5 @@ pub use traits::*;
mod types;
pub use types::*;
mod validator_manager;
pub use validator_manager::*;
#[cfg(feature = "oneline-eyre")]
pub mod oneline_eyre;

@ -1,29 +1,29 @@
use abacus_core::db::AbacusDB;
use abacus_core::{
AbacusCommon, AbacusContract, ChainCommunicationError, Checkpoint, Message, Outbox,
OutboxEvents, OutboxState, RawCommittedMessage, TxOutcome,
};
use std::fmt::Debug;
use std::sync::Arc;
use abacus_ethereum::EthereumOutbox;
use abacus_test::mocks::MockOutboxContract;
use async_trait::async_trait;
use ethers::core::types::H256;
use eyre::Result;
use futures_util::future::select_all;
use std::sync::Arc;
use tokio::task::JoinHandle;
use tokio::time::{sleep, Duration};
use tracing::instrument::Instrumented;
use tracing::{info_span, Instrument};
use tracing::{instrument, instrument::Instrumented};
use crate::{ContractSync, ContractSyncMetrics, IndexSettings, OutboxIndexers};
use abacus_core::db::AbacusDB;
use abacus_core::{
AbacusCommon, AbacusContract, ChainCommunicationError, Checkpoint, Message, Outbox,
OutboxEvents, OutboxIndexer, OutboxState, RawCommittedMessage, TxOutcome,
};
use crate::{ContractSync, ContractSyncMetrics, IndexSettings};
/// Caching Outbox type
#[derive(Debug)]
#[derive(Debug, Clone)]
pub struct CachingOutbox {
outbox: Outboxes,
outbox: Arc<dyn Outbox>,
db: AbacusDB,
indexer: Arc<OutboxIndexers>,
indexer: Arc<dyn OutboxIndexer>,
}
impl std::fmt::Display for CachingOutbox {
@ -34,7 +34,7 @@ impl std::fmt::Display for CachingOutbox {
impl CachingOutbox {
/// Instantiate new CachingOutbox
pub fn new(outbox: Outboxes, db: AbacusDB, indexer: Arc<OutboxIndexers>) -> Self {
pub fn new(outbox: Arc<dyn Outbox>, db: AbacusDB, indexer: Arc<dyn OutboxIndexer>) -> Self {
Self {
outbox,
db,
@ -43,13 +43,13 @@ impl CachingOutbox {
}
/// Return handle on outbox object
pub fn outbox(&self) -> Outboxes {
self.outbox.clone()
pub fn outbox(&self) -> &Arc<dyn Outbox> {
&self.outbox
}
/// Return handle on AbacusDB
pub fn db(&self) -> AbacusDB {
self.db.clone()
pub fn db(&self) -> &AbacusDB {
&self.db
}
/// Spawn a task that syncs the CachingOutbox's db with the on-chain event
@ -85,10 +85,6 @@ impl CachingOutbox {
#[async_trait]
impl Outbox for CachingOutbox {
async fn dispatch(&self, message: &Message) -> Result<TxOutcome, ChainCommunicationError> {
self.outbox.dispatch(message).await
}
async fn state(&self) -> Result<OutboxState, ChainCommunicationError> {
self.outbox.state().await
}
@ -97,6 +93,10 @@ impl Outbox for CachingOutbox {
self.outbox.count().await
}
async fn dispatch(&self, message: &Message) -> Result<TxOutcome, ChainCommunicationError> {
self.outbox.dispatch(message).await
}
async fn cache_checkpoint(&self) -> Result<TxOutcome, ChainCommunicationError> {
self.outbox.cache_checkpoint().await
}
@ -165,172 +165,3 @@ impl AbacusCommon for CachingOutbox {
self.outbox.validator_manager().await
}
}
#[derive(Debug, Clone)]
/// Arc wrapper for OutboxVariants enum
pub struct Outboxes(Arc<OutboxVariants>);
impl From<OutboxVariants> for Outboxes {
fn from(outboxes: OutboxVariants) -> Self {
Self(Arc::new(outboxes))
}
}
impl std::ops::Deref for Outboxes {
type Target = Arc<OutboxVariants>;
fn deref(&self) -> &Self::Target {
&self.0
}
}
impl std::ops::DerefMut for Outboxes {
fn deref_mut(&mut self) -> &mut Self::Target {
&mut self.0
}
}
/// Outbox type
#[derive(Debug)]
pub enum OutboxVariants {
/// Ethereum Outbox contract
Ethereum(Box<dyn Outbox>),
/// Mock Outbox contract
Mock(Box<MockOutboxContract>),
/// Other Outbox variant
Other(Box<dyn Outbox>),
}
impl OutboxVariants {
/// Calls checkpoint on mock variant. Should
/// only be used during tests.
#[doc(hidden)]
pub fn checkpoint(&mut self) {
if let OutboxVariants::Mock(outbox) = self {
outbox.checkpoint();
} else {
panic!("Outbox should be mock variant!");
}
}
}
impl<M> From<EthereumOutbox<M>> for Outboxes
where
M: ethers::providers::Middleware + 'static,
{
fn from(outbox: EthereumOutbox<M>) -> Self {
OutboxVariants::Ethereum(Box::new(outbox)).into()
}
}
impl From<MockOutboxContract> for Outboxes {
fn from(mock_outbox: MockOutboxContract) -> Self {
OutboxVariants::Mock(Box::new(mock_outbox)).into()
}
}
impl From<Box<dyn Outbox>> for Outboxes {
fn from(outbox: Box<dyn Outbox>) -> Self {
OutboxVariants::Other(outbox).into()
}
}
#[async_trait]
impl Outbox for OutboxVariants {
#[instrument(level = "trace", err)]
async fn dispatch(&self, message: &Message) -> Result<TxOutcome, ChainCommunicationError> {
match self {
OutboxVariants::Ethereum(outbox) => outbox.dispatch(message).await,
OutboxVariants::Mock(mock_outbox) => mock_outbox.dispatch(message).await,
OutboxVariants::Other(outbox) => outbox.dispatch(message).await,
}
}
async fn state(&self) -> Result<OutboxState, ChainCommunicationError> {
match self {
OutboxVariants::Ethereum(outbox) => outbox.state().await,
OutboxVariants::Mock(mock_outbox) => mock_outbox.state().await,
OutboxVariants::Other(outbox) => outbox.state().await,
}
}
async fn count(&self) -> Result<u32, ChainCommunicationError> {
match self {
OutboxVariants::Ethereum(outbox) => outbox.count().await,
OutboxVariants::Mock(mock_outbox) => mock_outbox.count().await,
OutboxVariants::Other(outbox) => outbox.count().await,
}
}
async fn cache_checkpoint(&self) -> Result<TxOutcome, ChainCommunicationError> {
match self {
OutboxVariants::Ethereum(outbox) => outbox.cache_checkpoint().await,
OutboxVariants::Mock(mock_outbox) => mock_outbox.cache_checkpoint().await,
OutboxVariants::Other(outbox) => outbox.cache_checkpoint().await,
}
}
async fn latest_cached_root(&self) -> Result<H256, ChainCommunicationError> {
match self {
OutboxVariants::Ethereum(outbox) => outbox.latest_cached_root().await,
OutboxVariants::Mock(mock_outbox) => mock_outbox.latest_cached_root().await,
OutboxVariants::Other(outbox) => outbox.latest_cached_root().await,
}
}
async fn latest_cached_checkpoint(&self) -> Result<Checkpoint, ChainCommunicationError> {
match self {
OutboxVariants::Ethereum(outbox) => outbox.latest_cached_checkpoint().await,
OutboxVariants::Mock(mock_outbox) => mock_outbox.latest_cached_checkpoint().await,
OutboxVariants::Other(outbox) => outbox.latest_cached_checkpoint().await,
}
}
async fn latest_checkpoint(
&self,
maybe_lag: Option<u64>,
) -> Result<Checkpoint, ChainCommunicationError> {
match self {
OutboxVariants::Ethereum(outbox) => outbox.latest_checkpoint(maybe_lag).await,
OutboxVariants::Mock(mock_outbox) => mock_outbox.latest_checkpoint(maybe_lag).await,
OutboxVariants::Other(outbox) => outbox.latest_checkpoint(maybe_lag).await,
}
}
}
impl AbacusContract for OutboxVariants {
fn chain_name(&self) -> &str {
match self {
OutboxVariants::Ethereum(outbox) => outbox.chain_name(),
OutboxVariants::Mock(mock_outbox) => mock_outbox.chain_name(),
OutboxVariants::Other(outbox) => outbox.chain_name(),
}
}
}
#[async_trait]
impl AbacusCommon for OutboxVariants {
fn local_domain(&self) -> u32 {
match self {
OutboxVariants::Ethereum(outbox) => outbox.local_domain(),
OutboxVariants::Mock(mock_outbox) => mock_outbox.local_domain(),
OutboxVariants::Other(outbox) => outbox.local_domain(),
}
}
async fn status(&self, txid: H256) -> Result<Option<TxOutcome>, ChainCommunicationError> {
match self {
OutboxVariants::Ethereum(outbox) => outbox.status(txid).await,
OutboxVariants::Mock(mock_outbox) => mock_outbox.status(txid).await,
OutboxVariants::Other(outbox) => outbox.status(txid).await,
}
}
async fn validator_manager(&self) -> Result<H256, ChainCommunicationError> {
match self {
OutboxVariants::Ethereum(outbox) => outbox.validator_manager().await,
OutboxVariants::Mock(mock_outbox) => mock_outbox.validator_manager().await,
OutboxVariants::Other(outbox) => outbox.validator_manager().await,
}
}
}

@ -1,8 +1,10 @@
use ethers::signers::Signer;
use eyre::Report;
use serde::Deserialize;
use abacus_core::{AbacusAbi, ContractLocator, Signers};
use abacus_core::{
AbacusAbi, ContractLocator, Inbox, InboxValidatorManager, InterchainGasPaymaster, Outbox,
Signers,
};
use abacus_ethereum::{
Connection, EthereumInboxAbi, EthereumInterchainGasPaymasterAbi, EthereumOutboxAbi,
InboxBuilder, InboxValidatorManagerBuilder, InterchainGasPaymasterBuilder,
@ -12,10 +14,7 @@ use ethers_prometheus::middleware::{
ChainInfo, ContractInfo, PrometheusMiddlewareConf, WalletInfo,
};
use crate::{
CoreMetrics, InboxValidatorManagerVariants, InboxValidatorManagers, InboxVariants, Inboxes,
InterchainGasPaymasterVariants, InterchainGasPaymasters, OutboxVariants, Outboxes,
};
use crate::CoreMetrics;
/// A connection to _some_ blockchain.
///
@ -105,28 +104,25 @@ impl ChainSetup<OutboxAddresses> {
&self,
signer: Option<Signers>,
metrics: &CoreMetrics,
) -> Result<Outboxes, Report> {
) -> eyre::Result<Box<dyn Outbox>> {
match &self.chain {
ChainConf::Ethereum(conf) => Ok(OutboxVariants::Ethereum(
OutboxBuilder {}
.make_with_connection(
conf.clone(),
&ContractLocator {
chain_name: self.name.clone(),
domain: self.domain.parse().expect("invalid uint"),
address: self
.addresses
.outbox
.parse::<ethers::types::Address>()?
.into(),
},
signer,
Some(|| metrics.json_rpc_client_metrics()),
Some((metrics.provider_metrics(), self.metrics_conf())),
)
.await?,
)
.into()),
ChainConf::Ethereum(conf) => Ok(OutboxBuilder {}
.make_with_connection(
conf.clone(),
&ContractLocator {
chain_name: self.name.clone(),
domain: self.domain.parse().expect("invalid uint"),
address: self
.addresses
.outbox
.parse::<ethers::types::Address>()?
.into(),
},
signer,
Some(|| metrics.json_rpc_client_metrics()),
Some((metrics.provider_metrics(), self.metrics_conf())),
)
.await?),
}
}
@ -135,7 +131,7 @@ impl ChainSetup<OutboxAddresses> {
&self,
signer: Option<Signers>,
metrics: &CoreMetrics,
) -> Result<Option<InterchainGasPaymasters>, Report> {
) -> eyre::Result<Option<Box<dyn InterchainGasPaymaster>>> {
let paymaster_address = if let Some(address) = &self.addresses.interchain_gas_paymaster {
address
} else {
@ -143,24 +139,19 @@ impl ChainSetup<OutboxAddresses> {
};
match &self.chain {
ChainConf::Ethereum(conf) => Ok(Some(
InterchainGasPaymasterVariants::Ethereum(
InterchainGasPaymasterBuilder {}
.make_with_connection(
conf.clone(),
&ContractLocator {
chain_name: self.name.clone(),
domain: self.domain.parse().expect("invalid uint"),
address: paymaster_address
.parse::<ethers::types::Address>()?
.into(),
},
signer,
Some(|| metrics.json_rpc_client_metrics()),
Some((metrics.provider_metrics(), self.metrics_conf())),
)
.await?,
)
.into(),
InterchainGasPaymasterBuilder {}
.make_with_connection(
conf.clone(),
&ContractLocator {
chain_name: self.name.clone(),
domain: self.domain.parse().expect("invalid uint"),
address: paymaster_address.parse::<ethers::types::Address>()?.into(),
},
signer,
Some(|| metrics.json_rpc_client_metrics()),
Some((metrics.provider_metrics(), self.metrics_conf())),
)
.await?,
)),
}
}
@ -200,29 +191,26 @@ impl ChainSetup<InboxAddresses> {
&self,
signer: Option<Signers>,
metrics: &CoreMetrics,
) -> Result<Inboxes, Report> {
) -> eyre::Result<Box<dyn Inbox>> {
let metrics_conf = self.metrics_conf(metrics.agent_name(), &signer);
match &self.chain {
ChainConf::Ethereum(conf) => Ok(InboxVariants::Ethereum(
InboxBuilder {}
.make_with_connection(
conf.clone(),
&ContractLocator {
chain_name: self.name.clone(),
domain: self.domain.parse().expect("invalid uint"),
address: self
.addresses
.inbox
.parse::<ethers::types::Address>()?
.into(),
},
signer,
Some(|| metrics.json_rpc_client_metrics()),
Some((metrics.provider_metrics(), metrics_conf)),
)
.await?,
)
.into()),
ChainConf::Ethereum(conf) => Ok(InboxBuilder {}
.make_with_connection(
conf.clone(),
&ContractLocator {
chain_name: self.name.clone(),
domain: self.domain.parse().expect("invalid uint"),
address: self
.addresses
.inbox
.parse::<ethers::types::Address>()?
.into(),
},
signer,
Some(|| metrics.json_rpc_client_metrics()),
Some((metrics.provider_metrics(), metrics_conf)),
)
.await?),
}
}
@ -231,30 +219,27 @@ impl ChainSetup<InboxAddresses> {
&self,
signer: Option<Signers>,
metrics: &CoreMetrics,
) -> Result<InboxValidatorManagers, Report> {
) -> eyre::Result<Box<dyn InboxValidatorManager>> {
let inbox_address = self.addresses.inbox.parse::<ethers::types::Address>()?;
let metrics_conf = self.metrics_conf(metrics.agent_name(), &signer);
match &self.chain {
ChainConf::Ethereum(conf) => Ok(InboxValidatorManagerVariants::Ethereum(
InboxValidatorManagerBuilder { inbox_address }
.make_with_connection(
conf.clone(),
&ContractLocator {
chain_name: self.name.clone(),
domain: self.domain.parse().expect("invalid uint"),
address: self
.addresses
.validator_manager
.parse::<ethers::types::Address>()?
.into(),
},
signer,
Some(|| metrics.json_rpc_client_metrics()),
Some((metrics.provider_metrics(), metrics_conf)),
)
.await?,
)
.into()),
ChainConf::Ethereum(conf) => Ok(InboxValidatorManagerBuilder { inbox_address }
.make_with_connection(
conf.clone(),
&ContractLocator {
chain_name: self.name.clone(),
domain: self.domain.parse().expect("invalid uint"),
address: self
.addresses
.validator_manager
.parse::<ethers::types::Address>()?
.into(),
},
signer,
Some(|| metrics.json_rpc_client_metrics()),
Some((metrics.provider_metrics(), metrics_conf)),
)
.await?),
}
}

@ -89,7 +89,8 @@ use tracing::instrument;
use abacus_core::{
db::{AbacusDB, DB},
utils::HexString,
AbacusContract, ContractLocator, Signers,
AbacusContract, ContractLocator, InboxValidatorManager, InterchainGasPaymasterIndexer,
OutboxIndexer, Signers,
};
use abacus_ethereum::{
InterchainGasPaymasterIndexerBuilder, MakeableWithProvider, OutboxIndexerBuilder,
@ -97,10 +98,7 @@ use abacus_ethereum::{
pub use chains::{ChainConf, ChainSetup, InboxAddresses, OutboxAddresses};
use crate::{settings::trace::TracingConfig, CachingInterchainGasPaymaster};
use crate::{
AbacusAgentCore, CachingInbox, CachingOutbox, CoreMetrics, InboxContracts,
InboxValidatorManagers, InterchainGasPaymasterIndexers, OutboxIndexers,
};
use crate::{AbacusAgentCore, CachingInbox, CachingOutbox, CoreMetrics, InboxContracts};
/// Chain configuration
pub mod chains;
@ -263,7 +261,7 @@ impl Settings {
&self,
db: DB,
metrics: &CoreMetrics,
) -> Result<HashMap<String, InboxContracts>, Report> {
) -> eyre::Result<HashMap<String, InboxContracts>> {
let mut result = HashMap::new();
for (k, v) in self.inboxes.iter().filter(|(_, v)| {
!v.disabled
@ -283,8 +281,8 @@ impl Settings {
result.insert(
v.name.clone(),
InboxContracts {
inbox: Arc::new(caching_inbox),
validator_manager: Arc::new(validator_manager),
inbox: caching_inbox,
validator_manager: validator_manager.into(),
},
);
}
@ -297,11 +295,11 @@ impl Settings {
chain_setup: &ChainSetup<InboxAddresses>,
db: DB,
metrics: &CoreMetrics,
) -> Result<CachingInbox, Report> {
) -> eyre::Result<CachingInbox> {
let signer = self.get_signer(&chain_setup.name).await;
let inbox = chain_setup.try_into_inbox(signer, metrics).await?;
let abacus_db = AbacusDB::new(inbox.chain_name(), db);
Ok(CachingInbox::new(inbox, abacus_db))
Ok(CachingInbox::new(inbox.into(), abacus_db))
}
/// Try to get an InboxValidatorManager
@ -309,7 +307,7 @@ impl Settings {
&self,
chain_setup: &ChainSetup<InboxAddresses>,
metrics: &CoreMetrics,
) -> Result<InboxValidatorManagers, Report> {
) -> eyre::Result<Box<dyn InboxValidatorManager>> {
let signer = self.get_signer(&chain_setup.name).await;
chain_setup
@ -322,12 +320,12 @@ impl Settings {
&self,
db: DB,
metrics: &CoreMetrics,
) -> Result<CachingOutbox, Report> {
) -> eyre::Result<CachingOutbox> {
let signer = self.get_signer(&self.outbox.name).await;
let outbox = self.outbox.try_into_outbox(signer, metrics).await?;
let indexer = Arc::new(self.try_outbox_indexer(metrics).await?);
let indexer = self.try_outbox_indexer(metrics).await?;
let abacus_db = AbacusDB::new(outbox.chain_name(), db);
Ok(CachingOutbox::new(outbox, abacus_db, indexer))
Ok(CachingOutbox::new(outbox.into(), abacus_db, indexer.into()))
}
/// Try to get a CachingInterchainGasPaymaster
@ -335,7 +333,7 @@ impl Settings {
&self,
db: DB,
metrics: &CoreMetrics,
) -> Result<Option<CachingInterchainGasPaymaster>, Report> {
) -> eyre::Result<Option<CachingInterchainGasPaymaster>> {
let signer = self.get_signer(&self.outbox.name).await;
match self
.outbox
@ -343,10 +341,12 @@ impl Settings {
.await?
{
Some(paymaster) => {
let indexer = Arc::new(self.try_interchain_gas_paymaster_indexer(metrics).await?);
let indexer = self.try_interchain_gas_paymaster_indexer(metrics).await?;
let abacus_db = AbacusDB::new(paymaster.chain_name(), db);
Ok(Some(CachingInterchainGasPaymaster::new(
paymaster, abacus_db, indexer,
paymaster.into(),
abacus_db,
indexer.into(),
)))
}
None => Ok(None),
@ -357,32 +357,30 @@ impl Settings {
pub async fn try_outbox_indexer(
&self,
metrics: &CoreMetrics,
) -> Result<OutboxIndexers, Report> {
) -> eyre::Result<Box<dyn OutboxIndexer>> {
match &self.outbox.chain {
ChainConf::Ethereum(conn) => Ok(OutboxIndexers::Ethereum(
OutboxIndexerBuilder {
from_height: self.index.from(),
chunk_size: self.index.chunk_size(),
finality_blocks: self.outbox.finality_blocks(),
}
.make_with_connection(
conn.clone(),
&ContractLocator {
chain_name: self.outbox.name.clone(),
domain: self.outbox.domain.parse().expect("invalid uint"),
address: self
.outbox
.addresses
.outbox
.parse::<ethers::types::Address>()?
.into(),
},
self.get_signer(&self.outbox.name).await,
Some(|| metrics.json_rpc_client_metrics()),
Some((metrics.provider_metrics(), self.outbox.metrics_conf())),
)
.await?,
)),
ChainConf::Ethereum(conn) => Ok(OutboxIndexerBuilder {
from_height: self.index.from(),
chunk_size: self.index.chunk_size(),
finality_blocks: self.outbox.finality_blocks(),
}
.make_with_connection(
conn.clone(),
&ContractLocator {
chain_name: self.outbox.name.clone(),
domain: self.outbox.domain.parse().expect("invalid uint"),
address: self
.outbox
.addresses
.outbox
.parse::<ethers::types::Address>()?
.into(),
},
self.get_signer(&self.outbox.name).await,
Some(|| metrics.json_rpc_client_metrics()),
Some((metrics.provider_metrics(), self.outbox.metrics_conf())),
)
.await?),
}
}
@ -393,39 +391,37 @@ impl Settings {
pub async fn try_interchain_gas_paymaster_indexer(
&self,
metrics: &CoreMetrics,
) -> Result<InterchainGasPaymasterIndexers, Report> {
) -> eyre::Result<Box<dyn InterchainGasPaymasterIndexer>> {
match &self.outbox.chain {
ChainConf::Ethereum(conn) => Ok(InterchainGasPaymasterIndexers::Ethereum(
InterchainGasPaymasterIndexerBuilder {
outbox_address: self
ChainConf::Ethereum(conn) => Ok(InterchainGasPaymasterIndexerBuilder {
outbox_address: self
.outbox
.addresses
.outbox
.parse::<ethers::types::Address>()?,
from_height: self.index.from(),
chunk_size: self.index.chunk_size(),
finality_blocks: self.outbox.finality_blocks(),
}
.make_with_connection(
conn.clone(),
&ContractLocator {
chain_name: self.outbox.name.clone(),
domain: self.outbox.domain.parse().expect("invalid uint"),
address: self
.outbox
.addresses
.outbox
.parse::<ethers::types::Address>()?,
from_height: self.index.from(),
chunk_size: self.index.chunk_size(),
finality_blocks: self.outbox.finality_blocks(),
}
.make_with_connection(
conn.clone(),
&ContractLocator {
chain_name: self.outbox.name.clone(),
domain: self.outbox.domain.parse().expect("invalid uint"),
address: self
.outbox
.addresses
.interchain_gas_paymaster
.as_ref()
.expect("interchain_gas_paymaster not provided")
.parse::<ethers::types::Address>()?
.into(),
},
self.get_signer(&self.outbox.name).await,
Some(|| metrics.json_rpc_client_metrics()),
Some((metrics.provider_metrics(), self.outbox.metrics_conf())),
)
.await?,
)),
.interchain_gas_paymaster
.as_ref()
.expect("interchain_gas_paymaster not provided")
.parse::<ethers::types::Address>()?
.into(),
},
self.get_signer(&self.outbox.name).await,
Some(|| metrics.json_rpc_client_metrics()),
Some((metrics.provider_metrics(), self.outbox.metrics_conf())),
)
.await?),
}
}
@ -434,7 +430,7 @@ impl Settings {
&self,
name: &str,
parse_inboxes: bool,
) -> Result<AbacusAgentCore, Report> {
) -> eyre::Result<AbacusAgentCore> {
let metrics = Arc::new(CoreMetrics::new(
name,
self.metrics
@ -444,11 +440,10 @@ impl Settings {
)?);
let db = DB::from_path(&self.db)?;
let outbox = Arc::new(self.try_caching_outbox(db.clone(), &metrics).await?);
let outbox = self.try_caching_outbox(db.clone(), &metrics).await?;
let interchain_gas_paymaster = self
.try_caching_interchain_gas_paymaster(db.clone(), &metrics)
.await?
.map(Arc::new);
.await?;
let inbox_contracts = if parse_inboxes {
self.try_inbox_contracts(db.clone(), &metrics).await?

@ -1,105 +0,0 @@
use async_trait::async_trait;
use std::sync::Arc;
use abacus_core::{
accumulator::merkle::Proof, AbacusMessage, Address, ChainCommunicationError,
InboxValidatorManager, MultisigSignedCheckpoint, TxOutcome,
};
#[derive(Debug, Clone)]
/// Arc wrapper for InboxValidatorManagerVariants enum
pub struct InboxValidatorManagers(Arc<InboxValidatorManagerVariants>);
impl From<InboxValidatorManagerVariants> for InboxValidatorManagers {
fn from(inbox_validator_managers: InboxValidatorManagerVariants) -> Self {
Self(Arc::new(inbox_validator_managers))
}
}
impl std::ops::Deref for InboxValidatorManagers {
type Target = Arc<InboxValidatorManagerVariants>;
fn deref(&self) -> &Self::Target {
&self.0
}
}
impl std::ops::DerefMut for InboxValidatorManagers {
fn deref_mut(&mut self) -> &mut Self::Target {
&mut self.0
}
}
/// InboxValidatorManager type
#[derive(Debug)]
pub enum InboxValidatorManagerVariants {
/// Ethereum InboxValidatorManager contract
Ethereum(Box<dyn InboxValidatorManager>),
/// Mock InboxValidatorManager contract
Mock(Box<dyn InboxValidatorManager>),
/// Other InboxValidatorManager variant
Other(Box<dyn InboxValidatorManager>),
}
#[async_trait]
impl InboxValidatorManager for InboxValidatorManagerVariants {
/// Submit a signed checkpoint for inclusion
async fn process(
&self,
multisig_signed_checkpoint: &MultisigSignedCheckpoint,
message: &AbacusMessage,
proof: &Proof,
) -> Result<TxOutcome, ChainCommunicationError> {
match self {
InboxValidatorManagerVariants::Ethereum(validator_manager) => {
validator_manager
.process(multisig_signed_checkpoint, message, proof)
.await
}
InboxValidatorManagerVariants::Mock(mock_validator_manager) => {
mock_validator_manager
.process(multisig_signed_checkpoint, message, proof)
.await
}
InboxValidatorManagerVariants::Other(validator_manager) => {
validator_manager
.process(multisig_signed_checkpoint, message, proof)
.await
}
}
}
/// Get calldata for a process tx
fn process_calldata(
&self,
multisig_signed_checkpoint: &MultisigSignedCheckpoint,
message: &AbacusMessage,
proof: &Proof,
) -> Vec<u8> {
match self {
InboxValidatorManagerVariants::Ethereum(validator_manager) => {
validator_manager.process_calldata(multisig_signed_checkpoint, message, proof)
}
InboxValidatorManagerVariants::Mock(mock_validator_manager) => {
mock_validator_manager.process_calldata(multisig_signed_checkpoint, message, proof)
}
InboxValidatorManagerVariants::Other(validator_manager) => {
validator_manager.process_calldata(multisig_signed_checkpoint, message, proof)
}
}
}
fn contract_address(&self) -> Address {
match self {
InboxValidatorManagerVariants::Ethereum(validator_manager) => {
validator_manager.contract_address()
}
InboxValidatorManagerVariants::Mock(validator_manager) => {
validator_manager.contract_address()
}
InboxValidatorManagerVariants::Other(validator_manager) => {
validator_manager.contract_address()
}
}
}
}

@ -7,6 +7,7 @@ edition = "2021"
# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html
[dependencies]
auto_impl = "1.0"
ethers = { git = "https://github.com/abacus-network/ethers-rs", tag = "2022-09-12-01", default-features = false, features = ['legacy'] }
ethers-signers = { git = "https://github.com/abacus-network/ethers-rs", tag = "2022-09-12-01", features=["aws"] }
ethers-providers = { git = "https://github.com/abacus-network/ethers-rs", tag = "2022-09-12-01", features=["ws", "rustls"] }

@ -1,6 +1,7 @@
use std::fmt::Debug;
use async_trait::async_trait;
use auto_impl::auto_impl;
use ethers::core::types::H256;
use eyre::Result;
@ -11,6 +12,7 @@ use crate::{
/// Interface for on-chain inboxes
#[async_trait]
#[auto_impl(Box, Arc)]
pub trait Inbox: AbacusCommon + Send + Sync + Debug {
/// Return the domain of the inbox's linked outbox
async fn remote_domain(&self) -> Result<u32, ChainCommunicationError>;

@ -1,8 +1,8 @@
//! An Indexer provides a common interface for bubbling up chain-specific
//! event-data to another entity (e.g. a `ContractSync`). For example, the only
//! way to retrieve data such as the chain's latest block number or a list of
//! checkpoints/messages emitted within a certain block range by calling out to a
//! chain-specific library and provider (e.g. ethers::provider). A
//! checkpoints/messages emitted within a certain block range by calling out to
//! a chain-specific library and provider (e.g. ethers::provider). A
//! chain-specific outbox or inbox should implement one or both of the Indexer
//! traits (CommonIndexer or OutboxIndexer) to provide an common interface which
//! other entities can retrieve this chain-specific info.
@ -10,12 +10,14 @@
use std::fmt::Debug;
use async_trait::async_trait;
use auto_impl::auto_impl;
use eyre::Result;
use crate::{CheckpointWithMeta, InterchainGasPaymentWithMeta, RawCommittedMessage};
/// Interface for an indexer.
#[async_trait]
#[auto_impl(Box, Arc)]
pub trait Indexer: Send + Sync + Debug {
/// Get the chain's latest block number that has reached finality
async fn get_finalized_block_number(&self) -> Result<u32>;
@ -24,12 +26,14 @@ pub trait Indexer: Send + Sync + Debug {
/// Interface for Outbox contract indexer. Interface for allowing other
/// entities to retrieve chain-specific data from an outbox.
#[async_trait]
#[auto_impl(Box, Arc)]
pub trait OutboxIndexer: Indexer + Send + Sync + Debug {
/// Fetch list of messages between blocks `from` and `to`.
async fn fetch_sorted_messages(&self, _from: u32, _to: u32)
-> Result<Vec<RawCommittedMessage>>;
/// Fetch sequentially sorted list of cached checkpoints between blocks `from` and `to`
/// Fetch sequentially sorted list of cached checkpoints between blocks
/// `from` and `to`
async fn fetch_sorted_cached_checkpoints(
&self,
from: u32,
@ -39,8 +43,10 @@ pub trait OutboxIndexer: Indexer + Send + Sync + Debug {
/// Interface for InterchainGasPaymaster contract indexer.
#[async_trait]
#[auto_impl(Box, Arc)]
pub trait InterchainGasPaymasterIndexer: Indexer + Send + Sync + Debug {
/// Fetch list of gas payments between `from_block` and `to_block`, inclusive
/// Fetch list of gas payments between `from_block` and `to_block`,
/// inclusive
async fn fetch_gas_payments(
&self,
from_block: u32,

@ -1,10 +1,12 @@
use std::fmt::Debug;
use async_trait::async_trait;
use auto_impl::auto_impl;
use crate::AbacusContract;
/// Interface for the InterchainGasPaymaster chain contract.
/// Allows abstraction over different chains.
#[async_trait]
#[auto_impl(Box, Arc)]
pub trait InterchainGasPaymaster: AbacusContract + Send + Sync + Debug {}

@ -1,16 +1,9 @@
mod common;
mod encode;
mod inbox;
mod indexer;
mod interchain_gas;
mod message;
mod outbox;
mod validator_manager;
use std::collections::HashMap;
use std::error::Error as StdError;
use std::fmt::Debug;
use async_trait::async_trait;
use auto_impl::auto_impl;
use ethers::prelude::Selector;
use ethers::{
contract::ContractError,
@ -18,9 +11,6 @@ use ethers::{
providers::{Middleware, ProviderError},
};
use eyre::Result;
use std::error::Error as StdError;
use crate::{db::DbError, utils::domain_hash, AbacusError};
pub use common::*;
pub use encode::*;
@ -31,6 +21,17 @@ pub use message::*;
pub use outbox::*;
pub use validator_manager::*;
use crate::{db::DbError, utils::domain_hash, AbacusError};
mod common;
mod encode;
mod inbox;
mod indexer;
mod interchain_gas;
mod message;
mod outbox;
mod validator_manager;
/// The result of a transaction
#[derive(Debug, Clone, Copy)]
pub struct TxOutcome {
@ -89,6 +90,7 @@ where
/// Interface for a deployed contract.
/// This trait is intended to expose attributes of any contract, and
/// should not consider the purpose or implementation details of the contract.
#[auto_impl(Box, Arc)]
pub trait AbacusContract {
/// Return an identifier (not necessarily unique) for the chain this
/// contract is deployed to.
@ -97,6 +99,7 @@ pub trait AbacusContract {
/// Interface for attributes shared by Outbox and Inbox
#[async_trait]
#[auto_impl(Box, Arc)]
pub trait AbacusCommon: AbacusContract + Sync + Send + Debug {
/// Return the domain ID
fn local_domain(&self) -> u32;
@ -114,11 +117,13 @@ pub trait AbacusCommon: AbacusContract + Sync + Send + Debug {
}
/// Static contract ABI information.
#[auto_impl(Box, Arc)]
pub trait AbacusAbi {
/// Get a mapping from function selectors to human readable function names.
fn fn_map() -> HashMap<Selector, &'static str>;
/// Get a mapping from function selectors to owned human readable function names.
/// Get a mapping from function selectors to owned human readable function
/// names.
fn fn_map_owned() -> HashMap<Selector, String> {
Self::fn_map()
.into_iter()

@ -1,17 +1,20 @@
use std::convert::TryFrom;
use std::fmt::Debug;
use async_trait::async_trait;
use auto_impl::auto_impl;
use ethers::core::types::H256;
use eyre::Result;
use crate::{
traits::{ChainCommunicationError, TxOutcome},
AbacusCommon, Checkpoint, CommittedMessage, Message, OutboxState, RawCommittedMessage,
};
use async_trait::async_trait;
use ethers::core::types::H256;
use eyre::Result;
/// Interface for the Outbox chain contract. Allows abstraction over different
/// chains
#[async_trait]
#[auto_impl(Box, Arc)]
pub trait Outbox: AbacusCommon + Send + Sync + Debug {
/// Fetch the current state.
async fn state(&self) -> Result<OutboxState, ChainCommunicationError>;
@ -40,6 +43,7 @@ pub trait Outbox: AbacusCommon + Send + Sync + Debug {
/// Interface for retrieving event data emitted specifically by the outbox
#[async_trait]
#[auto_impl(Box, Arc)]
pub trait OutboxEvents: Outbox + Send + Sync + Debug {
/// Look up a message by its hash.
/// This should fetch events from the chain API

@ -1,6 +1,7 @@
use std::fmt::Debug;
use async_trait::async_trait;
use auto_impl::auto_impl;
use eyre::Result;
use crate::{
@ -11,6 +12,7 @@ use crate::{
/// Interface for an InboxValidatorManager
#[async_trait]
#[auto_impl(Box, Arc)]
pub trait InboxValidatorManager: Send + Sync + Debug {
/// Process a message with a proof against the provided signed checkpoint
async fn process(

@ -56,10 +56,6 @@ impl std::fmt::Debug for MockOutboxContract {
#[async_trait]
impl Outbox for MockOutboxContract {
async fn dispatch(&self, message: &Message) -> Result<TxOutcome, ChainCommunicationError> {
self._dispatch(message)
}
async fn state(&self) -> Result<OutboxState, ChainCommunicationError> {
self._state()
}
@ -68,6 +64,10 @@ impl Outbox for MockOutboxContract {
self._count()
}
async fn dispatch(&self, message: &Message) -> Result<TxOutcome, ChainCommunicationError> {
self._dispatch(message)
}
async fn cache_checkpoint(&self) -> Result<TxOutcome, ChainCommunicationError> {
self._cache_checkpoint()
}

@ -3,11 +3,10 @@ use std::time::Duration;
use eyre::Result;
use prometheus::{IntGauge, IntGaugeVec};
use tokio::{sync::watch::Sender, task::JoinHandle, time::sleep};
use tracing::{debug, info, info_span, instrument, instrument::Instrumented, Instrument};
use abacus_base::{MultisigCheckpointSyncer, Outboxes};
use abacus_core::{AbacusContract, MultisigSignedCheckpoint};
use abacus_base::MultisigCheckpointSyncer;
use abacus_core::{MultisigSignedCheckpoint, Outbox};
pub(crate) struct CheckpointFetcher {
polling_interval: u64,
@ -19,7 +18,7 @@ pub(crate) struct CheckpointFetcher {
impl CheckpointFetcher {
#[allow(clippy::too_many_arguments)]
pub(crate) fn new(
outbox: Outboxes,
outbox: &dyn Outbox,
polling_interval: u64,
multisig_checkpoint_syncer: MultisigCheckpointSyncer,
signed_checkpoint_sender: Sender<Option<MultisigSignedCheckpoint>>,

@ -9,7 +9,7 @@ use tokio::{
};
use tracing::{debug, info_span, instrument, instrument::Instrumented, warn, Instrument};
use abacus_base::{CoreMetrics, InboxContracts, Outboxes};
use abacus_base::{CoreMetrics, InboxContracts};
use abacus_core::{
db::AbacusDB, AbacusCommon, AbacusContract, CommittedMessage, MultisigSignedCheckpoint, Outbox,
};
@ -20,7 +20,7 @@ use super::SubmitMessageArgs;
#[derive(Debug)]
pub(crate) struct MessageProcessor {
outbox: Outboxes,
outbox: Arc<dyn Outbox>,
db: AbacusDB,
inbox_contracts: InboxContracts,
whitelist: Arc<MatchingList>,
@ -35,7 +35,7 @@ pub(crate) struct MessageProcessor {
impl MessageProcessor {
#[allow(clippy::too_many_arguments)]
pub(crate) fn new(
outbox: Outboxes,
outbox: Arc<dyn Outbox>,
db: AbacusDB,
inbox_contracts: InboxContracts,
whitelist: Arc<MatchingList>,
@ -74,16 +74,18 @@ impl MessageProcessor {
#[instrument(ret, err, skip(self), fields(inbox_name=self.inbox_contracts.inbox.chain_name(), local_domain=?self.inbox_contracts.inbox.local_domain()), level = "info")]
async fn main_loop(mut self) -> Result<()> {
// Ensure that there is at least one valid, known checkpoint before starting work loop.
// Ensure that there is at least one valid, known checkpoint before starting
// work loop.
loop {
self.ckpt_rx.changed().await?;
if self.ckpt_rx.borrow().clone().is_some() {
break;
}
}
// Forever, scan AbacusDB looking for new messages to send. When criteria are satisfied
// or the message is disqualified, push the message onto self.tx_msg and then continue
// the scan at the next outbox highest leaf index.
// Forever, scan AbacusDB looking for new messages to send. When criteria are
// satisfied or the message is disqualified, push the message onto
// self.tx_msg and then continue the scan at the next outbox highest
// leaf index.
loop {
self.tick().await?;
}
@ -123,12 +125,13 @@ impl MessageProcessor {
"Leaf in db without message idx: {}",
self.message_leaf_index
);
// Not clear what the best thing to do here is, but there is seemingly an existing
// race wherein an indexer might non-atomically write leaf info to rocksdb across a
// few records, so we might see the leaf status above, but not the message contents
// here. For now, optimistically yield and then re-enter the loop in hopes that
// the DB is now coherent.
// TODO(webbhorn): Why can't we yield here instead of sleep?
// Not clear what the best thing to do here is, but there is seemingly an
// existing race wherein an indexer might non-atomically write leaf
// info to rocksdb across a few records, so we might see the leaf
// status above, but not the message contents here. For now,
// optimistically yield and then re-enter the loop in hopes that the
// DB is now coherent. TODO(webbhorn): Why can't we yield here
// instead of sleep?
tokio::time::sleep(Duration::from_secs(1)).await;
return Ok(());
};
@ -171,8 +174,9 @@ impl MessageProcessor {
return Ok(());
}
// If validator hasn't published checkpoint covering self.message_leaf_index yet, wait
// until it has, before forwarding the message to the submitter channel.
// If validator hasn't published checkpoint covering self.message_leaf_index
// yet, wait until it has, before forwarding the message to the
// submitter channel.
let mut ckpt;
loop {
ckpt = self.ckpt_rx.borrow().clone();
@ -188,7 +192,8 @@ impl MessageProcessor {
let checkpoint = ckpt.unwrap();
assert!(checkpoint.checkpoint.index >= self.message_leaf_index);
// Include proof against checkpoint for message in the args provided to the submitter.
// Include proof against checkpoint for message in the args provided to the
// submitter.
if checkpoint.checkpoint.index >= self.prover_sync.count() {
self.prover_sync
.update_to_checkpoint(&checkpoint.checkpoint)
@ -226,7 +231,7 @@ impl MessageProcessor {
}
/// Spawn a task to update the outbox state gauge.
async fn metrics_loop(outbox_state_gauge: IntGauge, outbox: Outboxes) {
async fn metrics_loop(outbox_state_gauge: IntGauge, outbox: Arc<dyn Outbox>) {
let mut interval = tokio::time::interval(Duration::from_secs(60));
interval.set_missed_tick_behavior(MissedTickBehavior::Skip);
loop {

@ -88,7 +88,7 @@ impl BaseAgent for Relayer {
let gas_payment_enforcer = Arc::new(GasPaymentEnforcer::new(
self.gas_payment_enforcement_policy.clone(),
self.outbox().db(),
self.outbox().db().clone(),
));
for (inbox_name, inbox_contracts) in inboxes {
@ -113,7 +113,7 @@ impl BaseAgent for Relayer {
tasks.push(self.run_outbox_sync(sync_metrics.clone()));
if let Some(paymaster) = self.interchain_gas_paymaster() {
tasks.push(self.run_interchain_gas_paymaster_sync(paymaster, sync_metrics));
tasks.push(self.run_interchain_gas_paymaster_sync(paymaster.clone(), sync_metrics));
} else {
info!("Interchain Gas Paymaster not provided, not running sync");
}
@ -134,7 +134,7 @@ impl Relayer {
fn run_interchain_gas_paymaster_sync(
&self,
paymaster: Arc<CachingInterchainGasPaymaster>,
paymaster: CachingInterchainGasPaymaster,
sync_metrics: ContractSyncMetrics,
) -> Instrumented<JoinHandle<Result<()>>> {
paymaster.sync(self.as_ref().indexer.clone(), sync_metrics)
@ -168,7 +168,7 @@ impl Relayer {
message_receiver,
self.outbox().local_domain(),
inbox_contracts,
self.outbox().db(),
self.outbox().db().clone(),
signer,
GelatoSubmitterMetrics::new(
&self.core.metrics,
@ -209,7 +209,7 @@ impl Relayer {
let serial_submitter = SerialSubmitter::new(
msg_receive,
inbox_contracts.clone(),
self.outbox().db(),
self.outbox().db().clone(),
SerialSubmitterMetrics::new(
&self.core.metrics,
outbox_name,
@ -221,8 +221,8 @@ impl Relayer {
}
};
let message_processor = MessageProcessor::new(
outbox,
self.outbox().db(),
outbox.clone(),
self.outbox().db().clone(),
inbox_contracts,
self.whitelist.clone(),
self.blacklist.clone(),

@ -15,7 +15,7 @@ pub(crate) struct ValidatorSubmitter {
interval: u64,
reorg_period: u64,
signer: Arc<Signers>,
outbox: Arc<CachingOutbox>,
outbox: CachingOutbox,
checkpoint_syncer: Arc<CheckpointSyncers>,
metrics: ValidatorSubmitterMetrics,
}
@ -24,7 +24,7 @@ impl ValidatorSubmitter {
pub(crate) fn new(
interval: u64,
reorg_period: u64,
outbox: Arc<CachingOutbox>,
outbox: CachingOutbox,
signer: Arc<Signers>,
checkpoint_syncer: Arc<CheckpointSyncers>,
metrics: ValidatorSubmitterMetrics,
@ -54,7 +54,7 @@ impl ValidatorSubmitter {
}
/// Spawn a task to update the outbox state gauge.
async fn metrics_loop(outbox_state_gauge: IntGauge, outbox: Arc<CachingOutbox>) {
async fn metrics_loop(outbox_state_gauge: IntGauge, outbox: CachingOutbox) {
let mut interval = tokio::time::interval(Duration::from_secs(60));
interval.set_missed_tick_behavior(MissedTickBehavior::Skip);
loop {

@ -79,7 +79,7 @@ impl BaseAgent for Validator {
let submit = ValidatorSubmitter::new(
self.interval,
self.reorg_period,
self.outbox(),
self.outbox().clone(),
self.signer.clone(),
self.checkpoint_syncer.clone(),
ValidatorSubmitterMetrics::new(&self.core.metrics, self.outbox().chain_name()),

Loading…
Cancel
Save