From 21a0cf964fe34000443a427c2db92caceb969d32 Mon Sep 17 00:00:00 2001 From: IvanPsurtcev <87025698+IvanPsurtcev@users.noreply.github.com> Date: Thu, 31 Oct 2024 15:26:39 +0300 Subject: [PATCH 1/3] fix: GCS folder support (#4652) ### Description This PR fixes the problem of this issue: [https://github.com/hyperlane-xyz/hyperlane-monorepo/issues/4449](url) The change was only in the gcs_storage.rs file. Slightly tweaked methods so that you can write files to a specified folder in the terminal. When trying to run a validator specifying checkpointSyncer settings for GCS: `--checkpointSyncer.type gcs --checkpointSyncer.bucket an-s3-signatures-repository --checkpointSyncer.folder atletaolympia` ``` Building provider without signer thread 'main' panicked at 'Failed to report agent metadata: error validating bucket name an-s3-signatures-repository/atletaolympia Caused by: Character '/' @ 27 is not allowed Location: hyperlane-base/src/types/gcs_storage.rs:181:9', agents/validator/src/validator.rs:178:14 ``` To solve this problem I added the `folder: Option` field to the GcsStorageClient structure, I also added bucket and folder processing to the impl CheckpointSyncer, GcsStorageClientBuilder methods --- .../hyperlane-base/src/types/gcs_storage.rs | 154 +++++++++++++----- 1 file changed, 109 insertions(+), 45 deletions(-) diff --git a/rust/main/hyperlane-base/src/types/gcs_storage.rs b/rust/main/hyperlane-base/src/types/gcs_storage.rs index ba40ec2ca..a413edb32 100644 --- a/rust/main/hyperlane-base/src/types/gcs_storage.rs +++ b/rust/main/hyperlane-base/src/types/gcs_storage.rs @@ -4,6 +4,7 @@ use derive_new::new; use eyre::{bail, Result}; use hyperlane_core::{ReorgEvent, SignedAnnouncement, SignedCheckpointWithMessageId}; use std::fmt; +use tracing::{error, info, instrument}; use ya_gcp::{ storage::{ api::{error::HttpStatusError, http::StatusCode, Error}, @@ -16,6 +17,7 @@ const LATEST_INDEX_KEY: &str = "gcsLatestIndexKey"; const METADATA_KEY: &str = "gcsMetadataKey"; const ANNOUNCEMENT_KEY: &str = "gcsAnnouncementKey"; const REORG_FLAG_KEY: &str = "gcsReorgFlagKey"; + /// Path to GCS users_secret file pub const GCS_USER_SECRET: &str = "GCS_USER_SECRET"; /// Path to GCS Service account key @@ -80,12 +82,14 @@ pub struct GcsStorageClient { inner: StorageClient, // bucket name of this client's storage bucket: String, + // folder name of this client's storage + folder: Option, } impl GcsStorageClientBuilder { /// Instantiates `ya_gcp:StorageClient` based on provided auth method /// # Param - /// * `baucket_name` - String name of target bucket to work with, will be used by all store and get ops + /// * `bucket_name` - String name of target bucket to work with, will be used by all store and get ops pub async fn build( self, bucket_name: impl Into, @@ -94,21 +98,71 @@ impl GcsStorageClientBuilder { let inner = ClientBuilder::new(ClientBuilderConfig::new().auth_flow(self.auth)) .await? .build_storage_client(); - let bucket = if let Some(folder) = folder { - format! {"{}/{}", bucket_name.into(), folder} - } else { - bucket_name.into() - }; - Ok(GcsStorageClient { inner, bucket }) + let bucket = bucket_name.into(); + let mut processed_folder = folder; + + if let Some(ref mut folder_str) = processed_folder { + if folder_str.ends_with('/') { + folder_str.truncate(folder_str.trim_end_matches('/').len()); + info!( + "Trimmed trailing '/' from folder name. New folder: '{}'", + folder_str + ); + } + } + + GcsStorageClient::validate_bucket_name(&bucket)?; + Ok(GcsStorageClient { + inner, + bucket, + folder: processed_folder, + }) } } impl GcsStorageClient { - // convenience formatter + // Convenience formatter fn get_checkpoint_key(index: u32) -> String { format!("checkpoint_{index}_with_id.json") } + + fn object_path(&self, object_name: &str) -> String { + if let Some(folder) = &self.folder { + format!("{}/{}", folder, object_name) + } else { + object_name.to_string() + } + } + + fn validate_bucket_name(bucket: &str) -> Result<()> { + if bucket.contains('/') { + error!("Bucket name '{}' has an invalid symbol '/'", bucket); + bail!("Bucket name '{}' has an invalid symbol '/'", bucket) + } else { + Ok(()) + } + } + + /// Uploads data to GCS and logs the result. + #[instrument(skip(self, data))] + async fn upload_and_log(&self, object_name: &str, data: Vec) -> Result<()> { + match self + .inner + .insert_object(&self.bucket, object_name, data) + .await + { + Ok(_) => { + info!("Successfully uploaded to '{}'", object_name); + Ok(()) + } + Err(e) => { + error!("Failed to upload to '{}': {:?}", object_name, e); + Err(e.into()) + } + } + } + // #test only method[s] #[cfg(test)] pub(crate) async fn get_by_path(&self, path: impl AsRef) -> Result<()> { @@ -117,11 +171,12 @@ impl GcsStorageClient { } } -// required by `CheckpointSyncer` +// Required by `CheckpointSyncer` impl fmt::Debug for GcsStorageClient { fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { - f.debug_struct("S3Storage") + f.debug_struct("GcsStorageClient") .field("bucket", &self.bucket) + .field("folder", &self.folder) .finish() } } @@ -129,6 +184,7 @@ impl fmt::Debug for GcsStorageClient { #[async_trait] impl CheckpointSyncer for GcsStorageClient { /// Read the highest index of this Syncer + #[instrument(skip(self))] async fn latest_index(&self) -> Result> { match self.inner.get_object(&self.bucket, LATEST_INDEX_KEY).await { Ok(data) => Ok(Some(serde_json::from_slice(data.as_ref())?)), @@ -144,15 +200,14 @@ impl CheckpointSyncer for GcsStorageClient { } /// Writes the highest index of this Syncer + #[instrument(skip(self, index))] async fn write_latest_index(&self, index: u32) -> Result<()> { - let d = serde_json::to_vec(&index)?; - self.inner - .insert_object(&self.bucket, LATEST_INDEX_KEY, d) - .await?; - Ok(()) + let data = serde_json::to_vec(&index)?; + self.upload_and_log(LATEST_INDEX_KEY, data).await } /// Update the latest index of this syncer if necessary + #[instrument(skip(self, index))] async fn update_latest_index(&self, index: u32) -> Result<()> { let curr = self.latest_index().await?.unwrap_or(0); if index > curr { @@ -162,6 +217,7 @@ impl CheckpointSyncer for GcsStorageClient { } /// Attempt to fetch the signed (checkpoint, messageId) tuple at this index + #[instrument(skip(self, index))] async fn fetch_checkpoint(&self, index: u32) -> Result> { match self .inner @@ -179,56 +235,64 @@ impl CheckpointSyncer for GcsStorageClient { } /// Write the signed (checkpoint, messageId) tuple to this syncer + #[instrument(skip(self, signed_checkpoint))] async fn write_checkpoint( &self, signed_checkpoint: &SignedCheckpointWithMessageId, ) -> Result<()> { - self.inner - .insert_object( - &self.bucket, - GcsStorageClient::get_checkpoint_key(signed_checkpoint.value.index), - serde_json::to_vec(signed_checkpoint)?, - ) - .await?; - Ok(()) + let object_name = Self::get_checkpoint_key(signed_checkpoint.value.index); + let data = serde_json::to_vec(signed_checkpoint)?; + self.upload_and_log(&object_name, data).await } /// Write the agent metadata to this syncer + #[instrument(skip(self, metadata))] async fn write_metadata(&self, metadata: &AgentMetadata) -> Result<()> { - let serialized_metadata = serde_json::to_string_pretty(metadata)?; - self.inner - .insert_object(&self.bucket, METADATA_KEY, serialized_metadata) - .await?; - Ok(()) + let object_name = self.object_path(METADATA_KEY); + let data = serde_json::to_string_pretty(metadata)?.into_bytes(); + self.upload_and_log(&object_name, data).await } /// Write the signed announcement to this syncer - async fn write_announcement(&self, signed_announcement: &SignedAnnouncement) -> Result<()> { - self.inner - .insert_object( - &self.bucket, - ANNOUNCEMENT_KEY, - serde_json::to_string(signed_announcement)?, - ) - .await?; - Ok(()) + #[instrument(skip(self, announcement))] + async fn write_announcement(&self, announcement: &SignedAnnouncement) -> Result<()> { + let object_name = self.object_path(ANNOUNCEMENT_KEY); + let data = serde_json::to_string(announcement)?.into_bytes(); + self.upload_and_log(&object_name, data).await } /// Return the announcement storage location for this syncer + #[instrument(skip(self))] fn announcement_location(&self) -> String { - format!("gs://{}/{}", &self.bucket, ANNOUNCEMENT_KEY) + let location = format!( + "gs://{}/{}", + &self.bucket, + self.object_path(ANNOUNCEMENT_KEY) + ); + info!("Announcement storage location: '{}'", location); + location } - async fn write_reorg_status(&self, reorged_event: &ReorgEvent) -> Result<()> { - let serialized_metadata = serde_json::to_string_pretty(reorged_event)?; - self.inner - .insert_object(&self.bucket, REORG_FLAG_KEY, serialized_metadata) - .await?; - Ok(()) + /// Write the reorg status to this syncer + #[instrument(skip(self, reorg_event))] + async fn write_reorg_status(&self, reorg_event: &ReorgEvent) -> Result<()> { + let object_name = REORG_FLAG_KEY; + let data = serde_json::to_string_pretty(reorg_event)?.into_bytes(); + self.upload_and_log(object_name, data).await } + /// Read the reorg status from this syncer + #[instrument(skip(self))] async fn reorg_status(&self) -> Result> { - Ok(None) + match self.inner.get_object(&self.bucket, REORG_FLAG_KEY).await { + Ok(data) => Ok(Some(serde_json::from_slice(data.as_ref())?)), + Err(e) => match e { + ObjectError::Failure(Error::HttpStatus(HttpStatusError(StatusCode::NOT_FOUND))) => { + Ok(None) + } + _ => bail!(e), + }, + } } } From 38bd1ae49f884358ee022b3488a25663cba80870 Mon Sep 17 00:00:00 2001 From: Danil Nemirovsky Date: Thu, 31 Oct 2024 12:41:30 +0000 Subject: [PATCH 2/3] feat: Scraper stores input data from Ethereum transactions (#4794) ### Description Scraper stores input data from Ethereum transactions ### Related issues - Fixes https://github.com/hyperlane-xyz/hyperlane-monorepo/issues/4778 ### Backward compatibility Yes ### Testing Manual run of Scraper against 1. local database 2. database restored from backup of production database Co-authored-by: Danil Nemirovsky <4614623+ameten@users.noreply.github.com> --- .../agents/scraper/migration/bin/generate_entities.rs | 4 ++-- .../src/m20230309_000003_create_table_transaction.rs | 9 +++++++++ rust/main/agents/scraper/src/db/generated/transaction.rs | 3 +++ rust/main/agents/scraper/src/db/txn.rs | 1 + .../hyperlane-cosmos/src/providers/cosmos/provider.rs | 1 + .../hyperlane-ethereum/src/rpc_clients/provider.rs | 7 +++++-- rust/main/chains/hyperlane-fuel/src/provider.rs | 1 + rust/main/chains/hyperlane-sealevel/src/provider.rs | 1 + rust/main/hyperlane-core/src/types/chain_data.rs | 2 ++ 9 files changed, 25 insertions(+), 4 deletions(-) diff --git a/rust/main/agents/scraper/migration/bin/generate_entities.rs b/rust/main/agents/scraper/migration/bin/generate_entities.rs index 5ec3e6c66..9481ece17 100644 --- a/rust/main/agents/scraper/migration/bin/generate_entities.rs +++ b/rust/main/agents/scraper/migration/bin/generate_entities.rs @@ -57,8 +57,8 @@ impl Drop for PostgresDockerContainer { async fn main() -> Result<(), DbErr> { assert_eq!( std::env::current_dir().unwrap().file_name().unwrap(), - "rust", - "Must run from the rust dir" + "main", + "Must run from the rust/main dir" ); let postgres = PostgresDockerContainer::start(); diff --git a/rust/main/agents/scraper/migration/src/m20230309_000003_create_table_transaction.rs b/rust/main/agents/scraper/migration/src/m20230309_000003_create_table_transaction.rs index daacfb6b7..283968c6d 100644 --- a/rust/main/agents/scraper/migration/src/m20230309_000003_create_table_transaction.rs +++ b/rust/main/agents/scraper/migration/src/m20230309_000003_create_table_transaction.rs @@ -52,6 +52,13 @@ impl MigrationTrait for Migration { .col(ColumnDef::new_with_type(Transaction::Recipient, Address).borrow_mut()) .col(ColumnDef::new_with_type(Transaction::GasUsed, Wei).not_null()) .col(ColumnDef::new_with_type(Transaction::CumulativeGasUsed, Wei).not_null()) + .col( + ColumnDef::new_with_type( + Transaction::RawInputData, + ColumnType::Binary(BlobSize::Blob(None)), + ) + .borrow_mut(), + ) .foreign_key( ForeignKey::create() .from_col(Transaction::BlockId) @@ -128,4 +135,6 @@ pub enum Transaction { GasUsed, /// Cumulative gas used within the block after this was executed CumulativeGasUsed, + /// Raw input data from Ethereum transaction + RawInputData, } diff --git a/rust/main/agents/scraper/src/db/generated/transaction.rs b/rust/main/agents/scraper/src/db/generated/transaction.rs index 216545b37..4f1139c8c 100644 --- a/rust/main/agents/scraper/src/db/generated/transaction.rs +++ b/rust/main/agents/scraper/src/db/generated/transaction.rs @@ -27,6 +27,7 @@ pub struct Model { pub recipient: Option>, pub gas_used: BigDecimal, pub cumulative_gas_used: BigDecimal, + pub raw_input_data: Option>, } #[derive(Copy, Clone, Debug, EnumIter, DeriveColumn)] @@ -45,6 +46,7 @@ pub enum Column { Recipient, GasUsed, CumulativeGasUsed, + RawInputData, } #[derive(Copy, Clone, Debug, EnumIter, DerivePrimaryKey)] @@ -85,6 +87,7 @@ impl ColumnTrait for Column { Self::Recipient => ColumnType::Binary(BlobSize::Blob(None)).def().null(), Self::GasUsed => ColumnType::Decimal(Some((78u32, 0u32))).def(), Self::CumulativeGasUsed => ColumnType::Decimal(Some((78u32, 0u32))).def(), + Self::RawInputData => ColumnType::Binary(BlobSize::Blob(None)).def().null(), } } } diff --git a/rust/main/agents/scraper/src/db/txn.rs b/rust/main/agents/scraper/src/db/txn.rs index d5cec03dc..c482569cb 100644 --- a/rust/main/agents/scraper/src/db/txn.rs +++ b/rust/main/agents/scraper/src/db/txn.rs @@ -94,6 +94,7 @@ impl ScraperDb { recipient: Set(txn.recipient.as_ref().map(address_to_bytes)), max_fee_per_gas: Set(txn.max_fee_per_gas.map(u256_to_decimal)), cumulative_gas_used: Set(u256_to_decimal(receipt.cumulative_gas_used)), + raw_input_data: Set(txn.raw_input_data.clone()), }) }) .collect::>>()?; diff --git a/rust/main/chains/hyperlane-cosmos/src/providers/cosmos/provider.rs b/rust/main/chains/hyperlane-cosmos/src/providers/cosmos/provider.rs index 2ab0388be..62076974b 100644 --- a/rust/main/chains/hyperlane-cosmos/src/providers/cosmos/provider.rs +++ b/rust/main/chains/hyperlane-cosmos/src/providers/cosmos/provider.rs @@ -441,6 +441,7 @@ impl HyperlaneProvider for CosmosProvider { cumulative_gas_used: U256::from(response.tx_result.gas_used), effective_gas_price: Some(gas_price), }), + raw_input_data: None, }; Ok(tx_info) diff --git a/rust/main/chains/hyperlane-ethereum/src/rpc_clients/provider.rs b/rust/main/chains/hyperlane-ethereum/src/rpc_clients/provider.rs index d89cf9f43..44a9f99e7 100644 --- a/rust/main/chains/hyperlane-ethereum/src/rpc_clients/provider.rs +++ b/rust/main/chains/hyperlane-ethereum/src/rpc_clients/provider.rs @@ -106,7 +106,7 @@ where }) .transpose()?; - Ok(TxnInfo { + let txn_info = TxnInfo { hash: *hash, max_fee_per_gas: txn.max_fee_per_gas.map(Into::into), max_priority_fee_per_gas: txn.max_priority_fee_per_gas.map(Into::into), @@ -116,7 +116,10 @@ where sender: txn.from.into(), recipient: txn.to.map(Into::into), receipt, - }) + raw_input_data: Some(txn.input.to_vec()), + }; + + Ok(txn_info) } #[instrument(err, skip(self))] diff --git a/rust/main/chains/hyperlane-fuel/src/provider.rs b/rust/main/chains/hyperlane-fuel/src/provider.rs index cdd32650e..f4c6ee134 100644 --- a/rust/main/chains/hyperlane-fuel/src/provider.rs +++ b/rust/main/chains/hyperlane-fuel/src/provider.rs @@ -381,6 +381,7 @@ impl HyperlaneProvider for FuelProvider { gas_price: Some(gas_price.into()), recipient, receipt: None, + raw_input_data: None, }) } None => Err(ChainCommunicationError::CustomError(format!( diff --git a/rust/main/chains/hyperlane-sealevel/src/provider.rs b/rust/main/chains/hyperlane-sealevel/src/provider.rs index a0c5a41ea..1b03872ed 100644 --- a/rust/main/chains/hyperlane-sealevel/src/provider.rs +++ b/rust/main/chains/hyperlane-sealevel/src/provider.rs @@ -116,6 +116,7 @@ impl HyperlaneProvider for SealevelProvider { sender: Default::default(), recipient: None, receipt: Some(receipt), + raw_input_data: None, }) } diff --git a/rust/main/hyperlane-core/src/types/chain_data.rs b/rust/main/hyperlane-core/src/types/chain_data.rs index 219f6989d..b4c80c846 100644 --- a/rust/main/hyperlane-core/src/types/chain_data.rs +++ b/rust/main/hyperlane-core/src/types/chain_data.rs @@ -49,6 +49,8 @@ pub struct TxnInfo { /// If the txn has been processed, we can also report some additional /// information. pub receipt: Option, + /// Raw input data of a transaction + pub raw_input_data: Option>, } /// Information about the execution of a transaction. From fa066909142099d61d526956e95814fbc891a4e2 Mon Sep 17 00:00:00 2001 From: Danil Nemirovsky Date: Thu, 31 Oct 2024 16:09:35 +0000 Subject: [PATCH 3/3] feat: Upgrade Scraper so that it stores input data for Ethereum transactions (#4796) ### Description Upgrade Scraper so that it stores input data for Ethereum transactions ### Related issues - Fixes https://github.com/hyperlane-xyz/hyperlane-monorepo/issues/4778 ### Backward compatibility Yes ### Testing Manual run of Scraper with local database Manual run of Scraper with db restored from production backup Co-authored-by: Danil Nemirovsky <4614623+ameten@users.noreply.github.com> --- typescript/infra/config/environments/mainnet3/agent.ts | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/typescript/infra/config/environments/mainnet3/agent.ts b/typescript/infra/config/environments/mainnet3/agent.ts index aa79f5e43..456e6605c 100644 --- a/typescript/infra/config/environments/mainnet3/agent.ts +++ b/typescript/infra/config/environments/mainnet3/agent.ts @@ -484,7 +484,7 @@ const hyperlane: RootAgentConfig = { rpcConsensusType: RpcConsensusType.Fallback, docker: { repo, - tag: '45399a3-20241025-210128', + tag: '38bd1ae-20241031-125333', }, resources: scraperResources, },