Relayer blacklists (#807)

* blacklists

* fix not_matches logic to be more understandable

* WIP

* standardize src/dst

* move docs around

* Add tests for matches
pull/859/head
Mattie Conover 2 years ago committed by GitHub
parent a46f724c64
commit e5a72eaba2
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 24
      rust/agents/relayer/src/msg/processor.rs
  2. 31
      rust/agents/relayer/src/relayer.rs
  3. 213
      rust/agents/relayer/src/settings/matching_list.rs
  4. 8
      rust/agents/relayer/src/settings/mod.rs

@ -14,7 +14,7 @@ use abacus_core::{
db::AbacusDB, AbacusCommon, AbacusContract, CommittedMessage, MultisigSignedCheckpoint, Outbox, db::AbacusDB, AbacusCommon, AbacusContract, CommittedMessage, MultisigSignedCheckpoint, Outbox,
}; };
use crate::{merkle_tree_builder::MerkleTreeBuilder, settings::whitelist::Whitelist}; use crate::{merkle_tree_builder::MerkleTreeBuilder, settings::matching_list::MatchingList};
use super::SubmitMessageArgs; use super::SubmitMessageArgs;
@ -23,7 +23,8 @@ pub(crate) struct MessageProcessor {
outbox: Outboxes, outbox: Outboxes,
db: AbacusDB, db: AbacusDB,
inbox_contracts: InboxContracts, inbox_contracts: InboxContracts,
whitelist: Arc<Whitelist>, whitelist: Arc<MatchingList>,
blacklist: Arc<MatchingList>,
metrics: MessageProcessorMetrics, metrics: MessageProcessorMetrics,
tx_msg: mpsc::UnboundedSender<SubmitMessageArgs>, tx_msg: mpsc::UnboundedSender<SubmitMessageArgs>,
ckpt_rx: watch::Receiver<Option<MultisigSignedCheckpoint>>, ckpt_rx: watch::Receiver<Option<MultisigSignedCheckpoint>>,
@ -37,7 +38,8 @@ impl MessageProcessor {
outbox: Outboxes, outbox: Outboxes,
db: AbacusDB, db: AbacusDB,
inbox_contracts: InboxContracts, inbox_contracts: InboxContracts,
whitelist: Arc<Whitelist>, whitelist: Arc<MatchingList>,
blacklist: Arc<MatchingList>,
metrics: MessageProcessorMetrics, metrics: MessageProcessorMetrics,
tx_msg: mpsc::UnboundedSender<SubmitMessageArgs>, tx_msg: mpsc::UnboundedSender<SubmitMessageArgs>,
ckpt_rx: watch::Receiver<Option<MultisigSignedCheckpoint>>, ckpt_rx: watch::Receiver<Option<MultisigSignedCheckpoint>>,
@ -47,6 +49,7 @@ impl MessageProcessor {
db: db.clone(), db: db.clone(),
inbox_contracts, inbox_contracts,
whitelist, whitelist,
blacklist,
metrics, metrics,
tx_msg, tx_msg,
ckpt_rx, ckpt_rx,
@ -143,7 +146,7 @@ impl MessageProcessor {
} }
// Skip if not whitelisted. // Skip if not whitelisted.
if !self.whitelist.msg_matches(&message.message) { if !self.whitelist.msg_matches(&message.message, true) {
debug!( debug!(
inbox_name=?self.inbox_contracts.inbox.chain_name(), inbox_name=?self.inbox_contracts.inbox.chain_name(),
local_domain=?self.inbox_contracts.inbox.local_domain(), local_domain=?self.inbox_contracts.inbox.local_domain(),
@ -155,6 +158,19 @@ impl MessageProcessor {
return Ok(()); return Ok(());
} }
// skip if the message is blacklisted
if self.blacklist.msg_matches(&message.message, false) {
debug!(
inbox_name=?self.inbox_contracts.inbox.chain_name(),
local_domain=?self.inbox_contracts.inbox.local_domain(),
dst=?message.message.destination,
blacklist=?self.blacklist,
msg=?message,
"Message blacklisted, skipping idx {}", self.message_leaf_index);
self.message_leaf_index += 1;
return Ok(());
}
// If validator hasn't published checkpoint covering self.message_leaf_index yet, wait // If validator hasn't published checkpoint covering self.message_leaf_index yet, wait
// until it has, before forwarding the message to the submitter channel. // until it has, before forwarding the message to the submitter channel.
let mut ckpt; let mut ckpt;

@ -18,7 +18,7 @@ use abacus_core::{AbacusContract, MultisigSignedCheckpoint};
use crate::msg::gelato_submitter::GelatoSubmitter; use crate::msg::gelato_submitter::GelatoSubmitter;
use crate::msg::processor::{MessageProcessor, MessageProcessorMetrics}; use crate::msg::processor::{MessageProcessor, MessageProcessorMetrics};
use crate::msg::serial_submitter::SerialSubmitter; use crate::msg::serial_submitter::SerialSubmitter;
use crate::settings::whitelist::Whitelist; use crate::settings::matching_list::MatchingList;
use crate::settings::RelayerSettings; use crate::settings::RelayerSettings;
use crate::{checkpoint_fetcher::CheckpointFetcher, msg::serial_submitter::SerialSubmitterMetrics}; use crate::{checkpoint_fetcher::CheckpointFetcher, msg::serial_submitter::SerialSubmitterMetrics};
@ -28,7 +28,8 @@ pub struct Relayer {
signed_checkpoint_polling_interval: u64, signed_checkpoint_polling_interval: u64,
multisig_checkpoint_syncer: MultisigCheckpointSyncer, multisig_checkpoint_syncer: MultisigCheckpointSyncer,
core: AbacusAgentCore, core: AbacusAgentCore,
whitelist: Arc<Whitelist>, whitelist: Arc<MatchingList>,
blacklist: Arc<MatchingList>,
} }
impl AsRef<AbacusAgentCore> for Relayer { impl AsRef<AbacusAgentCore> for Relayer {
@ -51,16 +52,10 @@ impl Agent for Relayer {
let multisig_checkpoint_syncer: MultisigCheckpointSyncer = settings let multisig_checkpoint_syncer: MultisigCheckpointSyncer = settings
.multisigcheckpointsyncer .multisigcheckpointsyncer
.try_into_multisig_checkpoint_syncer()?; .try_into_multisig_checkpoint_syncer()?;
let whitelist = Arc::new(
settings let whitelist = parse_matching_list(&settings.whitelist);
.whitelist let blacklist = parse_matching_list(&settings.blacklist);
.as_ref() info!(whitelist = %whitelist, blacklist = %blacklist, "Whitelist configuration");
.map(|wl| serde_json::from_str(wl))
.transpose()
.expect("Invalid whitelist received")
.unwrap_or_default(),
);
info!(whitelist = %whitelist, "Whitelist configuration");
Ok(Self { Ok(Self {
signed_checkpoint_polling_interval: settings signed_checkpoint_polling_interval: settings
@ -73,6 +68,7 @@ impl Agent for Relayer {
.try_into_abacus_core(Self::AGENT_NAME, true) .try_into_abacus_core(Self::AGENT_NAME, true)
.await?, .await?,
whitelist, whitelist,
blacklist,
}) })
} }
} }
@ -152,6 +148,7 @@ impl Relayer {
self.outbox().db(), self.outbox().db(),
inbox_contracts, inbox_contracts,
self.whitelist.clone(), self.whitelist.clone(),
self.blacklist.clone(),
metrics, metrics,
new_messages_send_channel, new_messages_send_channel,
signed_checkpoint_receiver, signed_checkpoint_receiver,
@ -200,5 +197,15 @@ impl Relayer {
} }
} }
fn parse_matching_list(list: &Option<String>) -> Arc<MatchingList> {
Arc::new(
list.as_deref()
.map(serde_json::from_str)
.transpose()
.expect("Invalid matching list received")
.unwrap_or_default(),
)
}
#[cfg(test)] #[cfg(test)]
mod test {} mod test {}

@ -9,17 +9,17 @@ use serde::{Deserialize, Deserializer};
use abacus_core::AbacusMessage; use abacus_core::AbacusMessage;
/// Whitelist defining which messages should be relayed. If no wishlist is provided ALL /// Defines a set of patterns for determining if a message should or should not
/// messages will be relayed. /// be relayed. This is useful for determine if a message matches a given set or
/// rules.
/// ///
/// Valid options for each of the tuple elements are /// Valid options for each of the tuple elements are
/// - wildcard "*" /// - wildcard "*"
/// - single value in decimal or hex (must start with `0x`) format /// - single value in decimal or hex (must start with `0x`) format
/// - list of values in decimal or hex format /// - list of values in decimal or hex format
/// - defaults to wildcards
#[derive(Debug, Deserialize, Default, Clone)] #[derive(Debug, Deserialize, Default, Clone)]
#[serde(transparent)] #[serde(transparent)]
pub struct Whitelist(Option<Vec<WhitelistElement>>); pub struct MatchingList(Option<Vec<ListElement>>);
#[derive(Debug, Clone, PartialEq)] #[derive(Debug, Clone, PartialEq)]
enum Filter<T> { enum Filter<T> {
@ -177,51 +177,71 @@ impl<'de> Deserialize<'de> for Filter<H256> {
} }
#[derive(Debug, Deserialize, Clone)] #[derive(Debug, Deserialize, Clone)]
#[serde(tag = "type", rename_all = "camelCase")] #[serde(tag = "type")]
struct WhitelistElement { struct ListElement {
#[serde(default)] #[serde(default, rename = "sourceDomain")]
source_domain: Filter<u32>, src_domain: Filter<u32>,
#[serde(default)] #[serde(default, rename = "sourceAddress")]
source_address: Filter<H256>, src_address: Filter<H256>,
#[serde(default)] #[serde(default, rename = "destinationDomain")]
destination_domain: Filter<u32>, dst_domain: Filter<u32>,
#[serde(default)] #[serde(default, rename = "destinationAddress")]
destination_address: Filter<H256>, dst_address: Filter<H256>,
} }
impl Display for WhitelistElement { impl Display for ListElement {
fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result { fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result {
write!(f, "{{sourceDomain: {}, sourceAddress: {}, destinationDomain: {}, destinationAddress: {}}}", self.source_domain, self.source_address, self.destination_domain, self.destination_address) write!(f, "{{sourceDomain: {}, sourceAddress: {}, destinationDomain: {}, destinationAddress: {}}}", self.src_domain, self.src_address, self.dst_domain, self.dst_address)
} }
} }
impl Whitelist { #[derive(Copy, Clone, Debug)]
pub fn msg_matches(&self, msg: &AbacusMessage) -> bool { struct MatchInfo<'a> {
self.matches(msg.origin, &msg.sender, msg.destination, &msg.recipient)
}
pub fn matches(
&self,
src_domain: u32, src_domain: u32,
src_addr: &H256, src_addr: &'a H256,
dst_domain: u32, dst_domain: u32,
dst_addr: &H256, dst_addr: &'a H256,
) -> bool { }
impl<'a> From<&'a AbacusMessage> for MatchInfo<'a> {
fn from(msg: &'a AbacusMessage) -> Self {
Self {
src_domain: msg.origin,
src_addr: &msg.sender,
dst_domain: msg.destination,
dst_addr: &msg.recipient,
}
}
}
impl MatchingList {
/// Check if a message matches any of the rules.
/// - `default`: What to return if the the matching list is empty.
pub fn msg_matches(&self, msg: &AbacusMessage, default: bool) -> bool {
self.matches(msg.into(), default)
}
/// Check if a message matches any of the rules.
/// - `default`: What to return if the the matching list is empty.
fn matches(&self, info: MatchInfo, default: bool) -> bool {
if let Some(rules) = &self.0 { if let Some(rules) = &self.0 {
rules.iter().any(|rule| { matches_any_rule(rules.iter(), info)
rule.source_domain.matches(&src_domain)
&& rule.source_address.matches(src_addr)
&& rule.destination_domain.matches(&dst_domain)
&& rule.destination_address.matches(dst_addr)
})
} else { } else {
// by default if there is no whitelist, allow everything default
true
} }
} }
} }
impl Display for Whitelist { fn matches_any_rule<'a>(mut rules: impl Iterator<Item = &'a ListElement>, info: MatchInfo) -> bool {
rules.any(|rule| {
rule.src_domain.matches(&info.src_domain)
&& rule.src_address.matches(info.src_addr)
&& rule.dst_domain.matches(&info.dst_domain)
&& rule.dst_address.matches(info.dst_addr)
})
}
impl Display for MatchingList {
fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result { fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result {
if let Some(wl) = &self.0 { if let Some(wl) = &self.0 {
write!(f, "[")?; write!(f, "[")?;
@ -250,62 +270,129 @@ fn parse_addr<E: Error>(addr_str: &str) -> Result<H256, E> {
#[cfg(test)] #[cfg(test)]
mod test { mod test {
use crate::settings::matching_list::MatchInfo;
use ethers::prelude::*; use ethers::prelude::*;
use super::{Filter::*, Whitelist}; use super::{Filter::*, MatchingList};
#[test] #[test]
fn basic_config() { fn basic_config() {
let whitelist: Whitelist = serde_json::from_str(r#"[{"sourceDomain": "*", "sourceAddress": "*", "destinationDomain": "*", "destinationAddress": "*"}, {}]"#).unwrap(); let list: MatchingList = serde_json::from_str(r#"[{"sourceDomain": "*", "sourceAddress": "*", "destinationDomain": "*", "destinationAddress": "*"}, {}]"#).unwrap();
assert!(whitelist.0.is_some()); assert!(list.0.is_some());
assert_eq!(whitelist.0.as_ref().unwrap().len(), 2); assert_eq!(list.0.as_ref().unwrap().len(), 2);
let elem = &whitelist.0.as_ref().unwrap()[0]; let elem = &list.0.as_ref().unwrap()[0];
assert_eq!(elem.destination_domain, Wildcard); assert_eq!(elem.dst_domain, Wildcard);
assert_eq!(elem.destination_address, Wildcard); assert_eq!(elem.dst_address, Wildcard);
assert_eq!(elem.source_domain, Wildcard); assert_eq!(elem.src_domain, Wildcard);
assert_eq!(elem.source_address, Wildcard); assert_eq!(elem.src_address, Wildcard);
let elem = &whitelist.0.as_ref().unwrap()[1]; let elem = &list.0.as_ref().unwrap()[1];
assert_eq!(elem.destination_domain, Wildcard); assert_eq!(elem.dst_domain, Wildcard);
assert_eq!(elem.destination_address, Wildcard); assert_eq!(elem.dst_address, Wildcard);
assert_eq!(elem.source_domain, Wildcard); assert_eq!(elem.src_domain, Wildcard);
assert_eq!(elem.source_address, Wildcard); assert_eq!(elem.src_address, Wildcard);
assert!(list.matches(
MatchInfo {
src_domain: 0,
src_addr: &H256::default(),
dst_domain: 0,
dst_addr: &H256::default()
},
false
));
assert!(list.matches(
MatchInfo {
src_domain: 34,
src_addr: &"0x9d4454B023096f34B160D6B654540c56A1F81688"
.parse::<H160>()
.unwrap()
.into(),
dst_domain: 5456,
dst_addr: &H256::default()
},
false
))
} }
#[test] #[test]
fn config_with_address() { fn config_with_address() {
let whitelist: Whitelist = serde_json::from_str(r#"[{"sourceAddress": "0x9d4454B023096f34B160D6B654540c56A1F81688", "destinationAddress": "9d4454B023096f34B160D6B654540c56A1F81688"}]"#).unwrap(); let list: MatchingList = serde_json::from_str(r#"[{"sourceAddress": "0x9d4454B023096f34B160D6B654540c56A1F81688", "destinationAddress": "9d4454B023096f34B160D6B654540c56A1F81688"}]"#).unwrap();
assert!(whitelist.0.is_some()); assert!(list.0.is_some());
assert_eq!(whitelist.0.as_ref().unwrap().len(), 1); assert_eq!(list.0.as_ref().unwrap().len(), 1);
let elem = &whitelist.0.as_ref().unwrap()[0]; let elem = &list.0.as_ref().unwrap()[0];
assert_eq!(elem.destination_domain, Wildcard); assert_eq!(elem.dst_domain, Wildcard);
assert_eq!( assert_eq!(
elem.destination_address, elem.dst_address,
Enumerated(vec!["0x9d4454B023096f34B160D6B654540c56A1F81688" Enumerated(vec!["0x9d4454B023096f34B160D6B654540c56A1F81688"
.parse::<H160>() .parse::<H160>()
.unwrap() .unwrap()
.into()]) .into()])
); );
assert_eq!(elem.source_domain, Wildcard); assert_eq!(elem.src_domain, Wildcard);
assert_eq!( assert_eq!(
elem.source_address, elem.src_address,
Enumerated(vec!["0x9d4454B023096f34B160D6B654540c56A1F81688" Enumerated(vec!["0x9d4454B023096f34B160D6B654540c56A1F81688"
.parse::<H160>() .parse::<H160>()
.unwrap() .unwrap()
.into()]) .into()])
); );
assert!(list.matches(
MatchInfo {
src_domain: 34,
src_addr: &"0x9d4454B023096f34B160D6B654540c56A1F81688"
.parse::<H160>()
.unwrap()
.into(),
dst_domain: 5456,
dst_addr: &"9d4454B023096f34B160D6B654540c56A1F81688"
.parse::<H160>()
.unwrap()
.into()
},
false
));
assert!(!list.matches(
MatchInfo {
src_domain: 34,
src_addr: &"0x9d4454B023096f34B160D6B654540c56A1F81688"
.parse::<H160>()
.unwrap()
.into(),
dst_domain: 5456,
dst_addr: &H256::default()
},
false
));
} }
#[test] #[test]
fn config_with_multiple_domains() { fn config_with_multiple_domains() {
let whitelist: Whitelist = let whitelist: MatchingList =
serde_json::from_str(r#"[{"destinationDomain": ["13372", "13373"]}]"#).unwrap(); serde_json::from_str(r#"[{"destinationDomain": ["13372", "13373"]}]"#).unwrap();
assert!(whitelist.0.is_some()); assert!(whitelist.0.is_some());
assert_eq!(whitelist.0.as_ref().unwrap().len(), 1); assert_eq!(whitelist.0.as_ref().unwrap().len(), 1);
let elem = &whitelist.0.as_ref().unwrap()[0]; let elem = &whitelist.0.as_ref().unwrap()[0];
assert_eq!(elem.destination_domain, Enumerated(vec![13372, 13373])); assert_eq!(elem.dst_domain, Enumerated(vec![13372, 13373]));
assert_eq!(elem.destination_address, Wildcard); assert_eq!(elem.dst_address, Wildcard);
assert_eq!(elem.source_domain, Wildcard); assert_eq!(elem.src_domain, Wildcard);
assert_eq!(elem.source_address, Wildcard); assert_eq!(elem.src_address, Wildcard);
}
#[test]
fn matches_empty_list() {
let info = MatchInfo {
src_domain: 0,
src_addr: &H256::default(),
dst_domain: 0,
dst_addr: &H256::default(),
};
// whitelist use
assert!(MatchingList(None).matches(info, true));
// blacklist use
assert!(!MatchingList(None).matches(info, false));
} }
} }

@ -2,7 +2,7 @@
use abacus_base::decl_settings; use abacus_base::decl_settings;
pub mod whitelist; pub mod matching_list;
decl_settings!(Relayer { decl_settings!(Relayer {
/// The polling interval to check for new signed checkpoints in seconds /// The polling interval to check for new signed checkpoints in seconds
@ -11,6 +11,10 @@ decl_settings!(Relayer {
maxprocessingretries: String, maxprocessingretries: String,
/// The multisig checkpoint syncer configuration /// The multisig checkpoint syncer configuration
multisigcheckpointsyncer: abacus_base::MultisigCheckpointSyncerConf, multisigcheckpointsyncer: abacus_base::MultisigCheckpointSyncerConf,
/// This is optional. See `Whitelist` for more. /// This is optional. If no whitelist is provided ALL messages will be considered on the
/// whitelist.
whitelist: Option<String>, whitelist: Option<String>,
/// This is optional. If no blacklist is provided ALL will be considered to not be on
/// the blacklist.
blacklist: Option<String>,
}); });

Loading…
Cancel
Save