Implement single signer task in validator (#2342)

### Description

Implements singleton signer which allows multiple tasks to use a shared
signer that is signing on a single task and migrates validator to use a
singleton KMS signer

### Drive-by changes

Modifies base agent to allow self to be mutable in run

### Related issues

- Fixes error from sharing KMS signer connection across validator tasks
https://github.com/rusoto/rusoto/issues/1766

### Backward compatibility

Yes

### Testing

E2E Tests

---------

Co-authored-by: Mattie Conover <git@mconover.dev>
pull/2374/head agents-2023-06-08
Yorke Rhodes 1 year ago committed by GitHub
parent eb17a9af15
commit a93daec7a2
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 2
      rust/agents/relayer/src/relayer.rs
  2. 2
      rust/agents/scraper/src/agent.rs
  3. 14
      rust/agents/validator/src/submit.rs
  4. 47
      rust/agents/validator/src/validator.rs
  5. 5
      rust/chains/hyperlane-ethereum/src/lib.rs
  6. 93
      rust/chains/hyperlane-ethereum/src/singleton_signer.rs
  7. 2
      rust/hyperlane-base/src/agent.rs

@ -234,7 +234,7 @@ impl BaseAgent for Relayer {
}
#[allow(clippy::async_yields_async)]
async fn run(&self) -> Instrumented<JoinHandle<Result<()>>> {
async fn run(self) -> Instrumented<JoinHandle<Result<()>>> {
let mut tasks = vec![];
// send channels by destination chain

@ -152,7 +152,7 @@ impl BaseAgent for Scraper {
}
#[allow(clippy::async_yields_async)]
async fn run(&self) -> Instrumented<JoinHandle<eyre::Result<()>>> {
async fn run(self) -> Instrumented<JoinHandle<eyre::Result<()>>> {
let mut tasks = Vec::with_capacity(self.scrapers.len());
for domain in self.scrapers.keys() {
tasks.push(self.scrape(*domain).await);

@ -4,25 +4,23 @@ use std::time::{Duration, Instant};
use std::vec;
use eyre::Result;
use hyperlane_base::db::HyperlaneRocksDB;
use hyperlane_core::accumulator::incremental::IncrementalMerkle;
use prometheus::IntGauge;
use tokio::time::sleep;
use tracing::instrument;
use tracing::{debug, info};
use hyperlane_base::{CheckpointSyncer, CoreMetrics};
use hyperlane_base::{db::HyperlaneRocksDB, CheckpointSyncer, CoreMetrics};
use hyperlane_core::{
Checkpoint, CheckpointWithMessageId, HyperlaneChain, HyperlaneContract, HyperlaneDomain,
HyperlaneSigner, HyperlaneSignerExt, Mailbox,
accumulator::incremental::IncrementalMerkle, Checkpoint, CheckpointWithMessageId,
HyperlaneChain, HyperlaneContract, HyperlaneDomain, HyperlaneSignerExt, Mailbox,
};
use hyperlane_ethereum::SingletonSignerHandle;
#[derive(Clone)]
pub(crate) struct ValidatorSubmitter {
interval: Duration,
reorg_period: Option<NonZeroU64>,
signer: Arc<dyn HyperlaneSigner>,
signer: SingletonSignerHandle,
mailbox: Arc<dyn Mailbox>,
checkpoint_syncer: Arc<dyn CheckpointSyncer>,
message_db: HyperlaneRocksDB,
@ -34,7 +32,7 @@ impl ValidatorSubmitter {
interval: Duration,
reorg_period: u64,
mailbox: Arc<dyn Mailbox>,
signer: Arc<dyn HyperlaneSigner>,
signer: SingletonSignerHandle,
checkpoint_syncer: Arc<dyn CheckpointSyncer>,
message_db: HyperlaneRocksDB,
metrics: ValidatorSubmitterMetrics,

@ -1,24 +1,22 @@
use std::num::NonZeroU64;
use std::sync::Arc;
use std::time::Duration;
use async_trait::async_trait;
use eyre::Result;
use hyperlane_base::db::HyperlaneRocksDB;
use hyperlane_base::MessageContractSync;
use hyperlane_core::accumulator::incremental::IncrementalMerkle;
use std::num::NonZeroU64;
use tokio::{task::JoinHandle, time::sleep};
use tracing::{error, info, info_span, instrument::Instrumented, warn, Instrument};
use hyperlane_base::{
db::DB, run_all, BaseAgent, CheckpointSyncer, ContractSyncMetrics, CoreMetrics,
HyperlaneAgentCore,
db::{HyperlaneRocksDB, DB},
run_all, BaseAgent, CheckpointSyncer, ContractSyncMetrics, CoreMetrics, HyperlaneAgentCore,
MessageContractSync,
};
use hyperlane_core::{
Announcement, HyperlaneChain, HyperlaneContract, HyperlaneDomain, HyperlaneSigner,
HyperlaneSignerExt, Mailbox, ValidatorAnnounce, H256, U256,
accumulator::incremental::IncrementalMerkle, Announcement, HyperlaneChain, HyperlaneContract,
HyperlaneDomain, HyperlaneSigner, HyperlaneSignerExt, Mailbox, ValidatorAnnounce, H256, U256,
};
use hyperlane_ethereum::{SingletonSigner, SingletonSignerHandle};
use crate::{
settings::ValidatorSettings, submit::ValidatorSubmitter, submit::ValidatorSubmitterMetrics,
@ -33,7 +31,9 @@ pub struct Validator {
message_sync: Arc<MessageContractSync>,
mailbox: Arc<dyn Mailbox>,
validator_announce: Arc<dyn ValidatorAnnounce>,
signer: Arc<dyn HyperlaneSigner>,
signer: SingletonSignerHandle,
// temporary holder until `run` is called
signer_instance: Option<Box<SingletonSigner>>,
reorg_period: u64,
interval: Duration,
checkpoint_syncer: Arc<dyn CheckpointSyncer>,
@ -58,12 +58,9 @@ impl BaseAgent for Validator {
let db = DB::from_path(&settings.db)?;
let msg_db = HyperlaneRocksDB::new(&settings.origin_chain, db);
let signer = settings
.validator
// Intentionally using hyperlane_ethereum for the validator's signer
.build::<hyperlane_ethereum::Signers>()
.await
.map(|validator| Arc::new(validator) as Arc<dyn HyperlaneSigner>)?;
// Intentionally using hyperlane_ethereum for the validator's signer
let (signer_instance, signer) = SingletonSigner::new(settings.validator.build().await?);
let core = settings.build_hyperlane_core(metrics.clone());
let checkpoint_syncer = settings.checkpoint_syncer.build(None)?.into();
@ -95,6 +92,7 @@ impl BaseAgent for Validator {
message_sync,
validator_announce: validator_announce.into(),
signer,
signer_instance: Some(Box::new(signer_instance)),
reorg_period: settings.reorg_period,
interval: settings.interval,
checkpoint_syncer,
@ -102,7 +100,20 @@ impl BaseAgent for Validator {
}
#[allow(clippy::async_yields_async)]
async fn run(&self) -> Instrumented<JoinHandle<Result<()>>> {
async fn run(mut self) -> Instrumented<JoinHandle<Result<()>>> {
let mut tasks = vec![];
if let Some(signer_instance) = self.signer_instance.take() {
tasks.push(
tokio::spawn(async move {
signer_instance.run().await;
Ok(())
})
.instrument(info_span!("SingletonSigner")),
);
}
// announce the validator after spawning the signer task
self.announce().await.expect("Failed to announce validator");
let reorg_period = NonZeroU64::new(self.reorg_period);
@ -120,8 +131,6 @@ impl BaseAgent for Validator {
sleep(self.interval).await;
}
let mut tasks = vec![];
tasks.push(self.run_message_sync().await);
for checkpoint_sync_task in self.run_checkpoint_submitters().await {
tasks.push(checkpoint_sync_task);

@ -11,7 +11,7 @@ use ethers::prelude::{abi, Lazy, Middleware};
#[cfg(not(doctest))]
pub use self::{
config::*, interchain_gas::*, interchain_security_module::*, mailbox::*, multisig_ism::*,
provider::*, routing_ism::*, rpc_clients::*, signers::*, trait_builder::*,
provider::*, routing_ism::*, rpc_clients::*, signers::*, singleton_signer::*, trait_builder::*,
validator_announce::*,
};
@ -58,6 +58,9 @@ mod rpc_clients;
mod signers;
#[cfg(not(doctest))]
mod singleton_signer;
mod config;
fn extract_fn_map(abi: &'static Lazy<abi::Abi>) -> HashMap<Vec<u8>, &'static str> {

@ -0,0 +1,93 @@
use std::fmt;
use async_trait::async_trait;
use ethers::core::types::Signature;
use thiserror::Error;
use tokio::sync::{mpsc, oneshot};
use tracing::warn;
use hyperlane_core::{HyperlaneSigner, HyperlaneSignerError, H160, H256};
use crate::Signers;
/// A callback to send the result of a signing operation
type Callback = oneshot::Sender<Result<Signature, HyperlaneSignerError>>;
/// A hash that needs to be signed with a callback to send the result
type SignTask = (H256, Callback);
/// A wrapper around a signer that uses channels to ensure that only one call is
/// made at a time. Mostly useful for the AWS signers.
pub struct SingletonSigner {
inner: Signers,
rx: mpsc::UnboundedReceiver<SignTask>,
}
impl fmt::Debug for SingletonSigner {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
f.debug_tuple("SingletonSigner").field(&self.inner).finish()
}
}
/// A `HyperlaneSigner` which grants access to a singleton signer via a channel.
#[derive(Clone)]
pub struct SingletonSignerHandle {
address: H160,
tx: mpsc::UnboundedSender<SignTask>,
}
impl fmt::Debug for SingletonSignerHandle {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
f.debug_tuple("SingletonSignerHandle")
.field(&self.address)
.finish()
}
}
#[async_trait]
impl HyperlaneSigner for SingletonSignerHandle {
fn eth_address(&self) -> H160 {
self.address
}
async fn sign_hash(&self, hash: &H256) -> Result<Signature, HyperlaneSignerError> {
let (tx, rx) = oneshot::channel();
let task = (*hash, tx);
self.tx.send(task).map_err(SingletonSignerError::from)?;
rx.await.map_err(SingletonSignerError::from)?
}
}
impl SingletonSigner {
/// Create a new singleton signer
pub fn new(inner: Signers) -> (Self, SingletonSignerHandle) {
let (tx, rx) = mpsc::unbounded_channel::<SignTask>();
let address = inner.eth_address();
(Self { inner, rx }, SingletonSignerHandle { address, tx })
}
/// Run this signer's event loop.
pub async fn run(mut self) {
while let Some((hash, tx)) = self.rx.recv().await {
if tx.send(self.inner.sign_hash(&hash).await).is_err() {
warn!(
"Failed to send signature back to the signer handle because the channel was closed"
);
}
}
}
}
/// An error incurred by the SingletonSigner signer
#[derive(Error, Debug)]
enum SingletonSignerError {
#[error("Error sending task to singleton signer {0}")]
ChannelSendError(#[from] mpsc::error::SendError<SignTask>),
#[error("Error receiving response from singleton signer {0}")]
ChannelRecvError(#[from] oneshot::error::RecvError),
}
impl From<SingletonSignerError> for HyperlaneSignerError {
fn from(e: SingletonSignerError) -> Self {
Self::from(Box::new(e) as Box<_>)
}
}

@ -43,7 +43,7 @@ pub trait BaseAgent: Send + Sync + Debug {
/// Start running this agent.
#[allow(clippy::async_yields_async)]
async fn run(&self) -> Instrumented<JoinHandle<Result<()>>>;
async fn run(self) -> Instrumented<JoinHandle<Result<()>>>;
}
/// Call this from `main` to fully initialize and run the agent for its entire

Loading…
Cancel
Save