Add relayer scaffold (#225)

* Add relayer scaffold

* Lint

* Add Outbox indexing to relayer and determine messages destined for inbox (#227)

* Add Outbox indexing to relayer and determine messages destined for inbox

* Lint

* PR review

* Support message processing in relayer (#232)

* Support message processing in relayer

* Lint

* lint

* PR review

* Add clarifying comment

* Be more specific between polling_interval and submission_latency
pull/276/head
Nam Chu Hoai 3 years ago committed by GitHub
parent 9a7de7f117
commit d3fbd1f9f9
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 139
      rust/abacus-base/src/contract_sync/mod.rs
  2. 9
      rust/abacus-base/src/inbox.rs
  3. 7
      rust/abacus-core/src/db/abacus_db.rs
  4. 6
      rust/abacus-core/src/traits/inbox.rs
  5. 33
      rust/agents/processor/src/prover_sync.rs
  6. 162
      rust/agents/relayer/src/checkpoint_relayer.rs
  7. 7
      rust/agents/relayer/src/main.rs
  8. 176
      rust/agents/relayer/src/prover.rs
  9. 184
      rust/agents/relayer/src/relayer.rs
  10. 6
      rust/agents/relayer/src/settings.rs
  11. 226
      rust/agents/relayer/src/tip_prover.rs

@ -378,15 +378,150 @@ impl<I> ContractSync<I>
where
I: OutboxIndexer + 'static,
{
/// TODO: Not implemented
/// Sync outbox messages
pub fn sync_outbox_messages(
&self,
) -> Instrumented<tokio::task::JoinHandle<color_eyre::Result<()>>> {
let span = info_span!("MessageContractSync");
let db = self.db.clone();
let indexer = self.indexer.clone();
let indexed_height = self.metrics.indexed_height.clone().with_label_values(&[
MESSAGES_LABEL,
&self.contract_name,
&self.agent_name,
]);
let stored_messages = self.metrics.stored_events.clone().with_label_values(&[
MESSAGES_LABEL,
&self.contract_name,
&self.agent_name,
]);
let missed_messages = self.metrics.missed_events.clone().with_label_values(&[
MESSAGES_LABEL,
&self.contract_name,
&self.agent_name,
]);
let message_leaf_index = self.metrics.message_leaf_index.clone();
let config_from = self.index_settings.from();
let chunk_size = self.index_settings.chunk_size();
tokio::spawn(async move {
let mut from = db
.retrieve_message_latest_block_end()
.map_or_else(|| config_from, |h| h + 1);
let mut finding_missing = false;
let mut realized_missing_start_block = 0;
let mut realized_missing_end_block = 0;
let mut exponential = 0;
info!(from = from, "[Messages]: resuming indexer from {}", from);
// Set the metrics with the latest known leaf index
if let Ok(Some(idx)) = db.retrieve_latest_leaf_index() {
if let Some(gauge) = message_leaf_index.as_ref() {
gauge.set(idx as i64);
}
}
loop {
sleep(Duration::from_secs(1)).await;
indexed_height.set(from as i64);
// If we were searching for missing message and have reached
// original missing start block, turn off finding_missing and
// TRY to resume normal indexing
if finding_missing && from >= realized_missing_start_block {
info!("Turning off finding_missing mode");
finding_missing = false;
}
// If we have passed the end block of the missing message, we
// have found the message and can reset variables
if from > realized_missing_end_block && realized_missing_end_block != 0 {
missed_messages.inc();
exponential = 0;
realized_missing_start_block = 0;
realized_missing_end_block = 0;
}
let tip = indexer.get_block_number().await?;
if tip <= from {
// TODO: Make this configurable
// Sleep if caught up to tip
sleep(Duration::from_secs(1)).await;
continue;
}
let candidate = from + chunk_size;
let to = min(tip, candidate);
info!(
from = from,
to = to,
"[Messages]: indexing block heights {}...{}",
from,
to
);
let sorted_messages = indexer.fetch_sorted_messages(from, to).await?;
// If no messages found, update last seen block and next height
// and continue
if sorted_messages.is_empty() {
db.store_message_latest_block_end(to)?;
from = to + 1;
continue;
}
// If messages found, check that list is valid
let last_leaf_index: OptLatestLeafIndex = db.retrieve_latest_leaf_index()?.into();
match &last_leaf_index.valid_continuation(&sorted_messages) {
ListValidity::Valid => {
// Store messages
let max_leaf_index_of_batch = db.store_messages(&sorted_messages)?;
// Report amount of messages stored into db
stored_messages.add(sorted_messages.len().try_into()?);
// Report latest leaf index to gauge
if let Some(gauge) = message_leaf_index.as_ref() {
gauge.set(max_leaf_index_of_batch as i64);
}
// Move forward next height
db.store_message_latest_block_end(to)?;
from = to + 1;
}
ListValidity::Invalid => {
if finding_missing {
from = to + 1;
} else {
warn!(
last_leaf_index = ?last_leaf_index,
start_block = from,
end_block = to,
"[Messages]: RPC failed to find message(s) between blocks {}...{}. Last seen leaf index: {:?}. Activating finding_missing mode.",
from,
to,
last_leaf_index,
);
// Turn on finding_missing mode
finding_missing = true;
realized_missing_start_block = from;
realized_missing_end_block = to;
from = realized_missing_start_block - (chunk_size * 2u32.pow(exponential as u32));
exponential += 1;
}
}
ListValidity::Empty => unreachable!("Tried to validate empty list of messages"),
};
}
})
.instrument(span)

@ -88,6 +88,15 @@ impl Inbox for CachingInbox {
self.inbox.process(message).await
}
/// Prove a leaf in the inbox and then process its message
async fn prove_and_process(
&self,
message: &AbacusMessage,
proof: &Proof,
) -> Result<TxOutcome, ChainCommunicationError> {
self.inbox.prove_and_process(message, proof).await
}
async fn message_status(&self, leaf: H256) -> Result<MessageStatus, ChainCommunicationError> {
self.inbox.message_status(leaf).await
}

@ -355,15 +355,12 @@ impl AbacusDB {
// TODO(james): this is a quick-fix for the prover_sync and I don't like it
/// poll db ever 100 milliseconds waitinf for a leaf.
pub fn wait_for_leaf(
&self,
leaf_index: u32,
) -> impl Future<Output = Result<Option<H256>, DbError>> {
pub fn wait_for_leaf(&self, leaf_index: u32) -> impl Future<Output = Result<H256, DbError>> {
let slf = self.clone();
async move {
loop {
if let Some(leaf) = slf.leaf_by_leaf_index(leaf_index)? {
return Ok(Some(leaf));
return Ok(leaf);
}
sleep(Duration::from_millis(100)).await
}

@ -25,11 +25,7 @@ pub trait Inbox: AbacusCommon + Send + Sync + std::fmt::Debug {
&self,
message: &AbacusMessage,
proof: &Proof,
) -> Result<TxOutcome, ChainCommunicationError> {
self.prove(proof).await?;
Ok(self.process(message).await?)
}
) -> Result<TxOutcome, ChainCommunicationError>;
/// Fetch the status of a message
async fn message_status(&self, leaf: H256) -> Result<MessageStatus, ChainCommunicationError>;

@ -155,10 +155,7 @@ impl ProverSync {
for i in range {
let leaf = self.db.wait_for_leaf(i as u32).await?;
if leaf.is_none() {
break;
}
leaves.push(leaf.unwrap());
leaves.push(leaf);
}
Ok(leaves)
@ -262,22 +259,18 @@ impl ProverSync {
// As we fill the incremental merkle, its tree_size will always be
// equal to the index of the next leaf we want (e.g. if tree_size
// is 3, we want the 4th leaf, which is at index 3)
if let Some(leaf) = self.db.wait_for_leaf(tree_size as u32).await? {
info!(
index = tree_size,
leaf = ?leaf,
"Leaf at index {} is {}",
tree_size,
leaf
);
incremental.ingest(leaf);
leaves.push(leaf);
current_root = incremental.root();
} else {
// break on no leaf (warn: path never taken)
current_root = incremental.root();
break;
}
let leaf = self.db.wait_for_leaf(tree_size as u32).await?;
info!(
index = tree_size,
leaf = ?leaf,
"Leaf at index {} is {}",
tree_size,
leaf
);
incremental.ingest(leaf);
leaves.push(leaf);
current_root = incremental.root();
tree_size = incremental.count();
}

@ -0,0 +1,162 @@
use std::{sync::Arc, time::Duration};
use abacus_base::{CachingInbox, CheckpointSyncer, LocalStorage};
use abacus_core::{db::AbacusDB, AbacusCommon, CommittedMessage, Inbox};
use color_eyre::Result;
use tokio::{task::JoinHandle, time::sleep};
use tracing::{debug, info_span, instrument::Instrumented, Instrument};
use crate::tip_prover::{MessageBatch, TipProver};
pub(crate) struct CheckpointRelayer {
polling_interval: u64,
/// The minimum latency in seconds between two relayed checkpoints on the inbox
submission_latency: u64,
db: AbacusDB,
inbox: Arc<CachingInbox>,
prover_sync: TipProver,
}
impl CheckpointRelayer {
pub(crate) fn new(
polling_interval: u64,
submission_latency: u64,
db: AbacusDB,
inbox: Arc<CachingInbox>,
) -> Self {
Self {
polling_interval,
submission_latency,
prover_sync: TipProver::from_disk(db.clone()),
db,
inbox,
}
}
/// Only gets the messages desinated for the Relayers inbox
async fn get_messages_between(
&self,
from_leaf_index: u32,
to_leaf_index: u32,
) -> Result<Option<Vec<CommittedMessage>>> {
let mut messages: Vec<CommittedMessage> = vec![];
let mut current_leaf_index = from_leaf_index;
while current_leaf_index <= to_leaf_index {
// Relies on the indexer finding this message eventually
self.db.wait_for_leaf(current_leaf_index).await?;
let maybe_message = self
.db
.message_by_leaf_index(current_leaf_index)?
.map(CommittedMessage::try_from)
.transpose()?;
match maybe_message {
Some(message) => {
if message.message.destination == self.inbox.local_domain() {
messages.push(message);
}
}
// This should never happen, but if it does, retry the range
None => return Ok(None),
}
current_leaf_index += 1
}
Ok(Some(messages))
}
// Returns the newest "current" checkpoint index
async fn submit_checkpoint_and_messages(
&mut self,
local_storage: &LocalStorage,
onchain_checkpoint_index: u32,
signed_checkpoint_index: u32,
messages: Vec<CommittedMessage>,
) -> Result<u32> {
// If the checkpoint storage is inconsistent, then this arm won't match
// and it will cause us to have skipped this message batch
if let Some(latest_signed_checkpoint) = local_storage
.fetch_checkpoint(signed_checkpoint_index)
.await?
{
let batch = MessageBatch::new(
messages,
onchain_checkpoint_index,
latest_signed_checkpoint.clone(),
);
self.prover_sync.update_from_batch(&batch)?;
self.inbox
.submit_checkpoint(&latest_signed_checkpoint)
.await?;
// TODO: sign in parallel
for message in &batch.messages {
if let Some(proof) = self.db.proof_by_leaf_index(message.leaf_index)? {
self.inbox
.prove_and_process(&message.message, &proof)
.await?;
}
}
// Sleep latency period after submission
sleep(Duration::from_secs(self.submission_latency)).await;
Ok(latest_signed_checkpoint.checkpoint.index)
} else {
Ok(onchain_checkpoint_index)
}
}
pub(crate) fn spawn(mut self) -> Instrumented<JoinHandle<Result<()>>> {
let span = info_span!("CheckpointRelayer");
let local_storage = LocalStorage {
path: "/tmp/validatorsignatures".to_string(),
};
tokio::spawn(async move {
let latest_inbox_checkpoint = self.inbox.latest_checkpoint(None).await?;
let mut onchain_checkpoint_index = latest_inbox_checkpoint.index;
// Checkpoints are 1-indexed, while leaves are 0-indexed
let mut next_inbox_leaf_index = onchain_checkpoint_index;
loop {
sleep(Duration::from_secs(self.polling_interval)).await;
if let Some(signed_checkpoint_index) = local_storage.latest_index().await? {
if signed_checkpoint_index <= onchain_checkpoint_index {
debug!(
onchain = onchain_checkpoint_index,
signed = signed_checkpoint_index,
"Signed checkpoint matches known checkpoint on-chain, continue"
);
continue;
}
match self
.get_messages_between(next_inbox_leaf_index, signed_checkpoint_index)
.await?
{
None => debug!("Couldn't fetch the relevant messages, retry this range"),
Some(messages) if messages.is_empty() => {
next_inbox_leaf_index = signed_checkpoint_index;
debug!("New checkpoint does not include messages for inbox")
}
Some(messages) => {
next_inbox_leaf_index = signed_checkpoint_index;
debug!(
len = messages.len(),
"Signed checkpoint allows for processing of new messages"
);
onchain_checkpoint_index = self
.submit_checkpoint_and_messages(
&local_storage,
onchain_checkpoint_index,
signed_checkpoint_index,
messages,
)
.await?;
}
}
}
}
})
.instrument(span)
}
}

@ -7,12 +7,15 @@
#![warn(missing_docs)]
#![warn(unused_extern_crates)]
mod checkpoint_relayer;
mod prover;
mod relayer;
mod settings;
mod tip_prover;
use color_eyre::Result;
use abacus_base::AbacusAgent;
use abacus_base::Agent;
use crate::{relayer::Relayer, settings::RelayerSettings as Settings};
@ -30,7 +33,7 @@ async fn _main() -> Result<()> {
let _ = agent.metrics().run_http_server();
agent.run_all().await??;
agent.run().await??;
Ok(())
}

@ -0,0 +1,176 @@
//! Prover process: generate proofs in the tree.
//!
//! Struct responsible for syncing Prover
use ethers::core::types::H256;
use abacus_core::accumulator::{
merkle::{merkle_root_from_branch, MerkleTree, MerkleTreeError, Proof},
TREE_DEPTH,
};
/// A depth-32 sparse Merkle tree capable of producing proofs for arbitrary
/// elements.
#[derive(Debug)]
pub struct Prover {
count: usize,
tree: MerkleTree,
}
/// Prover Errors
#[derive(Debug, thiserror::Error)]
pub enum ProverError {
/// Index is above tree max size
#[error("Requested proof for index above u32::MAX: {0}")]
IndexTooHigh(usize),
/// Requested proof for a zero element
#[error("Requested proof for a zero element. Requested: {index}. Tree has: {count}")]
ZeroProof {
/// The index requested
index: usize,
/// The number of leaves
count: usize,
},
/// Bubbled up from underlying
#[error(transparent)]
MerkleTreeError(#[from] MerkleTreeError),
/// Failed proof verification
#[error("Proof verification failed. Root is {expected}, produced is {actual}")]
#[allow(dead_code)]
VerificationFailed {
/// The expected root (this tree's current root)
expected: H256,
/// The root produced by branch evaluation
actual: H256,
},
}
impl Default for Prover {
fn default() -> Self {
let full = MerkleTree::create(&[], TREE_DEPTH);
Self {
count: 0,
tree: full,
}
}
}
impl Prover {
/// Push a leaf to the tree. Appends it to the first unoccupied slot
///
/// This will fail if the underlying tree is full.
pub fn ingest(&mut self, element: H256) -> Result<H256, ProverError> {
self.count += 1;
self.tree.push_leaf(element, TREE_DEPTH)?;
Ok(self.tree.hash())
}
/// Return the current root hash of the tree
pub fn root(&self) -> H256 {
self.tree.hash()
}
/// Return the number of leaves that have been ingested
pub fn count(&self) -> usize {
self.count
}
/// Create a proof of a leaf in this tree.
///
/// Note, if the tree ingests more leaves, the root will need to be recalculated.
pub fn prove(&self, index: usize) -> Result<Proof, ProverError> {
if index > u32::MAX as usize {
return Err(ProverError::IndexTooHigh(index));
}
let count = self.count();
if index >= count {
return Err(ProverError::ZeroProof { index, count });
}
let (leaf, hashes) = self.tree.generate_proof(index, TREE_DEPTH);
let mut path = [H256::zero(); 32];
path.copy_from_slice(&hashes[..32]);
Ok(Proof { leaf, index, path })
}
/// Verify a proof against this tree's root.
#[allow(dead_code)]
pub fn verify(&self, proof: &Proof) -> Result<(), ProverError> {
let actual = merkle_root_from_branch(proof.leaf, &proof.path, TREE_DEPTH, proof.index);
let expected = self.root();
if expected == actual {
Ok(())
} else {
Err(ProverError::VerificationFailed { expected, actual })
}
}
}
impl<T> From<T> for Prover
where
T: AsRef<[H256]>,
{
fn from(t: T) -> Self {
let slice = t.as_ref();
Self {
count: slice.len(),
tree: MerkleTree::create(slice, TREE_DEPTH),
}
}
}
impl std::iter::FromIterator<H256> for Prover {
/// Will panic if the tree fills
fn from_iter<I: IntoIterator<Item = H256>>(iter: I) -> Self {
let mut prover = Self::default();
prover.extend(iter);
prover
}
}
impl std::iter::Extend<H256> for Prover {
/// Will panic if the tree fills
fn extend<I: IntoIterator<Item = H256>>(&mut self, iter: I) {
for i in iter {
self.ingest(i).expect("!tree full");
}
}
}
#[cfg(test)]
mod test {
use super::*;
use abacus_core::test_utils;
use ethers::utils::hash_message;
#[test]
fn it_produces_and_verifies_proofs() {
let test_cases = test_utils::load_merkle_test_json();
for test_case in test_cases.iter() {
let mut tree = Prover::default();
// insert the leaves
for leaf in test_case.leaves.iter() {
let hashed_leaf = hash_message(leaf);
tree.ingest(hashed_leaf).unwrap();
}
// assert the tree has the proper leaf count
assert_eq!(tree.count(), test_case.leaves.len());
// assert the tree generates the proper root
let root = tree.root(); // root is type H256
assert_eq!(root, test_case.expected_root);
for n in 0..test_case.leaves.len() {
// assert the tree generates the proper proof for this leaf
let proof = tree.prove(n).unwrap();
assert_eq!(proof, test_case.proofs[n]);
// check that the tree can verify the proof for this leaf
tree.verify(&proof).unwrap();
}
}
}
}

@ -1,119 +1,24 @@
use async_trait::async_trait;
use color_eyre::{eyre::bail, Result};
use std::{sync::Arc, time::Duration};
use tokio::{sync::Mutex, task::JoinHandle, time::sleep};
use tracing::{info, instrument::Instrumented, Instrument};
use color_eyre::{eyre::Context, Result};
use std::sync::Arc;
use tokio::task::JoinHandle;
use tracing::{instrument::Instrumented, Instrument};
use abacus_base::{AbacusAgent, AgentCore, CachingHome, CachingReplica};
use abacus_core::{Common, CommonEvents};
use abacus_base::{AbacusAgentCore, Agent, CachingInbox, ContractSyncMetrics};
use crate::settings::RelayerSettings as Settings;
const AGENT_NAME: &str = "relayer";
#[derive(Debug)]
struct UpdatePoller {
duration: Duration,
home: Arc<CachingHome>,
replica: Arc<CachingReplica>,
semaphore: Mutex<()>,
updates_relayed_count: Arc<prometheus::IntCounterVec>,
}
impl std::fmt::Display for UpdatePoller {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
write!(
f,
"UpdatePoller: {{ home: {:?}, replica: {:?} }}",
self.home, self.replica
)
}
}
impl UpdatePoller {
fn new(
home: Arc<CachingHome>,
replica: Arc<CachingReplica>,
duration: u64,
updates_relayed_count: Arc<prometheus::IntCounterVec>,
) -> Self {
Self {
home,
replica,
duration: Duration::from_secs(duration),
semaphore: Mutex::new(()),
updates_relayed_count,
}
}
#[tracing::instrument(err, skip(self), fields(self = %self))]
async fn poll_and_relay_update(&self) -> Result<()> {
// Get replica's current root.
let old_root = self.replica.committed_root().await?;
info!(
"Replica {} latest root is: {}",
self.replica.name(),
old_root
);
// Check for first signed update building off of the replica's current root
let signed_update_opt = self.home.signed_update_by_old_root(old_root).await?;
// If signed update exists for replica's committed root, try to
// relay
if let Some(signed_update) = signed_update_opt {
info!(
"Update for replica {}. Root {} to {}",
self.replica.name(),
&signed_update.update.previous_root,
&signed_update.update.new_root,
);
// Attempt to acquire lock for submitting tx
let lock = self.semaphore.try_lock();
if lock.is_err() {
return Ok(()); // tx in flight. just do nothing
}
// Relay update and increment counters if tx successful
if self.replica.update(&signed_update).await.is_ok() {
self.updates_relayed_count
.with_label_values(&[self.home.name(), self.replica.name(), AGENT_NAME])
.inc();
}
// lock dropped here
} else {
info!(
"No update. Current root for replica {} is {}",
self.replica.name(),
old_root
);
}
Ok(())
}
fn spawn(self) -> JoinHandle<Result<()>> {
tokio::spawn(async move {
loop {
self.poll_and_relay_update().await?;
sleep(self.duration).await;
}
})
}
}
use crate::{checkpoint_relayer::CheckpointRelayer, settings::RelayerSettings as Settings};
/// A relayer agent
#[derive(Debug)]
pub struct Relayer {
duration: u64,
core: AgentCore,
polling_interval: u64,
submission_latency: u64,
core: AbacusAgentCore,
updates_relayed_count: Arc<prometheus::IntCounterVec>,
}
impl AsRef<AgentCore> for Relayer {
fn as_ref(&self) -> &AgentCore {
impl AsRef<AbacusAgentCore> for Relayer {
fn as_ref(&self) -> &AbacusAgentCore {
&self.core
}
}
@ -121,7 +26,7 @@ impl AsRef<AgentCore> for Relayer {
#[allow(clippy::unit_arg)]
impl Relayer {
/// Instantiate a new relayer
pub fn new(duration: u64, core: AgentCore) -> Self {
pub fn new(polling_interval: u64, submission_latency: u64, core: AbacusAgentCore) -> Self {
let updates_relayed_count = Arc::new(
core.metrics
.new_int_counter(
@ -133,7 +38,8 @@ impl Relayer {
);
Self {
duration,
polling_interval,
submission_latency,
core,
updates_relayed_count,
}
@ -142,7 +48,7 @@ impl Relayer {
#[async_trait]
#[allow(clippy::unit_arg)]
impl AbacusAgent for Relayer {
impl Agent for Relayer {
const AGENT_NAME: &'static str = "relayer";
type Settings = Settings;
@ -152,31 +58,53 @@ impl AbacusAgent for Relayer {
Self: Sized,
{
Ok(Self::new(
settings.interval.parse().expect("invalid uint"),
settings.as_ref().try_into_core("relayer").await?,
settings.pollinginterval.parse().unwrap_or(5),
settings.submissionlatency.parse().expect("invalid uint"),
settings
.as_ref()
.try_into_abacus_core(Self::AGENT_NAME)
.await?,
))
}
}
#[tracing::instrument]
fn run(&self, name: &str) -> Instrumented<JoinHandle<Result<()>>> {
let replica_opt = self.replica_by_name(name);
let home = self.home();
let updates_relayed_count = self.updates_relayed_count.clone();
impl Relayer {
fn run_contract_sync(&self) -> Instrumented<JoinHandle<Result<()>>> {
let sync_metrics = ContractSyncMetrics::new(self.metrics(), None);
let sync = self.outbox().sync(
Self::AGENT_NAME.to_string(),
self.as_ref().indexer.clone(),
sync_metrics,
);
sync
}
fn run_inbox(&self, inbox: Arc<CachingInbox>) -> Instrumented<JoinHandle<Result<()>>> {
let db = self.outbox().db();
let submit =
CheckpointRelayer::new(self.polling_interval, self.submission_latency, db, inbox);
self.run_all(vec![submit.spawn()])
}
let name = name.to_owned();
let duration = self.duration;
fn wrap_inbox_run(
&self,
inbox_name: &str,
inbox: Arc<CachingInbox>,
) -> Instrumented<JoinHandle<Result<()>>> {
let m = format!("Task for inbox named {} failed", inbox_name);
let handle = self.run_inbox(inbox).in_current_span();
let fut = async move { handle.await?.wrap_err(m) };
tokio::spawn(async move {
if replica_opt.is_none() {
bail!("No replica named {}", name);
}
let replica = replica_opt.unwrap();
tokio::spawn(fut).in_current_span()
}
let update_poller =
UpdatePoller::new(home, replica.clone(), duration, updates_relayed_count);
update_poller.spawn().await?
})
.in_current_span()
pub fn run(&self) -> Instrumented<JoinHandle<Result<()>>> {
let mut inbox_tasks: Vec<Instrumented<JoinHandle<Result<()>>>> = self
.inboxes()
.iter()
.map(|(inbox_name, inbox)| self.wrap_inbox_run(inbox_name, inbox.clone()))
.collect();
inbox_tasks.push(self.run_contract_sync());
self.run_all(inbox_tasks)
}
}

@ -3,6 +3,8 @@
use abacus_base::decl_settings;
decl_settings!(Relayer {
/// The polling interval (in seconds)
interval: String,
/// The polling interval to check for new checkpoints in seconds
pollinginterval: String,
/// The minimum latency in seconds between two relayed checkpoints on the inbox
submissionlatency: String,
});

@ -0,0 +1,226 @@
use crate::prover::{Prover, ProverError};
use abacus_core::{
accumulator::incremental::IncrementalMerkle,
db::{AbacusDB, DbError},
ChainCommunicationError, CommittedMessage, SignedCheckpoint,
};
use color_eyre::eyre::Result;
use ethers::core::types::H256;
use std::fmt::Display;
use tracing::{debug, error, info, instrument};
/// Struct to update prover
pub struct MessageBatch {
/// Messages
pub messages: Vec<CommittedMessage>,
current_checkpoint_index: u32,
signed_target_checkpoint: SignedCheckpoint,
}
impl MessageBatch {
pub fn new(
messages: Vec<CommittedMessage>,
current_checkpoint_index: u32,
signed_target_checkpoint: SignedCheckpoint,
) -> Self {
Self {
messages,
current_checkpoint_index,
signed_target_checkpoint,
}
}
}
/// Struct to sync prover.
#[derive(Debug)]
pub struct TipProver {
db: AbacusDB,
prover: Prover,
incremental: IncrementalMerkle,
}
impl Display for TipProver {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
write!(f, "TipProver {{ ")?;
write!(
f,
"incremental: {{ root: {:?}, size: {} }}, ",
self.incremental.root(),
self.incremental.count()
)?;
write!(
f,
"prover: {{ root: {:?}, size: {} }} ",
self.prover.root(),
self.prover.count()
)?;
write!(f, "}}")?;
Ok(())
}
}
/// TipProver errors
#[derive(Debug, thiserror::Error)]
pub enum TipProverError {
/// Local tree up-to-date but root does not match signed checkpoint"
#[error("Local tree up-to-date but root does not match checkpoint. Local root: {prover_root}. checkpoint root: {checkpoint_root}. WARNING: this could indicate malicious validator and/or long reorganization process!")]
MismatchedRoots {
/// Root of prover's local merkle tree
prover_root: H256,
/// Root of the incremental merkle tree
incremental_root: H256,
/// New root contained in signed checkpoint
checkpoint_root: H256,
},
/// Leaf index was not found in DB, despite batch providing messages after
#[error("Leaf index was not found {leaf_index:?}")]
UnavailableLeaf {
/// Root of prover's local merkle tree
leaf_index: u32,
},
/// TipProver attempts Prover operation and receives ProverError
#[error(transparent)]
ProverError(#[from] ProverError),
/// TipProver receives ChainCommunicationError from chain API
#[error(transparent)]
ChainCommunicationError(#[from] ChainCommunicationError),
/// DB Error
#[error("{0}")]
DbError(#[from] DbError),
}
impl TipProver {
fn store_proof(&self, leaf_index: u32) -> Result<(), TipProverError> {
match self.prover.prove(leaf_index as usize) {
Ok(proof) => {
self.db.store_proof(leaf_index, &proof)?;
info!(
leaf_index,
root = ?self.prover.root(),
"Storing proof for leaf {}",
leaf_index
);
Ok(())
}
// ignore the storage request if it's out of range (e.g. leaves
// up-to-date but no update containing leaves produced yet)
Err(ProverError::ZeroProof { index: _, count: _ }) => Ok(()),
// bubble up any other errors
Err(e) => Err(e.into()),
}
}
/// Given rocksdb handle `db` containing merkle tree leaves,
/// instantiates new `TipProver` and fills prover's merkle tree
#[instrument(level = "debug", skip(db))]
pub fn from_disk(db: AbacusDB) -> Self {
// Ingest all leaves in db into prover tree
let mut prover = Prover::default();
let mut incremental = IncrementalMerkle::default();
if let Some(root) = db.retrieve_latest_root().expect("db error") {
for i in 0.. {
match db.leaf_by_leaf_index(i) {
Ok(Some(leaf)) => {
debug!(leaf_index = i, "Ingesting leaf from_disk");
prover.ingest(leaf).expect("!tree full");
incremental.ingest(leaf);
assert_eq!(prover.root(), incremental.root());
if prover.root() == root {
break;
}
}
Ok(None) => break,
Err(e) => {
error!(error = %e, "Error in TipProver::from_disk");
panic!("Error in TipProver::from_disk");
}
}
}
info!(target_latest_root = ?root, root = ?incremental.root(), "Reloaded TipProver from disk");
}
let sync = Self {
prover,
incremental,
db,
};
// Ensure proofs exist for all leaves
for i in 0..sync.prover.count() as u32 {
match (
sync.db.leaf_by_leaf_index(i).expect("db error"),
sync.db.proof_by_leaf_index(i).expect("db error"),
) {
(Some(_), None) => sync.store_proof(i).expect("db error"),
(None, _) => break,
_ => {}
}
}
sync
}
fn ingest_leaf_index(&mut self, leaf_index: u32) -> Result<(), TipProverError> {
match self.db.leaf_by_leaf_index(leaf_index) {
Ok(Some(leaf)) => {
debug!(leaf_index = leaf_index, "Ingesting leaf update_from_batch");
self.prover.ingest(leaf).expect("!tree full");
self.incremental.ingest(leaf);
assert_eq!(self.prover.root(), self.incremental.root());
Ok(())
}
Ok(None) => {
error!("We should not arrive here");
Err(TipProverError::UnavailableLeaf { leaf_index })
}
Err(e) => Err(e.into()),
}
}
/// Update the prover with a message batch
pub fn update_from_batch(&mut self, batch: &MessageBatch) -> Result<(), TipProverError> {
// TODO:: If we are ahead already, something went wrong
// if we are somehow behind the current index, prove until then
for i in (self.prover.count() as u32)..batch.current_checkpoint_index + 1 {
self.ingest_leaf_index(i)?;
}
info!(
count = self.prover.count(),
"update_from_batch fast forward"
);
// prove the until target (checkpoints are 1-indexed)
for i in
(batch.current_checkpoint_index + 1)..batch.signed_target_checkpoint.checkpoint.index
{
self.ingest_leaf_index(i)?;
}
let prover_root = self.prover.root();
let incremental_root = self.incremental.root();
let checkpoint_root = batch.signed_target_checkpoint.checkpoint.root;
if prover_root != incremental_root || prover_root != checkpoint_root {
return Err(TipProverError::MismatchedRoots {
prover_root,
incremental_root,
checkpoint_root,
});
}
info!(
count = self.prover.count(),
"update_from_batch batch proving"
);
// store proofs in DB
for message in &batch.messages {
self.store_proof(message.leaf_index)?;
}
// TODO: push proofs to S3
Ok(())
}
}
Loading…
Cancel
Save