Google Cloud Storage CheckpointSyncer implementation (#3156)

### Description

GSC builder and implementer classes;
Documentation comments;
Examples;
Anonymous test on public bucket access;

### Drive-by changes

`cargo doc` build fixes:
* replaced square braces of non-existing items with `'` ;
* raw URL surrounded by `<>`;

### Related issues

Fixes #2242 Part 1

### Backward compatibility

Yes

### Testing

Unit Tests
pull/3236/head
Ivan Temchenko 9 months ago committed by GitHub
parent 54aeb6420b
commit 65e2ea50f9
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
  1. 1150
      rust/Cargo.lock
  2. 1
      rust/Cargo.toml
  3. 5
      rust/README.md
  4. 2
      rust/agents/relayer/src/msg/metadata/base.rs
  5. 2
      rust/agents/validator/src/validator.rs
  6. 1
      rust/hyperlane-base/Cargo.toml
  7. 67
      rust/hyperlane-base/src/settings/checkpoint_syncer.rs
  8. 204
      rust/hyperlane-base/src/types/gcs_storage.rs
  9. 2
      rust/hyperlane-base/src/types/mod.rs

1150
rust/Cargo.lock generated

File diff suppressed because it is too large Load Diff

@ -183,6 +183,7 @@ url = "2.3"
walkdir = "2"
warp = "0.3"
which = "4.3"
ya-gcp = { version = "0.11.1", features = ["storage"] }
## TODO: remove this
cosmwasm-schema = "1.2.7"

@ -51,7 +51,7 @@ cargo build --release --bin relayer
./target/release/relayer
```
### Running local binary against cloud resources (AWS KMS, S3, Postgresql etc)
### Running local binary against cloud resources (AWS KMS, S3, Postgresql, Google Cloud Storage, etc)
Building the docker image and upgrading the pod is a **slow** process. To speed up the development cycle, you can run a local binary against cloud resources.
This workflow is useful for testing local changes against cloud resources. It is also useful for debugging issues in production.
@ -74,6 +74,9 @@ Configure additional env variables appropriately:
HYP_DB=/tmp/fuji-validator-db
CONFIG_FILES=./config/testnet_config.json
HYP_TRACING_FMT=pretty
GCS_USER_SECRET=./path/to/file
# or if service account used
GCS_SERVICE_ACCOUNT_KEY=./path/to/file
DATABASE_URL=<READ_REPLICA_POSTGRES_URL> # for scraper
```

@ -381,7 +381,7 @@ impl BaseMetadataBuilder {
continue;
}
match config.build(None) {
match config.build(None).await {
Ok(checkpoint_syncer) => {
// found the syncer for this validator
checkpoint_syncers.insert(validator.into(), checkpoint_syncer.into());

@ -67,7 +67,7 @@ impl BaseAgent for Validator {
let (signer_instance, signer) = SingletonSigner::new(settings.validator.build().await?);
let core = settings.build_hyperlane_core(metrics.clone());
let checkpoint_syncer = settings.checkpoint_syncer.build(None)?.into();
let checkpoint_syncer = settings.checkpoint_syncer.build(None).await?.into();
let mailbox = settings
.build_mailbox(&settings.origin_chain, &metrics)

@ -40,6 +40,7 @@ tracing-subscriber = { workspace = true, features = ["json", "ansi"] }
tracing.workspace = true
url.workspace = true
warp.workspace = true
ya-gcp.workspace = true
backtrace = { workspace = true, optional = true }
backtrace-oneline = { path = "../utils/backtrace-oneline", optional = true }

@ -1,11 +1,13 @@
use crate::{
CheckpointSyncer, GcsStorageClientBuilder, LocalStorage, S3Storage, GCS_SERVICE_ACCOUNT_KEY,
GCS_USER_SECRET,
};
use core::str::FromStr;
use std::path::PathBuf;
use eyre::{eyre, Context, Report, Result};
use prometheus::IntGauge;
use rusoto_core::Region;
use crate::{CheckpointSyncer, LocalStorage, S3Storage};
use std::{env, path::PathBuf};
use ya_gcp::{AuthFlow, ServiceAccountAuth};
/// Checkpoint Syncer types
#[derive(Debug, Clone)]
@ -24,6 +26,18 @@ pub enum CheckpointSyncerConf {
/// S3 Region
region: Region,
},
/// A checkpoint syncer on Google Cloud Storage
Gcs {
/// Bucket name
bucket: String,
/// Folder name inside bucket - defaults to the root of the bucket
folder: Option<String>,
/// A path to the oauth service account key json file.
service_account_key: Option<String>,
/// Path to oauth user secrets, like those created by
/// `gcloud auth application-default login`
user_secrets: Option<String>,
},
}
impl FromStr for CheckpointSyncerConf {
@ -54,6 +68,28 @@ impl FromStr for CheckpointSyncerConf {
"file" => Ok(CheckpointSyncerConf::LocalStorage {
path: suffix.into(),
}),
// for google cloud both options (with or without folder) from str are for anonymous access only
// or env variables parsing
"gs" => {
let service_account_key = env::var(GCS_SERVICE_ACCOUNT_KEY).ok();
let user_secrets = env::var(GCS_USER_SECRET).ok();
if let Some(ind) = suffix.find('/') {
let (bucket, folder) = suffix.split_at(ind);
Ok(Self::Gcs {
bucket: bucket.into(),
folder: Some(folder.into()),
service_account_key,
user_secrets,
})
} else {
Ok(Self::Gcs {
bucket: suffix.into(),
folder: None,
service_account_key,
user_secrets,
})
}
}
_ => Err(eyre!("Unknown storage location prefix `{prefix}`")),
}
}
@ -61,7 +97,7 @@ impl FromStr for CheckpointSyncerConf {
impl CheckpointSyncerConf {
/// Turn conf info a Checkpoint Syncer
pub fn build(
pub async fn build(
&self,
latest_index_gauge: Option<IntGauge>,
) -> Result<Box<dyn CheckpointSyncer>, Report> {
@ -79,6 +115,27 @@ impl CheckpointSyncerConf {
region.clone(),
latest_index_gauge,
)),
CheckpointSyncerConf::Gcs {
bucket,
folder,
service_account_key,
user_secrets,
} => {
let auth = if let Some(path) = service_account_key {
AuthFlow::ServiceAccount(ServiceAccountAuth::Path(path.into()))
} else if let Some(path) = user_secrets {
AuthFlow::UserAccount(path.into())
} else {
// Public data access only - no `insert`
AuthFlow::NoAuth
};
Box::new(
GcsStorageClientBuilder::new(auth)
.build(bucket, folder.to_owned())
.await?,
)
}
})
}
}

@ -0,0 +1,204 @@
use crate::CheckpointSyncer;
use async_trait::async_trait;
use derive_new::new;
use eyre::{bail, Result};
use hyperlane_core::{SignedAnnouncement, SignedCheckpointWithMessageId};
use std::fmt;
use ya_gcp::{storage::StorageClient, AuthFlow, ClientBuilder, ClientBuilderConfig};
const LATEST_INDEX_KEY: &str = "gcsLatestIndexKey";
const ANNOUNCEMENT_KEY: &str = "gcsAnnouncementKey";
/// Path to GCS users_secret file
pub const GCS_USER_SECRET: &str = "GCS_USER_SECRET";
/// Path to GCS Service account key
pub const GCS_SERVICE_ACCOUNT_KEY: &str = "GCS_SERVICE_ACCOUNT_KEY";
/// Google Cloud Storage client builder
/// Provide `AuthFlow::NoAuth` for no-auth access to public bucket
/// # Example 1 - anonymous client with access to public bucket
/// ```
/// use hyperlane_base::GcsStorageClientBuilder;
/// use ya_gcp::AuthFlow;
/// # #[tokio::main]
/// # async fn main() {
/// let client = GcsStorageClientBuilder::new(AuthFlow::NoAuth)
/// .build("HyperlaneBucket", None)
/// .await.expect("failed to instantiate anonymous client");
/// # }
///```
///
/// For authenticated write access to bucket proper file path must be provided.
/// # WARN: panic-s if file path is incorrect or data in it as faulty
///
/// # Example 2 - service account key
/// ```should_panic
/// use hyperlane_base::GcsStorageClientBuilder;
/// use ya_gcp::{AuthFlow, ServiceAccountAuth};
/// # #[tokio::main]
/// # async fn main() {
/// let auth =
/// AuthFlow::ServiceAccount(ServiceAccountAuth::Path("path/to/sac.json".into()));
///
/// let client = GcsStorageClientBuilder::new(auth)
/// .build("HyperlaneBucket", None)
/// .await.expect("failed to instantiate anonymous client");
/// # }
///```
/// # Example 3 - user secret access
/// ```should_panic
/// use hyperlane_base::GcsStorageClientBuilder;
/// use ya_gcp::AuthFlow;
/// # #[tokio::main]
/// # async fn main() {
/// let auth =
/// AuthFlow::UserAccount("path/to/user_secret.json".into());
///
/// let client = GcsStorageClientBuilder::new(auth)
/// .build("HyperlaneBucket", None)
/// .await.expect("failed to instantiate anonymous client");
/// # }
///```
#[derive(Debug, new)]
pub struct GcsStorageClientBuilder {
auth: AuthFlow,
}
/// Google Cloud Storage client
/// Enables use of any of service account key OR user secrets to authenticate
/// For anonymous access to public data provide `(None, None)` to Builder
pub struct GcsStorageClient {
// GCS storage client
// # Details: <https://docs.rs/ya-gcp/latest/ya_gcp/storage/struct.StorageClient.html>
inner: StorageClient,
// bucket name of this client's storage
bucket: String,
}
impl GcsStorageClientBuilder {
/// Instantiates `ya_gcp:StorageClient` based on provided auth method
/// # Param
/// * `baucket_name` - String name of target bucket to work with, will be used by all store and get ops
pub async fn build(
self,
bucket_name: impl Into<String>,
folder: Option<String>,
) -> Result<GcsStorageClient> {
let inner = ClientBuilder::new(ClientBuilderConfig::new().auth_flow(self.auth))
.await?
.build_storage_client();
let bucket = if let Some(folder) = folder {
format! {"{}/{}", bucket_name.into(), folder}
} else {
bucket_name.into()
};
Ok(GcsStorageClient { inner, bucket })
}
}
impl GcsStorageClient {
// convinience formatter
fn get_checkpoint_key(index: u32) -> String {
format!("checkpoint_{index}_with_id.json")
}
// #test only method[s]
#[cfg(test)]
pub(crate) async fn get_by_path(&self, path: impl AsRef<str>) -> Result<()> {
self.inner.get_object(&self.bucket, path).await?;
Ok(())
}
}
// required by `CheckpointSyncer`
impl fmt::Debug for GcsStorageClient {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
f.debug_struct("S3Storage")
.field("bucket", &self.bucket)
.finish()
}
}
#[async_trait]
impl CheckpointSyncer for GcsStorageClient {
/// Read the highest index of this Syncer
async fn latest_index(&self) -> Result<Option<u32>> {
match self.inner.get_object(&self.bucket, LATEST_INDEX_KEY).await {
Ok(data) => Ok(Some(serde_json::from_slice(data.as_ref())?)),
Err(e) => match e {
// never written before to this bucket
ya_gcp::storage::ObjectError::InvalidName(_) => Ok(None),
_ => bail!(e),
},
}
}
/// Writes the highest index of this Syncer
async fn write_latest_index(&self, index: u32) -> Result<()> {
let d = serde_json::to_vec(&index)?;
self.inner
.insert_object(&self.bucket, LATEST_INDEX_KEY, d)
.await?;
Ok(())
}
/// Update the latest index of this syncer if necessary
async fn update_latest_index(&self, index: u32) -> Result<()> {
let curr = self.latest_index().await?.unwrap_or(0);
if index > curr {
self.write_latest_index(index).await?;
}
Ok(())
}
/// Attempt to fetch the signed (checkpoint, messageId) tuple at this index
async fn fetch_checkpoint(&self, index: u32) -> Result<Option<SignedCheckpointWithMessageId>> {
let res = self
.inner
.get_object(&self.bucket, GcsStorageClient::get_checkpoint_key(index))
.await?;
Ok(Some(serde_json::from_slice(res.as_ref())?))
}
/// Write the signed (checkpoint, messageId) tuple to this syncer
async fn write_checkpoint(
&self,
signed_checkpoint: &SignedCheckpointWithMessageId,
) -> Result<()> {
self.inner
.insert_object(
&self.bucket,
GcsStorageClient::get_checkpoint_key(signed_checkpoint.value.index),
serde_json::to_vec(signed_checkpoint)?,
)
.await?;
Ok(())
}
/// Write the signed announcement to this syncer
async fn write_announcement(&self, signed_announcement: &SignedAnnouncement) -> Result<()> {
self.inner
.insert_object(
&self.bucket,
ANNOUNCEMENT_KEY,
serde_json::to_string(signed_announcement)?,
)
.await?;
Ok(())
}
/// Return the announcement storage location for this syncer
fn announcement_location(&self) -> String {
format!("gs://{}/{}", &self.bucket, ANNOUNCEMENT_KEY)
}
}
#[tokio::test]
async fn public_landset_no_auth_works_test() {
const LANDSAT_BUCKET: &str = "gcp-public-data-landsat";
const LANDSAT_KEY: &str = "LC08/01/001/003/LC08_L1GT_001003_20140812_20170420_01_T2/LC08_L1GT_001003_20140812_20170420_01_T2_B3.TIF";
let client = GcsStorageClientBuilder::new(AuthFlow::NoAuth)
.build(LANDSAT_BUCKET, None)
.await
.unwrap();
assert!(client.get_by_path(LANDSAT_KEY).await.is_ok());
}

@ -1,7 +1,9 @@
mod gcs_storage;
mod local_storage;
mod multisig;
mod s3_storage;
pub use gcs_storage::*;
pub use local_storage::*;
pub use multisig::*;
pub use s3_storage::*;

Loading…
Cancel
Save