|
|
|
@ -4,6 +4,7 @@ use derive_new::new; |
|
|
|
|
use eyre::{bail, Result}; |
|
|
|
|
use hyperlane_core::{ReorgEvent, SignedAnnouncement, SignedCheckpointWithMessageId}; |
|
|
|
|
use std::fmt; |
|
|
|
|
use tracing::{error, info, instrument}; |
|
|
|
|
use ya_gcp::{ |
|
|
|
|
storage::{ |
|
|
|
|
api::{error::HttpStatusError, http::StatusCode, Error}, |
|
|
|
@ -16,6 +17,7 @@ const LATEST_INDEX_KEY: &str = "gcsLatestIndexKey"; |
|
|
|
|
const METADATA_KEY: &str = "gcsMetadataKey"; |
|
|
|
|
const ANNOUNCEMENT_KEY: &str = "gcsAnnouncementKey"; |
|
|
|
|
const REORG_FLAG_KEY: &str = "gcsReorgFlagKey"; |
|
|
|
|
|
|
|
|
|
/// Path to GCS users_secret file
|
|
|
|
|
pub const GCS_USER_SECRET: &str = "GCS_USER_SECRET"; |
|
|
|
|
/// Path to GCS Service account key
|
|
|
|
@ -80,12 +82,14 @@ pub struct GcsStorageClient { |
|
|
|
|
inner: StorageClient, |
|
|
|
|
// bucket name of this client's storage
|
|
|
|
|
bucket: String, |
|
|
|
|
// folder name of this client's storage
|
|
|
|
|
folder: Option<String>, |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
impl GcsStorageClientBuilder { |
|
|
|
|
/// Instantiates `ya_gcp:StorageClient` based on provided auth method
|
|
|
|
|
/// # Param
|
|
|
|
|
/// * `baucket_name` - String name of target bucket to work with, will be used by all store and get ops
|
|
|
|
|
/// * `bucket_name` - String name of target bucket to work with, will be used by all store and get ops
|
|
|
|
|
pub async fn build( |
|
|
|
|
self, |
|
|
|
|
bucket_name: impl Into<String>, |
|
|
|
@ -94,21 +98,71 @@ impl GcsStorageClientBuilder { |
|
|
|
|
let inner = ClientBuilder::new(ClientBuilderConfig::new().auth_flow(self.auth)) |
|
|
|
|
.await? |
|
|
|
|
.build_storage_client(); |
|
|
|
|
let bucket = if let Some(folder) = folder { |
|
|
|
|
format! {"{}/{}", bucket_name.into(), folder} |
|
|
|
|
} else { |
|
|
|
|
bucket_name.into() |
|
|
|
|
}; |
|
|
|
|
|
|
|
|
|
Ok(GcsStorageClient { inner, bucket }) |
|
|
|
|
let bucket = bucket_name.into(); |
|
|
|
|
let mut processed_folder = folder; |
|
|
|
|
|
|
|
|
|
if let Some(ref mut folder_str) = processed_folder { |
|
|
|
|
if folder_str.ends_with('/') { |
|
|
|
|
folder_str.truncate(folder_str.trim_end_matches('/').len()); |
|
|
|
|
info!( |
|
|
|
|
"Trimmed trailing '/' from folder name. New folder: '{}'", |
|
|
|
|
folder_str |
|
|
|
|
); |
|
|
|
|
} |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
GcsStorageClient::validate_bucket_name(&bucket)?; |
|
|
|
|
Ok(GcsStorageClient { |
|
|
|
|
inner, |
|
|
|
|
bucket, |
|
|
|
|
folder: processed_folder, |
|
|
|
|
}) |
|
|
|
|
} |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
impl GcsStorageClient { |
|
|
|
|
// convenience formatter
|
|
|
|
|
// Convenience formatter
|
|
|
|
|
fn get_checkpoint_key(index: u32) -> String { |
|
|
|
|
format!("checkpoint_{index}_with_id.json") |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
fn object_path(&self, object_name: &str) -> String { |
|
|
|
|
if let Some(folder) = &self.folder { |
|
|
|
|
format!("{}/{}", folder, object_name) |
|
|
|
|
} else { |
|
|
|
|
object_name.to_string() |
|
|
|
|
} |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
fn validate_bucket_name(bucket: &str) -> Result<()> { |
|
|
|
|
if bucket.contains('/') { |
|
|
|
|
error!("Bucket name '{}' has an invalid symbol '/'", bucket); |
|
|
|
|
bail!("Bucket name '{}' has an invalid symbol '/'", bucket) |
|
|
|
|
} else { |
|
|
|
|
Ok(()) |
|
|
|
|
} |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
/// Uploads data to GCS and logs the result.
|
|
|
|
|
#[instrument(skip(self, data))] |
|
|
|
|
async fn upload_and_log(&self, object_name: &str, data: Vec<u8>) -> Result<()> { |
|
|
|
|
match self |
|
|
|
|
.inner |
|
|
|
|
.insert_object(&self.bucket, object_name, data) |
|
|
|
|
.await |
|
|
|
|
{ |
|
|
|
|
Ok(_) => { |
|
|
|
|
info!("Successfully uploaded to '{}'", object_name); |
|
|
|
|
Ok(()) |
|
|
|
|
} |
|
|
|
|
Err(e) => { |
|
|
|
|
error!("Failed to upload to '{}': {:?}", object_name, e); |
|
|
|
|
Err(e.into()) |
|
|
|
|
} |
|
|
|
|
} |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
// #test only method[s]
|
|
|
|
|
#[cfg(test)] |
|
|
|
|
pub(crate) async fn get_by_path(&self, path: impl AsRef<str>) -> Result<()> { |
|
|
|
@ -117,11 +171,12 @@ impl GcsStorageClient { |
|
|
|
|
} |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
// required by `CheckpointSyncer`
|
|
|
|
|
// Required by `CheckpointSyncer`
|
|
|
|
|
impl fmt::Debug for GcsStorageClient { |
|
|
|
|
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { |
|
|
|
|
f.debug_struct("S3Storage") |
|
|
|
|
f.debug_struct("GcsStorageClient") |
|
|
|
|
.field("bucket", &self.bucket) |
|
|
|
|
.field("folder", &self.folder) |
|
|
|
|
.finish() |
|
|
|
|
} |
|
|
|
|
} |
|
|
|
@ -129,6 +184,7 @@ impl fmt::Debug for GcsStorageClient { |
|
|
|
|
#[async_trait] |
|
|
|
|
impl CheckpointSyncer for GcsStorageClient { |
|
|
|
|
/// Read the highest index of this Syncer
|
|
|
|
|
#[instrument(skip(self))] |
|
|
|
|
async fn latest_index(&self) -> Result<Option<u32>> { |
|
|
|
|
match self.inner.get_object(&self.bucket, LATEST_INDEX_KEY).await { |
|
|
|
|
Ok(data) => Ok(Some(serde_json::from_slice(data.as_ref())?)), |
|
|
|
@ -144,15 +200,14 @@ impl CheckpointSyncer for GcsStorageClient { |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
/// Writes the highest index of this Syncer
|
|
|
|
|
#[instrument(skip(self, index))] |
|
|
|
|
async fn write_latest_index(&self, index: u32) -> Result<()> { |
|
|
|
|
let d = serde_json::to_vec(&index)?; |
|
|
|
|
self.inner |
|
|
|
|
.insert_object(&self.bucket, LATEST_INDEX_KEY, d) |
|
|
|
|
.await?; |
|
|
|
|
Ok(()) |
|
|
|
|
let data = serde_json::to_vec(&index)?; |
|
|
|
|
self.upload_and_log(LATEST_INDEX_KEY, data).await |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
/// Update the latest index of this syncer if necessary
|
|
|
|
|
#[instrument(skip(self, index))] |
|
|
|
|
async fn update_latest_index(&self, index: u32) -> Result<()> { |
|
|
|
|
let curr = self.latest_index().await?.unwrap_or(0); |
|
|
|
|
if index > curr { |
|
|
|
@ -162,6 +217,7 @@ impl CheckpointSyncer for GcsStorageClient { |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
/// Attempt to fetch the signed (checkpoint, messageId) tuple at this index
|
|
|
|
|
#[instrument(skip(self, index))] |
|
|
|
|
async fn fetch_checkpoint(&self, index: u32) -> Result<Option<SignedCheckpointWithMessageId>> { |
|
|
|
|
match self |
|
|
|
|
.inner |
|
|
|
@ -179,56 +235,64 @@ impl CheckpointSyncer for GcsStorageClient { |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
/// Write the signed (checkpoint, messageId) tuple to this syncer
|
|
|
|
|
#[instrument(skip(self, signed_checkpoint))] |
|
|
|
|
async fn write_checkpoint( |
|
|
|
|
&self, |
|
|
|
|
signed_checkpoint: &SignedCheckpointWithMessageId, |
|
|
|
|
) -> Result<()> { |
|
|
|
|
self.inner |
|
|
|
|
.insert_object( |
|
|
|
|
&self.bucket, |
|
|
|
|
GcsStorageClient::get_checkpoint_key(signed_checkpoint.value.index), |
|
|
|
|
serde_json::to_vec(signed_checkpoint)?, |
|
|
|
|
) |
|
|
|
|
.await?; |
|
|
|
|
Ok(()) |
|
|
|
|
let object_name = Self::get_checkpoint_key(signed_checkpoint.value.index); |
|
|
|
|
let data = serde_json::to_vec(signed_checkpoint)?; |
|
|
|
|
self.upload_and_log(&object_name, data).await |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
/// Write the agent metadata to this syncer
|
|
|
|
|
#[instrument(skip(self, metadata))] |
|
|
|
|
async fn write_metadata(&self, metadata: &AgentMetadata) -> Result<()> { |
|
|
|
|
let serialized_metadata = serde_json::to_string_pretty(metadata)?; |
|
|
|
|
self.inner |
|
|
|
|
.insert_object(&self.bucket, METADATA_KEY, serialized_metadata) |
|
|
|
|
.await?; |
|
|
|
|
Ok(()) |
|
|
|
|
let object_name = self.object_path(METADATA_KEY); |
|
|
|
|
let data = serde_json::to_string_pretty(metadata)?.into_bytes(); |
|
|
|
|
self.upload_and_log(&object_name, data).await |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
/// Write the signed announcement to this syncer
|
|
|
|
|
async fn write_announcement(&self, signed_announcement: &SignedAnnouncement) -> Result<()> { |
|
|
|
|
self.inner |
|
|
|
|
.insert_object( |
|
|
|
|
&self.bucket, |
|
|
|
|
ANNOUNCEMENT_KEY, |
|
|
|
|
serde_json::to_string(signed_announcement)?, |
|
|
|
|
) |
|
|
|
|
.await?; |
|
|
|
|
Ok(()) |
|
|
|
|
#[instrument(skip(self, announcement))] |
|
|
|
|
async fn write_announcement(&self, announcement: &SignedAnnouncement) -> Result<()> { |
|
|
|
|
let object_name = self.object_path(ANNOUNCEMENT_KEY); |
|
|
|
|
let data = serde_json::to_string(announcement)?.into_bytes(); |
|
|
|
|
self.upload_and_log(&object_name, data).await |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
/// Return the announcement storage location for this syncer
|
|
|
|
|
#[instrument(skip(self))] |
|
|
|
|
fn announcement_location(&self) -> String { |
|
|
|
|
format!("gs://{}/{}", &self.bucket, ANNOUNCEMENT_KEY) |
|
|
|
|
let location = format!( |
|
|
|
|
"gs://{}/{}", |
|
|
|
|
&self.bucket, |
|
|
|
|
self.object_path(ANNOUNCEMENT_KEY) |
|
|
|
|
); |
|
|
|
|
info!("Announcement storage location: '{}'", location); |
|
|
|
|
location |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
async fn write_reorg_status(&self, reorged_event: &ReorgEvent) -> Result<()> { |
|
|
|
|
let serialized_metadata = serde_json::to_string_pretty(reorged_event)?; |
|
|
|
|
self.inner |
|
|
|
|
.insert_object(&self.bucket, REORG_FLAG_KEY, serialized_metadata) |
|
|
|
|
.await?; |
|
|
|
|
Ok(()) |
|
|
|
|
/// Write the reorg status to this syncer
|
|
|
|
|
#[instrument(skip(self, reorg_event))] |
|
|
|
|
async fn write_reorg_status(&self, reorg_event: &ReorgEvent) -> Result<()> { |
|
|
|
|
let object_name = REORG_FLAG_KEY; |
|
|
|
|
let data = serde_json::to_string_pretty(reorg_event)?.into_bytes(); |
|
|
|
|
self.upload_and_log(object_name, data).await |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
/// Read the reorg status from this syncer
|
|
|
|
|
#[instrument(skip(self))] |
|
|
|
|
async fn reorg_status(&self) -> Result<Option<ReorgEvent>> { |
|
|
|
|
Ok(None) |
|
|
|
|
match self.inner.get_object(&self.bucket, REORG_FLAG_KEY).await { |
|
|
|
|
Ok(data) => Ok(Some(serde_json::from_slice(data.as_ref())?)), |
|
|
|
|
Err(e) => match e { |
|
|
|
|
ObjectError::Failure(Error::HttpStatus(HttpStatusError(StatusCode::NOT_FOUND))) => { |
|
|
|
|
Ok(None) |
|
|
|
|
} |
|
|
|
|
_ => bail!(e), |
|
|
|
|
}, |
|
|
|
|
} |
|
|
|
|
} |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|