|
|
|
@ -26,43 +26,54 @@ pub enum CheckpointSyncerConf { |
|
|
|
|
}, |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
impl CheckpointSyncerConf { |
|
|
|
|
/// Create a CheckpointSyncerConf from a storage location string
|
|
|
|
|
pub fn from_storage_location(storage_location: &str) -> Option<Self> { |
|
|
|
|
let s3_prefix = "s3://"; |
|
|
|
|
let local_prefix = "file://"; |
|
|
|
|
if let Some(location) = storage_location.strip_prefix(s3_prefix) { |
|
|
|
|
let pieces: Vec<&str> = location.split('/').collect(); |
|
|
|
|
if pieces.len() == 2 { |
|
|
|
|
Some(CheckpointSyncerConf::S3 { |
|
|
|
|
bucket: pieces[0].into(), |
|
|
|
|
region: pieces[1].into(), |
|
|
|
|
/// Error for parsing announced storage locations
|
|
|
|
|
#[derive(Debug, PartialEq, Eq)] |
|
|
|
|
pub struct ParseStorageLocationError; |
|
|
|
|
|
|
|
|
|
impl FromStr for CheckpointSyncerConf { |
|
|
|
|
type Err = ParseStorageLocationError; |
|
|
|
|
|
|
|
|
|
fn from_str(s: &str) -> Result<Self, Self::Err> { |
|
|
|
|
let [prefix, suffix]: [&str; 2] = s |
|
|
|
|
.split("://") |
|
|
|
|
.collect::<Vec<_>>() |
|
|
|
|
.try_into() |
|
|
|
|
.map_err(|_| ParseStorageLocationError)?; |
|
|
|
|
|
|
|
|
|
match prefix { |
|
|
|
|
"s3" => { |
|
|
|
|
let [bucket, region]: [&str; 2] = suffix |
|
|
|
|
.split('/') |
|
|
|
|
.collect::<Vec<_>>() |
|
|
|
|
.try_into() |
|
|
|
|
.map_err(|_| ParseStorageLocationError)?; |
|
|
|
|
Ok(CheckpointSyncerConf::S3 { |
|
|
|
|
bucket: bucket.into(), |
|
|
|
|
region: region.into(), |
|
|
|
|
}) |
|
|
|
|
} else { |
|
|
|
|
None |
|
|
|
|
} |
|
|
|
|
} else { |
|
|
|
|
storage_location |
|
|
|
|
.strip_prefix(local_prefix) |
|
|
|
|
.map(|path| CheckpointSyncerConf::LocalStorage { path: path.into() }) |
|
|
|
|
"file" => Ok(CheckpointSyncerConf::LocalStorage { |
|
|
|
|
path: suffix.into(), |
|
|
|
|
}), |
|
|
|
|
_ => Err(ParseStorageLocationError), |
|
|
|
|
} |
|
|
|
|
} |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
impl CheckpointSyncerConf { |
|
|
|
|
/// Turn conf info a Checkpoint Syncer
|
|
|
|
|
pub fn build( |
|
|
|
|
&self, |
|
|
|
|
latest_index_gauge: Option<IntGauge>, |
|
|
|
|
) -> Result<Box<dyn CheckpointSyncer>, Report> { |
|
|
|
|
match self { |
|
|
|
|
Ok(match self { |
|
|
|
|
CheckpointSyncerConf::LocalStorage { path } => { |
|
|
|
|
Ok(Box::new(LocalStorage::new(path, latest_index_gauge))) |
|
|
|
|
Box::new(LocalStorage::new(path, latest_index_gauge)) |
|
|
|
|
} |
|
|
|
|
CheckpointSyncerConf::S3 { bucket, region } => Ok(Box::new(S3Storage::new( |
|
|
|
|
bucket, |
|
|
|
|
region.parse().expect("invalid s3 region"), |
|
|
|
|
latest_index_gauge, |
|
|
|
|
))), |
|
|
|
|
} |
|
|
|
|
CheckpointSyncerConf::S3 { bucket, region } => { |
|
|
|
|
Box::new(S3Storage::new(bucket, region.parse()?, latest_index_gauge)) |
|
|
|
|
} |
|
|
|
|
}) |
|
|
|
|
} |
|
|
|
|
} |
|
|
|
|
|
|
|
|
@ -85,7 +96,11 @@ impl MultisigCheckpointSyncerConf { |
|
|
|
|
for (key, value) in self.checkpointsyncers.iter() { |
|
|
|
|
let gauge = |
|
|
|
|
validator_checkpoint_index.with_label_values(&[origin, &key.to_lowercase()]); |
|
|
|
|
checkpoint_syncers.insert(Address::from_str(key)?, value.build(Some(gauge))?.into()); |
|
|
|
|
if let Ok(conf) = value.build(Some(gauge)) { |
|
|
|
|
checkpoint_syncers.insert(Address::from_str(key)?, conf.into()); |
|
|
|
|
} else { |
|
|
|
|
continue; |
|
|
|
|
} |
|
|
|
|
} |
|
|
|
|
Ok(MultisigCheckpointSyncer::new(checkpoint_syncers)) |
|
|
|
|
} |
|
|
|
|