@ -4,8 +4,12 @@ use abacus_core::SignedCheckpoint;
use async_trait ::async_trait ;
use eyre ::{ bail , Result } ;
use futures_util ::TryStreamExt ;
use once_cell ::sync ::OnceCell ;
use prometheus ::IntGauge ;
use rusoto_core ::{ credential ::EnvironmentProvider , HttpClient , Region , RusotoError } ;
use rusoto_core ::{
credential ::{ Anonymous , AwsCredentials , EnvironmentProvider , StaticProvider } ,
HttpClient , Region , RusotoError ,
} ;
use rusoto_s3 ::{ GetObjectError , GetObjectRequest , PutObjectRequest , S3Client , S3 } ;
use crate ::CheckpointSyncer ;
@ -13,12 +17,15 @@ use crate::CheckpointSyncer;
#[ derive(Clone) ]
/// Type for reading/writing to S3
pub struct S3Storage {
/// bucket
/// The name of the bucket.
bucket : String ,
/// region
/// The region of the bucket.
region : Region ,
/// client
client : S3Client ,
/// A client with AWS credentials.
authenticated_client : OnceCell < S3Client > ,
/// A client without credentials for anonymous requests.
anonymous_client : OnceCell < S3Client > ,
/// The latest seen signed checkpoint index.
latest_index : Option < IntGauge > ,
}
@ -34,16 +41,11 @@ impl fmt::Debug for S3Storage {
impl S3Storage {
/// constructor
pub fn new ( bucket : & str , region : Region , latest_index : Option < IntGauge > ) -> Self {
let client = S3Client ::new_with (
HttpClient ::new ( ) . unwrap ( ) ,
EnvironmentProvider ::default ( ) ,
region . clone ( ) ,
) ;
Self {
bucket : bucket . to_owned ( ) ,
region ,
client ,
authenticated_client : OnceCell ::new ( ) ,
anonymous_client : OnceCell ::new ( ) ,
latest_index ,
}
}
@ -56,17 +58,18 @@ impl S3Storage {
content_type : Some ( "application/json" . to_owned ( ) ) ,
.. Default ::default ( )
} ;
self . client . put_object ( req ) . await ? ;
self . authenticated_ client( ) . put_object ( req ) . await ? ;
Ok ( ( ) )
}
async fn read_from_bucket ( & self , key : String ) -> Result < Option < Vec < u8 > > > {
/// Uses an anonymous client. This should only be used for publicly accessible buckets.
async fn anonymously_read_from_bucket ( & self , key : String ) -> Result < Option < Vec < u8 > > > {
let req = GetObjectRequest {
key ,
bucket : self . bucket . clone ( ) ,
.. Default ::default ( )
} ;
match self . client . get_object ( req ) . await {
match self . anonymous_ client( ) . get_object ( req ) . await {
Ok ( res ) = > match res . body {
Some ( body ) = > Ok ( Some ( body . map_ok ( | b | b . to_vec ( ) ) . try_concat ( ) . await ? ) ) ,
None = > Ok ( None ) ,
@ -76,6 +79,37 @@ impl S3Storage {
}
}
/// Gets an authenticated S3Client, creating it if it doesn't already exist.
fn authenticated_client ( & self ) -> & S3Client {
self . authenticated_client . get_or_init ( | | {
S3Client ::new_with (
HttpClient ::new ( ) . unwrap ( ) ,
EnvironmentProvider ::default ( ) ,
self . region . clone ( ) ,
)
} )
}
/// Gets an anonymous S3Client, creating it if it doesn't already exist.
/// An anonymous client doesn't have AWS credentials and will not sign S3
/// requests with any credentials.
/// We've experienced an inability to make GetObjectRequests to public
/// S3 buckets when signing with credentials from an AWS account not from the
/// S3 bucket's AWS account.
fn anonymous_client ( & self ) -> & S3Client {
self . anonymous_client . get_or_init ( | | {
// By default, these credentials are anonymous, see https://docs.rs/rusoto_credential/latest/rusoto_credential/struct.AwsCredentials.html#anonymous-example
let credentials = AwsCredentials ::default ( ) ;
assert! ( credentials . is_anonymous ( ) , "AWS credentials not anonymous" ) ;
S3Client ::new_with (
HttpClient ::new ( ) . unwrap ( ) ,
StaticProvider ::from ( credentials ) ,
self . region . clone ( ) ,
)
} )
}
fn checkpoint_key ( index : u32 ) -> String {
format! ( "checkpoint_{}.json" , index )
}
@ -88,7 +122,7 @@ impl S3Storage {
impl CheckpointSyncer for S3Storage {
async fn latest_index ( & self ) -> Result < Option < u32 > > {
let ret = self
. read_from_bucket ( S3Storage ::index_key ( ) )
. anonymously_ read_from_bucket( S3Storage ::index_key ( ) )
. await ?
. map ( | data | serde_json ::from_slice ( & data ) )
. transpose ( )
@ -102,13 +136,15 @@ impl CheckpointSyncer for S3Storage {
ret
}
async fn fetch_checkpoint ( & self , index : u32 ) -> Result < Option < SignedCheckpoint > > {
self . read_from_bucket ( S3Storage ::checkpoint_key ( index ) )
self . anonymously_ read_from_bucket( S3Storage ::checkpoint_key ( index ) )
. await ?
. map ( | data | serde_json ::from_slice ( & data ) )
. transpose ( )
. map_err ( Into ::into )
}
async fn write_checkpoint ( & self , signed_checkpoint : SignedCheckpoint ) -> Result < ( ) > {
let serialized_checkpoint = serde_json ::to_string_pretty ( & signed_checkpoint ) ? ;
self . write_to_bucket (