feature: configuration to push proofs to s3 buckets (#821)

* feature: configuration to push proofs to s3 buckets

* refactor: key s3 proofs by their index, not their leaf hash

* chore: improve processor bootup logging

* fix: add name and bucket to debug for Pusher

* bug: check that content-length is gt 0

* bug: expect 3 labels in next_message_nonce metric

* bug: properly handle non-existent key

* fix: log messages for proof pushing

* feature: upload message to s3 with proof
buddies-main-deployment
James Prestwich 3 years ago committed by GitHub
parent 0b4e4b9b0d
commit 985bd7850c
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 6
      .gitignore
  2. 15
      rust/Cargo.lock
  3. 4
      rust/agents/processor/Cargo.toml
  4. 1
      rust/agents/processor/src/main.rs
  5. 125
      rust/agents/processor/src/processor.rs
  6. 139
      rust/agents/processor/src/push.rs
  7. 9
      rust/agents/processor/src/settings.rs
  8. 7
      rust/optics-base/src/agent.rs

6
.gitignore vendored

@ -11,3 +11,9 @@ rust/tmp_db
rust/tmp.env
tmp.env
.DS_STORE
typescript/*/.env
typescript/*/node_modules
typescript/**/tsconfig.tsbuildinfo
**/**/tsconfig.tsbuildinfo
typescript/optics-provider/src/tmp.ts

15
rust/Cargo.lock generated

@ -2546,6 +2546,8 @@ dependencies = [
"paste",
"prometheus",
"rocksdb",
"rusoto_core",
"rusoto_s3",
"serde 1.0.130",
"serde_json",
"thiserror",
@ -2920,6 +2922,19 @@ dependencies = [
"serde_json",
]
[[package]]
name = "rusoto_s3"
version = "0.47.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "048c2fe811a823ad5a9acc976e8bf4f1d910df719dcf44b15c3e96c5b7a51027"
dependencies = [
"async-trait",
"bytes",
"futures",
"rusoto_core",
"xml-rs",
]
[[package]]
name = "rusoto_signature"
version = "0.47.0"

@ -25,6 +25,8 @@ optics-base = { path = "../../optics-base" }
paste = "1.0.5"
prometheus = "0.12"
rusoto_s3 = "0.47.0"
rusoto_core = "0.47.0"
[dev-dependencies]
optics-test = { path = "../../optics-test" }
optics-test = { path = "../../optics-test" }

@ -11,6 +11,7 @@
mod processor;
mod prover;
mod prover_sync;
mod push;
mod settings;
use color_eyre::Result;

@ -11,7 +11,7 @@ use std::{
time::Duration,
};
use tokio::{sync::RwLock, task::JoinHandle, time::sleep};
use tracing::{error, info, info_span, instrument, instrument::Instrumented, Instrument};
use tracing::{debug, error, info, info_span, instrument, instrument::Instrumented, Instrument};
use optics_base::{
agent::{AgentCore, OpticsAgent},
@ -25,10 +25,19 @@ use optics_core::{
traits::{CommittedMessage, Common, Home, MessageStatus},
};
use crate::{prover_sync::ProverSync, settings::ProcessorSettings as Settings};
use crate::{
prover_sync::ProverSync,
push::Pusher,
settings::{ProcessorSettings as Settings, S3Config},
};
const AGENT_NAME: &str = "processor";
enum Flow {
Advance,
Repeat,
}
/// The replica processor is responsible for polling messages and waiting until they validate
/// before proving/processing them.
#[derive(Debug)]
@ -39,7 +48,7 @@ pub(crate) struct Replica {
home_db: HomeDB,
allowed: Option<Arc<HashSet<H256>>>,
denied: Option<Arc<HashSet<H256>>>,
next_nonce: Arc<prometheus::IntGaugeVec>,
next_message_nonce: Arc<prometheus::IntGaugeVec>,
}
impl std::fmt::Display for Replica {
@ -69,24 +78,24 @@ impl Replica {
// - If not, wait and poll again
// 4. Check if the proof is valid under the replica
// 5. Submit the proof to the replica
let mut next_nonce: u32 = self
let mut next_message_nonce: u32 = self
.home_db
.retrieve_latest_nonce(domain)?
.map(|n: u32| n + 1)
.unwrap_or_default();
self.next_nonce
self.next_message_nonce
.with_label_values(&[self.home.name(), self.replica.name(), AGENT_NAME])
.set(next_nonce as i64);
.set(next_message_nonce as i64);
info!(
domain,
nonce = next_nonce,
nonce = next_message_nonce,
replica = self.replica.name(),
"Starting processor for {} {} at nonce {}",
domain,
self.replica.name(),
next_nonce
next_message_nonce
);
loop {
@ -94,21 +103,22 @@ impl Replica {
let seq_span = tracing::trace_span!(
"ReplicaProcessor",
name = self.replica.name(),
nonce = next_nonce,
nonce = next_message_nonce,
replica_domain = self.replica.local_domain(),
home_domain = self.home.local_domain(),
);
match self
.try_msg_by_domain_and_nonce(domain, next_nonce)
.try_msg_by_domain_and_nonce(domain, next_message_nonce)
.instrument(seq_span)
.await
{
Ok(true) => {
self.home_db.store_latest_nonce(domain, next_nonce)?;
next_nonce += 1;
Ok(Flow::Advance) => {
self.home_db
.store_latest_nonce(domain, next_message_nonce)?;
next_message_nonce += 1;
}
Ok(false) => {
Ok(Flow::Repeat) => {
// there was some fault, let's wait and then try again later when state may have moved
sleep(Duration::from_secs(self.interval)).await
}
@ -132,7 +142,7 @@ impl Replica {
///
/// In case of error: send help?
#[instrument(err, skip(self), fields(self = %self))]
async fn try_msg_by_domain_and_nonce(&self, domain: u32, nonce: u32) -> Result<bool> {
async fn try_msg_by_domain_and_nonce(&self, domain: u32, nonce: u32) -> Result<Flow> {
use optics_core::traits::Replica;
let message = match self.home.message_by_nonce(domain, nonce).await {
@ -143,9 +153,9 @@ impl Replica {
sequence = nonce,
"Message not yet found {}:{}",
domain,
nonce
nonce,
);
return Ok(false);
return Ok(Flow::Repeat);
}
Err(e) => bail!(e),
};
@ -158,7 +168,7 @@ impl Replica {
.as_ref()
.map(|set| set.contains(&message.message.sender))
{
return Ok(true);
return Ok(Flow::Advance);
}
// if we have a deny list, filter senders on it
@ -167,14 +177,18 @@ impl Replica {
.as_ref()
.map(|set| set.contains(&message.message.sender))
{
return Ok(true);
return Ok(Flow::Advance);
}
let proof = match self.home_db.proof_by_leaf_index(message.leaf_index) {
Ok(Some(p)) => p,
Ok(None) => {
info!(leaf_index = message.leaf_index, "Proof not yet found");
return Ok(false);
info!(
leaf_hash = ?message.to_leaf(),
leaf_index = message.leaf_index,
"Proof not yet found"
);
return Ok(Flow::Repeat);
}
Err(e) => bail!(e),
};
@ -188,6 +202,8 @@ impl Replica {
while !self.replica.acceptable_root(proof.root()).await? {
info!(
leaf_hash = ?message.to_leaf(),
leaf_index = message.leaf_index,
"Proof under {root} not yet valid here, waiting until Replica confirms",
root = proof.root(),
);
@ -195,13 +211,16 @@ impl Replica {
}
info!(
leaf_hash = ?message.to_leaf(),
leaf_index = message.leaf_index,
"Dispatching a message for processing {}:{}",
domain,
nonce, "Dispatching a message for processing {}:{}", domain, nonce
nonce
);
self.process(message, proof).await?;
Ok(true)
Ok(Flow::Advance)
}
#[instrument(err, level = "trace", skip(self), fields(self = %self))]
@ -243,8 +262,9 @@ decl_agent!(
replica_tasks: RwLock<HashMap<String, JoinHandle<Result<()>>>>,
allowed: Option<Arc<HashSet<H256>>>,
denied: Option<Arc<HashSet<H256>>>,
next_nonce: Arc<prometheus::IntGaugeVec>,
index_only: bool,
next_message_nonce: Arc<prometheus::IntGaugeVec>,
config: Option<S3Config>,
}
);
@ -256,15 +276,17 @@ impl Processor {
allowed: Option<HashSet<H256>>,
denied: Option<HashSet<H256>>,
index_only: bool,
config: Option<S3Config>,
) -> Self {
let next_nonce = core
.metrics
.new_int_gauge(
"next_nonce",
"Next nonce of a replica processor to inspect",
&["home", "replica", "agent"],
)
.expect("processor metric already registered -- should have be a singleton");
let next_message_nonce = Arc::new(
core.metrics
.new_int_gauge(
"next_message_nonce",
"Index of the next message to inspect",
&["home", "replica", "agent"],
)
.expect("processor metric already registered -- should have be a singleton"),
);
Self {
interval,
@ -272,8 +294,9 @@ impl Processor {
replica_tasks: Default::default(),
allowed: allowed.map(Arc::new),
denied: denied.map(Arc::new),
next_nonce: Arc::new(next_nonce),
next_message_nonce,
index_only,
config,
}
}
}
@ -295,33 +318,33 @@ impl OpticsAgent for Processor {
settings.allowed,
settings.denied,
settings.indexon.is_some(),
settings.s3,
))
}
fn run(&self, name: &str) -> Instrumented<JoinHandle<Result<()>>> {
let home = self.home();
let next_nonce = self.next_nonce.clone();
let next_message_nonce = self.next_message_nonce.clone();
let interval = self.interval;
let home_db = self.home_db();
let replica_opt = self.replica_by_name(name);
let name = name.to_owned();
let db = self.db();
let allowed = self.allowed.clone();
let denied = self.denied.clone();
tokio::spawn(async move {
let replica = replica_opt.ok_or_else(|| eyre!("No replica named {}", name))?;
let home_name = home.name().to_owned();
Replica {
interval,
replica,
home,
home_db: HomeDB::new(db, home_name),
home_db,
allowed,
denied,
next_nonce,
next_message_nonce,
}
.main()
.await?
@ -335,13 +358,13 @@ impl OpticsAgent for Processor {
{
tokio::spawn(async move {
info!("Starting Processor tasks");
// tree sync
let sync = ProverSync::from_disk(HomeDB::new(
self.core.db.clone(),
self.home().name().to_owned(),
));
info!("Starting ProverSync");
let sync = ProverSync::from_disk(self.home_db());
let sync_task = sync.spawn();
info!("Starting indexer");
// indexer setup
let block_height = self
.as_ref()
@ -369,9 +392,23 @@ impl OpticsAgent for Processor {
tasks.push(self.run_many(&names));
}
info!("selecting");
let (res, _, remaining) = select_all(tasks).await;
// if we have a bucket, add a task to push to it
if let Some(config) = &self.config {
info!(bucket = %config.bucket, "Starting S3 push tasks");
tasks.push(
Pusher::new(
self.core.home.name(),
&config.bucket,
config.region.parse().expect("invalid s3 region"),
self.home_db(),
)
.spawn(),
)
}
// find the first task to shut down. Then cancel all others
debug!(tasks = tasks.len(), "Selecting across Processor tasks");
let (res, _, remaining) = select_all(tasks).await;
for task in remaining.into_iter() {
cancel_task!(task);
}

@ -0,0 +1,139 @@
use std::time::Duration;
use rusoto_core::{credential::EnvironmentProvider, HttpClient, Region, RusotoError};
use rusoto_s3::{GetObjectError, GetObjectRequest, PutObjectRequest, S3Client, S3};
use color_eyre::eyre::{bail, eyre, Result};
use optics_core::{accumulator::merkle::Proof, db::HomeDB, Encode};
use tokio::{task::JoinHandle, time::sleep};
use tracing::{debug, info, info_span, instrument::Instrumented, Instrument};
#[derive(serde::Serialize, serde::Deserialize)]
struct ProvenMessage {
message: Vec<u8>,
proof: Proof,
}
/// Pushes proofs to an S3 bucket
pub struct Pusher {
name: String,
bucket: String,
region: Region,
db: HomeDB,
client: S3Client,
}
impl std::fmt::Debug for Pusher {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("Pusher")
.field("region", &self.region)
.field("bucket", &self.bucket)
.field("name", &self.name)
.finish()
}
}
impl Pusher {
/// Instantiate a new pusher with a region
pub fn new(name: &str, bucket: &str, region: Region, db: HomeDB) -> Self {
let client = S3Client::new_with(
HttpClient::new().unwrap(),
EnvironmentProvider::default(),
region.clone(),
);
Self {
name: name.to_owned(),
bucket: bucket.to_owned(),
region,
db,
client,
}
}
async fn upload_proof(&self, proven: &ProvenMessage) -> Result<()> {
let key = self.key(proven);
let proof_json = Vec::from(serde_json::to_string_pretty(proven)?);
info!(
leaf = ?proven.proof.leaf,
leaf_index = proven.proof.index,
key = %key,
"Storing proof in s3 bucket",
);
let req = PutObjectRequest {
key,
bucket: self.bucket.clone(),
body: Some(proof_json.into()),
content_type: Some("application/json".to_owned()),
..Default::default()
};
self.client.put_object(req).await?;
Ok(())
}
async fn already_uploaded(&self, proven: &ProvenMessage) -> Result<bool> {
let req = GetObjectRequest {
key: self.key(proven),
bucket: self.bucket.clone(),
..Default::default()
};
let resp = self.client.get_object(req).await;
match resp {
Ok(_) => {
debug!(
leaf = ?proven.proof.leaf,
leaf_index = proven.proof.index,
key = %self.key(proven),
"Proof already stored in bucket"
);
Ok(true)
}
Err(RusotoError::Service(GetObjectError::NoSuchKey(_))) => Ok(false),
Err(e) => bail!(e),
}
}
fn key(&self, proven: &ProvenMessage) -> String {
format!("{}_{}", self.name, proven.proof.index)
}
/// Spawn the pusher task and return a joinhandle
///
/// The pusher task polls the DB for new proofs and attempts to push them
/// to an S3 bucket
pub fn spawn(self) -> Instrumented<JoinHandle<Result<()>>> {
let span = info_span!(
"ProofPusher",
bucket = %self.bucket,
region = self.region.name(),
home = %self.name,
);
tokio::spawn(async move {
let mut index = 0;
loop {
let proof = self.db.proof_by_leaf_index(index)?;
match proof {
Some(proof) => {
let message = self
.db
.message_by_leaf_index(index)?
.ok_or_else(|| eyre!("Missing message for known proof"))?;
let proven = ProvenMessage {
proof,
message: message.to_vec(),
};
// upload if not already present
if !self.already_uploaded(&proven).await? {
self.upload_proof(&proven).await?;
}
index += 1;
}
None => sleep(Duration::from_millis(500)).await,
}
}
})
.instrument(span)
}
}

@ -1,9 +1,16 @@
//! Configuration
use ethers::prelude::H256;
use serde::Deserialize;
use std::collections::HashSet;
use optics_base::decl_settings;
#[derive(Debug, Deserialize, Clone)]
pub struct S3Config {
pub bucket: String,
pub region: String,
}
decl_settings!(Processor {
/// The polling interval (in seconds)
interval: String,
@ -13,4 +20,6 @@ decl_settings!(Processor {
denied: Option<HashSet<H256>>,
/// Only index transactions if this key is set
indexon: Option<String>,
/// An amazon aws s3 bucket to push proofs to
s3: Option<S3Config>,
});

@ -9,7 +9,7 @@ use async_trait::async_trait;
use color_eyre::{eyre::WrapErr, Result};
use futures_util::future::select_all;
use optics_core::{
db::DB,
db::{HomeDB, DB},
traits::{Common, Home},
};
use tracing::instrument::Instrumented;
@ -61,6 +61,11 @@ pub trait OpticsAgent: Send + Sync + std::fmt::Debug + AsRef<AgentCore> {
self.as_ref().db.clone()
}
/// Return a handle to the DB with the home schema
fn home_db(&self) -> HomeDB {
HomeDB::new(self.as_ref().db.clone(), self.home().name().to_owned())
}
/// Return a reference to a home contract
fn home(&self) -> Arc<Homes> {
self.as_ref().home.clone()

Loading…
Cancel
Save