Relayer whitelist (#570)

* Update ABIs, compiles. Removed Inbox indexer and cached checkpoints from the Inbox

* Rm checkpoint indexing test

* Update InboxValidatorManager, just need to connect the pieces now

* Add channels

* So close

* Fix InboxValidatorManager deploy

* Make message processor slightly more readable

* Create loop-control crate

* Try to bring some sanity to message processor

* Fix bug where prover_sync wasn't in line with latest_signed_checkpoint

* Rm immediate message processing, clean up

* rm abacus-cli

* more cleanup

* rename / rm some settings

* TS renames / rms

* Lower some processing failure logs to info

* Checkpoint fetcher doesn't need the CommittedMessages

* Hardcode kathy dispatching

* Move to watch channel

* I'm sorry clippy

* nits

* rm some nonce related stuff

* more refactoring

* cleanup

* Add whitelist settings

* Whitelist filtering

* fix docker build

* use relayed terminology

* Change whitelist tuple order

* naming change

* move whitelist check to before fetching proof

* Update span event

* make messages singular

* Update error

* minor cleanup

* Update configs in scripts

* Whitelist testing and parsing fixes

Co-authored-by: Trevor Porter <trkporter@ucdavis.edu>
pull/586/head
Mattie Conover 2 years ago committed by GitHub
parent bcf638ff4f
commit 4d9603c868
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 5
      rust/Cargo.lock
  2. 3
      rust/Cargo.toml
  3. 1
      rust/Dockerfile
  4. 3
      rust/agents/relayer/Cargo.toml
  5. 376
      rust/agents/relayer/src/message_processor.rs
  6. 52
      rust/agents/relayer/src/relayer.rs
  7. 6
      rust/agents/relayer/src/settings/mod.rs
  8. 311
      rust/agents/relayer/src/settings/whitelist.rs
  9. 1
      rust/run-locally.sh
  10. 7
      rust/utils/loop-control/Cargo.toml
  11. 33
      rust/utils/loop-control/src/lib.rs
  12. 34
      rust/utils/loop-control/tests/integration_tests.rs
  13. 2
      typescript/infra/config/environments/dev/agent.ts
  14. 2
      typescript/infra/config/environments/test/agent.ts
  15. 2
      typescript/infra/config/environments/testnet/agent.ts
  16. 2
      typescript/infra/config/environments/testnet2/agent.ts
  17. 2
      typescript/infra/src/agents/index.ts
  18. 26
      typescript/infra/src/config/agent.ts

5
rust/Cargo.lock generated

@ -2132,6 +2132,10 @@ dependencies = [
"cfg-if",
]
[[package]]
name = "loop-control"
version = "0.1.0"
[[package]]
name = "maplit"
version = "1.0.2"
@ -3140,6 +3144,7 @@ dependencies = [
"ethers",
"eyre",
"futures-util",
"loop-control",
"prometheus",
"serde 1.0.130",
"serde_json",

@ -10,5 +10,6 @@ members = [
"agents/kathy",
"agents/validator",
"agents/relayer",
"ethers-prometheus"
"ethers-prometheus",
"utils/loop-control"
]

@ -16,6 +16,7 @@ COPY abacus-base ./abacus-base
COPY abacus-core ./abacus-core
COPY abacus-test ./abacus-test
COPY ethers-prometheus ./ethers-prometheus
COPY utils ./utils
COPY Cargo.toml .
COPY Cargo.lock .

@ -6,7 +6,7 @@ edition = "2021"
[dependencies]
tokio = { version = "1", features = ["rt", "macros"] }
config = "0.10"
serde = "1.0"
serde = {version = "1.0", features = ["derive"]}
serde_json = { version = "1.0", default-features = false }
ethers = { git = "https://github.com/gakonst/ethers-rs", branch = "master" }
thiserror = { version = "1.0.22", default-features = false }
@ -20,6 +20,7 @@ tracing-subscriber = "0.3"
abacus-core = { path = "../../abacus-core" }
abacus-base = { path = "../../abacus-base" }
loop-control = { path = "../../utils/loop-control" }
prometheus = "0.13"

@ -1,14 +1,10 @@
use std::sync::Arc;
use std::{
cmp::Reverse,
collections::BinaryHeap,
time::{Duration, Instant},
};
use abacus_base::{InboxContracts, Outboxes};
use abacus_core::{
db::AbacusDB, AbacusCommon, AbacusContract, CommittedMessage, Inbox, InboxValidatorManager,
MessageStatus, MultisigSignedCheckpoint,
};
use eyre::{bail, Result};
use prometheus::{IntGauge, IntGaugeVec};
use tokio::{sync::watch::Receiver, task::JoinHandle, time::sleep};
@ -16,7 +12,16 @@ use tracing::{
debug, error, info, info_span, instrument, instrument::Instrumented, warn, Instrument,
};
use abacus_base::{InboxContracts, Outboxes};
use abacus_core::{
db::AbacusDB, AbacusCommon, AbacusContract, CommittedMessage, Inbox, InboxValidatorManager,
MessageStatus, MultisigSignedCheckpoint,
};
use loop_control::LoopControl::{Continue, Flow};
use loop_control::{loop_ctrl, LoopControl};
use crate::merkle_tree_builder::MerkleTreeBuilder;
use crate::settings::whitelist::Whitelist;
pub(crate) struct MessageProcessor {
max_retries: u32,
@ -25,6 +30,7 @@ pub(crate) struct MessageProcessor {
prover_sync: MerkleTreeBuilder,
retry_queue: BinaryHeap<MessageToRetry>,
signed_checkpoint_receiver: Receiver<Option<MultisigSignedCheckpoint>>,
whitelist: Arc<Whitelist>,
processor_loop_gauge: IntGauge,
processed_gauge: IntGauge,
retry_queue_length_gauge: IntGauge,
@ -40,6 +46,7 @@ struct MessageToRetry {
#[derive(Debug)]
enum MessageProcessingStatus {
NotDestinedForInbox,
NotWhitelisted,
NotYetCheckpointed,
Processed,
Error,
@ -53,6 +60,7 @@ impl MessageProcessor {
db: AbacusDB,
inbox_contracts: InboxContracts,
signed_checkpoint_receiver: Receiver<Option<MultisigSignedCheckpoint>>,
whitelist: Arc<Whitelist>,
leaf_index_gauge: IntGaugeVec,
retry_queue_length: IntGaugeVec,
) -> Self {
@ -74,6 +82,7 @@ impl MessageProcessor {
db,
inbox_contracts,
retry_queue: BinaryHeap::new(),
whitelist,
signed_checkpoint_receiver,
processor_loop_gauge,
processed_gauge,
@ -87,77 +96,79 @@ impl MessageProcessor {
latest_signed_checkpoint: &MultisigSignedCheckpoint,
message_leaf_index: u32,
) -> Result<MessageProcessingStatus> {
match self
let message = if let Some(m) = self
.db
.message_by_leaf_index(message_leaf_index)?
.map(CommittedMessage::try_from)
.transpose()?
{
Some(message) => {
let leaf = message.to_leaf();
if message.message.destination != self.inbox_contracts.inbox.local_domain() {
return Ok(MessageProcessingStatus::NotDestinedForInbox);
m
} else {
// Should not get here
bail!("Somehow MessageProcessor get the leaf despite waiting for it");
};
let leaf = message.to_leaf();
if message.message.destination != self.inbox_contracts.inbox.local_domain() {
return Ok(MessageProcessingStatus::NotDestinedForInbox);
}
match self.inbox_contracts.inbox.message_status(leaf).await? {
MessageStatus::None => {
if latest_signed_checkpoint.checkpoint.index >= self.prover_sync.count() {
self.prover_sync
.update_to_checkpoint(&latest_signed_checkpoint.checkpoint)
.await?;
}
match self.inbox_contracts.inbox.message_status(leaf).await? {
MessageStatus::None => {
if latest_signed_checkpoint.checkpoint.index >= self.prover_sync.count() {
self.prover_sync
.update_to_checkpoint(&latest_signed_checkpoint.checkpoint)
.await?;
}
// prover_sync should always be in sync with latest_signed_checkpoint
assert_eq!(
latest_signed_checkpoint.checkpoint.index + 1,
self.prover_sync.count()
);
// prover_sync should always be in sync with latest_signed_checkpoint
assert_eq!(
latest_signed_checkpoint.checkpoint.index + 1,
self.prover_sync.count()
);
if message_leaf_index > latest_signed_checkpoint.checkpoint.index {
return Ok(MessageProcessingStatus::NotYetCheckpointed);
}
if !self.whitelist.msg_matches(&message.message) {
return Ok(MessageProcessingStatus::NotWhitelisted);
}
if message_leaf_index > latest_signed_checkpoint.checkpoint.index {
return Ok(MessageProcessingStatus::NotYetCheckpointed);
match self.prover_sync.get_proof(message_leaf_index) {
Ok(proof) => match self
.inbox_contracts
.validator_manager
.process(latest_signed_checkpoint, &message.message, &proof)
.await
{
Ok(outcome) => {
info!(
leaf_index = message_leaf_index,
hash = ?outcome.txid,
"Message successfully processed"
);
self.db.mark_leaf_as_processed(message_leaf_index)?;
Ok(MessageProcessingStatus::Processed)
}
match self.prover_sync.get_proof(message_leaf_index) {
Ok(proof) => match self
.inbox_contracts
.validator_manager
.process(latest_signed_checkpoint, &message.message, &proof)
.await
{
Ok(outcome) => {
info!(
leaf_index = message_leaf_index,
hash = ?outcome.txid,
"Message successfully processed"
);
self.db.mark_leaf_as_processed(message_leaf_index)?;
Ok(MessageProcessingStatus::Processed)
}
Err(err) => {
info!(leaf_index = message_leaf_index, error=?err, "Message failed to process, enqueuing for retry");
Ok(MessageProcessingStatus::Error)
}
},
Err(err) => {
error!(error=?err, "Unable to fetch proof");
bail!("Unable to fetch proof");
}
Err(err) => {
info!(leaf_index = message_leaf_index, error=?err, "Message failed to process, enqueuing for retry");
Ok(MessageProcessingStatus::Error)
}
}
MessageStatus::Processed => {
debug!(
leaf_index = message_leaf_index,
domain = self.inbox_contracts.inbox.local_domain(),
"Message already processed"
);
self.db.mark_leaf_as_processed(message_leaf_index)?;
Ok(MessageProcessingStatus::Processed)
},
Err(err) => {
error!(error=?err, "Unable to fetch proof");
bail!("Unable to fetch proof");
}
}
}
None => {
// Should not get here
bail!("Somehow MessageProcessor get the leaf despite waiting for it");
MessageStatus::Processed => {
debug!(
leaf_index = message_leaf_index,
domain = self.inbox_contracts.inbox.local_domain(),
"Message already processed"
);
self.db.mark_leaf_as_processed(message_leaf_index)?;
Ok(MessageProcessingStatus::Processed)
}
}
}
@ -165,7 +176,7 @@ impl MessageProcessor {
/// Read a signed checkpoint that the channel may have received without blocking.
/// Of current_latest_signed_checkpoint and the signed checkpoint received,
/// the one with the latest index is returned.
fn get_latest_signed_checkpoint(
fn get_updated_latest_signed_checkpoint(
&mut self,
current_latest_signed_checkpoint: MultisigSignedCheckpoint,
) -> Result<MultisigSignedCheckpoint> {
@ -210,11 +221,12 @@ impl MessageProcessor {
loop {
// Get latest signed checkpoint, non-blocking
latest_signed_checkpoint =
self.get_latest_signed_checkpoint(latest_signed_checkpoint)?;
self.get_updated_latest_signed_checkpoint(latest_signed_checkpoint)?;
self.processor_loop_gauge.set(message_leaf_index as i64);
self.retry_queue_length_gauge
.set(self.retry_queue.len() as i64);
if self
.db
.retrieve_leaf_processing_status(message_leaf_index)?
@ -223,120 +235,146 @@ impl MessageProcessor {
message_leaf_index += 1;
continue;
}
// Sleep to not fire too many view calls in a short duration
sleep(Duration::from_millis(20)).await;
match self.db.leaf_by_leaf_index(message_leaf_index)? {
Some(_) => {
// We have unseen messages to process
if self.db.leaf_by_leaf_index(message_leaf_index)?.is_some() {
message_leaf_index = self
.process_fresh_leaf(&mut latest_signed_checkpoint, message_leaf_index)
.await?;
} else {
loop_ctrl!(
self.retry_processing_message(&mut latest_signed_checkpoint)
.await?
);
}
}
// will only reach this if we break
Ok(())
}
/// Part of main loop
///
/// - `returns` the new message leaf index.
async fn process_fresh_leaf(
&mut self,
latest_signed_checkpoint: &mut MultisigSignedCheckpoint,
message_leaf_index: u32,
) -> Result<u32> {
// We have unseen messages to process
info!(
destination = self.inbox_contracts.inbox.local_domain(),
leaf_index = message_leaf_index,
"Evaluating fresh leaf for processing"
);
let new_leaf = match self
.try_processing_message(latest_signed_checkpoint, message_leaf_index)
.await?
{
MessageProcessingStatus::Processed => {
self.processed_gauge.set(message_leaf_index as i64);
message_leaf_index + 1
}
MessageProcessingStatus::NotYetCheckpointed => {
// Do nothing. We should allow the backlog to be evaluated
// and will eventually learn about a new signed checkpoint.
message_leaf_index
}
MessageProcessingStatus::NotDestinedForInbox
| MessageProcessingStatus::NotWhitelisted => message_leaf_index + 1,
MessageProcessingStatus::Error => {
warn!(
destination = self.inbox_contracts.inbox.local_domain(),
leaf_index = message_leaf_index,
"Message could not be processed, queue for retry"
);
self.retry_queue.push(MessageToRetry {
leaf_index: message_leaf_index,
time_to_retry: Reverse(Instant::now()),
retries: 0,
});
message_leaf_index + 1
}
};
Ok(new_leaf)
}
/// Part of main loop
async fn retry_processing_message(
&mut self,
latest_signed_checkpoint: &mut MultisigSignedCheckpoint,
) -> Result<LoopControl> {
// See if we have messages to retry
if let Some(MessageToRetry { time_to_retry, .. }) = self.retry_queue.peek() {
// Since we use Reverse, we want time_to_retry to be smaller
if time_to_retry < &Reverse(Instant::now()) {
return Ok(Continue);
}
}
let MessageToRetry {
leaf_index,
retries,
..
} = if let Some(v) = self.retry_queue.pop() {
v
} else {
// Nothing to do, just sleep
sleep(Duration::from_secs(1)).await;
return Ok(Flow);
};
info!(
destination = self.inbox_contracts.inbox.local_domain(),
leaf_index = leaf_index,
retries = retries,
retry_queue_length = self.retry_queue.len(),
"Retry processing of message"
);
match self
.try_processing_message(latest_signed_checkpoint, leaf_index)
.await?
{
MessageProcessingStatus::NotDestinedForInbox
| MessageProcessingStatus::NotYetCheckpointed
| MessageProcessingStatus::NotWhitelisted => {
error!(
leaf_index = leaf_index,
"Somehow we tried to retry a message that cant be retried"
);
bail!("Somehow we tried to retry a message that cant be retried")
}
MessageProcessingStatus::Processed => {}
MessageProcessingStatus::Error => {
info!(
destination = self.inbox_contracts.inbox.local_domain(),
leaf_index = leaf_index,
retries = retries,
retry_queue_length = self.retry_queue.len(),
"Retry of message failed processing"
);
if retries >= self.max_retries {
info!(
destination = self.inbox_contracts.inbox.local_domain(),
leaf_index = message_leaf_index,
"Process fresh leaf"
leaf_index = leaf_index,
retries = retries,
retry_queue_length = self.retry_queue.len(),
"Maximum number of retries exceeded for processing message"
);
match self
.try_processing_message(&latest_signed_checkpoint, message_leaf_index)
.await?
{
MessageProcessingStatus::Processed => {
self.processed_gauge.set(message_leaf_index as i64);
message_leaf_index += 1
}
MessageProcessingStatus::NotYetCheckpointed => {
// Do nothing. We should allow the backlog to be evaluated
// and will eventually learn about a new signed checkpoint.
}
MessageProcessingStatus::NotDestinedForInbox => message_leaf_index += 1,
MessageProcessingStatus::Error => {
warn!(
destination = self.inbox_contracts.inbox.local_domain(),
leaf_index = message_leaf_index,
"Message could not be processed, queue for retry"
);
self.retry_queue.push(MessageToRetry {
leaf_index: message_leaf_index,
time_to_retry: Reverse(Instant::now()),
retries: 0,
});
message_leaf_index += 1;
}
}
}
None => {
// See if we have messages to retry
if let Some(MessageToRetry { time_to_retry, .. }) = self.retry_queue.peek() {
// Since we use Reverse, we want time_to_retry to be smaller
if time_to_retry < &Reverse(Instant::now()) {
continue;
}
}
match self.retry_queue.pop() {
Some(MessageToRetry {
leaf_index,
retries,
..
}) => {
info!(
destination = self.inbox_contracts.inbox.local_domain(),
leaf_index = leaf_index,
retries = retries,
retry_queue_length = self.retry_queue.len(),
"Retry processing of message"
);
match self
.try_processing_message(&latest_signed_checkpoint, leaf_index)
.await?
{
MessageProcessingStatus::NotDestinedForInbox
| MessageProcessingStatus::NotYetCheckpointed => {
error!(
leaf_index = leaf_index,
"Somehow we tried to retry a message that cant be retried"
);
bail!(
"Somehow we tried to retry a message that cant be retried"
)
}
MessageProcessingStatus::Processed => {}
MessageProcessingStatus::Error => {
info!(
destination = self.inbox_contracts.inbox.local_domain(),
leaf_index = leaf_index,
retries = retries,
retry_queue_length = self.retry_queue.len(),
"Retry of message failed processing"
);
if retries >= self.max_retries {
info!(
destination = self.inbox_contracts.inbox.local_domain(),
leaf_index = leaf_index,
retries = retries,
retry_queue_length = self.retry_queue.len(),
"Max retries exceeded for processing message, dropping"
);
continue;
}
let retries = retries + 1;
let time_to_retry = Reverse(
Instant::now()
+ Duration::from_secs(2u64.pow(retries as u32)),
);
self.retry_queue.push(MessageToRetry {
leaf_index,
time_to_retry,
retries,
});
}
}
}
None => {
// Nothing to do, just sleep
sleep(Duration::from_secs(1)).await;
}
}
return Ok(Continue);
}
let retries = retries + 1;
let time_to_retry =
Reverse(Instant::now() + Duration::from_secs(2u64.pow(retries as u32)));
self.retry_queue.push(MessageToRetry {
leaf_index,
time_to_retry,
retries,
});
}
}
Ok(Flow)
}
pub(crate) fn spawn(self) -> Instrumented<JoinHandle<Result<()>>> {

@ -1,24 +1,22 @@
use std::sync::Arc;
use abacus_core::MultisigSignedCheckpoint;
use async_trait::async_trait;
use eyre::{Context, Result};
use tokio::{
sync::watch::{channel, Receiver, Sender},
task::JoinHandle,
};
use tracing::{info, instrument::Instrumented, Instrument};
use abacus_base::{
AbacusAgentCore, Agent, CachingInterchainGasPaymaster, ContractSyncMetrics, InboxContracts,
MultisigCheckpointSyncer,
};
use abacus_core::MultisigSignedCheckpoint;
use crate::{
checkpoint_fetcher::CheckpointFetcher, message_processor::MessageProcessor,
settings::RelayerSettings as Settings,
};
use crate::checkpoint_fetcher::CheckpointFetcher;
use crate::settings::whitelist::Whitelist;
use crate::{message_processor::MessageProcessor, settings::RelayerSettings};
/// A relayer agent
#[derive(Debug)]
@ -27,6 +25,7 @@ pub struct Relayer {
max_processing_retries: u32,
multisig_checkpoint_syncer: MultisigCheckpointSyncer,
core: AbacusAgentCore,
whitelist: Arc<Whitelist>,
}
impl AsRef<AbacusAgentCore> for Relayer {
@ -35,30 +34,12 @@ impl AsRef<AbacusAgentCore> for Relayer {
}
}
#[allow(clippy::unit_arg)]
impl Relayer {
/// Instantiate a new relayer
pub fn new(
signed_checkpoint_polling_interval: u64,
max_processing_retries: u32,
multisig_checkpoint_syncer: MultisigCheckpointSyncer,
core: AbacusAgentCore,
) -> Self {
Self {
signed_checkpoint_polling_interval,
max_processing_retries,
multisig_checkpoint_syncer,
core,
}
}
}
#[async_trait]
#[allow(clippy::unit_arg)]
impl Agent for Relayer {
const AGENT_NAME: &'static str = "relayer";
type Settings = Settings;
type Settings = RelayerSettings;
async fn from_settings(settings: Self::Settings) -> Result<Self>
where
@ -67,18 +48,30 @@ impl Agent for Relayer {
let multisig_checkpoint_syncer: MultisigCheckpointSyncer = settings
.multisigcheckpointsyncer
.try_into_multisig_checkpoint_syncer()?;
Ok(Self::new(
let whitelist = Arc::new(
settings
.whitelist
.as_ref()
.map(|wl| serde_json::from_str(wl))
.transpose()
.expect("Invalid whitelist received")
.unwrap_or_default(),
);
info!(whitelist = %whitelist, "Whitelist configuration");
Ok(Self {
signed_checkpoint_polling_interval: settings
.signedcheckpointpollinginterval
.parse()
.unwrap_or(5),
settings.maxprocessingretries.parse().unwrap_or(10),
max_processing_retries: settings.maxprocessingretries.parse().unwrap_or(10),
multisig_checkpoint_syncer,
settings
core: settings
.as_ref()
.try_into_abacus_core(Self::AGENT_NAME)
.await?,
))
whitelist,
})
}
}
@ -126,6 +119,7 @@ impl Relayer {
db,
inbox_contracts,
signed_checkpoint_receiver,
self.whitelist.clone(),
self.core.metrics.last_known_message_leaf_index(),
self.core.metrics.retry_queue_length(),
);

@ -2,11 +2,15 @@
use abacus_base::decl_settings;
pub mod whitelist;
decl_settings!(Relayer {
/// The polling interval to check for new signed checkpoints in seconds
signedcheckpointpollinginterval: String,
/// The maxinmum number of times a processor will try to process a message
/// The maximum number of times a relayer will try to process a message
maxprocessingretries: String,
/// The multisig checkpoint syncer configuration
multisigcheckpointsyncer: abacus_base::MultisigCheckpointSyncerConf,
/// This is optional. See `Whitelist` for more.
whitelist: Option<String>,
});

@ -0,0 +1,311 @@
use std::fmt;
use std::fmt::{Display, Formatter};
use std::marker::PhantomData;
use std::num::ParseIntError;
use ethers::prelude::*;
use serde::de::{Error, SeqAccess, Visitor};
use serde::{Deserialize, Deserializer};
use abacus_core::AbacusMessage;
/// Whitelist defining which messages should be relayed. If no wishlist is provided ALL
/// messages will be relayed.
///
/// Valid options for each of the tuple elements are
/// - wildcard "*"
/// - single value in decimal or hex (must start with `0x`) format
/// - list of values in decimal or hex format
/// - defaults to wildcards
#[derive(Debug, Deserialize, Default, Clone)]
#[serde(transparent)]
pub struct Whitelist(Option<Vec<WhitelistElement>>);
#[derive(Debug, Clone, PartialEq)]
enum Filter<T> {
Wildcard,
Enumerated(Vec<T>),
}
impl<T> Default for Filter<T> {
fn default() -> Self {
Self::Wildcard
}
}
impl<T: PartialEq> Filter<T> {
fn matches(&self, v: &T) -> bool {
match self {
Filter::Wildcard => true,
Filter::Enumerated(list) => list.iter().any(|i| i == v),
}
}
}
impl<T: Display> Display for Filter<T> {
fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result {
match self {
Self::Wildcard => write!(f, "*"),
Self::Enumerated(l) if l.len() == 1 => write!(f, "{}", l[0]),
Self::Enumerated(l) => {
write!(f, "[")?;
for i in l {
write!(f, "{i},")?;
}
write!(f, "]")
}
}
}
}
#[derive(Deserialize)]
#[serde(untagged)]
enum StrOrInt<'a> {
Str(&'a str),
Int(u32),
}
impl TryFrom<StrOrInt<'_>> for u32 {
type Error = ParseIntError;
fn try_from(v: StrOrInt) -> Result<Self, Self::Error> {
match v {
StrOrInt::Str(s) => s.parse(),
StrOrInt::Int(i) => Ok(i),
}
}
}
struct FilterVisitor<T>(PhantomData<T>);
impl<'de> Visitor<'de> for FilterVisitor<u32> {
type Value = Filter<u32>;
fn expecting(&self, fmt: &mut Formatter) -> fmt::Result {
write!(fmt, "Expecting either a wildcard \"*\", decimal/hex value string, or list of decimal/hex value strings")
}
fn visit_u32<E>(self, v: u32) -> Result<Self::Value, E>
where
E: Error,
{
Ok(Self::Value::Enumerated(vec![v]))
}
fn visit_u64<E>(self, v: u64) -> Result<Self::Value, E>
where
E: Error,
{
if v <= u32::MAX as u64 {
Ok(Self::Value::Enumerated(vec![v as u32]))
} else {
Err(E::custom("Domain Id must fit within a u32 value"))
}
}
fn visit_str<E>(self, v: &str) -> Result<Self::Value, E>
where
E: Error,
{
Ok(if v == "*" {
Self::Value::Wildcard
} else {
Self::Value::Enumerated(vec![v.parse::<u32>().map_err(to_serde_err)?])
})
}
fn visit_seq<A>(self, mut seq: A) -> Result<Self::Value, A::Error>
where
A: SeqAccess<'de>,
{
let mut values = Vec::new();
while let Some(i) = seq.next_element::<StrOrInt>()? {
values.push(i.try_into().map_err(to_serde_err)?);
}
Ok(Self::Value::Enumerated(values))
}
}
impl<'de> Visitor<'de> for FilterVisitor<H256> {
type Value = Filter<H256>;
fn expecting(&self, fmt: &mut Formatter) -> fmt::Result {
write!(
fmt,
"Expecting either a wildcard \"*\", hex address string, or list of hex address strings"
)
}
fn visit_str<E>(self, v: &str) -> Result<Self::Value, E>
where
E: Error,
{
Ok(if v == "*" {
Self::Value::Wildcard
} else {
Self::Value::Enumerated(vec![parse_addr(v)?])
})
}
fn visit_seq<A>(self, mut seq: A) -> Result<Self::Value, A::Error>
where
A: SeqAccess<'de>,
{
let mut values = Vec::new();
while let Some(i) = seq.next_element::<&str>()? {
values.push(parse_addr(i)?)
}
Ok(Self::Value::Enumerated(values))
}
}
impl<'de> Deserialize<'de> for Filter<u32> {
fn deserialize<D>(d: D) -> Result<Self, D::Error>
where
D: Deserializer<'de>,
{
d.deserialize_any(FilterVisitor::<u32>(Default::default()))
}
}
impl<'de> Deserialize<'de> for Filter<H256> {
fn deserialize<D>(d: D) -> Result<Self, D::Error>
where
D: Deserializer<'de>,
{
d.deserialize_any(FilterVisitor::<H256>(Default::default()))
}
}
#[derive(Debug, Deserialize, Clone)]
#[serde(tag = "type", rename_all = "camelCase")]
struct WhitelistElement {
#[serde(default)]
source_domain: Filter<u32>,
#[serde(default)]
source_address: Filter<H256>,
#[serde(default)]
destination_domain: Filter<u32>,
#[serde(default)]
destination_address: Filter<H256>,
}
impl Display for WhitelistElement {
fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result {
write!(f, "{{sourceDomain: {}, sourceAddress: {}, destinationDomain: {}, destinationAddress: {}}}", self.source_domain, self.source_address, self.destination_domain, self.destination_address)
}
}
impl Whitelist {
pub fn msg_matches(&self, msg: &AbacusMessage) -> bool {
self.matches(msg.origin, &msg.sender, msg.destination, &msg.recipient)
}
pub fn matches(
&self,
src_domain: u32,
src_addr: &H256,
dst_domain: u32,
dst_addr: &H256,
) -> bool {
if let Some(rules) = &self.0 {
rules.iter().any(|rule| {
rule.source_domain.matches(&src_domain)
&& rule.source_address.matches(src_addr)
&& rule.destination_domain.matches(&dst_domain)
&& rule.destination_address.matches(dst_addr)
})
} else {
// by default if there is no whitelist, allow everything
true
}
}
}
impl Display for Whitelist {
fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result {
if let Some(wl) = &self.0 {
write!(f, "[")?;
for i in wl {
write!(f, "{i},")?;
}
write!(f, "]")
} else {
write!(f, "null")
}
}
}
fn to_serde_err<IE: ToString, OE: Error>(e: IE) -> OE {
OE::custom(e.to_string())
}
fn parse_addr<E: Error>(addr_str: &str) -> Result<H256, E> {
if addr_str.len() <= 42 {
addr_str.parse::<H160>().map(H256::from)
} else {
addr_str.parse::<H256>()
}
.map_err(to_serde_err)
}
#[cfg(test)]
mod test {
use ethers::prelude::*;
use super::{Filter::*, Whitelist};
#[test]
fn basic_config() {
let whitelist: Whitelist = serde_json::from_str(r#"[{"sourceDomain": "*", "sourceAddress": "*", "destinationDomain": "*", "destinationAddress": "*"}, {}]"#).unwrap();
assert!(whitelist.0.is_some());
assert_eq!(whitelist.0.as_ref().unwrap().len(), 2);
let elem = &whitelist.0.as_ref().unwrap()[0];
assert_eq!(elem.destination_domain, Wildcard);
assert_eq!(elem.destination_address, Wildcard);
assert_eq!(elem.source_domain, Wildcard);
assert_eq!(elem.source_address, Wildcard);
let elem = &whitelist.0.as_ref().unwrap()[1];
assert_eq!(elem.destination_domain, Wildcard);
assert_eq!(elem.destination_address, Wildcard);
assert_eq!(elem.source_domain, Wildcard);
assert_eq!(elem.source_address, Wildcard);
}
#[test]
fn config_with_address() {
let whitelist: Whitelist = serde_json::from_str(r#"[{"sourceAddress": "0x9d4454B023096f34B160D6B654540c56A1F81688", "destinationAddress": "9d4454B023096f34B160D6B654540c56A1F81688"}]"#).unwrap();
assert!(whitelist.0.is_some());
assert_eq!(whitelist.0.as_ref().unwrap().len(), 1);
let elem = &whitelist.0.as_ref().unwrap()[0];
assert_eq!(elem.destination_domain, Wildcard);
assert_eq!(
elem.destination_address,
Enumerated(vec!["0x9d4454B023096f34B160D6B654540c56A1F81688"
.parse::<H160>()
.unwrap()
.into()])
);
assert_eq!(elem.source_domain, Wildcard);
assert_eq!(
elem.source_address,
Enumerated(vec!["0x9d4454B023096f34B160D6B654540c56A1F81688"
.parse::<H160>()
.unwrap()
.into()])
);
}
#[test]
fn config_with_multiple_domains() {
let whitelist: Whitelist =
serde_json::from_str(r#"[{"destinationDomain": ["13372", "13373"]}]"#).unwrap();
assert!(whitelist.0.is_some());
assert_eq!(whitelist.0.as_ref().unwrap().len(), 1);
let elem = &whitelist.0.as_ref().unwrap()[0];
assert_eq!(elem.destination_domain, Enumerated(vec![13372, 13373]));
assert_eq!(elem.destination_address, Wildcard);
assert_eq!(elem.source_domain, Wildcard);
assert_eq!(elem.source_address, Wildcard);
}
}

@ -42,6 +42,7 @@ function relayer {(
OPT_BASE_SIGNERS_TEST2_TYPE="hexKey" \
OPT_BASE_SIGNERS_TEST3_KEY="701b615bbdfb9de65240bc28bd21bbc0d996645a3dd57e7b12bc2bdf6f192c82" \
OPT_BASE_SIGNERS_TEST3_TYPE="hexKey" \
OPT_RELAYER_WHITELIST='[{"sourceAddress": "*", "destinationDomain": ["13372", "13373"], "destinationAddress": "*"}]' \
OPT_RELAYER_SIGNEDCHECKPOINTPOLLINGINTERVAL="5" \
OPT_RELAYER_MAXPROCESSINGRETRIES="5" \
OPT_RELAYER_MULTISIGCHECKPOINTSYNCER_THRESHOLD="1" \

@ -0,0 +1,7 @@
[package]
name = "loop-control"
version = "0.1.0"
edition = "2021"
description = "A simple library which provides an easy way to breakup logic within a loop without losing the ability to pass loop control logic back."
[dependencies]

@ -0,0 +1,33 @@
/// A loop control operation.
#[must_use]
#[derive(Debug, Copy, Clone)]
pub enum LoopControl {
/// No op, just flow through the rest of the loop normally
Flow,
/// Inject `continue` and run next loop iteration
Continue,
/// Inject `break` and end the loop
Break,
}
impl Default for LoopControl {
fn default() -> Self {
LoopControl::Flow
}
}
/// Handle a loop control operation. This must be called directly within a loop.
#[macro_export]
macro_rules! loop_ctrl {
($ctrl:expr) => {
match $ctrl {
::loop_control::LoopControl::Flow => {}
::loop_control::LoopControl::Continue => {
continue;
}
::loop_control::LoopControl::Break => {
break;
}
}
};
}

@ -0,0 +1,34 @@
use loop_control::{loop_ctrl, LoopControl::*};
#[test]
fn flows_loop() {
let mut i = 0;
for _ in 0..5 {
i += 1;
loop_ctrl!(Flow);
i += 1;
}
assert_eq!(i, 10);
}
#[test]
fn continues_loop() {
let mut i = 0;
for _ in 0..5 {
i += 1;
loop_ctrl!(Continue);
i += 1;
}
assert_eq!(i, 5);
}
#[test]
fn breaks_loop() {
let mut i = 0;
for _ in 0..5 {
i += 1;
loop_ctrl!(Break);
i += 1;
}
assert_eq!(i, 1);
}

@ -21,7 +21,7 @@ export const agent: AgentConfig<DevChains> = {
},
relayer: {
default: {
signedCheckpointPollingInteral: 5,
signedCheckpointPollingInterval: 5,
maxProcessingRetries: 10,
},
},

@ -21,7 +21,7 @@ export const agent: AgentConfig<TestChains> = {
},
relayer: {
default: {
signedCheckpointPollingInteral: 5,
signedCheckpointPollingInterval: 5,
maxProcessingRetries: 10,
},
},

@ -30,7 +30,7 @@ export const agent: AgentConfig<TestnetChains> = {
},
relayer: {
default: {
signedCheckpointPollingInteral: 5,
signedCheckpointPollingInterval: 5,
maxProcessingRetries: 10,
},
},

@ -30,7 +30,7 @@ export const agent: AgentConfig<TestnetChains> = {
},
relayer: {
default: {
signedCheckpointPollingInteral: 5,
signedCheckpointPollingInterval: 5,
maxProcessingRetries: 10,
},
},

@ -331,7 +331,7 @@ export async function runAgentHelmCommand<Chain extends ChainName>(
return execCmd(
`helm ${action} ${outboxChainName} ../../rust/helm/abacus-agent/ --create-namespace --namespace ${
agentConfig.namespace
} ${values.join(' ')} ${extraPipe}`,
} ${values.join(' ')} --debug --dry-run ${extraPipe}`,
{},
false,
true,

@ -85,12 +85,22 @@ export type ChainValidatorSets<Chain extends ChainName> = ChainMap<
// ===== Relayer Agent =====
// =================================
type Whitelist = WhitelistElement[];
interface WhitelistElement {
sourceDomain?: '*' | string | string[] | number | number[];
sourceAddress?: '*' | string | string[];
destinationDomain?: '*' | string | string[] | number | number[];
destinationAddress?: '*' | string | string[];
}
// Incomplete basic relayer agent config
interface BaseRelayerConfig {
// The polling interval to check for new signed checkpoints in seconds
signedCheckpointPollingInteral: number;
signedCheckpointPollingInterval: number;
// The maxinmum number of times a processor will try to process a message
maxProcessingRetries: number;
whitelist?: Whitelist;
}
// Per-chain relayer agent configs
@ -100,8 +110,9 @@ type ChainRelayerConfigs<Chain extends ChainName> = ChainOverridableConfig<
>;
// Full relayer agent config for a single chain
interface RelayerConfig extends BaseRelayerConfig {
interface RelayerConfig extends Omit<BaseRelayerConfig, 'whitelist'> {
multisigCheckpointSyncer: MultisigCheckpointSyncerConfig;
whitelist?: string;
}
// ===================================
@ -394,13 +405,20 @@ export class ChainAgentConfig<Chain extends ChainName> {
{},
);
return {
...baseConfig,
const obj: RelayerConfig = {
signedCheckpointPollingInterval:
baseConfig.signedCheckpointPollingInterval,
maxProcessingRetries: baseConfig.maxProcessingRetries,
multisigCheckpointSyncer: {
threshold: this.validatorSet.threshold,
checkpointSyncers,
},
};
if (baseConfig.whitelist) {
obj.whitelist = JSON.stringify(baseConfig.whitelist);
}
return obj;
}
get checkpointerEnabled() {

Loading…
Cancel
Save