From 0b7d0aed9c034a9e8d501cd7816ced3a7e07b587 Mon Sep 17 00:00:00 2001 From: Alessandro Passaro Date: Thu, 26 Sep 2024 18:20:21 +0100 Subject: [PATCH] Initial implementation of a shared cache on S3 Express (#1032) * Make cache block size user configurable (default 1024 KiB) Signed-off-by: Alessandro Passaro * Require Clone on ObjectClient Signed-off-by: Alessandro Passaro * Implement initial draft of shared cache in Express Signed-off-by: Alessandro Passaro * Encode cache version and block size into keys Signed-off-by: Alessandro Passaro * Decouple DataCacheError from io::Error Signed-off-by: Alessandro Passaro * Improve error handling Signed-off-by: Alessandro Passaro * Add unit test Signed-off-by: Alessandro Passaro * Allow sharing the cache when mounting with different prefixes Signed-off-by: Alessandro Passaro * Fix flow-control window Signed-off-by: Alessandro Passaro --------- Signed-off-by: Alessandro Passaro --- mountpoint-s3/Cargo.toml | 2 + mountpoint-s3/src/bin/mock-mount-s3.rs | 6 +- mountpoint-s3/src/cli.rs | 98 +++++-- mountpoint-s3/src/data_cache.rs | 4 +- .../src/data_cache/disk_data_cache.rs | 21 +- .../src/data_cache/express_data_cache.rs | 244 ++++++++++++++++++ mountpoint-s3/src/fs.rs | 18 +- mountpoint-s3/src/fuse.rs | 6 +- mountpoint-s3/src/prefetch.rs | 30 +-- mountpoint-s3/src/upload.rs | 6 +- mountpoint-s3/tests/common/fuse.rs | 2 +- mountpoint-s3/tests/common/mod.rs | 2 +- 12 files changed, 373 insertions(+), 66 deletions(-) create mode 100644 mountpoint-s3/src/data_cache/express_data_cache.rs diff --git a/mountpoint-s3/Cargo.toml b/mountpoint-s3/Cargo.toml index ca4b82f2c..9e28aaca5 100644 --- a/mountpoint-s3/Cargo.toml +++ b/mountpoint-s3/Cargo.toml @@ -80,6 +80,8 @@ built = { version = "0.7.1", features = ["git2"] } [features] # Unreleased feature flags +express_cache = ["block_size"] +block_size = [] event_log = [] # Features for choosing tests fips_tests = [] diff --git a/mountpoint-s3/src/bin/mock-mount-s3.rs b/mountpoint-s3/src/bin/mock-mount-s3.rs index fb541a3e1..f7bd62d04 100644 --- a/mountpoint-s3/src/bin/mock-mount-s3.rs +++ b/mountpoint-s3/src/bin/mock-mount-s3.rs @@ -10,6 +10,8 @@ //! //! This binary is intended only for use in testing and development of Mountpoint. +use std::sync::Arc; + use anyhow::anyhow; use futures::executor::ThreadPool; @@ -23,7 +25,7 @@ fn main() -> anyhow::Result<()> { mountpoint_s3::cli::main(create_mock_client) } -fn create_mock_client(args: &CliArgs) -> anyhow::Result<(ThroughputMockClient, ThreadPool, S3Personality)> { +fn create_mock_client(args: &CliArgs) -> anyhow::Result<(Arc, ThreadPool, S3Personality)> { // An extra little safety thing to make sure we can distinguish the real mount-s3 binary and // this one. Buckets starting with "sthree-" are always invalid against real S3: // https://docs.aws.amazon.com/AmazonS3/latest/userguide/bucketnamingrules.html @@ -81,5 +83,5 @@ fn create_mock_client(args: &CliArgs) -> anyhow::Result<(ThroughputMockClient, T MockObject::from_bytes(b"hello world", ETag::for_tests()), ); - Ok((client, runtime, s3_personality)) + Ok((Arc::new(client), runtime, s3_personality)) } diff --git a/mountpoint-s3/src/cli.rs b/mountpoint-s3/src/cli.rs index 1d4489d80..79132536e 100644 --- a/mountpoint-s3/src/cli.rs +++ b/mountpoint-s3/src/cli.rs @@ -26,7 +26,7 @@ use nix::unistd::ForkResult; use regex::Regex; use crate::build_info; -use crate::data_cache::{CacheLimit, DiskDataCache, DiskDataCacheConfig, ManagedCacheDir}; +use crate::data_cache::{CacheLimit, DiskDataCache, DiskDataCacheConfig, ExpressDataCache, ManagedCacheDir}; use crate::fs::{CacheConfig, S3FilesystemConfig, ServerSideEncryption, TimeToLive}; use crate::fuse::session::FuseSession; use crate::fuse::S3FuseFilesystem; @@ -261,6 +261,7 @@ pub struct CliArgs { help = "Enable caching of object content to the given directory and set metadata TTL to 60 seconds", help_heading = CACHING_OPTIONS_HEADER, value_name = "DIRECTORY", + group = "cache_group", )] pub cache: Option, @@ -282,6 +283,27 @@ pub struct CliArgs { )] pub max_cache_size: Option, + #[cfg(feature = "block_size")] + #[clap( + long, + help = "Size of a cache block in KiB [Default: 1024 (1 MiB) for disk cache, 512 (512 KiB) for S3 Express cache]", + help_heading = CACHING_OPTIONS_HEADER, + value_name = "KiB", + requires = "cache_group" + )] + pub cache_block_size: Option, + + #[cfg(feature = "express_cache")] + #[clap( + long, + help = "Enable caching of object content to the specified bucket on S3 Express One Zone (same region only)", + help_heading = CACHING_OPTIONS_HEADER, + value_name = "BUCKET", + value_parser = parse_bucket_name, + group = "cache_group", + )] + pub cache_express: Option, + #[clap( long, help = "Configure a string to be prepended to the 'User-Agent' HTTP request header for all S3 requests", @@ -384,6 +406,25 @@ impl CliArgs { self.prefix.as_ref().cloned().unwrap_or_default() } + fn cache_block_size_in_bytes(&self) -> u64 { + #[cfg(feature = "block_size")] + if let Some(kib) = self.cache_block_size { + return kib * 1024; + } + if self.cache_express_bucket_name().is_some() { + return 512 * 1024; // 512 KiB block size - default for express cache + } + 1024 * 1024 // 1 MiB block size - default for disk cache + } + + fn cache_express_bucket_name(&self) -> Option<&str> { + #[cfg(feature = "express_cache")] + if let Some(bucket_name) = &self.cache_express { + return Some(bucket_name); + } + None + } + fn logging_config(&self) -> LoggingConfig { let default_filter = if self.no_log { String::from("off") @@ -450,7 +491,7 @@ impl CliArgs { pub fn main(client_builder: ClientBuilder) -> anyhow::Result<()> where ClientBuilder: FnOnce(&CliArgs) -> anyhow::Result<(Client, Runtime, S3Personality)>, - Client: ObjectClient + Send + Sync + 'static, + Client: ObjectClient + Clone + Send + Sync + 'static, Runtime: Spawn + Clone + Send + Sync + 'static, { let args = CliArgs::parse(); @@ -699,7 +740,7 @@ pub fn create_s3_client(args: &CliArgs) -> anyhow::Result<(S3CrtClient, EventLoo fn mount(args: CliArgs, client_builder: ClientBuilder) -> anyhow::Result where ClientBuilder: FnOnce(&CliArgs) -> anyhow::Result<(Client, Runtime, S3Personality)>, - Client: ObjectClient + Send + Sync + 'static, + Client: ObjectClient + Clone + Send + Sync + 'static, Runtime: Spawn + Clone + Send + Sync + 'static, { tracing::info!("mount-s3 {}", build_info::FULL_VERSION); @@ -728,11 +769,11 @@ where if let Some(file_mode) = args.file_mode { filesystem_config.file_mode = file_mode; } - filesystem_config.storage_class = args.storage_class; + filesystem_config.storage_class = args.storage_class.clone(); filesystem_config.allow_delete = args.allow_delete; filesystem_config.allow_overwrite = args.allow_overwrite; filesystem_config.s3_personality = s3_personality; - filesystem_config.server_side_encryption = ServerSideEncryption::new(args.sse, args.sse_kms_key_id); + filesystem_config.server_side_encryption = ServerSideEncryption::new(args.sse.clone(), args.sse_kms_key_id.clone()); // Written in this awkward way to force us to update it if we add new checksum types filesystem_config.use_upload_checksums = match args.upload_checksums { @@ -747,7 +788,7 @@ where let prefetcher_config = Default::default(); let mut metadata_cache_ttl = args.metadata_ttl.unwrap_or_else(|| { - if args.cache.is_some() { + if args.cache.is_some() || args.cache_express_bucket_name().is_some() { // When the data cache is enabled, use 1min as metadata-ttl. TimeToLive::Duration(Duration::from_secs(60)) } else { @@ -771,20 +812,20 @@ where tracing::trace!("using metadata TTL setting {metadata_cache_ttl:?}"); filesystem_config.cache_config = CacheConfig::new(metadata_cache_ttl); - if let Some(path) = args.cache { - let cache_config = match args.max_cache_size { + if let Some(path) = &args.cache { + let cache_limit = match args.max_cache_size { // Fallback to no data cache. Some(0) => None, - Some(max_size_in_mib) => Some(DiskDataCacheConfig { - limit: CacheLimit::TotalSize { - max_size: (max_size_in_mib * 1024 * 1024) as usize, - }, - ..Default::default() + Some(max_size_in_mib) => Some(CacheLimit::TotalSize { + max_size: (max_size_in_mib * 1024 * 1024) as usize, }), - None => Some(DiskDataCacheConfig::default()), + None => Some(CacheLimit::default()), }; - - if let Some(cache_config) = cache_config { + if let Some(cache_limit) = cache_limit { + let cache_config = DiskDataCacheConfig { + block_size: args.cache_block_size_in_bytes(), + limit: cache_limit, + }; let cache_key = env_unstable_cache_key(); let managed_cache_dir = ManagedCacheDir::new_from_parent_with_cache_key(path, cache_key) .context("failed to create cache directory")?; @@ -809,6 +850,29 @@ where } } + if let Some(express_bucket_name) = args.cache_express_bucket_name() { + // The cache can be shared across instances mounting the same bucket (including with different prefixes) + let source_description = &args.bucket_name; + let cache = ExpressDataCache::new( + express_bucket_name, + client.clone(), + source_description, + args.cache_block_size_in_bytes(), + ); + let prefetcher = caching_prefetch(cache, runtime, prefetcher_config); + let fuse_session = create_filesystem( + client, + prefetcher, + &args.bucket_name, + &args.prefix.unwrap_or_default(), + filesystem_config, + fuse_config, + &bucket_description, + )?; + + return Ok(fuse_session); + }; + let prefetcher = default_prefetch(runtime, prefetcher_config); create_filesystem( client, @@ -831,7 +895,7 @@ fn create_filesystem( bucket_description: &str, ) -> anyhow::Result where - Client: ObjectClient + Send + Sync + 'static, + Client: ObjectClient + Clone + Send + Sync + 'static, Prefetcher: Prefetch + Send + Sync + 'static, { tracing::trace!(?filesystem_config, "creating file system"); diff --git a/mountpoint-s3/src/data_cache.rs b/mountpoint-s3/src/data_cache.rs index e3685e0ba..1654a7cf9 100644 --- a/mountpoint-s3/src/data_cache.rs +++ b/mountpoint-s3/src/data_cache.rs @@ -6,6 +6,7 @@ mod cache_directory; mod disk_data_cache; +mod express_data_cache; mod in_memory_data_cache; use async_trait::async_trait; @@ -14,6 +15,7 @@ use thiserror::Error; pub use crate::checksums::ChecksummedBytes; pub use crate::data_cache::cache_directory::ManagedCacheDir; pub use crate::data_cache::disk_data_cache::{CacheLimit, DiskDataCache, DiskDataCacheConfig}; +pub use crate::data_cache::express_data_cache::ExpressDataCache; pub use crate::data_cache::in_memory_data_cache::InMemoryDataCache; use crate::object::ObjectId; @@ -25,7 +27,7 @@ pub type BlockIndex = u64; #[derive(Debug, Error)] pub enum DataCacheError { #[error("IO error when reading or writing from cache: {0}")] - IoFailure(#[from] std::io::Error), + IoFailure(#[source] anyhow::Error), #[error("Block content was not valid/readable")] InvalidBlockContent, #[error("Block offset does not match block index")] diff --git a/mountpoint-s3/src/data_cache/disk_data_cache.rs b/mountpoint-s3/src/data_cache/disk_data_cache.rs index cca4406a4..c136c7aff 100644 --- a/mountpoint-s3/src/data_cache/disk_data_cache.rs +++ b/mountpoint-s3/src/data_cache/disk_data_cache.rs @@ -45,15 +45,6 @@ pub struct DiskDataCacheConfig { pub limit: CacheLimit, } -impl Default for DiskDataCacheConfig { - fn default() -> Self { - Self { - block_size: 1024 * 1024, // 1 MiB block size - limit: CacheLimit::AvailableSpace { min_ratio: 0.05 }, // Preserve 5% available space - } - } -} - /// Limit the cache size. #[derive(Debug)] pub enum CacheLimit { @@ -62,6 +53,12 @@ pub enum CacheLimit { AvailableSpace { min_ratio: f64 }, } +impl Default for CacheLimit { + fn default() -> Self { + CacheLimit::AvailableSpace { min_ratio: 0.05 } // Preserve 5% available space + } +} + /// Describes additional information about the data stored in the block. /// /// It should be written alongside the block's data @@ -203,6 +200,12 @@ impl DiskBlock { } } +impl From for DataCacheError { + fn from(e: std::io::Error) -> Self { + DataCacheError::IoFailure(e.into()) + } +} + impl DiskDataCache { /// Create a new instance of an [DiskDataCache] with the specified configuration. pub fn new(cache_directory: PathBuf, config: DiskDataCacheConfig) -> Self { diff --git a/mountpoint-s3/src/data_cache/express_data_cache.rs b/mountpoint-s3/src/data_cache/express_data_cache.rs new file mode 100644 index 000000000..4ee5f13eb --- /dev/null +++ b/mountpoint-s3/src/data_cache/express_data_cache.rs @@ -0,0 +1,244 @@ +use crate::object::ObjectId; + +use super::{BlockIndex, ChecksummedBytes, DataCache, DataCacheError, DataCacheResult}; + +use async_trait::async_trait; +use bytes::BytesMut; +use futures::{pin_mut, StreamExt}; +use mountpoint_s3_client::error::{GetObjectError, ObjectClientError}; +use mountpoint_s3_client::types::{GetObjectRequest, PutObjectParams}; +use mountpoint_s3_client::{ObjectClient, PutObjectRequest}; +use sha2::{Digest, Sha256}; +use tracing::Instrument; + +const CACHE_VERSION: &str = "V1"; + +/// A data cache on S3 Express One Zone that can be shared across Mountpoint instances. +pub struct ExpressDataCache { + client: Client, + bucket_name: String, + prefix: String, + block_size: u64, +} + +impl From> for DataCacheError +where + S: std::error::Error + Send + Sync + 'static, + C: std::error::Error + Send + Sync + 'static, +{ + fn from(e: ObjectClientError) -> Self { + DataCacheError::IoFailure(e.into()) + } +} + +impl ExpressDataCache +where + Client: ObjectClient + Send + Sync + 'static, +{ + /// Create a new instance. + /// + /// TODO: consider adding some validation of the bucket. + pub fn new(bucket_name: &str, client: Client, source_description: &str, block_size: u64) -> Self { + let prefix = hex::encode( + Sha256::new() + .chain_update(CACHE_VERSION.as_bytes()) + .chain_update(block_size.to_be_bytes()) + .chain_update(source_description.as_bytes()) + .finalize(), + ); + Self { + client, + bucket_name: bucket_name.to_owned(), + prefix, + block_size, + } + } +} + +#[async_trait] +impl DataCache for ExpressDataCache +where + Client: ObjectClient + Send + Sync + 'static, +{ + async fn get_block( + &self, + cache_key: &ObjectId, + block_idx: BlockIndex, + block_offset: u64, + ) -> DataCacheResult> { + if block_offset != block_idx * self.block_size { + return Err(DataCacheError::InvalidBlockOffset); + } + + let object_key = block_key(&self.prefix, cache_key, block_idx); + let result = match self.client.get_object(&self.bucket_name, &object_key, None, None).await { + Ok(result) => result, + Err(ObjectClientError::ServiceError(GetObjectError::NoSuchKey)) => return Ok(None), + Err(e) => return Err(e.into()), + }; + + pin_mut!(result); + + // TODO: optimize for the common case of a single chunk. + let mut buffer = BytesMut::default(); + while let Some(chunk) = result.next().await { + let (offset, body) = chunk?; + if offset != buffer.len() as u64 { + return Err(DataCacheError::InvalidBlockOffset); + } + buffer.extend_from_slice(&body); + + // Ensure the flow-control window is large enough. + // TODO: review if/when we add a header to the block. + result.as_mut().increment_read_window(self.block_size as usize); + } + let buffer = buffer.freeze(); + DataCacheResult::Ok(Some(buffer.into())) + } + + async fn put_block( + &self, + cache_key: ObjectId, + block_idx: BlockIndex, + block_offset: u64, + bytes: ChecksummedBytes, + ) -> DataCacheResult<()> { + if block_offset != block_idx * self.block_size { + return Err(DataCacheError::InvalidBlockOffset); + } + + let object_key = block_key(&self.prefix, &cache_key, block_idx); + + // TODO: ideally we should use a simple Put rather than MPU. + let params = PutObjectParams::new(); + let mut req = self + .client + .put_object(&self.bucket_name, &object_key, ¶ms) + .in_current_span() + .await?; + let (data, _crc) = bytes.into_inner().map_err(|_| DataCacheError::InvalidBlockContent)?; + req.write(&data).await?; + req.complete().await?; + + DataCacheResult::Ok(()) + } + + fn block_size(&self) -> u64 { + self.block_size + } +} + +fn block_key(prefix: &str, cache_key: &ObjectId, block_idx: BlockIndex) -> String { + let hashed_cache_key = hex::encode( + Sha256::new() + .chain_update(cache_key.key()) + .chain_update(cache_key.etag().as_str()) + .finalize(), + ); + format!("{}/{}/{:010}", prefix, hashed_cache_key, block_idx) +} + +#[cfg(test)] +mod tests { + use super::*; + use crate::checksums::ChecksummedBytes; + use crate::sync::Arc; + + use test_case::test_case; + + use mountpoint_s3_client::mock_client::{MockClient, MockClientConfig}; + use mountpoint_s3_client::types::ETag; + + #[test_case(1024, 512 * 1024; "block_size smaller than part_size")] + #[test_case(8 * 1024 * 1024, 512 * 1024; "block_size larger than part_size")] + #[tokio::test] + async fn test_put_get(part_size: usize, block_size: u64) { + let bucket = "test-bucket"; + let config = MockClientConfig { + bucket: bucket.to_string(), + part_size, + enable_backpressure: true, + initial_read_window_size: part_size, + ..Default::default() + }; + let client = Arc::new(MockClient::new(config)); + + let cache = ExpressDataCache::new(bucket, client, "unique source description", block_size); + + let data_1 = ChecksummedBytes::new("Foo".into()); + let data_2 = ChecksummedBytes::new("Bar".into()); + let data_3 = ChecksummedBytes::new("a".repeat(block_size as usize).into()); + + let cache_key_1 = ObjectId::new("a".into(), ETag::for_tests()); + let cache_key_2 = ObjectId::new( + "longkey_".repeat(128), // 1024 bytes, max length for S3 keys + ETag::for_tests(), + ); + + let block = cache + .get_block(&cache_key_1, 0, 0) + .await + .expect("cache should be accessible"); + assert!( + block.is_none(), + "no entry should be available to return but got {:?}", + block, + ); + + // PUT and GET, OK? + cache + .put_block(cache_key_1.clone(), 0, 0, data_1.clone()) + .await + .expect("cache should be accessible"); + let entry = cache + .get_block(&cache_key_1, 0, 0) + .await + .expect("cache should be accessible") + .expect("cache entry should be returned"); + assert_eq!( + data_1, entry, + "cache entry returned should match original bytes after put" + ); + + // PUT AND GET block for a second key, OK? + cache + .put_block(cache_key_2.clone(), 0, 0, data_2.clone()) + .await + .expect("cache should be accessible"); + let entry = cache + .get_block(&cache_key_2, 0, 0) + .await + .expect("cache should be accessible") + .expect("cache entry should be returned"); + assert_eq!( + data_2, entry, + "cache entry returned should match original bytes after put" + ); + + // PUT AND GET a second block in a cache entry, OK? + cache + .put_block(cache_key_1.clone(), 1, block_size, data_3.clone()) + .await + .expect("cache should be accessible"); + let entry = cache + .get_block(&cache_key_1, 1, block_size) + .await + .expect("cache should be accessible") + .expect("cache entry should be returned"); + assert_eq!( + data_3, entry, + "cache entry returned should match original bytes after put" + ); + + // Entry 1's first block still intact + let entry = cache + .get_block(&cache_key_1, 0, 0) + .await + .expect("cache should be accessible") + .expect("cache entry should be returned"); + assert_eq!( + data_1, entry, + "cache entry returned should match original bytes after put" + ); + } +} diff --git a/mountpoint-s3/src/fs.rs b/mountpoint-s3/src/fs.rs index b03563cd2..b65762a3c 100644 --- a/mountpoint-s3/src/fs.rs +++ b/mountpoint-s3/src/fs.rs @@ -67,7 +67,7 @@ impl DirHandle { #[derive(Debug)] struct FileHandle where - Client: ObjectClient + Send + Sync + 'static, + Client: ObjectClient + Clone + Send + Sync + 'static, Prefetcher: Prefetch, { inode: Inode, @@ -77,7 +77,7 @@ where enum FileHandleState where - Client: ObjectClient + Send + Sync + 'static, + Client: ObjectClient + Clone + Send + Sync + 'static, Prefetcher: Prefetch, { /// The file handle has been assigned as a read handle @@ -91,7 +91,7 @@ where impl std::fmt::Debug for FileHandleState where - Client: ObjectClient + Send + Sync + 'static + std::fmt::Debug, + Client: ObjectClient + Clone + Send + Sync + 'static + std::fmt::Debug, Prefetcher: Prefetch, { fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { @@ -104,7 +104,7 @@ where impl FileHandleState where - Client: ObjectClient + Send + Sync, + Client: ObjectClient + Clone + Send + Sync, Prefetcher: Prefetch, { async fn new_write_handle( @@ -521,11 +521,11 @@ pub enum SseCorruptedError { #[derive(Debug)] pub struct S3Filesystem where - Client: ObjectClient + Send + Sync + 'static, + Client: ObjectClient + Clone + Send + Sync + 'static, Prefetcher: Prefetch, { config: S3FilesystemConfig, - client: Arc, + client: Client, superblock: Superblock, prefetcher: Prefetcher, uploader: Uploader, @@ -539,7 +539,7 @@ where impl S3Filesystem where - Client: ObjectClient + Send + Sync + 'static, + Client: ObjectClient + Clone + Send + Sync + 'static, Prefetcher: Prefetch, { pub fn new( @@ -557,8 +557,6 @@ where }; let superblock = Superblock::new(bucket, prefix, superblock_config); - let client = Arc::new(client); - let uploader = Uploader::new( client.clone(), config.storage_class.to_owned(), @@ -627,7 +625,7 @@ pub struct DirectoryEntry { impl S3Filesystem where - Client: ObjectClient + Send + Sync + 'static, + Client: ObjectClient + Clone + Send + Sync + 'static, Prefetcher: Prefetch, { pub async fn init(&self, config: &mut KernelConfig) -> Result<(), libc::c_int> { diff --git a/mountpoint-s3/src/fuse.rs b/mountpoint-s3/src/fuse.rs index 9eadce513..0fb0b13b2 100644 --- a/mountpoint-s3/src/fuse.rs +++ b/mountpoint-s3/src/fuse.rs @@ -65,7 +65,7 @@ macro_rules! fuse_unsupported { /// so that we can test our actual filesystem implementation without having actual FUSE in the loop. pub struct S3FuseFilesystem where - Client: ObjectClient + Send + Sync + 'static, + Client: ObjectClient + Clone + Send + Sync + 'static, Prefetcher: Prefetch, { fs: S3Filesystem, @@ -73,7 +73,7 @@ where impl S3FuseFilesystem where - Client: ObjectClient + Send + Sync + 'static, + Client: ObjectClient + Clone + Send + Sync + 'static, Prefetcher: Prefetch, { pub fn new( @@ -91,7 +91,7 @@ where impl Filesystem for S3FuseFilesystem where - Client: ObjectClient + Send + Sync + 'static, + Client: ObjectClient + Clone + Send + Sync + 'static, Prefetcher: Prefetch, { #[instrument(level="warn", skip_all, fields(req=_req.unique()))] diff --git a/mountpoint-s3/src/prefetch.rs b/mountpoint-s3/src/prefetch.rs index 3238e6677..1dea5df9b 100644 --- a/mountpoint-s3/src/prefetch.rs +++ b/mountpoint-s3/src/prefetch.rs @@ -64,19 +64,19 @@ use crate::sync::Arc; /// Generic interface to handle reading data from an object. pub trait Prefetch { - type PrefetchResult: PrefetchResult; + type PrefetchResult: PrefetchResult; /// Start a new prefetch request to the specified object. fn prefetch( &self, - client: Arc, + client: Client, bucket: &str, key: &str, size: u64, etag: ETag, ) -> Self::PrefetchResult where - Client: ObjectClient + Send + Sync + 'static; + Client: ObjectClient + Clone + Send + Sync + 'static; } /// Result of a prefetch request. Allows callers to read object data. @@ -234,28 +234,20 @@ impl Prefetch for Prefetcher where Stream: ObjectPartStream + Send + Sync + 'static, { - type PrefetchResult = PrefetchGetObject; + type PrefetchResult = PrefetchGetObject; fn prefetch( &self, - client: Arc, + client: Client, bucket: &str, key: &str, size: u64, etag: ETag, ) -> Self::PrefetchResult where - Client: ObjectClient + Send + Sync + 'static, + Client: ObjectClient + Clone + Send + Sync + 'static, { - PrefetchGetObject::new( - client.clone(), - self.part_stream.clone(), - self.config, - bucket, - key, - size, - etag, - ) + PrefetchGetObject::new(client, self.part_stream.clone(), self.config, bucket, key, size, etag) } } @@ -263,7 +255,7 @@ where /// in a way that maximizes throughput from S3. #[derive(Debug)] pub struct PrefetchGetObject { - client: Arc, + client: Client, part_stream: Arc, config: PrefetcherConfig, backpressure_task: Option>, @@ -285,7 +277,7 @@ pub struct PrefetchGetObject { impl PrefetchResult for PrefetchGetObject where Stream: ObjectPartStream + Send + Sync + 'static, - Client: ObjectClient + Send + Sync + 'static, + Client: ObjectClient + Clone + Send + Sync + 'static, { /// Read some bytes from the object. This function will always return exactly `size` bytes, /// except at the end of the object where it will return however many bytes are left (including @@ -312,11 +304,11 @@ where impl PrefetchGetObject where Stream: ObjectPartStream, - Client: ObjectClient + Send + Sync + 'static, + Client: ObjectClient + Clone + Send + Sync + 'static, { /// Create and spawn a new prefetching request for an object fn new( - client: Arc, + client: Client, part_stream: Arc, config: PrefetcherConfig, bucket: &str, diff --git a/mountpoint-s3/src/upload.rs b/mountpoint-s3/src/upload.rs index da4039846..65661b53d 100644 --- a/mountpoint-s3/src/upload.rs +++ b/mountpoint-s3/src/upload.rs @@ -24,7 +24,7 @@ pub struct Uploader { #[derive(Debug)] struct UploaderInner { - client: Arc, + client: Client, storage_class: Option, server_side_encryption: ServerSideEncryption, use_additional_checksums: bool, @@ -41,7 +41,7 @@ pub enum UploadPutError { impl Uploader { /// Create a new [Uploader] that will make requests to the given client. pub fn new( - client: Arc, + client: Client, storage_class: Option, server_side_encryption: ServerSideEncryption, use_additional_checksums: bool, @@ -419,7 +419,7 @@ mod tests { ServerSideEncryption::new(Some("aws:kms".to_string()), Some("some_key_alias".to_string())), true, ); - std::sync::Arc::>::get_mut(&mut uploader.inner) + std::sync::Arc::>>::get_mut(&mut uploader.inner) .unwrap() .server_side_encryption .corrupt_data(sse_type_corrupted.map(String::from), key_id_corrupted.map(String::from)); diff --git a/mountpoint-s3/tests/common/fuse.rs b/mountpoint-s3/tests/common/fuse.rs index 87399531f..57f60d08e 100644 --- a/mountpoint-s3/tests/common/fuse.rs +++ b/mountpoint-s3/tests/common/fuse.rs @@ -84,7 +84,7 @@ fn create_fuse_session( filesystem_config: S3FilesystemConfig, ) -> BackgroundSession where - Client: ObjectClient + Send + Sync + 'static, + Client: ObjectClient + Clone + Send + Sync + 'static, Prefetcher: Prefetch + Send + Sync + 'static, { let options = vec![ diff --git a/mountpoint-s3/tests/common/mod.rs b/mountpoint-s3/tests/common/mod.rs index 53ee65a7c..f60d3ec10 100644 --- a/mountpoint-s3/tests/common/mod.rs +++ b/mountpoint-s3/tests/common/mod.rs @@ -54,7 +54,7 @@ pub fn make_test_filesystem_with_client( config: S3FilesystemConfig, ) -> TestS3Filesystem where - Client: ObjectClient + Send + Sync + 'static, + Client: ObjectClient + Clone + Send + Sync + 'static, { let runtime = ThreadPool::builder().pool_size(1).create().unwrap(); let prefetcher = default_prefetch(runtime, Default::default());