From ebc3e251b5f465aba850ace889c86a11c56c9e0e Mon Sep 17 00:00:00 2001 From: Alessandro Passaro Date: Thu, 5 Oct 2023 07:47:26 +0000 Subject: [PATCH] Introduce ObjectStore trait to replace ObjectClient in mountpoint-s3 Signed-off-by: Alessandro Passaro --- mountpoint-s3/examples/fs_benchmark.rs | 5 +- mountpoint-s3/examples/prefetch_benchmark.rs | 5 +- mountpoint-s3/src/fs.rs | 112 +++-- mountpoint-s3/src/fuse.rs | 21 +- mountpoint-s3/src/inode.rs | 253 ++++++----- mountpoint-s3/src/inode/readdir.rs | 22 +- mountpoint-s3/src/lib.rs | 3 +- mountpoint-s3/src/main.rs | 113 +++-- mountpoint-s3/src/prefetch.rs | 394 +++++++----------- mountpoint-s3/src/prefetch/feed.rs | 302 ++++++++++---- mountpoint-s3/src/prefetch/part_queue.rs | 26 +- mountpoint-s3/src/prefetch/task.rs | 78 ++++ mountpoint-s3/src/store.rs | 242 +++++++++++ mountpoint-s3/src/upload.rs | 58 +-- mountpoint-s3/tests/common/mod.rs | 14 +- mountpoint-s3/tests/fs.rs | 21 +- mountpoint-s3/tests/fuse_tests/mod.rs | 106 ++--- .../tests/fuse_tests/prefetch_test.rs | 8 +- mountpoint-s3/tests/reftests/harness.rs | 6 +- 19 files changed, 1120 insertions(+), 669 deletions(-) create mode 100644 mountpoint-s3/src/prefetch/task.rs create mode 100644 mountpoint-s3/src/store.rs diff --git a/mountpoint-s3/examples/fs_benchmark.rs b/mountpoint-s3/examples/fs_benchmark.rs index 45fa23d51..4487905dc 100644 --- a/mountpoint-s3/examples/fs_benchmark.rs +++ b/mountpoint-s3/examples/fs_benchmark.rs @@ -1,10 +1,12 @@ use clap::{Arg, ArgAction, Command}; use fuser::{BackgroundSession, MountOption, Session}; use mountpoint_s3::fuse::S3FuseFilesystem; +use mountpoint_s3::store::default_store; use mountpoint_s3::S3FilesystemConfig; use mountpoint_s3_client::config::{EndpointConfig, S3ClientConfig}; use mountpoint_s3_client::S3CrtClient; use mountpoint_s3_crt::common::rust_log_adapter::RustLogAdapter; +use std::sync::Arc; use std::{ fs::File, fs::OpenOptions, @@ -164,8 +166,9 @@ fn mount_file_system(bucket_name: &str, region: &str, throughput_target_gbps: Op bucket_name, mountpoint.to_str().unwrap() ); + let store = default_store(Arc::new(client), runtime, Default::default()); let session = Session::new( - S3FuseFilesystem::new(client, runtime, bucket_name, &Default::default(), filesystem_config), + S3FuseFilesystem::new(store, bucket_name, &Default::default(), filesystem_config), mountpoint, &options, ) diff --git a/mountpoint-s3/examples/prefetch_benchmark.rs b/mountpoint-s3/examples/prefetch_benchmark.rs index 38f5b5a51..59914cce1 100644 --- a/mountpoint-s3/examples/prefetch_benchmark.rs +++ b/mountpoint-s3/examples/prefetch_benchmark.rs @@ -4,7 +4,9 @@ use std::time::Instant; use clap::{Arg, Command}; use futures::executor::{block_on, ThreadPool}; +use mountpoint_s3::prefetch::feed::ClientPartFeed; use mountpoint_s3::prefetch::Prefetcher; +use mountpoint_s3::store::PrefetchGetObject; use mountpoint_s3_client::config::{EndpointConfig, S3ClientConfig}; use mountpoint_s3_client::types::ETag; use mountpoint_s3_client::S3CrtClient; @@ -80,7 +82,8 @@ fn main() { for i in 0..iterations.unwrap_or(1) { let runtime = ThreadPool::builder().pool_size(1).create().unwrap(); - let manager = Prefetcher::new(client.clone(), runtime, Default::default()); + let part_feed = ClientPartFeed::new(client.clone(), runtime); + let manager = Prefetcher::new(part_feed, Default::default()); let received_size = Arc::new(AtomicU64::new(0)); let start = Instant::now(); diff --git a/mountpoint-s3/src/fs.rs b/mountpoint-s3/src/fs.rs index 0cfbea909..4db6387cb 100644 --- a/mountpoint-s3/src/fs.rs +++ b/mountpoint-s3/src/fs.rs @@ -1,6 +1,5 @@ //! FUSE file system types and operations, not tied to the _fuser_ library bindings. -use futures::task::Spawn; use nix::unistd::{getgid, getuid}; use std::collections::HashMap; use std::ffi::{OsStr, OsString}; @@ -12,11 +11,10 @@ use tracing::{debug, error, trace}; use fuser::{FileAttr, KernelConfig}; use mountpoint_s3_client::error::{GetObjectError, ObjectClientError}; use mountpoint_s3_client::types::ETag; -use mountpoint_s3_client::ObjectClient; use crate::inode::{Inode, InodeError, InodeKind, LookedUp, ReaddirHandle, Superblock, WriteHandle}; -use crate::prefetch::{PrefetchGetObject, PrefetchReadError, Prefetcher, PrefetcherConfig}; use crate::prefix::Prefix; +use crate::store::{ObjectStore, PrefetchGetObject, PrefetchReadError}; use crate::sync::atomic::{AtomicI64, AtomicU64, Ordering}; use crate::sync::{Arc, AsyncMutex, AsyncRwLock}; use crate::upload::{UploadRequest, Uploader}; @@ -49,37 +47,45 @@ impl DirHandle { } #[derive(Debug)] -struct FileHandle { +struct FileHandle { inode: Inode, full_key: String, object_size: u64, - typ: FileHandleType, + typ: FileHandleType, } -#[derive(Debug)] -enum FileHandleType { +enum FileHandleType { Read { - request: AsyncMutex>>, + request: AsyncMutex>, etag: ETag, }, - Write(AsyncMutex>), + Write(AsyncMutex>), +} + +impl std::fmt::Debug for FileHandleType { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + match self { + Self::Read { request: _, etag } => f.debug_struct("Read").field("etag", etag).finish(), + Self::Write(state) => f.debug_tuple("Write").field(state).finish(), + } + } } -impl FileHandleType { +impl FileHandleType { async fn new_write_handle( lookup: &LookedUp, ino: InodeNo, flags: i32, pid: u32, - fs: &S3Filesystem, - ) -> Result, Error> { + fs: &S3Filesystem, + ) -> Result, Error> { // We can't support O_SYNC writes because they require the data to go to stable storage // at `write` time, but we only commit a PUT at `close` time. if flags & (libc::O_SYNC | libc::O_DSYNC) != 0 { return Err(err!(libc::EINVAL, "O_SYNC and O_DSYNC are not supported")); } - let handle = match fs.superblock.write(&fs.client, ino, lookup.inode.parent(), pid).await { + let handle = match fs.superblock.write(&fs.store, ino, lookup.inode.parent(), pid).await { Ok(handle) => handle, Err(e) => { return Err(e.into()); @@ -96,7 +102,7 @@ impl FileHandleType { Ok(handle) } - async fn new_read_handle(lookup: &LookedUp) -> Result, Error> { + async fn new_read_handle(lookup: &LookedUp) -> Result, Error> { if !lookup.stat.is_readable { return Err(err!( libc::EACCES, @@ -117,9 +123,9 @@ impl FileHandleType { } #[derive(Debug)] -enum UploadState { +enum UploadState { InProgress { - request: UploadRequest, + request: UploadRequest, handle: WriteHandle, }, Completed, @@ -127,7 +133,7 @@ enum UploadState { Failed(libc::c_int), } -impl UploadState { +impl UploadState { async fn write(&mut self, offset: i64, data: &[u8], key: &str) -> Result { let upload = match self { Self::InProgress { request, .. } => request, @@ -196,7 +202,7 @@ impl UploadState { } } - async fn complete_upload(upload: UploadRequest, key: &str, handle: WriteHandle) -> Result<(), Error> { + async fn complete_upload(upload: UploadRequest, key: &str, handle: WriteHandle) -> Result<(), Error> { let size = upload.size(); let put_result = match upload.complete().await { Ok(_) => { @@ -303,8 +309,6 @@ pub struct S3FilesystemConfig { pub dir_mode: u16, /// File permissions pub file_mode: u16, - /// Prefetcher configuration - pub prefetcher_config: PrefetcherConfig, /// Allow delete pub allow_delete: bool, /// Storage class to be used for new object uploads @@ -323,7 +327,6 @@ impl Default for S3FilesystemConfig { gid, dir_mode: 0o755, file_mode: 0o644, - prefetcher_config: PrefetcherConfig::default(), allow_delete: false, storage_class: None, } @@ -331,38 +334,31 @@ impl Default for S3FilesystemConfig { } #[derive(Debug)] -pub struct S3Filesystem { +pub struct S3Filesystem { config: S3FilesystemConfig, - client: Arc, + store: Store, superblock: Superblock, - prefetcher: Prefetcher, - uploader: Uploader, + uploader: Uploader, bucket: String, #[allow(unused)] prefix: Prefix, next_handle: AtomicU64, dir_handles: AsyncRwLock>>, - file_handles: AsyncRwLock>>>, + file_handles: AsyncRwLock>>>, } -impl S3Filesystem +impl S3Filesystem where - Client: ObjectClient + Send + Sync + 'static, - Runtime: Spawn + Send + Sync, + Store: ObjectStore + Send + Sync + 'static, { - pub fn new(client: Client, runtime: Runtime, bucket: &str, prefix: &Prefix, config: S3FilesystemConfig) -> Self { + pub fn new(store: Store, bucket: &str, prefix: &Prefix, config: S3FilesystemConfig) -> Self { let superblock = Superblock::new(bucket, prefix, config.cache_config.clone()); - - let client = Arc::new(client); - - let prefetcher = Prefetcher::new(client.clone(), runtime, config.prefetcher_config); - let uploader = Uploader::new(client.clone(), config.storage_class.to_owned()); + let uploader = Uploader::new(store.clone(), config.storage_class.to_owned()); Self { config, - client, + store, superblock, - prefetcher, uploader, bucket: bucket.to_string(), prefix: prefix.clone(), @@ -430,10 +426,9 @@ pub trait ReadReplier { fn error(self, error: Error) -> Self::Replied; } -impl S3Filesystem +impl S3Filesystem where - Client: ObjectClient + Send + Sync + 'static, - Runtime: Spawn + Send + Sync, + Store: ObjectStore + Send + Sync + 'static, { pub async fn init(&self, config: &mut KernelConfig) -> Result<(), libc::c_int> { let _ = config.add_capabilities(fuser::consts::FUSE_DO_READDIRPLUS); @@ -484,7 +479,7 @@ where pub async fn lookup(&self, parent: InodeNo, name: &OsStr) -> Result { trace!("fs:lookup with parent {:?} name {:?}", parent, name); - let lookup = self.superblock.lookup(&self.client, parent, name).await?; + let lookup = self.superblock.lookup(&self.store, parent, name).await?; let attr = self.make_attr(&lookup); Ok(Entry { ttl: lookup.validity(), @@ -496,7 +491,7 @@ where pub async fn getattr(&self, ino: InodeNo) -> Result { trace!("fs:getattr with ino {:?}", ino); - let lookup = self.superblock.getattr(&self.client, ino, false).await?; + let lookup = self.superblock.getattr(&self.store, ino, false).await?; let attr = self.make_attr(&lookup); Ok(Attr { @@ -519,7 +514,7 @@ where atime, mtime ); - let lookup = self.superblock.setattr(&self.client, ino, atime, mtime).await?; + let lookup = self.superblock.setattr(&self.store, ino, atime, mtime).await?; let attr = self.make_attr(&lookup); Ok(Attr { @@ -537,7 +532,7 @@ where trace!("fs:open with ino {:?} flags {:?} pid {:?}", ino, flags, pid); let force_revalidate = !self.config.cache_config.serve_lookup_from_cache; - let lookup = self.superblock.getattr(&self.client, ino, force_revalidate).await?; + let lookup = self.superblock.getattr(&self.store, ino, force_revalidate).await?; match lookup.inode.kind() { InodeKind::Directory => return Err(InodeError::IsDirectory(lookup.inode.err()).into()), @@ -610,8 +605,8 @@ where if request.is_none() { *request = Some( - self.prefetcher - .get(&self.bucket, &handle.full_key, handle.object_size, file_etag), + self.store + .prefetch(&self.bucket, &handle.full_key, handle.object_size, file_etag), ); } @@ -620,14 +615,13 @@ where Ok(bytes) => reply.data(&bytes), Err(e) => reply.error(err!(libc::EIO, source:e, "integrity error")), }, - Err(PrefetchReadError::GetRequestFailed(ObjectClientError::ServiceError( + Err(ObjectClientError::ServiceError(PrefetchReadError::GetRequestFailed( GetObjectError::PreconditionFailed, ))) => reply.error(err!(libc::ESTALE, "object was mutated remotely")), - Err(PrefetchReadError::Integrity(e)) => reply.error(err!(libc::EIO, source:e, "integrity error")), - Err(e @ PrefetchReadError::GetRequestFailed(_)) - | Err(e @ PrefetchReadError::GetRequestTerminatedUnexpectedly) => { - reply.error(err!(libc::EIO, source:e, "get request failed")) + Err(ObjectClientError::ServiceError(PrefetchReadError::Integrity(e))) => { + reply.error(err!(libc::EIO, source:e, "integrity error")) } + Err(e) => reply.error(err!(libc::EIO, source:e, "get request failed")), } } @@ -649,7 +643,7 @@ where let lookup = self .superblock - .create(&self.client, parent, name, InodeKind::File) + .create(&self.store, parent, name, InodeKind::File) .await?; let attr = self.make_attr(&lookup); Ok(Entry { @@ -662,7 +656,7 @@ where pub async fn mkdir(&self, parent: InodeNo, name: &OsStr, _mode: libc::mode_t, _umask: u32) -> Result { let lookup = self .superblock - .create(&self.client, parent, name, InodeKind::Directory) + .create(&self.store, parent, name, InodeKind::Directory) .await?; let attr = self.make_attr(&lookup); Ok(Entry { @@ -714,7 +708,7 @@ where pub async fn opendir(&self, parent: InodeNo, _flags: i32) -> Result { trace!("fs:opendir with parent {:?} flags {:?}", parent, _flags); - let inode_handle = self.superblock.readdir(&self.client, parent, 1000).await?; + let inode_handle = self.superblock.readdir(&self.store, parent, 1000).await?; let fh = self.next_handle(); let handle = DirHandle { @@ -827,7 +821,7 @@ where let mut reply = Reply { reply, entries: vec![] }; if dir_handle.offset() < 1 { - let lookup = self.superblock.getattr(&self.client, parent, false).await?; + let lookup = self.superblock.getattr(&self.store, parent, false).await?; let attr = self.make_attr(&lookup); let entry = DirectoryEntry { ino: parent, @@ -846,7 +840,7 @@ where if dir_handle.offset() < 2 { let lookup = self .superblock - .getattr(&self.client, dir_handle.handle.parent(), false) + .getattr(&self.store, dir_handle.handle.parent(), false) .await?; let attr = self.make_attr(&lookup); let entry = DirectoryEntry { @@ -865,7 +859,7 @@ where } loop { - let next = match dir_handle.handle.next(&self.client).await? { + let next = match dir_handle.handle.next(&self.store).await? { None => return Ok(reply.finish(offset, &dir_handle).await), Some(next) => next, }; @@ -981,7 +975,7 @@ where } pub async fn rmdir(&self, parent_ino: InodeNo, name: &OsStr) -> Result<(), Error> { - self.superblock.rmdir(&self.client, parent_ino, name).await?; + self.superblock.rmdir(&self.store, parent_ino, name).await?; Ok(()) } @@ -997,6 +991,6 @@ where if !self.config.allow_delete { return Err(err!(libc::EPERM, "deletes are disabled")); } - Ok(self.superblock.unlink(&self.client, parent_ino, name).await?) + Ok(self.superblock.unlink(&self.store, parent_ino, name).await?) } } diff --git a/mountpoint-s3/src/fuse.rs b/mountpoint-s3/src/fuse.rs index f65fa6ab5..df4fdf61f 100644 --- a/mountpoint-s3/src/fuse.rs +++ b/mountpoint-s3/src/fuse.rs @@ -1,7 +1,6 @@ //! Links _fuser_ method calls into Mountpoint's filesystem code in [crate::fs]. use futures::executor::block_on; -use futures::task::Spawn; use std::ffi::OsStr; use std::path::Path; use std::time::SystemTime; @@ -12,13 +11,13 @@ use crate::fs::{ self, DirectoryEntry, DirectoryReplier, InodeNo, ReadReplier, S3Filesystem, S3FilesystemConfig, ToErrno, }; use crate::prefix::Prefix; +use crate::store::ObjectStore; #[cfg(target_os = "macos")] use fuser::ReplyXTimes; use fuser::{ Filesystem, KernelConfig, ReplyAttr, ReplyBmap, ReplyCreate, ReplyData, ReplyEmpty, ReplyEntry, ReplyIoctl, ReplyLock, ReplyLseek, ReplyOpen, ReplyWrite, ReplyXattr, Request, TimeOrNow, }; -use mountpoint_s3_client::ObjectClient; pub mod session; @@ -48,26 +47,24 @@ macro_rules! fuse_unsupported { /// This is just a thin wrapper around [S3Filesystem] that implements the actual `fuser` protocol, /// so that we can test our actual filesystem implementation without having actual FUSE in the loop. -pub struct S3FuseFilesystem { - fs: S3Filesystem, +pub struct S3FuseFilesystem { + fs: S3Filesystem, } -impl S3FuseFilesystem +impl S3FuseFilesystem where - Client: ObjectClient + Send + Sync + 'static, - Runtime: Spawn + Send + Sync, + Store: ObjectStore + Send + Sync + 'static, { - pub fn new(client: Client, runtime: Runtime, bucket: &str, prefix: &Prefix, config: S3FilesystemConfig) -> Self { - let fs = S3Filesystem::new(client, runtime, bucket, prefix, config); + pub fn new(store: Store, bucket: &str, prefix: &Prefix, config: S3FilesystemConfig) -> Self { + let fs = S3Filesystem::new(store, bucket, prefix, config); Self { fs } } } -impl Filesystem for S3FuseFilesystem +impl Filesystem for S3FuseFilesystem where - Client: ObjectClient + Send + Sync + 'static, - Runtime: Spawn + Send + Sync, + Store: ObjectStore + Send + Sync + 'static, { #[instrument(level="warn", skip_all, fields(req=_req.unique()))] fn init(&self, _req: &Request<'_>, config: &mut KernelConfig) -> Result<(), libc::c_int> { diff --git a/mountpoint-s3/src/inode.rs b/mountpoint-s3/src/inode.rs index ae8d7e745..464cae05f 100644 --- a/mountpoint-s3/src/inode.rs +++ b/mountpoint-s3/src/inode.rs @@ -33,7 +33,6 @@ use fuser::FileType; use futures::{select_biased, FutureExt}; use mountpoint_s3_client::error::{HeadObjectError, ObjectClientError}; use mountpoint_s3_client::types::{HeadObjectResult, RestoreStatus}; -use mountpoint_s3_client::ObjectClient; use mountpoint_s3_crt::checksums::crc32c::{self, Crc32c}; use thiserror::Error; use time::OffsetDateTime; @@ -41,6 +40,7 @@ use tracing::{debug, error, trace, warn}; use crate::fs::CacheConfig; use crate::prefix::Prefix; +use crate::store::ObjectStore; use crate::sync::atomic::{AtomicU64, Ordering}; use crate::sync::RwLockReadGuard; use crate::sync::RwLockWriteGuard; @@ -174,30 +174,25 @@ impl Superblock { /// Lookup an inode in the parent directory with the given name and /// increments its lookup count. - pub async fn lookup( + pub async fn lookup( &self, - client: &OC, + store: &OS, parent_ino: InodeNo, name: &OsStr, ) -> Result { trace!(parent=?parent_ino, ?name, "lookup"); let lookup = self .inner - .lookup_by_name( - client, - parent_ino, - name, - self.inner.cache_config.serve_lookup_from_cache, - ) + .lookup_by_name(store, parent_ino, name, self.inner.cache_config.serve_lookup_from_cache) .await?; self.inner.remember(&lookup.inode); Ok(lookup) } /// Retrieve the attributes for an inode - pub async fn getattr( + pub async fn getattr( &self, - client: &OC, + store: &OS, ino: InodeNo, force_revalidate: bool, ) -> Result { @@ -214,7 +209,7 @@ impl Superblock { let lookup = self .inner - .lookup_by_name(client, inode.parent(), inode.name().as_ref(), false) + .lookup_by_name(store, inode.parent(), inode.name().as_ref(), false) .await?; if lookup.inode.ino() != ino { Err(InodeError::StaleInode { @@ -228,9 +223,9 @@ impl Superblock { } /// Set the attributes for an inode - pub async fn setattr( + pub async fn setattr( &self, - _client: &OC, + _store: &OS, ino: InodeNo, atime: Option, mtime: Option, @@ -261,9 +256,9 @@ impl Superblock { } /// Create a new write handle to be used for state transition - pub async fn write( + pub async fn write( &self, - _client: &OC, + _store: &OS, ino: InodeNo, parent_ino: InodeNo, pid: u32, @@ -276,9 +271,9 @@ impl Superblock { /// Start a readdir stream for the given directory inode /// /// Doesn't currently do any IO, so doesn't need to be async, but reserving it for future use. - pub async fn readdir( + pub async fn readdir( &self, - _client: &OC, + _store: &OS, dir_ino: InodeNo, page_size: usize, ) -> Result { @@ -297,9 +292,9 @@ impl Superblock { } /// Create a new regular file or directory inode ready to be opened in write-only mode - pub async fn create( + pub async fn create( &self, - client: &OC, + store: &OS, dir: InodeNo, name: &OsStr, kind: InodeKind, @@ -308,7 +303,7 @@ impl Superblock { let existing = self .inner - .lookup_by_name(client, dir, name, self.inner.cache_config.serve_lookup_from_cache) + .lookup_by_name(store, dir, name, self.inner.cache_config.serve_lookup_from_cache) .await; match existing { Ok(lookup) => return Err(InodeError::FileAlreadyExists(lookup.inode.err())), @@ -357,20 +352,15 @@ impl Superblock { /// Remove local-only empty directory, i.e., the ones created by mkdir. /// It does not affect empty directories represented remotely with directory markers. - pub async fn rmdir( + pub async fn rmdir( &self, - client: &OC, + store: &OS, parent_ino: InodeNo, name: &OsStr, ) -> Result<(), InodeError> { let LookedUp { inode, .. } = self .inner - .lookup_by_name( - client, - parent_ino, - name, - self.inner.cache_config.serve_lookup_from_cache, - ) + .lookup_by_name(store, parent_ino, name, self.inner.cache_config.serve_lookup_from_cache) .await?; if inode.kind() == InodeKind::File { @@ -430,21 +420,16 @@ impl Superblock { /// We know that the Linux Kernel's VFS will lock both the parent and child, /// so we can safely ignore concurrent operations within the same Mountpoint process to the file and its parent. /// See: https://www.kernel.org/doc/html/next/filesystems/directory-locking.html - pub async fn unlink( + pub async fn unlink( &self, - client: &OC, + store: &OS, parent_ino: InodeNo, name: &OsStr, ) -> Result<(), InodeError> { let parent = self.inner.get(parent_ino)?; let LookedUp { inode, .. } = self .inner - .lookup_by_name( - client, - parent_ino, - name, - self.inner.cache_config.serve_lookup_from_cache, - ) + .lookup_by_name(store, parent_ino, name, self.inner.cache_config.serve_lookup_from_cache) .await?; if inode.kind() == InodeKind::Directory { @@ -469,7 +454,7 @@ impl Superblock { WriteStatus::Remote => { let (bucket, s3_key) = (self.inner.bucket.as_str(), inode.full_key()); debug!(parent=?parent_ino, ?name, "unlink on remote file will delete key {}", s3_key); - let delete_obj_result = client.delete_object(bucket, s3_key).await; + let delete_obj_result = store.delete_object(bucket, s3_key).await; match delete_obj_result { Ok(_res) => (), @@ -544,9 +529,9 @@ impl SuperblockInner { /// Updates the parent inode to be in sync with the client, but does /// not add new inodes to the superblock. The caller is responsible /// for calling [`remember()`] if that is required. - pub async fn lookup_by_name( + pub async fn lookup_by_name( &self, - client: &OC, + store: &OS, parent_ino: InodeNo, name: &OsStr, allow_cache: bool, @@ -570,7 +555,7 @@ impl SuperblockInner { let lookup = match lookup { Some(lookup) => lookup, None => { - let remote = self.remote_lookup(client, parent_ino, name).await?; + let remote = self.remote_lookup(store, parent_ino, name).await?; self.update_from_remote(parent_ino, name, remote)? } }; @@ -617,9 +602,9 @@ impl SuperblockInner { /// Lookup an inode in the parent directory with the given name /// on the remote client. - async fn remote_lookup( + async fn remote_lookup( &self, - client: &OC, + store: &OS, parent_ino: InodeNo, name: &str, ) -> Result, InodeError> { @@ -658,8 +643,8 @@ impl SuperblockInner { // "/" to the prefix in the request, the first common prefix we'll get back will be // "dir-1/", because that precedes "dir/" in lexicographic order. Doing the // ListObjects with "/" appended makes sure we always observe the correct prefix. - let mut file_lookup = client.head_object(&self.bucket, &full_path).fuse(); - let mut dir_lookup = client + let mut file_lookup = store.head_object(&self.bucket, &full_path).fuse(); + let mut dir_lookup = store .list_objects(&self.bucket, None, "/", 1, &full_path_suffixed) .fuse(); @@ -1495,7 +1480,10 @@ mod tests { use test_case::test_case; use time::{Duration, OffsetDateTime}; - use crate::fs::{ToErrno, FUSE_ROOT_INODE}; + use crate::{ + fs::{ToErrno, FUSE_ROOT_INODE}, + store::test_store, + }; use super::*; @@ -1520,6 +1508,7 @@ mod tests { part_size: 1024 * 1024, }; let client = Arc::new(MockClient::new(client_config)); + let store = test_store(client.clone()); let keys = &[ format!("{prefix}dir0/file0.txt"), @@ -1551,42 +1540,42 @@ mod tests { // Try it twice to test the inode reuse path too for _ in 0..2 { let dir0 = superblock - .lookup(&client, FUSE_ROOT_INODE, &OsString::from("dir0")) + .lookup(&store, FUSE_ROOT_INODE, &OsString::from("dir0")) .await .expect("should exist"); assert_inode_stat!(dir0, InodeKind::Directory, ts, 0); assert_eq!(dir0.inode.full_key(), OsString::from(format!("{prefix}dir0/"))); let dir1 = superblock - .lookup(&client, FUSE_ROOT_INODE, &OsString::from("dir1")) + .lookup(&store, FUSE_ROOT_INODE, &OsString::from("dir1")) .await .expect("should exist"); assert_inode_stat!(dir1, InodeKind::Directory, ts, 0); assert_eq!(dir1.inode.full_key(), OsString::from(format!("{prefix}dir1/"))); let sdir0 = superblock - .lookup(&client, dir0.inode.ino(), &OsString::from("sdir0")) + .lookup(&store, dir0.inode.ino(), &OsString::from("sdir0")) .await .expect("should exist"); assert_inode_stat!(sdir0, InodeKind::Directory, ts, 0); assert_eq!(sdir0.inode.full_key(), OsString::from(format!("{prefix}dir0/sdir0/"))); let sdir1 = superblock - .lookup(&client, dir0.inode.ino(), &OsString::from("sdir1")) + .lookup(&store, dir0.inode.ino(), &OsString::from("sdir1")) .await .expect("should exist"); assert_inode_stat!(sdir1, InodeKind::Directory, ts, 0); assert_eq!(sdir1.inode.full_key(), OsString::from(format!("{prefix}dir0/sdir1/"))); let sdir2 = superblock - .lookup(&client, dir1.inode.ino(), &OsString::from("sdir2")) + .lookup(&store, dir1.inode.ino(), &OsString::from("sdir2")) .await .expect("should exist"); assert_inode_stat!(sdir2, InodeKind::Directory, ts, 0); assert_eq!(sdir2.inode.full_key(), OsString::from(format!("{prefix}dir1/sdir2/"))); let sdir3 = superblock - .lookup(&client, dir1.inode.ino(), &OsString::from("sdir3")) + .lookup(&store, dir1.inode.ino(), &OsString::from("sdir3")) .await .expect("should exist"); assert_inode_stat!(sdir3, InodeKind::Directory, ts, 0); @@ -1600,11 +1589,11 @@ mod tests { ] { for i in 0..*n { let file = superblock - .lookup(&client, *ino, &OsString::from(format!("file{i}.txt"))) + .lookup(&store, *ino, &OsString::from(format!("file{i}.txt"))) .await .expect("inode should exist"); // Grab last modified time according to mock S3 - let modified_time = client + let modified_time = store .head_object(bucket, file.inode.full_key()) .await .expect("object should exist") @@ -1631,6 +1620,7 @@ mod tests { part_size: 1024 * 1024, }; let client = Arc::new(MockClient::new(client_config)); + let store = test_store(client.clone()); let keys = &[ format!("{prefix}dir0/file0.txt"), @@ -1648,7 +1638,6 @@ mod tests { } let prefix = Prefix::new(prefix).expect("valid prefix"); - let ts = OffsetDateTime::now_utc(); let ttl = if cached { std::time::Duration::from_secs(60 * 60 * 24 * 7) // 7 days should be enough } else { @@ -1665,18 +1654,18 @@ mod tests { ); let dir0 = superblock - .lookup(&client, FUSE_ROOT_INODE, &OsString::from("dir0")) + .lookup(&store, FUSE_ROOT_INODE, &OsString::from("dir0")) .await .expect("should exist"); let file0 = superblock - .lookup(&client, dir0.inode.ino(), &OsString::from("file0.txt")) + .lookup(&store, dir0.inode.ino(), &OsString::from("file0.txt")) .await .expect("should exist"); client.remove_object(file0.inode.full_key()); let file0 = superblock - .lookup(&client, dir0.inode.ino(), &OsString::from("file0.txt")) + .lookup(&store, dir0.inode.ino(), &OsString::from("file0.txt")) .await; if cached { file0.expect("file0 inode should still be served from cache"); @@ -1738,13 +1727,14 @@ mod tests { part_size: 1024 * 1024, }; let client = Arc::new(MockClient::new(client_config)); + let store = test_store(client.clone()); let name = "foo"; client.add_object(name, b"foo".into()); let superblock = Superblock::new("test_bucket", &Default::default(), Default::default()); - let lookup = superblock.lookup(&client, ROOT_INODE_NO, name.as_ref()).await.unwrap(); + let lookup = superblock.lookup(&store, ROOT_INODE_NO, name.as_ref()).await.unwrap(); let lookup_count = lookup.inode.inner.sync.read().unwrap().lookup_count; assert_eq!(lookup_count, 1); let ino = lookup.inode.ino(); @@ -1759,12 +1749,12 @@ mod tests { drop(lookup); let err = superblock - .getattr(&client, ino, false) + .getattr(&store, ino, false) .await .expect_err("Inode should not be valid"); assert!(matches!(err, InodeError::InodeDoesNotExist(_))); - let lookup = superblock.lookup(&client, ROOT_INODE_NO, name.as_ref()).await.unwrap(); + let lookup = superblock.lookup(&store, ROOT_INODE_NO, name.as_ref()).await.unwrap(); let lookup_count = lookup.inode.inner.sync.read().unwrap().lookup_count; assert_eq!(lookup_count, 1); } @@ -1776,13 +1766,14 @@ mod tests { part_size: 1024 * 1024, }; let client = Arc::new(MockClient::new(client_config)); + let store = test_store(client.clone()); let name = "foo"; client.add_object(name, b"foo".into()); let superblock = Superblock::new("test_bucket", &Default::default(), Default::default()); - let lookup = superblock.lookup(&client, ROOT_INODE_NO, name.as_ref()).await.unwrap(); + let lookup = superblock.lookup(&store, ROOT_INODE_NO, name.as_ref()).await.unwrap(); let lookup_count = lookup.inode.inner.sync.read().unwrap().lookup_count; assert_eq!(lookup_count, 1); let ino = lookup.inode.ino(); @@ -1791,13 +1782,13 @@ mod tests { client.add_object(&format!("{name}/bar"), b"bar".into()); // Should be a directory now, so a different inode - let new_lookup = superblock.lookup(&client, ROOT_INODE_NO, name.as_ref()).await.unwrap(); + let new_lookup = superblock.lookup(&store, ROOT_INODE_NO, name.as_ref()).await.unwrap(); assert_ne!(ino, new_lookup.inode.ino()); superblock.forget(ino, 1); // Lookup still works after forgetting the old inode - let new_lookup2 = superblock.lookup(&client, ROOT_INODE_NO, name.as_ref()).await.unwrap(); + let new_lookup2 = superblock.lookup(&store, ROOT_INODE_NO, name.as_ref()).await.unwrap(); assert_eq!(new_lookup.inode.ino(), new_lookup2.inode.ino()); } @@ -1810,6 +1801,7 @@ mod tests { part_size: 1024 * 1024, }; let client = Arc::new(MockClient::new(client_config)); + let store = test_store(client.clone()); let keys = &[ format!("{prefix}dir0/file0.txt"), @@ -1838,8 +1830,8 @@ mod tests { // Try it all twice to test inode reuse for _ in 0..2 { - let dir_handle = superblock.readdir(&client, FUSE_ROOT_INODE, 2).await.unwrap(); - let entries = dir_handle.collect(&client).await.unwrap(); + let dir_handle = superblock.readdir(&store, FUSE_ROOT_INODE, 2).await.unwrap(); + let entries = dir_handle.collect(&store).await.unwrap(); assert_eq!( entries.iter().map(|entry| entry.inode.name()).collect::>(), &["dir0", "dir1"] @@ -1849,8 +1841,8 @@ mod tests { dir_handle.remember(&entries[0]); let dir0_inode = entries[0].inode.ino(); - let dir_handle = superblock.readdir(&client, dir0_inode, 2).await.unwrap(); - let entries = dir_handle.collect(&client).await.unwrap(); + let dir_handle = superblock.readdir(&store, dir0_inode, 2).await.unwrap(); + let entries = dir_handle.collect(&store).await.unwrap(); assert_eq!( entries.iter().map(|entry| entry.inode.name()).collect::>(), &["file0.txt", "sdir0", "sdir1"] @@ -1862,8 +1854,8 @@ mod tests { dir_handle.remember(&entries[1]); let sdir0_inode = entries[1].inode.ino(); - let dir_handle = superblock.readdir(&client, sdir0_inode, 2).await.unwrap(); - let entries = dir_handle.collect(&client).await.unwrap(); + let dir_handle = superblock.readdir(&store, sdir0_inode, 2).await.unwrap(); + let entries = dir_handle.collect(&store).await.unwrap(); assert_eq!( entries.iter().map(|entry| entry.inode.name()).collect::>(), &["file0.txt", "file1.txt", "file2.txt"] @@ -1882,7 +1874,7 @@ mod tests { bucket: "test_bucket".to_string(), part_size: 1024 * 1024, }; - let client = Arc::new(MockClient::new(client_config)); + let store = test_store(Arc::new(MockClient::new(client_config))); let prefix = Prefix::new(prefix).expect("valid prefix"); let superblock = Superblock::new("test_bucket", &prefix, Default::default()); @@ -1894,7 +1886,7 @@ mod tests { let filename = format!("file{i}.txt"); let new_inode = superblock .create( - &client, + &store, FUSE_ROOT_INODE, OsStr::from_bytes(filename.as_bytes()), InodeKind::File, @@ -1902,7 +1894,7 @@ mod tests { .await .unwrap(); superblock - .write(&client, new_inode.inode.ino(), FUSE_ROOT_INODE, 0) + .write(&store, new_inode.inode.ino(), FUSE_ROOT_INODE, 0) .await .unwrap(); expected_list.push(filename); @@ -1910,8 +1902,8 @@ mod tests { // Try it all twice to test inode reuse for _ in 0..2 { - let dir_handle = superblock.readdir(&client, FUSE_ROOT_INODE, 2).await.unwrap(); - let entries = dir_handle.collect(&client).await.unwrap(); + let dir_handle = superblock.readdir(&store, FUSE_ROOT_INODE, 2).await.unwrap(); + let entries = dir_handle.collect(&store).await.unwrap(); assert_eq!( entries.iter().map(|entry| entry.inode.name()).collect::>(), expected_list @@ -1928,6 +1920,7 @@ mod tests { part_size: 1024 * 1024, }; let client = Arc::new(MockClient::new(client_config)); + let store = test_store(client.clone()); let prefix = Prefix::new(prefix).expect("valid prefix"); let superblock = Superblock::new("test_bucket", &prefix, Default::default()); @@ -1950,7 +1943,7 @@ mod tests { let filename = format!("newfile{i}.txt"); let new_inode = superblock .create( - &client, + &store, FUSE_ROOT_INODE, OsStr::from_bytes(filename.as_bytes()), InodeKind::File, @@ -1958,7 +1951,7 @@ mod tests { .await .unwrap(); superblock - .write(&client, new_inode.inode.ino(), FUSE_ROOT_INODE, 0) + .write(&store, new_inode.inode.ino(), FUSE_ROOT_INODE, 0) .await .unwrap(); expected_list.push(filename); @@ -1966,8 +1959,8 @@ mod tests { // Try it all twice to test inode reuse for _ in 0..2 { - let dir_handle = superblock.readdir(&client, FUSE_ROOT_INODE, 2).await.unwrap(); - let entries = dir_handle.collect(&client).await.unwrap(); + let dir_handle = superblock.readdir(&store, FUSE_ROOT_INODE, 2).await.unwrap(); + let entries = dir_handle.collect(&store).await.unwrap(); assert_eq!( entries.iter().map(|entry| entry.inode.name()).collect::>(), expected_list @@ -1984,18 +1977,19 @@ mod tests { part_size: 1024 * 1024, }; let client = Arc::new(MockClient::new(client_config)); + let store = test_store(client.clone()); let prefix = Prefix::new(prefix).expect("valid prefix"); let superblock = Superblock::new("test_bucket", &prefix, Default::default()); // Create local directory let dirname = "local_dir"; superblock - .create(&client, FUSE_ROOT_INODE, dirname.as_ref(), InodeKind::Directory) + .create(&store, FUSE_ROOT_INODE, dirname.as_ref(), InodeKind::Directory) .await .unwrap(); let lookedup = superblock - .lookup(&client, FUSE_ROOT_INODE, dirname.as_ref()) + .lookup(&store, FUSE_ROOT_INODE, dirname.as_ref()) .await .expect("lookup should succeed on local dirs"); assert_eq!( @@ -2007,8 +2001,8 @@ mod tests { WriteStatus::LocalUnopened ); - let dir_handle = superblock.readdir(&client, FUSE_ROOT_INODE, 2).await.unwrap(); - let entries = dir_handle.collect(&client).await.unwrap(); + let dir_handle = superblock.readdir(&store, FUSE_ROOT_INODE, 2).await.unwrap(); + let entries = dir_handle.collect(&store).await.unwrap(); assert_eq!( entries.iter().map(|entry| entry.inode.name()).collect::>(), vec![dirname] @@ -2027,34 +2021,34 @@ mod tests { bucket: "test_bucket".to_string(), part_size: 1024 * 1024, }; - let client = Arc::new(MockClient::new(client_config)); + let store = test_store(Arc::new(MockClient::new(client_config))); let prefix = Prefix::new(prefix).expect("valid prefix"); let superblock = Superblock::new("test_bucket", &prefix, Default::default()); // Create local directory let dirname = "local_dir"; let LookedUp { inode, .. } = superblock - .create(&client, FUSE_ROOT_INODE, dirname.as_ref(), InodeKind::Directory) + .create(&store, FUSE_ROOT_INODE, dirname.as_ref(), InodeKind::Directory) .await .expect("Should be able to create directory"); superblock - .rmdir(&client, FUSE_ROOT_INODE, dirname.as_ref()) + .rmdir(&store, FUSE_ROOT_INODE, dirname.as_ref()) .await .expect("rmdir on empty local directory should succeed"); superblock - .lookup(&client, FUSE_ROOT_INODE, dirname.as_ref()) + .lookup(&store, FUSE_ROOT_INODE, dirname.as_ref()) .await .expect_err("should not do lookup on removed directory"); superblock - .readdir(&client, inode.ino(), 2) + .readdir(&store, inode.ino(), 2) .await .expect_err("should not do readdir on removed directory"); superblock - .getattr(&client, inode.ino(), false) + .getattr(&store, inode.ino(), false) .await .expect_err("should not do getattr on removed directory"); } @@ -2067,19 +2061,19 @@ mod tests { bucket: "test_bucket".to_string(), part_size: 1024 * 1024, }; - let client = Arc::new(MockClient::new(client_config)); + let store = test_store(Arc::new(MockClient::new(client_config))); let prefix = Prefix::new(prefix).expect("valid prefix"); let superblock = Superblock::new("test_bucket", &prefix, Default::default()); // Create local directory let dirname = "local_dir"; let LookedUp { inode, .. } = superblock - .create(&client, FUSE_ROOT_INODE, dirname.as_ref(), InodeKind::Directory) + .create(&store, FUSE_ROOT_INODE, dirname.as_ref(), InodeKind::Directory) .await .expect("Should be able to create directory"); superblock - .rmdir(&client, FUSE_ROOT_INODE, dirname.as_ref()) + .rmdir(&store, FUSE_ROOT_INODE, dirname.as_ref()) .await .expect("rmdir on empty local directory should succeed"); @@ -2112,31 +2106,31 @@ mod tests { bucket: "test_bucket".to_string(), part_size: 1024 * 1024, }; - let client = Arc::new(MockClient::new(client_config)); + let store = test_store(Arc::new(MockClient::new(client_config))); let prefix = Prefix::new(prefix).expect("valid prefix"); let superblock = Superblock::new("test_bucket", &prefix, Default::default()); // Create local directory let dirname = "local_dir"; superblock - .create(&client, FUSE_ROOT_INODE, dirname.as_ref(), InodeKind::Directory) + .create(&store, FUSE_ROOT_INODE, dirname.as_ref(), InodeKind::Directory) .await .expect("Should be able to create directory"); let dirname_to_stay = "staying_local_dir"; superblock - .create(&client, FUSE_ROOT_INODE, dirname_to_stay.as_ref(), InodeKind::Directory) + .create(&store, FUSE_ROOT_INODE, dirname_to_stay.as_ref(), InodeKind::Directory) .await .expect("Should be able to create directory"); superblock - .rmdir(&client, FUSE_ROOT_INODE, dirname.as_ref()) + .rmdir(&store, FUSE_ROOT_INODE, dirname.as_ref()) .await .expect("rmdir on empty local directory should succeed"); // removed directory should not appear in readdir of parent - let dir_handle = superblock.readdir(&client, FUSE_ROOT_INODE, 2).await.unwrap(); - let entries = dir_handle.collect(&client).await.unwrap(); + let dir_handle = superblock.readdir(&store, FUSE_ROOT_INODE, 2).await.unwrap(); + let entries = dir_handle.collect(&store).await.unwrap(); assert_eq!( entries.iter().map(|entry| entry.inode.name()).collect::>(), &[dirname_to_stay] @@ -2152,6 +2146,7 @@ mod tests { part_size: 1024 * 1024, }; let client = Arc::new(MockClient::new(client_config)); + let store = test_store(client.clone()); let prefix = Prefix::new(prefix).expect("valid prefix"); let superblock = Superblock::new("test_bucket", &prefix, Default::default()); @@ -2161,17 +2156,17 @@ mod tests { let parent_ino = FUSE_ROOT_INODE; superblock - .lookup(&client, parent_ino, file_name.as_ref()) + .lookup(&store, parent_ino, file_name.as_ref()) .await .expect("file should exist"); superblock - .unlink(&client, parent_ino, file_name.as_ref()) + .unlink(&store, parent_ino, file_name.as_ref()) .await .expect("file delete should succeed as it exists"); let err: i32 = superblock - .lookup(&client, parent_ino, file_name.as_ref()) + .lookup(&store, parent_ino, file_name.as_ref()) .await .expect_err("lookup should no longer find deleted file") .to_errno(); @@ -2185,6 +2180,7 @@ mod tests { part_size: 1024 * 1024, }; let client = Arc::new(MockClient::new(client_config)); + let store = test_store(client.clone()); let file_name = "corrupted"; client.add_object(file_name.as_ref(), MockObject::constant(0xaa, 30, ETag::for_tests())); @@ -2231,7 +2227,7 @@ mod tests { } let err = superblock - .unlink(&client, parent_ino, file_name.as_ref()) + .unlink(&store, parent_ino, file_name.as_ref()) .await .expect_err("unlink of a corrupted inode should fail"); assert!(matches!(err, InodeError::CorruptedMetadata(_))); @@ -2243,7 +2239,7 @@ mod tests { bucket: "test_bucket".to_string(), part_size: 1024 * 1024, }; - let client = Arc::new(MockClient::new(client_config)); + let store = test_store(Arc::new(MockClient::new(client_config))); let superblock = Superblock::new("test_bucket", &Default::default(), Default::default()); let nested_dirs = (0..5).map(|i| format!("level{i}")).collect::>(); @@ -2251,7 +2247,7 @@ mod tests { let mut parent_dir_ino = FUSE_ROOT_INODE; for dirname in &nested_dirs { let dir_lookedup = superblock - .create(&client, parent_dir_ino, dirname.as_ref(), InodeKind::Directory) + .create(&store, parent_dir_ino, dirname.as_ref(), InodeKind::Directory) .await .unwrap(); @@ -2273,7 +2269,7 @@ mod tests { let filename = "newfile.txt"; let new_inode = superblock .create( - &client, + &store, leaf_dir_ino, OsStr::from_bytes(filename.as_bytes()), InodeKind::File, @@ -2282,7 +2278,7 @@ mod tests { .unwrap(); let writehandle = superblock - .write(&client, new_inode.inode.ino(), leaf_dir_ino, 0) + .write(&store, new_inode.inode.ino(), leaf_dir_ino, 0) .await .unwrap(); @@ -2292,7 +2288,7 @@ mod tests { // All nested dirs disappear let dirname = nested_dirs.first().unwrap(); - let lookedup = superblock.lookup(&client, FUSE_ROOT_INODE, dirname.as_ref()).await; + let lookedup = superblock.lookup(&store, FUSE_ROOT_INODE, dirname.as_ref()).await; assert!(matches!(lookedup, Err(InodeError::FileDoesNotExist))); } @@ -2303,27 +2299,28 @@ mod tests { part_size: 1024 * 1024, }; let client = Arc::new(MockClient::new(client_config)); + let store = test_store(client.clone()); client.add_object("dir1/file1.txt", MockObject::constant(0xaa, 30, ETag::for_tests())); let superblock = Superblock::new("test_bucket", &Default::default(), Default::default()); for _ in 0..2 { let dir1_1 = superblock - .lookup(&client, FUSE_ROOT_INODE, OsStr::from_bytes("dir1".as_bytes())) + .lookup(&store, FUSE_ROOT_INODE, OsStr::from_bytes("dir1".as_bytes())) .await .unwrap(); let dir1_2 = superblock - .lookup(&client, FUSE_ROOT_INODE, OsStr::from_bytes("dir1".as_bytes())) + .lookup(&store, FUSE_ROOT_INODE, OsStr::from_bytes("dir1".as_bytes())) .await .unwrap(); assert_eq!(dir1_1.inode.ino(), dir1_2.inode.ino()); let file1_1 = superblock - .lookup(&client, dir1_1.inode.ino(), OsStr::from_bytes("file1.txt".as_bytes())) + .lookup(&store, dir1_1.inode.ino(), OsStr::from_bytes("file1.txt".as_bytes())) .await .unwrap(); let file1_2 = superblock - .lookup(&client, dir1_1.inode.ino(), OsStr::from_bytes("file1.txt".as_bytes())) + .lookup(&store, dir1_1.inode.ino(), OsStr::from_bytes("file1.txt".as_bytes())) .await .unwrap(); assert_eq!(file1_1.inode.ino(), file1_2.inode.ino()); @@ -2339,6 +2336,7 @@ mod tests { part_size: 1024 * 1024, }; let client = Arc::new(MockClient::new(client_config)); + let store = test_store(client.clone()); // In this test the `/` delimiter comes back to bite us. `dir-1/` comes before `dir/` in // lexicographical order (- is ASCII 0x2d, / is ASCII 0x2f), so `dir-1` will be the first // common prefix when we do ListObjects with prefix = 'dir'. But `dir` comes before `dir-1` @@ -2355,15 +2353,15 @@ mod tests { let superblock = Superblock::new("test_bucket", &Default::default(), Default::default()); - let dir_handle = superblock.readdir(&client, FUSE_ROOT_INODE, 2).await.unwrap(); - let entries = dir_handle.collect(&client).await.unwrap(); + let dir_handle = superblock.readdir(&store, FUSE_ROOT_INODE, 2).await.unwrap(); + let entries = dir_handle.collect(&store).await.unwrap(); assert_eq!( entries.iter().map(|entry| entry.inode.name()).collect::>(), &["dir", "dir-1"] ); let dir = superblock - .lookup(&client, FUSE_ROOT_INODE, OsStr::from_bytes("dir".as_bytes())) + .lookup(&store, FUSE_ROOT_INODE, OsStr::from_bytes("dir".as_bytes())) .await .unwrap(); assert_eq!(dir.inode.full_key(), OsString::from("dir/")); @@ -2400,9 +2398,10 @@ mod tests { MockObject::constant(0xaa, 30, ETag::from_str("test_etag_5").unwrap()), ); + let store = test_store(client.clone()); let superblock = Superblock::new("test_bucket", &Default::default(), Default::default()); - let dir_handle = superblock.readdir(&client, FUSE_ROOT_INODE, 2).await.unwrap(); - let entries = dir_handle.collect(&client).await.unwrap(); + let dir_handle = superblock.readdir(&store, FUSE_ROOT_INODE, 2).await.unwrap(); + let entries = dir_handle.collect(&store).await.unwrap(); assert_eq!( entries.iter().map(|entry| entry.inode.name()).collect::>(), &["dir1"] @@ -2410,8 +2409,8 @@ mod tests { dir_handle.remember(&entries[0]); let dir1_ino = entries[0].inode.ino(); - let dir_handle = superblock.readdir(&client, dir1_ino, 2).await.unwrap(); - let entries = dir_handle.collect(&client).await.unwrap(); + let dir_handle = superblock.readdir(&store, dir1_ino, 2).await.unwrap(); + let entries = dir_handle.collect(&store).await.unwrap(); assert_eq!( entries.iter().map(|entry| entry.inode.name()).collect::>(), &["a"] @@ -2420,7 +2419,7 @@ mod tests { // Neither of these keys should exist in the directory for key in ["/", "."] { let lookup = superblock - .lookup(&client, dir1_ino, OsStr::from_bytes(key.as_bytes())) + .lookup(&store, dir1_ino, OsStr::from_bytes(key.as_bytes())) .await; assert!(matches!(lookup, Err(InodeError::InvalidFileName(_)))); } @@ -2434,7 +2433,7 @@ mod tests { bucket: "test_bucket".to_string(), part_size: 1024 * 1024, }; - let client = Arc::new(MockClient::new(client_config)); + let store = test_store(Arc::new(MockClient::new(client_config))); let prefix = Prefix::new(prefix).expect("valid prefix"); let superblock = Superblock::new("test_bucket", &prefix, Default::default()); @@ -2442,7 +2441,7 @@ mod tests { let filename = "newfile.txt"; let new_inode = superblock .create( - &client, + &store, FUSE_ROOT_INODE, OsStr::from_bytes(filename.as_bytes()), InodeKind::File, @@ -2451,7 +2450,7 @@ mod tests { .unwrap(); let writehandle = superblock - .write(&client, new_inode.inode.ino(), FUSE_ROOT_INODE, 0) + .write(&store, new_inode.inode.ino(), FUSE_ROOT_INODE, 0) .await .unwrap(); @@ -2460,7 +2459,7 @@ mod tests { // Call setattr and verify the stat let lookup = superblock - .setattr(&client, new_inode.inode.ino(), Some(atime), Some(mtime)) + .setattr(&store, new_inode.inode.ino(), Some(atime), Some(mtime)) .await .expect("setattr should be successful"); let stat = lookup.stat; @@ -2468,7 +2467,7 @@ mod tests { assert_eq!(stat.mtime, mtime); let lookup = superblock - .getattr(&client, new_inode.inode.ino(), false) + .getattr(&store, new_inode.inode.ino(), false) .await .expect("getattr should be successful"); let stat = lookup.stat; @@ -2480,7 +2479,7 @@ mod tests { // Should get an error back when calling setattr let result = superblock - .setattr(&client, new_inode.inode.ino(), Some(atime), Some(mtime)) + .setattr(&store, new_inode.inode.ino(), Some(atime), Some(mtime)) .await; assert!(matches!(result, Err(InodeError::SetAttrNotPermittedOnRemoteInode(_)))); } @@ -2491,7 +2490,7 @@ mod tests { bucket: "test_bucket".to_string(), part_size: 1024 * 1024, }; - let client = Arc::new(MockClient::new(client_config)); + let store = test_store(Arc::new(MockClient::new(client_config))); let superblock = Superblock::new("test_bucket", &Default::default(), Default::default()); let ino: u64 = 42; @@ -2526,7 +2525,7 @@ mod tests { // Should get an error back when calling setattr let atime = OffsetDateTime::UNIX_EPOCH + Duration::days(90); let mtime = OffsetDateTime::UNIX_EPOCH + Duration::days(60); - let result = superblock.setattr(&client, ino, Some(atime), Some(mtime)).await; + let result = superblock.setattr(&store, ino, Some(atime), Some(mtime)).await; assert!(matches!(result, Err(InodeError::SetAttrOnExpiredStat(_)))); } diff --git a/mountpoint-s3/src/inode/readdir.rs b/mountpoint-s3/src/inode/readdir.rs index 74fd31493..18e0cf7d8 100644 --- a/mountpoint-s3/src/inode/readdir.rs +++ b/mountpoint-s3/src/inode/readdir.rs @@ -39,10 +39,12 @@ use std::cmp::Ordering; use std::collections::VecDeque; use mountpoint_s3_client::types::ObjectInfo; -use mountpoint_s3_client::ObjectClient; use tracing::{error, trace, warn}; -use crate::sync::{Arc, AsyncMutex, Mutex}; +use crate::{ + store::ObjectStore, + sync::{Arc, AsyncMutex, Mutex}, +}; use super::{ valid_inode_name, InodeError, InodeKind, InodeKindData, InodeNo, InodeStat, LookedUp, RemoteLookup, SuperblockInner, @@ -106,7 +108,7 @@ impl ReaddirHandle { /// Return the next inode for the directory stream. If the stream is finished, returns /// `Ok(None)`. Does not increment the lookup count of the returned inodes: the caller /// is responsible for calling [`remember()`] if required. - pub async fn next(&self, client: &OC) -> Result, InodeError> { + pub async fn next(&self, store: &OS) -> Result, InodeError> { if let Some(readded) = self.readded.lock().unwrap().take() { return Ok(Some(readded)); } @@ -116,7 +118,7 @@ impl ReaddirHandle { loop { let next = { let mut iter = self.iter.lock().await; - iter.next(client).await? + iter.next(store).await? }; if let Some(next) = next { @@ -183,9 +185,9 @@ impl ReaddirHandle { } #[cfg(test)] - pub(super) async fn collect(&self, client: &OC) -> Result, InodeError> { + pub(super) async fn collect(&self, store: &OS) -> Result, InodeError> { let mut result = vec![]; - while let Some(entry) = self.next(client).await? { + while let Some(entry) = self.next(store).await? { result.push(entry); } Ok(result) @@ -295,13 +297,13 @@ impl ReaddirIter { /// Return the next [ReaddirEntry] for the directory stream. If the stream is finished, returns /// `Ok(None)`. - async fn next(&mut self, client: &impl ObjectClient) -> Result, InodeError> { + async fn next(&mut self, store: &impl ObjectStore) -> Result, InodeError> { // The only reason to go around this loop more than once is if the next entry to return is // a duplicate, in which case it's skipped. loop { // First refill the peeks at the next entries on each iterator if self.next_remote.is_none() { - self.next_remote = self.remote.next(client).await?; + self.next_remote = self.remote.next(store).await?; } if self.next_local.is_none() { self.next_local = self.local.next(); @@ -372,7 +374,7 @@ impl RemoteIter { } } - async fn next(&mut self, client: &impl ObjectClient) -> Result, InodeError> { + async fn next(&mut self, store: &impl ObjectStore) -> Result, InodeError> { if self.entries.is_empty() { let continuation_token = match &mut self.state { RemoteIterState::Finished => { @@ -384,7 +386,7 @@ impl RemoteIter { trace!(self=?self as *const _, prefix=?self.full_path, ?continuation_token, "continuing remote iter"); - let result = client + let result = store .list_objects( &self.bucket, continuation_token.as_deref(), diff --git a/mountpoint-s3/src/lib.rs b/mountpoint-s3/src/lib.rs index a17cb14b3..ea5cc9f6d 100644 --- a/mountpoint-s3/src/lib.rs +++ b/mountpoint-s3/src/lib.rs @@ -1,5 +1,5 @@ mod checksums; -mod data_cache; +pub mod data_cache; pub mod fs; pub mod fuse; mod inode; @@ -8,6 +8,7 @@ pub mod logging; pub mod metrics; pub mod prefetch; pub mod prefix; +pub mod store; mod sync; mod upload; diff --git a/mountpoint-s3/src/main.rs b/mountpoint-s3/src/main.rs index 1eff02b6e..016e91a51 100644 --- a/mountpoint-s3/src/main.rs +++ b/mountpoint-s3/src/main.rs @@ -4,18 +4,20 @@ use std::io::{Read, Write}; use std::os::fd::AsRawFd; use std::os::unix::prelude::FromRawFd; use std::path::{Path, PathBuf}; +use std::sync::Arc; use std::time::Duration; use anyhow::{anyhow, Context as _}; use clap::{value_parser, Parser}; use fuser::{MountOption, Session}; -use mountpoint_s3::fs::{CacheConfig, S3FilesystemConfig}; +use mountpoint_s3::fs::S3FilesystemConfig; use mountpoint_s3::fuse::session::FuseSession; use mountpoint_s3::fuse::S3FuseFilesystem; use mountpoint_s3::instance::InstanceInfo; use mountpoint_s3::logging::{init_logging, LoggingConfig}; use mountpoint_s3::metrics; use mountpoint_s3::prefix::Prefix; +use mountpoint_s3::store::{default_store, ObjectStore}; use mountpoint_s3_client::config::{AddressingStyle, EndpointConfig, S3ClientAuthConfig, S3ClientConfig}; use mountpoint_s3_client::error::ObjectClientError; use mountpoint_s3_client::{ObjectClient, S3CrtClient, S3RequestError}; @@ -238,6 +240,25 @@ struct CliArgs { )] pub metadata_cache_ttl: Option, + #[cfg(feature = "caching")] + #[clap( + long, + help = "Enable caching of object data in memory", + help_heading = CACHING_OPTIONS_HEADER, + )] + pub enable_data_caching: bool, + + #[cfg(feature = "caching")] + #[clap( + long, + help = "Block size for the data cache", + default_value = "1048576", + value_parser = value_parser!(u64).range(1..), + help_heading = CACHING_OPTIONS_HEADER, + requires = "enable_data_caching", + )] + pub data_cache_block_size: u64, + #[clap( long, help = "Configure a string to be prepended to the 'User-Agent' HTTP request header for all S3 requests", @@ -288,6 +309,35 @@ impl CliArgs { format!("bucket {}", self.bucket_name) } } + + fn fuse_session_config(&self) -> FuseSessionConfig { + let fs_name = String::from("mountpoint-s3"); + let mut options = vec![ + MountOption::DefaultPermissions, + MountOption::FSName(fs_name), + MountOption::NoAtime, + ]; + if self.read_only { + options.push(MountOption::RO); + } + if self.auto_unmount { + options.push(MountOption::AutoUnmount); + } + if self.allow_root { + options.push(MountOption::AllowRoot); + } + if self.allow_other { + options.push(MountOption::AllowOther); + } + + let mount_point = self.mount_point.to_owned(); + let max_threads = self.max_threads as usize; + FuseSessionConfig { + mount_point, + options, + max_threads, + } + } } fn main() -> anyhow::Result<()> { @@ -425,6 +475,7 @@ fn mount(args: CliArgs) -> anyhow::Result { validate_mount_point(&args.mount_point)?; let bucket_description = args.bucket_description(); + let fuse_config = args.fuse_session_config(); // Placeholder region will be filled in by [create_client_for_bucket] let endpoint_config = EndpointConfig::new("PLACEHOLDER") @@ -502,8 +553,12 @@ fn mount(args: CliArgs) -> anyhow::Result { filesystem_config.storage_class = args.storage_class; filesystem_config.allow_delete = args.allow_delete; + let prefetcher_config = Default::default(); + #[cfg(feature = "caching")] { + use mountpoint_s3::fs::CacheConfig; + if args.enable_metadata_caching { // TODO: Review default for TTL let metadata_cache_ttl = args.metadata_cache_ttl.unwrap_or(Duration::from_secs(3600)); @@ -515,41 +570,47 @@ fn mount(args: CliArgs) -> anyhow::Result { } } - let fs = S3FuseFilesystem::new(client, runtime, &args.bucket_name, &prefix, filesystem_config); - - let fs_name = String::from("mountpoint-s3"); - let mut options = vec![ - MountOption::DefaultPermissions, - MountOption::FSName(fs_name), - MountOption::NoAtime, - ]; - if args.read_only { - options.push(MountOption::RO); - } - if args.auto_unmount { - options.push(MountOption::AutoUnmount); - } - if args.allow_root { - options.push(MountOption::AllowRoot); - } - if args.allow_other { - options.push(MountOption::AllowOther); - } - - let session = Session::new(fs, &args.mount_point, &options).context("Failed to create FUSE session")?; + let store = default_store(Arc::new(client), runtime, prefetcher_config); + create_filesystem( + store, + &args.bucket_name, + &prefix, + filesystem_config, + fuse_config, + &bucket_description, + ) +} - let max_threads = args.max_threads as usize; - let session = FuseSession::new(session, max_threads).context("Failed to start FUSE session")?; +fn create_filesystem( + store: Store, + bucket_name: &str, + prefix: &Prefix, + filesystem_config: S3FilesystemConfig, + fuse_session_config: FuseSessionConfig, + bucket_description: &str, +) -> Result { + let fs = S3FuseFilesystem::new(store, bucket_name, prefix, filesystem_config); + let session = Session::new(fs, &fuse_session_config.mount_point, &fuse_session_config.options) + .context("Failed to create FUSE session")?; + let session = FuseSession::new(session, fuse_session_config.max_threads).context("Failed to start FUSE session")?; tracing::info!( "successfully mounted {} at {}", bucket_description, - args.mount_point.display() + fuse_session_config.mount_point.display() ); Ok(session) } +/// Configuration for a FUSE background session. +#[derive(Debug)] +struct FuseSessionConfig { + pub mount_point: PathBuf, + pub options: Vec, + pub max_threads: usize, +} + /// Create a client for a bucket in the given region and send a ListObjectsV2 request to validate /// that it's accessible. If no region is provided, attempt to infer it by first sending a /// ListObjectsV2 to the default region. diff --git a/mountpoint-s3/src/prefetch.rs b/mountpoint-s3/src/prefetch.rs index 550aa229b..2f10d4b1d 100644 --- a/mountpoint-s3/src/prefetch.rs +++ b/mountpoint-s3/src/prefetch.rs @@ -7,33 +7,29 @@ //! we increase the size of the GetObject requests up to some maximum. If the reader ever makes a //! non-sequential read, we abandon the prefetching and start again with the minimum request size. -mod feed; +pub mod feed; mod part; mod part_queue; mod seek_window; +mod task; use std::collections::VecDeque; use std::fmt::Debug; use std::time::Duration; -use futures::future::RemoteHandle; -use futures::task::{Spawn, SpawnExt}; +use async_trait::async_trait; use metrics::{counter, histogram}; -use mountpoint_s3_client::error::{GetObjectError, ObjectClientError}; -use mountpoint_s3_client::types::ETag; -use mountpoint_s3_client::ObjectClient; -use thiserror::Error; -use tracing::{debug_span, error, trace, Instrument}; +use mountpoint_s3_client::error::ObjectClientError; +use mountpoint_s3_client::types::{ETag, ObjectClientResult}; +use tracing::trace; use crate::checksums::{ChecksummedBytes, IntegrityError}; -use crate::prefetch::feed::{ClientPartFeed, ObjectPartFeed}; -use crate::prefetch::part::Part; -use crate::prefetch::part_queue::{unbounded_part_queue, PartQueue}; +use crate::prefetch::feed::{ObjectPartFeed, RequestRange}; use crate::prefetch::seek_window::SeekWindow; +use crate::prefetch::task::RequestTask; +use crate::store::PrefetchReadError; use crate::sync::Arc; -type TaskError = ObjectClientError::ClientError>; - #[derive(Debug, Clone, Copy)] pub struct PrefetcherConfig { /// Size of the first request in a prefetch run @@ -80,55 +76,36 @@ impl Default for PrefetcherConfig { /// A [Prefetcher] creates and manages prefetching GetObject requests to objects. #[derive(Debug)] -pub struct Prefetcher { - inner: Arc>, -} - -struct PrefetcherInner { - part_feed: Arc + Send + Sync>, +pub struct Prefetcher { + part_feed: Feed, config: PrefetcherConfig, - runtime: Runtime, -} - -impl Debug for PrefetcherInner { - fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { - f.debug_struct("PrefetcherInner").field("config", &self.config).finish() - } } -impl Prefetcher +impl Prefetcher where - Client: ObjectClient + Send + Sync + 'static, - Runtime: Spawn, + Feed: ObjectPartFeed, { /// Create a new [Prefetcher] that will make requests to the given client. - pub fn new(client: Arc, runtime: Runtime, config: PrefetcherConfig) -> Self { - let part_feed = Arc::new(ClientPartFeed::new(client)); - let inner = PrefetcherInner { - part_feed, - config, - runtime, - }; - - Self { inner: Arc::new(inner) } + pub fn new(part_feed: Feed, config: PrefetcherConfig) -> Arc { + Arc::new(Self { part_feed, config }) } /// Start a new get request to the specified object. - pub fn get(&self, bucket: &str, key: &str, size: u64, etag: ETag) -> PrefetchGetObject { - PrefetchGetObject::new(self.inner.clone(), bucket, key, size, etag) + pub fn get(self: &Arc, bucket: &str, key: &str, size: u64, etag: ETag) -> PrefetchGetObject { + PrefetchGetObject::new(self.clone(), bucket, key, size, etag) } } /// A GetObject request that divides the desired range of the object into chunks that it prefetches /// in a way that maximizes throughput from S3. #[derive(Debug)] -pub struct PrefetchGetObject { - inner: Arc>, +pub struct PrefetchGetObject { + inner: Arc>, // Invariant: the offset of the first byte in this task's part queue is always // self.next_sequential_read_offset. - current_task: Option>>, + current_task: Option>, // Currently we only every spawn at most one future task (see [spawn_next_request]) - future_tasks: VecDeque>>, + future_tasks: VecDeque>, // Invariant: the offset of the last byte in this window is always // self.next_sequential_read_offset - 1. backward_seek_window: SeekWindow, @@ -143,37 +120,21 @@ pub struct PrefetchGetObject { etag: ETag, } -impl PrefetchGetObject +#[async_trait] +impl crate::store::PrefetchGetObject for PrefetchGetObject where - Client: ObjectClient + Send + Sync + 'static, - Runtime: Spawn, + Feed: ObjectPartFeed + Send + Sync + 'static, { - /// Create and spawn a new prefetching request for an object - fn new(inner: Arc>, bucket: &str, key: &str, size: u64, etag: ETag) -> Self { - PrefetchGetObject { - inner: inner.clone(), - current_task: None, - future_tasks: Default::default(), - backward_seek_window: SeekWindow::new(inner.config.max_backward_seek_distance as usize), - preferred_part_size: 128 * 1024, - next_request_size: inner.config.first_request_size, - next_sequential_read_offset: 0, - next_request_offset: 0, - bucket: bucket.to_owned(), - key: key.to_owned(), - size, - etag, - } - } + type ClientError = Feed::ClientError; /// Read some bytes from the object. This function will always return exactly `size` bytes, /// except at the end of the object where it will return however many bytes are left (including /// possibly 0 bytes). - pub async fn read( + async fn read( &mut self, offset: u64, length: usize, - ) -> Result>> { + ) -> ObjectClientResult { trace!( offset, length, @@ -223,7 +184,7 @@ where trace!(offset, length, "read beyond object size"); break; }; - debug_assert!(current_task.remaining > 0); + debug_assert!(current_task.remaining() > 0); let part = match current_task.read(to_read as usize).await { Err(e) => { @@ -253,7 +214,7 @@ where // cancel inflight tasks self.current_task = None; self.future_tasks.drain(..); - return Err(e.into()); + return Err(ObjectClientError::ServiceError(e.into())); } } to_read -= part_len; @@ -261,11 +222,34 @@ where Ok(response) } +} + +impl PrefetchGetObject +where + Feed: ObjectPartFeed, +{ + /// Create and spawn a new prefetching request for an object + fn new(inner: Arc>, bucket: &str, key: &str, size: u64, etag: ETag) -> Self { + PrefetchGetObject { + inner: inner.clone(), + current_task: None, + future_tasks: Default::default(), + backward_seek_window: SeekWindow::new(inner.config.max_backward_seek_distance as usize), + preferred_part_size: 128 * 1024, + next_sequential_read_offset: 0, + next_request_size: inner.config.first_request_size, + next_request_offset: 0, + bucket: bucket.to_owned(), + key: key.to_owned(), + size, + etag, + } + } /// Runs on every read to prepare and spawn any requests our prefetching logic requires fn prepare_requests(&mut self) { let current_task = self.current_task.as_ref(); - if current_task.map(|task| task.remaining == 0).unwrap_or(true) { + if current_task.map(|task| task.remaining() == 0).unwrap_or(true) { // There's no current task, or the current task is finished. Prepare the next request. if let Some(next_task) = self.future_tasks.pop_front() { self.current_task = Some(next_task); @@ -275,7 +259,7 @@ where } else if current_task .map(|task| { // Don't trigger prefetch if we're in a fake task created by backward streaming - task.is_streaming() && task.remaining <= task.total_size / 2 + task.is_streaming() && task.remaining() <= task.total_size() / 2 }) .unwrap_or(false) && self.future_tasks.is_empty() @@ -289,64 +273,37 @@ where } /// Spawn the next required request - fn spawn_next_request(&mut self) -> Option>> { + fn spawn_next_request(&mut self) -> Option> { let start = self.next_request_offset; - let end = (start + self.next_request_size as u64).min(self.size); - if start >= self.size { return None; } - let size = end - start; - let range = start..end; - - let (part_queue, part_queue_producer) = unbounded_part_queue(); - - trace!(?range, size, "spawning request"); - - let request_task = { - let feed = self.inner.part_feed.clone(); - let preferred_part_size = self.preferred_part_size; - let bucket = self.bucket.to_owned(); - let key = self.key.to_owned(); - let etag = self.etag.clone(); - let span = debug_span!("prefetch", range=?range); - - async move { - feed.get_object_parts(&bucket, &key, range, etag, preferred_part_size, part_queue_producer) - .await - } - .instrument(span) - }; + let range = RequestRange::new(self.size as usize, start, self.next_request_size); + let task = self.inner.part_feed.spawn_get_object_request( + &self.bucket, + &self.key, + self.etag.clone(), + range, + self.preferred_part_size, + ); // [read] will reset these if the reader stops making sequential requests - self.next_request_offset += size; - self.next_request_size = self.get_next_request_size(); - - let task_handle = self.inner.runtime.spawn_with_handle(request_task).unwrap(); + self.next_request_offset += task.total_size() as u64; + self.next_request_size = self.get_next_request_size(task.total_size()); - Some(RequestTask { - task_handle: Some(task_handle), - total_size: size as usize, - remaining: size as usize, - start_offset: start, - part_queue, - }) + Some(task) } /// Suggest next request size. /// The next request size is the current request size multiplied by sequential prefetch multiplier. - fn get_next_request_size(&self) -> usize { + fn get_next_request_size(&self, request_size: usize) -> usize { // TODO: this logic doesn't work well right now in the case where part_size < // first_request_size and sequential_prefetch_multiplier = 1. It ends up just repeatedly // shrinking the request size until it reaches 1. But this isn't a configuration we // currently expect to ever run in (part_size will always be >= 5MB for MPU reasons, and a // prefetcher with multiplier 1 is not very good). - let next_request_size = (self.next_request_size * self.inner.config.sequential_prefetch_multiplier) - .min(self.inner.config.max_request_size); - self.inner - .part_feed - .get_aligned_request_size(self.next_request_offset, next_request_size) + (request_size * self.inner.config.sequential_prefetch_multiplier).min(self.inner.config.max_request_size) } /// Reset this prefetch request to a new offset, clearing any existing tasks queued. @@ -354,15 +311,15 @@ where self.current_task = None; self.future_tasks.drain(..); self.backward_seek_window.clear(); - self.next_request_size = self.inner.config.first_request_size; self.next_sequential_read_offset = offset; + self.next_request_size = self.inner.config.first_request_size; self.next_request_offset = offset; } /// Try to seek within the current inflight requests without restarting them. Returns true if /// the seek succeeded, in which case self.next_sequential_read_offset will be updated to the /// new offset. If this returns false, the prefetcher is in an unknown state and must be reset. - async fn try_seek(&mut self, offset: u64) -> Result>> { + async fn try_seek(&mut self, offset: u64) -> Result> { assert_ne!(offset, self.next_sequential_read_offset); trace!(from = self.next_sequential_read_offset, to = offset, "trying to seek"); if offset > self.next_sequential_read_offset { @@ -372,27 +329,30 @@ where } } - async fn try_seek_forward(&mut self, offset: u64) -> Result>> { + async fn try_seek_forward( + &mut self, + offset: u64, + ) -> Result> { assert!(offset > self.next_sequential_read_offset); let total_seek_distance = offset - self.next_sequential_read_offset; let Some(current_task) = self.current_task.as_mut() else { // Can't seek if there's no requests in flight at all return Ok(false); }; - let future_remaining = self.future_tasks.iter().map(|task| task.remaining).sum::() as u64; + let future_remaining = self.future_tasks.iter().map(|task| task.remaining()).sum::() as u64; if total_seek_distance - >= (current_task.remaining as u64 + future_remaining).min(self.inner.config.max_forward_seek_distance) + >= (current_task.remaining() as u64 + future_remaining).min(self.inner.config.max_forward_seek_distance) { // TODO maybe adjust the next_request_size somehow if we were still within // max_forward_seek_distance, so that strides > first_request_size can still get // prefetched. - trace!(?current_task.remaining, ?future_remaining, "seek failed: not enough inflight data"); + trace!(current_task_remaining=?current_task.remaining(), ?future_remaining, "seek failed: not enough inflight data"); return Ok(false); } // Jump ahead to the right request - if total_seek_distance >= current_task.remaining as u64 { - self.next_sequential_read_offset += current_task.remaining as u64; + if total_seek_distance >= current_task.remaining() as u64 { + self.next_sequential_read_offset += current_task.remaining() as u64; self.current_task = None; while let Some(next_request) = self.future_tasks.pop_front() { if next_request.end_offset() > offset { @@ -427,7 +387,10 @@ where Ok(true) } - fn try_seek_backward(&mut self, offset: u64) -> Result>> { + fn try_seek_backward( + &mut self, + offset: u64, + ) -> Result> { assert!(offset < self.next_sequential_read_offset); let backwards_length_needed = self.next_sequential_read_offset - offset; let Some(parts) = self.backward_seek_window.read_back(backwards_length_needed as usize) else { @@ -437,17 +400,7 @@ where // We're going to create a new fake "request" that contains the parts we read out of the // window. That sounds a bit hacky, but it keeps all the read logic simple rather than // needing separate paths for backwards seeks vs others. - let (part_queue, part_queue_producer) = unbounded_part_queue(); - for part in parts { - part_queue_producer.push(Ok(part)); - } - let request = RequestTask { - task_handle: None, - remaining: backwards_length_needed as usize, - start_offset: offset, - total_size: backwards_length_needed as usize, - part_queue, - }; + let request = RequestTask::from_parts(parts, offset); if let Some(current_task) = self.current_task.take() { self.future_tasks.push_front(current_task); } @@ -460,67 +413,27 @@ where } } -/// A single GetObject request submitted to the S3 client -#[derive(Debug)] -struct RequestTask { - /// Handle on the task/future. The future is cancelled when handle is dropped. This is None if - /// the request is fake (created by seeking backwards in the stream) - task_handle: Option>, - remaining: usize, - start_offset: u64, - total_size: usize, - part_queue: PartQueue, -} - -impl RequestTask { - async fn read(&mut self, length: usize) -> Result> { - let part = self.part_queue.read(length).await?; - debug_assert!(part.len() <= self.remaining); - self.remaining -= part.len(); - Ok(part) - } - - fn end_offset(&self) -> u64 { - self.start_offset + self.total_size as u64 - } - - /// Some requests aren't actually streaming data (they're fake, created by backwards seeks), and - /// shouldn't be counted for prefetcher progress. - fn is_streaming(&self) -> bool { - self.task_handle.is_some() - } -} - -#[derive(Debug, Error)] -pub enum PrefetchReadError { - #[error("get request failed")] - GetRequestFailed(#[source] E), - - #[error("get request terminated unexpectedly")] - GetRequestTerminatedUnexpectedly, - - #[error("integrity check failed")] - Integrity(#[from] IntegrityError), -} - #[cfg(test)] mod tests { // It's convenient to write test constants like "1 * 1024 * 1024" for symmetry #![allow(clippy::identity_op)] + use crate::prefetch::feed::ClientPartFeed; + use crate::store::PrefetchGetObject; + use super::*; use futures::executor::{block_on, ThreadPool}; + use futures::task::Spawn; + use mountpoint_s3_client::error::{GetObjectError, ObjectClientError}; use mountpoint_s3_client::failure_client::{countdown_failure_client, RequestFailureMap}; use mountpoint_s3_client::mock_client::{ramp_bytes, MockClient, MockClientConfig, MockClientError, MockObject}; + use mountpoint_s3_client::ObjectClient; use proptest::proptest; use proptest::strategy::{Just, Strategy}; use proptest_derive::Arbitrary; use std::collections::HashMap; use test_case::test_case; - const KB: usize = 1024; - const MB: usize = 1024 * 1024; - #[derive(Debug, Arbitrary)] struct TestConfig { #[proptest(strategy = "16usize..1*1024*1024")] @@ -537,6 +450,54 @@ mod tests { max_backward_seek_distance: u64, } + type GetObjectFn = dyn Fn(&str, &str, u64, ETag) -> Box>; + + struct PrefetcherBox { + get_fn: Box>, + } + + impl PrefetcherBox + where + E: std::error::Error + Send + Sync + 'static, + { + fn new(part_feed: Feed, config: PrefetcherConfig) -> Self + where + Feed: ObjectPartFeed + Send + Sync + 'static, + { + let prefetcher = Prefetcher::new(part_feed, config); + PrefetcherBox { + get_fn: Box::new(move |bucket: &str, key: &str, size: u64, etag: ETag| { + Box::new(prefetcher.get(bucket, key, size, etag)) + }), + } + } + + fn get(&self, bucket: &str, key: &str, size: u64, etag: ETag) -> Box> { + (self.get_fn)(bucket, key, size, etag) + } + } + + fn create_prefetcher(client: Client, config: PrefetcherConfig) -> PrefetcherBox + where + Client: ObjectClient + Send + Sync + 'static, + { + let runtime = ThreadPool::builder().pool_size(1).create().unwrap(); + create_prefetcher_with_runtime(client, runtime, config) + } + + fn create_prefetcher_with_runtime( + client: Client, + runtime: Runtime, + config: PrefetcherConfig, + ) -> PrefetcherBox + where + Client: ObjectClient + Send + Sync + 'static, + Runtime: Spawn + Send + Sync + 'static, + { + let part_feed = ClientPartFeed::new(Arc::new(client), runtime); + PrefetcherBox::new(part_feed, config) + } + fn run_sequential_read_test(size: u64, read_size: usize, test_config: TestConfig) { let config = MockClientConfig { bucket: "test-bucket".to_string(), @@ -548,7 +509,7 @@ mod tests { client.add_object("hello", object); - let test_config = PrefetcherConfig { + let prefetcher_config = PrefetcherConfig { first_request_size: test_config.first_request_size, max_request_size: test_config.max_request_size, sequential_prefetch_multiplier: test_config.sequential_prefetch_multiplier, @@ -556,8 +517,8 @@ mod tests { max_forward_seek_distance: test_config.max_forward_seek_distance, max_backward_seek_distance: test_config.max_backward_seek_distance, }; - let runtime = ThreadPool::builder().pool_size(1).create().unwrap(); - let prefetcher = Prefetcher::new(Arc::new(client), runtime, test_config); + + let prefetcher = create_prefetcher(client, prefetcher_config); let mut request = prefetcher.get("test-bucket", "hello", size, etag); @@ -632,14 +593,13 @@ mod tests { let client = countdown_failure_client(client, get_failures, HashMap::new(), HashMap::new(), HashMap::new()); - let test_config = PrefetcherConfig { + let prefetcher_config = PrefetcherConfig { first_request_size: test_config.first_request_size, max_request_size: test_config.max_request_size, sequential_prefetch_multiplier: test_config.sequential_prefetch_multiplier, ..Default::default() }; - let runtime = ThreadPool::builder().pool_size(1).create().unwrap(); - let prefetcher = Prefetcher::new(Arc::new(client), runtime, test_config); + let prefetcher = create_prefetcher(client, prefetcher_config); let mut request = prefetcher.get("test-bucket", "hello", size, etag); @@ -685,49 +645,6 @@ mod tests { fail_sequential_read_test(1024 * 1024 + 111, 1024 * 1024, config, get_failures); } - #[test_case(256 * KB, 256 * KB, 8, 100 * MB, 8 * MB, 2 * MB; "next request size is smaller than part size")] - #[test_case(7 * MB, 256 * KB, 8, 100 * MB, 8 * MB, 1 * MB; "next request size is remaining bytes in the part")] - #[test_case(9 * MB, (2 * MB) + 11, 11, 100 * MB, 9 * MB, 18 * MB; "next request size is trimmed to part boundaries")] - #[test_case(8 * MB, 2 * MB, 8, 100 * MB, 8 * MB, 16 * MB; "next request size is multiple of the part size")] - #[test_case(8 * MB, 2 * MB, 100, 20 * MB, 8 * MB, 16 * MB; "max request size is trimmed to part boundaries")] - #[test_case(8 * MB, 2 * MB, 100, 24 * MB, 8 * MB, 24 * MB; "max request size is multiple of the part size")] - #[test_case(8 * MB, 2 * MB, 8, 3 * MB, 8 * MB, 3 * MB; "max request size is less than part size")] - fn test_get_next_request_size( - next_request_offset: usize, - current_request_size: usize, - prefetch_multiplier: usize, - max_request_size: usize, - part_size: usize, - expected_size: usize, - ) { - let object_size = 50 * 1024 * 1024; - - let config = MockClientConfig { - bucket: "test-bucket".to_string(), - part_size, - }; - let client = MockClient::new(config); - - let test_config = PrefetcherConfig { - first_request_size: 256 * 1024, - sequential_prefetch_multiplier: prefetch_multiplier, - max_request_size, - read_timeout: Duration::from_secs(60), - max_forward_seek_distance: 16 * 1024 * 1024, - max_backward_seek_distance: 2 * 1024 * 1024, - }; - let runtime = ThreadPool::builder().pool_size(1).create().unwrap(); - let prefetcher = Prefetcher::new(Arc::new(client), runtime, test_config); - let etag = ETag::for_tests(); - - let mut request = prefetcher.get("test-bucket", "hello", object_size, etag); - - request.next_request_offset = next_request_offset as u64; - request.next_request_size = current_request_size; - let next_request_size = request.get_next_request_size(); - assert_eq!(next_request_size, expected_size); - } - proptest! { #[test] fn proptest_sequential_read( @@ -771,7 +688,7 @@ mod tests { client.add_object("hello", object); - let test_config = PrefetcherConfig { + let prefetcher_config = PrefetcherConfig { first_request_size: test_config.first_request_size, max_request_size: test_config.max_request_size, sequential_prefetch_multiplier: test_config.sequential_prefetch_multiplier, @@ -779,8 +696,7 @@ mod tests { max_backward_seek_distance: test_config.max_backward_seek_distance, ..Default::default() }; - let runtime = ThreadPool::builder().pool_size(1).create().unwrap(); - let prefetcher = Prefetcher::new(Arc::new(client), runtime, test_config); + let prefetcher = create_prefetcher(client, prefetcher_config); let mut request = prefetcher.get("test-bucket", "hello", object_size, etag); @@ -908,12 +824,11 @@ mod tests { client.add_object("hello", object); - let test_config = PrefetcherConfig { + let prefetcher_config = PrefetcherConfig { first_request_size: FIRST_REQUEST_SIZE, ..Default::default() }; - let runtime = ThreadPool::builder().pool_size(1).create().unwrap(); - let prefetcher = Prefetcher::new(Arc::new(client), runtime, test_config); + let prefetcher = create_prefetcher(client, prefetcher_config); // Try every possible seek from first_read_size for offset in first_read_size + 1..OBJECT_SIZE { @@ -945,12 +860,11 @@ mod tests { client.add_object("hello", object); - let test_config = PrefetcherConfig { + let prefetcher_config = PrefetcherConfig { first_request_size: FIRST_REQUEST_SIZE, ..Default::default() }; - let runtime = ThreadPool::builder().pool_size(1).create().unwrap(); - let prefetcher = Prefetcher::new(Arc::new(client), runtime, test_config); + let prefetcher = create_prefetcher(client, prefetcher_config); // Try every possible seek from first_read_size for offset in 0..first_read_size { @@ -968,7 +882,7 @@ mod tests { #[cfg(feature = "shuttle")] mod shuttle_tests { use super::*; - use futures::task::{FutureObj, SpawnError}; + use futures::task::{FutureObj, Spawn, SpawnError}; use shuttle::future::block_on; use shuttle::rand::Rng; use shuttle::{check_pct, check_random}; @@ -1001,7 +915,7 @@ mod tests { client.add_object("hello", object); - let test_config = PrefetcherConfig { + let prefetcher_config = PrefetcherConfig { first_request_size, max_request_size, sequential_prefetch_multiplier, @@ -1010,7 +924,7 @@ mod tests { ..Default::default() }; - let prefetcher = Prefetcher::new(Arc::new(client), ShuttleRuntime, test_config); + let prefetcher = create_prefetcher_with_runtime(client, ShuttleRuntime, prefetcher_config); let mut request = prefetcher.get("test-bucket", "hello", object_size, file_etag); @@ -1058,7 +972,7 @@ mod tests { client.add_object("hello", object); - let test_config = PrefetcherConfig { + let prefetcher_config = PrefetcherConfig { first_request_size, max_request_size, sequential_prefetch_multiplier, @@ -1067,7 +981,7 @@ mod tests { ..Default::default() }; - let prefetcher = Prefetcher::new(Arc::new(client), ShuttleRuntime, test_config); + let prefetcher = create_prefetcher_with_runtime(client, ShuttleRuntime, prefetcher_config); let mut request = prefetcher.get("test-bucket", "hello", object_size, file_etag); diff --git a/mountpoint-s3/src/prefetch/feed.rs b/mountpoint-s3/src/prefetch/feed.rs index b6067cafe..bd8542f60 100644 --- a/mountpoint-s3/src/prefetch/feed.rs +++ b/mountpoint-s3/src/prefetch/feed.rs @@ -1,128 +1,266 @@ use std::{fmt::Debug, ops::Range, sync::Arc}; -use async_trait::async_trait; use bytes::Bytes; -use futures::{pin_mut, StreamExt}; -use mountpoint_s3_client::{ - error::{GetObjectError, ObjectClientError}, - types::ETag, - ObjectClient, -}; +use futures::task::SpawnExt; +use futures::{pin_mut, task::Spawn, StreamExt}; +use mountpoint_s3_client::{types::ETag, ObjectClient}; use mountpoint_s3_crt::checksums::crc32c; -use tracing::{error, trace}; +use tracing::{debug_span, error, trace, Instrument}; use crate::checksums::ChecksummedBytes; -use crate::prefetch::{part::Part, part_queue::PartQueueProducer}; +use crate::prefetch::part::Part; +use crate::prefetch::part_queue::unbounded_part_queue; +use crate::prefetch::task::RequestTask; +use crate::store::PrefetchReadError; /// A generic interface to retrieve data from objects in a S3-like store. -#[async_trait] -pub trait ObjectPartFeed { - /// Get the content of an object in fixed size parts. The parts are pushed to the provided `part_sink` - /// and are guaranteed to be contiguous and in the correct order. Callers need to specify a preferred +pub trait ObjectPartFeed { + type ClientError: std::error::Error + Send + Sync + 'static; + + /// Spawns a request to get the content of an object. The object data will be retrieved in fixed size + /// parts and can then be consumed using [RequestTask::read]. Callers need to specify a preferred /// size for the parts, but implementations are allowed to ignore it. - async fn get_object_parts( + fn spawn_get_object_request( &self, bucket: &str, key: &str, - range: Range, if_match: ETag, + range: RequestRange, preferred_part_size: usize, - part_sink: PartQueueProducer>, - ); + ) -> RequestTask; +} + +/// The range of a [ObjectPartFeed::get_object_parts] request. +/// Includes the total size of the object. +#[derive(Clone, Copy)] +pub struct RequestRange { + object_size: usize, + offset: u64, + size: usize, +} + +impl RequestRange { + pub fn new(object_size: usize, offset: u64, size: usize) -> Self { + let size = size.min(object_size.saturating_sub(offset as usize)); + Self { + object_size, + offset, + size, + } + } + + pub fn len(&self) -> usize { + self.size + } + + pub fn is_empty(&self) -> bool { + self.size == 0 + } + + pub fn object_size(&self) -> usize { + self.object_size + } + + pub fn start(&self) -> u64 { + self.offset + } + + pub fn end(&self) -> u64 { + self.offset + self.size as u64 + } + + pub fn trim_start(&self, start_offset: u64) -> Self { + let offset = start_offset.max(self.offset); + let size = self.end().saturating_sub(offset) as usize; + Self { + object_size: self.object_size, + offset, + size, + } + } + + pub fn trim_end(&self, end_offset: u64) -> Self { + let end = end_offset.min(self.end()); + let size = end.saturating_sub(self.offset) as usize; + Self { + object_size: self.object_size, + offset: self.offset, + size, + } + } +} - /// Adjust the size of a request to align to optimal part boundaries for this client. - fn get_aligned_request_size(&self, offset: u64, preferred_size: usize) -> usize; +impl From for Range { + fn from(val: RequestRange) -> Self { + val.start()..val.end() + } +} + +impl Debug for RequestRange { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + write!(f, "{}..{} out of {}", self.start(), self.end(), self.object_size()) + } } /// [ObjectPartFeed] implementation which delegates retrieving object data to a [Client]. #[derive(Debug)] -pub struct ClientPartFeed { +pub struct ClientPartFeed { client: Arc, + runtime: Runtime, } -impl ClientPartFeed { - pub fn new(client: Arc) -> Self { - Self { client } +impl ClientPartFeed +where + Client: ObjectClient + Send + Sync + 'static, + Runtime: Spawn, +{ + pub fn new(client: Arc, runtime: Runtime) -> Self { + Self { client, runtime } } } -#[async_trait] -impl ObjectPartFeed for ClientPartFeed +impl ObjectPartFeed for ClientPartFeed where Client: ObjectClient + Send + Sync + 'static, + Runtime: Spawn, { - async fn get_object_parts( + type ClientError = Client::ClientError; + + fn spawn_get_object_request( &self, bucket: &str, key: &str, - range: Range, if_match: ETag, + range: RequestRange, preferred_part_size: usize, - part_queue_producer: PartQueueProducer>, - ) { + ) -> RequestTask { assert!(preferred_part_size > 0); - let get_object_result = match self.client.get_object(bucket, key, Some(range), Some(if_match)).await { - Ok(get_object_result) => get_object_result, - Err(e) => { - error!(error=?e, "GetObject request failed"); - part_queue_producer.push(Err(e)); - return; - } - }; + let request_range = get_aligned_request_range(range, self.client.part_size().unwrap_or(8 * 1024 * 1024)); + let start = request_range.start(); + let size = request_range.len(); - pin_mut!(get_object_result); - loop { - match get_object_result.next().await { - Some(Ok((offset, body))) => { - trace!(offset, length = body.len(), "received GetObject part"); - // pre-split the body into multiple parts as suggested by preferred part size - // in order to avoid validating checksum on large parts at read. - let mut body: Bytes = body.into(); - let mut curr_offset = offset; - loop { - let chunk_size = preferred_part_size.min(body.len()); - if chunk_size == 0 { + let (part_queue, part_queue_producer) = unbounded_part_queue(); + trace!(range=?request_range, "spawning request"); + + let request_task = { + let client = self.client.clone(); + let bucket = bucket.to_owned(); + let key = key.to_owned(); + let span = debug_span!("prefetch", range=?request_range); + + async move { + let get_object_result = match client + .get_object(&bucket, &key, Some(request_range.into()), Some(if_match)) + .await + { + Ok(get_object_result) => get_object_result, + Err(e) => { + error!(error=?e, "GetObject request failed"); + part_queue_producer.push(Err(PrefetchReadError::map(e))); + return; + } + }; + + pin_mut!(get_object_result); + loop { + match get_object_result.next().await { + Some(Ok((offset, body))) => { + trace!(offset, length = body.len(), "received GetObject part"); + // pre-split the body into multiple parts as suggested by preferred part size + // in order to avoid validating checksum on large parts at read. + let mut body: Bytes = body.into(); + let mut curr_offset = offset; + loop { + let chunk_size = preferred_part_size.min(body.len()); + if chunk_size == 0 { + break; + } + let chunk = body.split_to(chunk_size); + // S3 doesn't provide checksum for us if the request range is not aligned to + // object part boundaries, so we're computing our own checksum here. + let checksum = crc32c::checksum(&chunk); + let checksum_bytes = ChecksummedBytes::new(chunk, checksum); + let part = Part::new(&key, curr_offset, checksum_bytes); + curr_offset += part.len() as u64; + part_queue_producer.push(Ok(part)); + } + } + Some(Err(e)) => { + error!(error=?e, "GetObject body part failed"); + part_queue_producer.push(Err(PrefetchReadError::map(e))); break; } - let chunk = body.split_to(chunk_size); - // S3 doesn't provide checksum for us if the request range is not aligned to - // object part boundaries, so we're computing our own checksum here. - let checksum = crc32c::checksum(&chunk); - let checksum_bytes = ChecksummedBytes::new(chunk, checksum); - let part = Part::new(key, curr_offset, checksum_bytes); - curr_offset += part.len() as u64; - part_queue_producer.push(Ok(part)); + None => break, } } - Some(Err(e)) => { - error!(error=?e, "GetObject body part failed"); - part_queue_producer.push(Err(e)); - break; - } - None => break, + trace!("request finished"); } - } - trace!("request finished"); + .instrument(span) + }; + + let task_handle = self.runtime.spawn_with_handle(request_task).unwrap(); + + RequestTask::from_handle(task_handle, size, start, part_queue) } +} - fn get_aligned_request_size(&self, offset: u64, preferred_length: usize) -> usize { - // If the request size is bigger than a part size we will try to align it to part boundaries. - let part_alignment = self.client.part_size().unwrap_or(8 * 1024 * 1024); - let offset_in_part = (offset % part_alignment as u64) as usize; - if offset_in_part != 0 { - // if the offset is not at the start of the part we will drain all the bytes from that part first - let remaining_in_part = part_alignment - offset_in_part; - preferred_length.min(remaining_in_part) +fn get_aligned_request_range(range: RequestRange, part_alignment: usize) -> RequestRange { + let object_size = range.object_size(); + let offset = range.start(); + let preferred_length = range.len(); + + // If the request size is bigger than a part size we will try to align it to part boundaries. + let offset_in_part = (offset % part_alignment as u64) as usize; + let size = if offset_in_part != 0 { + // if the offset is not at the start of the part we will drain all the bytes from that part first + let remaining_in_part = part_alignment - offset_in_part; + preferred_length.min(remaining_in_part) + } else { + // if the request size is smaller than the part size, just return that value + if preferred_length < part_alignment { + preferred_length } else { - // if the request size is smaller than the part size, just return that value - if preferred_length < part_alignment { - preferred_length - } else { - // if it exceeds part boundaries, trim it to the part boundaries - let request_boundary = offset + preferred_length as u64; - let remainder = (request_boundary % part_alignment as u64) as usize; - preferred_length - remainder - } + // if it exceeds part boundaries, trim it to the part boundaries + let request_boundary = offset + preferred_length as u64; + let remainder = (request_boundary % part_alignment as u64) as usize; + preferred_length - remainder } + }; + RequestRange::new(object_size, offset, size) +} + +#[cfg(test)] +mod tests { + // It's convenient to write test constants like "1 * 1024 * 1024" for symmetry + #![allow(clippy::identity_op)] + + use super::*; + + use test_case::test_case; + + const KB: usize = 1024; + const MB: usize = 1024 * 1024; + + #[test_case(256 * KB, 256 * KB, 8, 100 * MB, 8 * MB, 2 * MB; "next request size is smaller than part size")] + #[test_case(7 * MB, 256 * KB, 8, 100 * MB, 8 * MB, 1 * MB; "next request size is remaining bytes in the part")] + #[test_case(9 * MB, (2 * MB) + 11, 11, 100 * MB, 9 * MB, 18 * MB; "next request size is trimmed to part boundaries")] + #[test_case(8 * MB, 2 * MB, 8, 100 * MB, 8 * MB, 16 * MB; "next request size is multiple of the part size")] + #[test_case(8 * MB, 2 * MB, 100, 20 * MB, 8 * MB, 16 * MB; "max request size is trimmed to part boundaries")] + #[test_case(8 * MB, 2 * MB, 100, 24 * MB, 8 * MB, 24 * MB; "max request size is multiple of the part size")] + #[test_case(8 * MB, 2 * MB, 8, 3 * MB, 8 * MB, 3 * MB; "max request size is less than part size")] + fn test_get_aligned_request_range( + next_request_offset: usize, + current_request_size: usize, + prefetch_multiplier: usize, + max_request_size: usize, + part_size: usize, + expected_size: usize, + ) { + let object_size = 50 * 1024 * 1024; + let request_size = (current_request_size * prefetch_multiplier).min(max_request_size); + let range = RequestRange::new(object_size, next_request_offset as u64, request_size); + + let aligned_range = get_aligned_request_range(range, part_size); + assert_eq!(aligned_range.len(), expected_size); } } diff --git a/mountpoint-s3/src/prefetch/part_queue.rs b/mountpoint-s3/src/prefetch/part_queue.rs index 07a353bab..f965814f7 100644 --- a/mountpoint-s3/src/prefetch/part_queue.rs +++ b/mountpoint-s3/src/prefetch/part_queue.rs @@ -1,9 +1,11 @@ use std::time::Instant; +use mountpoint_s3_client::error::ObjectClientError; +use mountpoint_s3_client::types::ObjectClientResult; use tracing::trace; use crate::prefetch::part::Part; -use crate::prefetch::PrefetchReadError; +use crate::store::PrefetchReadError; use crate::sync::async_channel::{unbounded, Receiver, RecvError, Sender}; use crate::sync::atomic::{AtomicBool, Ordering}; use crate::sync::AsyncMutex; @@ -11,20 +13,20 @@ use crate::sync::AsyncMutex; /// A queue of [Part]s where the first part can be partially read from if the reader doesn't want /// the entire part in one shot. #[derive(Debug)] -pub struct PartQueue { +pub struct PartQueue { current_part: AsyncMutex>, - receiver: Receiver>, + receiver: Receiver>, failed: AtomicBool, } /// Producer side of the queue of [Part]s. #[derive(Debug)] -pub struct PartQueueProducer { - sender: Sender>, +pub struct PartQueueProducer { + sender: Sender>, } /// Creates an unbounded [PartQueue] and its related [PartQueueProducer]. -pub fn unbounded_part_queue() -> (PartQueue, PartQueueProducer) { +pub fn unbounded_part_queue() -> (PartQueue, PartQueueProducer) { let (sender, receiver) = unbounded(); let part_queue = PartQueue { current_part: AsyncMutex::new(None), @@ -42,7 +44,7 @@ impl PartQueue { /// empty. /// /// If this method returns an Err, the PartQueue must never be accessed again. - pub async fn read(&self, length: usize) -> Result> { + pub async fn read(&self, length: usize) -> ObjectClientResult { let mut current_part = self.current_part.lock().await; assert!( @@ -55,14 +57,16 @@ impl PartQueue { } else { // Do `try_recv` first so we can track whether the read is starved or not if let Ok(part) = self.receiver.try_recv() { - part.map_err(|e| PrefetchReadError::GetRequestFailed(e)) + part } else { let start = Instant::now(); let part = self.receiver.recv().await; metrics::histogram!("prefetch.part_queue_starved_us", start.elapsed().as_micros() as f64); match part { - Err(RecvError) => Err(PrefetchReadError::GetRequestTerminatedUnexpectedly), - Ok(part) => part.map_err(|e| PrefetchReadError::GetRequestFailed(e)), + Err(RecvError) => Err(ObjectClientError::ServiceError( + PrefetchReadError::GetRequestTerminatedUnexpectedly, + )), + Ok(part) => part, } } }; @@ -88,7 +92,7 @@ impl PartQueue { impl PartQueueProducer { /// Push a new [Part] onto the back of the queue - pub fn push(&self, part: Result) { + pub fn push(&self, part: ObjectClientResult) { // Unbounded channel will never actually block let send_result = self.sender.send_blocking(part); if send_result.is_err() { diff --git a/mountpoint-s3/src/prefetch/task.rs b/mountpoint-s3/src/prefetch/task.rs new file mode 100644 index 000000000..64152a1e9 --- /dev/null +++ b/mountpoint-s3/src/prefetch/task.rs @@ -0,0 +1,78 @@ +use futures::future::RemoteHandle; +use mountpoint_s3_client::types::ObjectClientResult; + +use crate::store::PrefetchReadError; + +use super::{ + part::Part, + part_queue::{unbounded_part_queue, PartQueue}, +}; + +/// A single GetObject request submitted to the S3 client +#[derive(Debug)] +pub struct RequestTask { + /// Handle on the task/future. The future is cancelled when handle is dropped. This is None if + /// the request is fake (created by seeking backwards in the stream) + task_handle: Option>, + remaining: usize, + start_offset: u64, + total_size: usize, + part_queue: PartQueue, +} + +impl RequestTask { + pub fn from_handle(task_handle: RemoteHandle<()>, size: usize, offset: u64, part_queue: PartQueue) -> Self { + Self { + task_handle: Some(task_handle), + remaining: size, + start_offset: offset, + total_size: size, + part_queue, + } + } + + pub fn from_parts(parts: impl IntoIterator, offset: u64) -> Self { + let mut size = 0; + let (part_queue, part_queue_producer) = unbounded_part_queue(); + for part in parts { + size += part.len(); + part_queue_producer.push(Ok(part)); + } + Self { + task_handle: None, + remaining: size, + start_offset: offset, + total_size: size, + part_queue, + } + } + + pub async fn read(&mut self, length: usize) -> ObjectClientResult { + let part = self.part_queue.read(length).await?; + debug_assert!(part.len() <= self.remaining); + self.remaining -= part.len(); + Ok(part) + } + + pub fn start_offset(&self) -> u64 { + self.start_offset + } + + pub fn end_offset(&self) -> u64 { + self.start_offset + self.total_size as u64 + } + + pub fn total_size(&self) -> usize { + self.total_size + } + + pub fn remaining(&self) -> usize { + self.remaining + } + + /// Some requests aren't actually streaming data (they're fake, created by backwards seeks), and + /// shouldn't be counted for prefetcher progress. + pub fn is_streaming(&self) -> bool { + self.task_handle.is_some() + } +} diff --git a/mountpoint-s3/src/store.rs b/mountpoint-s3/src/store.rs new file mode 100644 index 000000000..467ce0a12 --- /dev/null +++ b/mountpoint-s3/src/store.rs @@ -0,0 +1,242 @@ +use std::fmt::Debug; + +use async_trait::async_trait; +use futures::{executor::ThreadPool, task::Spawn}; +use mountpoint_s3_client::{ + error::{ + DeleteObjectError, GetObjectAttributesError, GetObjectError, HeadObjectError, ListObjectsError, + ObjectClientError, PutObjectError, + }, + types::{ + DeleteObjectResult, ETag, GetObjectAttributesResult, HeadObjectResult, ListObjectsResult, ObjectAttribute, + ObjectClientResult, PutObjectParams, + }, + ObjectClient, PutObjectRequest, +}; +use thiserror::Error; + +use crate::{ + checksums::{ChecksummedBytes, IntegrityError}, + prefetch::{feed::ClientPartFeed, PrefetcherConfig}, +}; +use crate::{ + prefetch::{self, feed::ObjectPartFeed, Prefetcher}, + sync::Arc, +}; + +/// A generic interface to S3-like object store. +/// Similar to [ObjectClient], but provides a [ObjectStore::prefetch] method instead +/// of [ObjectClient::get_object]. +#[async_trait] +pub trait ObjectStore: Clone { + type ClientError: std::error::Error + Send + Sync + 'static; + type PutObjectRequest: PutObjectRequest; + type PrefetchGetObject: PrefetchGetObject; + + /// Query the part size this client uses for PUT and GET operations to the object store. This + /// can be `None` if the client does not do multi-part operations. + fn part_size(&self) -> Option; + + /// Delete a single object from the object store. + /// + /// DeleteObject will succeed even if the object within the bucket does not exist. + async fn delete_object( + &self, + bucket: &str, + key: &str, + ) -> ObjectClientResult; + + /// List the objects in a bucket under a given prefix + async fn list_objects( + &self, + bucket: &str, + continuation_token: Option<&str>, + delimiter: &str, + max_keys: usize, + prefix: &str, + ) -> ObjectClientResult; + + /// Retrieve object metadata without retrieving the object contents + async fn head_object( + &self, + bucket: &str, + key: &str, + ) -> ObjectClientResult; + + /// Put an object into the object store. Returns a [PutObjectRequest] for callers + /// to provide the content of the object. + async fn put_object( + &self, + bucket: &str, + key: &str, + params: &PutObjectParams, + ) -> ObjectClientResult; + + /// Retrieves all the metadata from an object without returning the object contents. + async fn get_object_attributes( + &self, + bucket: &str, + key: &str, + max_parts: Option, + part_number_marker: Option, + object_attributes: &[ObjectAttribute], + ) -> ObjectClientResult; + + /// Start a new prefetch request to the specified object. + fn prefetch(&self, bucket: &str, key: &str, size: u64, etag: ETag) -> Self::PrefetchGetObject; +} + +/// Result of a prefetch request. Allows callers to read object data. +#[async_trait] +pub trait PrefetchGetObject: Send { + type ClientError: std::error::Error + Send + Sync + 'static; + + /// Read some bytes from the object. This function will always return exactly `size` bytes, + /// except at the end of the object where it will return however many bytes are left (including + /// possibly 0 bytes). + async fn read( + &mut self, + offset: u64, + length: usize, + ) -> ObjectClientResult; +} + +#[derive(Debug, Error)] +pub enum PrefetchReadError { + #[error("get object request failed")] + GetRequestFailed(#[from] GetObjectError), + + #[error("get request terminated unexpectedly")] + GetRequestTerminatedUnexpectedly, + + #[error("integrity check failed")] + Integrity(#[from] IntegrityError), +} + +impl PrefetchReadError { + pub fn map(error: ObjectClientError) -> ObjectClientError { + match error { + ObjectClientError::ServiceError(s) => ObjectClientError::ServiceError(Self::GetRequestFailed(s)), + ObjectClientError::ClientError(c) => ObjectClientError::ClientError(c), + } + } +} + +#[derive(Debug)] +pub struct ClientStore { + client: Arc, + prefetcher: Arc>, +} + +impl ClientStore +where + Feed: ObjectPartFeed + Send + Sync + 'static, +{ + pub fn new(client: Arc, part_feed: Feed, prefetcher_config: PrefetcherConfig) -> Self { + let prefetcher = Prefetcher::new(part_feed, prefetcher_config); + Self { client, prefetcher } + } +} + +impl Clone for ClientStore { + fn clone(&self) -> Self { + Self { + client: self.client.clone(), + prefetcher: self.prefetcher.clone(), + } + } +} + +#[async_trait] +impl ObjectStore for ClientStore +where + Client: ObjectClient + Send + Sync + 'static, + Feed: ObjectPartFeed + Send + Sync + 'static, +{ + type ClientError = ::ClientError; + type PutObjectRequest = ::PutObjectRequest; + type PrefetchGetObject = prefetch::PrefetchGetObject; + + fn part_size(&self) -> Option { + self.client.part_size() + } + + async fn delete_object( + &self, + bucket: &str, + key: &str, + ) -> ObjectClientResult { + self.client.delete_object(bucket, key).await + } + + async fn list_objects( + &self, + bucket: &str, + continuation_token: Option<&str>, + delimiter: &str, + max_keys: usize, + prefix: &str, + ) -> ObjectClientResult { + self.client + .list_objects(bucket, continuation_token, delimiter, max_keys, prefix) + .await + } + + async fn head_object( + &self, + bucket: &str, + key: &str, + ) -> ObjectClientResult { + self.client.head_object(bucket, key).await + } + + async fn put_object( + &self, + bucket: &str, + key: &str, + params: &PutObjectParams, + ) -> ObjectClientResult { + self.client.put_object(bucket, key, params).await + } + + async fn get_object_attributes( + &self, + bucket: &str, + key: &str, + max_parts: Option, + part_number_marker: Option, + object_attributes: &[ObjectAttribute], + ) -> ObjectClientResult { + self.client + .get_object_attributes(bucket, key, max_parts, part_number_marker, object_attributes) + .await + } + + fn prefetch(&self, bucket: &str, key: &str, size: u64, etag: ETag) -> Self::PrefetchGetObject { + self.prefetcher.get(bucket, key, size, etag) + } +} + +pub fn default_store( + client: Arc, + runtime: Runtime, + prefetcher_config: PrefetcherConfig, +) -> ClientStore> +where + Client: ObjectClient + Send + Sync + 'static, + Runtime: Spawn + Send + Sync + 'static, +{ + let part_feed = ClientPartFeed::new(client.clone(), runtime); + ClientStore::new(client, part_feed, prefetcher_config) +} + +pub type TestStore = ClientStore>; + +pub fn test_store(client: Arc) -> TestStore +where + Client: ObjectClient + Send + Sync + 'static, +{ + let runtime = ThreadPool::builder().pool_size(1).create().unwrap(); + let part_feed = ClientPartFeed::new(client.clone(), runtime); + ClientStore::new(client, part_feed, Default::default()) +} diff --git a/mountpoint-s3/src/upload.rs b/mountpoint-s3/src/upload.rs index c12a1638f..7fcd6f18b 100644 --- a/mountpoint-s3/src/upload.rs +++ b/mountpoint-s3/src/upload.rs @@ -1,17 +1,19 @@ -use std::{fmt::Debug, sync::Arc}; +use std::fmt::Debug; use mountpoint_s3_client::checksums::crc32c_from_base64; use mountpoint_s3_client::error::{ObjectClientError, PutObjectError}; use mountpoint_s3_client::types::{ObjectClientResult, PutObjectParams, PutObjectResult, UploadReview}; -use mountpoint_s3_client::{ObjectClient, PutObjectRequest}; +use mountpoint_s3_client::PutObjectRequest; use mountpoint_s3_crt::checksums::crc32c::{Crc32c, Hasher}; use thiserror::Error; use tracing::error; use crate::checksums::combine_checksums; +use crate::store::ObjectStore; +use crate::sync::Arc; -type PutRequestError = ObjectClientError::ClientError>; +type PutRequestError = ObjectClientError::ClientError>; const MAX_S3_MULTIPART_UPLOAD_PARTS: usize = 10000; @@ -22,15 +24,15 @@ pub struct Uploader { } #[derive(Debug)] -struct UploaderInner { - client: Arc, +struct UploaderInner { + store: Store, storage_class: Option, } -impl Uploader { +impl Uploader { /// Create a new [Uploader] that will make requests to the given client. - pub fn new(client: Arc, storage_class: Option) -> Self { - let inner = UploaderInner { client, storage_class }; + pub fn new(store: Store, storage_class: Option) -> Self { + let inner = UploaderInner { store, storage_class }; Self { inner: Arc::new(inner) } } @@ -39,7 +41,7 @@ impl Uploader { &self, bucket: &str, key: &str, - ) -> ObjectClientResult, PutObjectError, Client::ClientError> { + ) -> ObjectClientResult, PutObjectError, Store::ClientError> { UploadRequest::new(Arc::clone(&self.inner), bucket, key).await } } @@ -59,29 +61,29 @@ pub enum UploadWriteError { /// Manages the upload of an object to S3. /// /// Wraps a PutObject request and enforces sequential writes. -pub struct UploadRequest { +pub struct UploadRequest { bucket: String, key: String, next_request_offset: u64, hasher: Hasher, - request: Client::PutObjectRequest, + request: Store::PutObjectRequest, maximum_upload_size: Option, } -impl UploadRequest { +impl UploadRequest { async fn new( - inner: Arc>, + inner: Arc>, bucket: &str, key: &str, - ) -> ObjectClientResult { + ) -> ObjectClientResult { let mut params = PutObjectParams::new().trailing_checksums(true); if let Some(storage_class) = &inner.storage_class { params = params.storage_class(storage_class.clone()); } - let request = inner.client.put_object(bucket, key, ¶ms).await?; - let maximum_upload_size = inner.client.part_size().map(|ps| ps * MAX_S3_MULTIPART_UPLOAD_PARTS); + let request = inner.store.put_object(bucket, key, ¶ms).await?; + let maximum_upload_size = inner.store.part_size().map(|ps| ps * MAX_S3_MULTIPART_UPLOAD_PARTS); Ok(Self { bucket: bucket.to_owned(), @@ -97,11 +99,7 @@ impl UploadRequest { self.next_request_offset } - pub async fn write( - &mut self, - offset: i64, - data: &[u8], - ) -> Result>> { + pub async fn write(&mut self, offset: i64, data: &[u8]) -> Result>> { let next_offset = self.next_request_offset; if offset != next_offset as i64 { return Err(UploadWriteError::OutOfOrderWrite { @@ -121,7 +119,7 @@ impl UploadRequest { Ok(data.len()) } - pub async fn complete(self) -> Result> { + pub async fn complete(self) -> Result> { let size = self.size(); let checksum = self.hasher.finalize(); self.request @@ -130,7 +128,7 @@ impl UploadRequest { } } -impl Debug for UploadRequest { +impl Debug for UploadRequest { fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { f.debug_struct("UploadRequest") .field("bucket", &self.bucket) @@ -186,6 +184,8 @@ fn verify_checksums(review: UploadReview, expected_size: u64, expected_checksum: mod tests { use std::collections::HashMap; + use crate::store::test_store; + use super::*; use mountpoint_s3_client::{ failure_client::countdown_failure_client, @@ -203,7 +203,7 @@ mod tests { bucket: bucket.to_owned(), part_size: 32, })); - let uploader = Uploader::new(client.clone(), None); + let uploader = Uploader::new(test_store(client.clone()), None); let request = uploader.put(bucket, key).await.unwrap(); assert!(!client.contains_key(key)); @@ -226,7 +226,7 @@ mod tests { bucket: bucket.to_owned(), part_size: 32, })); - let uploader = Uploader::new(client.clone(), Some(storage_class.to_owned())); + let uploader = Uploader::new(test_store(client.clone()), Some(storage_class.to_owned())); let mut request = uploader.put(bucket, key).await.unwrap(); @@ -266,15 +266,15 @@ mod tests { put_failures.insert(1, Ok((1, MockClientError("error".to_owned().into())))); put_failures.insert(2, Ok((2, MockClientError("error".to_owned().into())))); - let failure_client = Arc::new(countdown_failure_client( + let failure_client = countdown_failure_client( client.clone(), HashMap::new(), HashMap::new(), HashMap::new(), put_failures, - )); + ); - let uploader = Uploader::new(failure_client.clone(), None); + let uploader = Uploader::new(test_store(Arc::new(failure_client)), None); // First request fails on first write. { @@ -314,7 +314,7 @@ mod tests { bucket: bucket.to_owned(), part_size: PART_SIZE, })); - let uploader = Uploader::new(client.clone(), None); + let uploader = Uploader::new(test_store(client.clone()), None); let mut request = uploader.put(bucket, key).await.unwrap(); let successful_writes = PART_SIZE * MAX_S3_MULTIPART_UPLOAD_PARTS / write_size; diff --git a/mountpoint-s3/tests/common/mod.rs b/mountpoint-s3/tests/common/mod.rs index 611bddfb3..e37fd23e1 100644 --- a/mountpoint-s3/tests/common/mod.rs +++ b/mountpoint-s3/tests/common/mod.rs @@ -1,9 +1,9 @@ use aws_sdk_s3::config::Region; use aws_sdk_s3::primitives::ByteStream; use fuser::{FileAttr, FileType}; -use futures::executor::ThreadPool; use mountpoint_s3::fs::{self, DirectoryEntry, DirectoryReplier, ReadReplier, ToErrno}; use mountpoint_s3::prefix::Prefix; +use mountpoint_s3::store::{test_store, TestStore}; use mountpoint_s3::{S3Filesystem, S3FilesystemConfig}; use mountpoint_s3_client::mock_client::{MockClient, MockClientConfig}; use mountpoint_s3_client::ObjectClient; @@ -18,28 +18,28 @@ pub fn make_test_filesystem( bucket: &str, prefix: &Prefix, config: S3FilesystemConfig, -) -> (Arc, S3Filesystem, ThreadPool>) { +) -> (Arc, S3Filesystem>) { let client_config = MockClientConfig { bucket: bucket.to_string(), part_size: 1024 * 1024, }; let client = Arc::new(MockClient::new(client_config)); - let fs = make_test_filesystem_with_client(Arc::clone(&client), bucket, prefix, config); + let fs = make_test_filesystem_with_client(client.clone(), bucket, prefix, config); (client, fs) } pub fn make_test_filesystem_with_client( - client: Client, + client: Arc, bucket: &str, prefix: &Prefix, config: S3FilesystemConfig, -) -> S3Filesystem +) -> S3Filesystem> where Client: ObjectClient + Send + Sync + 'static, { - let runtime = ThreadPool::builder().pool_size(1).create().unwrap(); - S3Filesystem::new(client, runtime, bucket, prefix, config) + let store = test_store(client); + S3Filesystem::new(store, bucket, prefix, config) } pub fn get_test_bucket_and_prefix(test_name: &str) -> (String, String) { diff --git a/mountpoint-s3/tests/fs.rs b/mountpoint-s3/tests/fs.rs index 8345d756e..e4a48f7ef 100644 --- a/mountpoint-s3/tests/fs.rs +++ b/mountpoint-s3/tests/fs.rs @@ -704,7 +704,12 @@ async fn test_upload_aborted_on_write_failure() { Default::default(), put_failures, ); - let fs = make_test_filesystem_with_client(failure_client, BUCKET_NAME, &Default::default(), Default::default()); + let fs = make_test_filesystem_with_client( + Arc::new(failure_client), + BUCKET_NAME, + &Default::default(), + Default::default(), + ); let mode = libc::S_IFREG | libc::S_IRWXU; // regular file + 0700 permissions let dentry = fs.mknod(FUSE_ROOT_INODE, FILE_NAME.as_ref(), mode, 0, 0).await.unwrap(); @@ -775,7 +780,12 @@ async fn test_upload_aborted_on_fsync_failure() { Default::default(), put_failures, ); - let fs = make_test_filesystem_with_client(failure_client, BUCKET_NAME, &Default::default(), Default::default()); + let fs = make_test_filesystem_with_client( + Arc::new(failure_client), + BUCKET_NAME, + &Default::default(), + Default::default(), + ); let mode = libc::S_IFREG | libc::S_IRWXU; // regular file + 0700 permissions let dentry = fs.mknod(FUSE_ROOT_INODE, FILE_NAME.as_ref(), mode, 0, 0).await.unwrap(); @@ -831,7 +841,12 @@ async fn test_upload_aborted_on_release_failure() { Default::default(), put_failures, ); - let fs = make_test_filesystem_with_client(failure_client, BUCKET_NAME, &Default::default(), Default::default()); + let fs = make_test_filesystem_with_client( + Arc::new(failure_client), + BUCKET_NAME, + &Default::default(), + Default::default(), + ); let mode = libc::S_IFREG | libc::S_IRWXU; // regular file + 0700 permissions let dentry = fs.mknod(FUSE_ROOT_INODE, FILE_NAME.as_ref(), mode, 0, 0).await.unwrap(); diff --git a/mountpoint-s3/tests/fuse_tests/mod.rs b/mountpoint-s3/tests/fuse_tests/mod.rs index d3897a6ac..76a31467e 100644 --- a/mountpoint-s3/tests/fuse_tests/mod.rs +++ b/mountpoint-s3/tests/fuse_tests/mod.rs @@ -14,10 +14,14 @@ mod write_test; use std::ffi::OsStr; use std::fs::ReadDir; +use std::path::Path; +use std::sync::Arc; use fuser::{BackgroundSession, MountOption, Session}; use mountpoint_s3::fuse::S3FuseFilesystem; +use mountpoint_s3::prefetch::PrefetcherConfig; use mountpoint_s3::prefix::Prefix; +use mountpoint_s3::store::{default_store, ObjectStore}; use mountpoint_s3::S3FilesystemConfig; use mountpoint_s3_client::types::PutObjectParams; use tempfile::TempDir; @@ -54,6 +58,7 @@ pub type TestClientBox = Box; pub struct TestSessionConfig { pub part_size: usize, pub filesystem_config: S3FilesystemConfig, + pub prefetcher_config: PrefetcherConfig, } impl Default for TestSessionConfig { @@ -61,15 +66,43 @@ impl Default for TestSessionConfig { Self { part_size: 8 * 1024 * 1024, filesystem_config: Default::default(), + prefetcher_config: Default::default(), } } } +fn create_fuse_session( + store: Store, + bucket: &str, + prefix: &str, + mount_dir: &Path, + filesystem_config: S3FilesystemConfig, +) -> BackgroundSession +where + Store: ObjectStore + Send + Sync + 'static, +{ + let options = vec![ + MountOption::DefaultPermissions, + MountOption::FSName("mountpoint-s3".to_string()), + MountOption::NoAtime, + MountOption::AutoUnmount, + MountOption::AllowOther, + ]; + + let prefix = Prefix::new(prefix).expect("valid prefix"); + let session = Session::new( + S3FuseFilesystem::new(store, bucket, &prefix, filesystem_config), + mount_dir, + &options, + ) + .unwrap(); + + BackgroundSession::new(session).unwrap() +} + mod mock_session { use super::*; - use std::sync::Arc; - use futures::executor::ThreadPool; use mountpoint_s3_client::mock_client::{MockClient, MockClientConfig, MockObject}; @@ -89,39 +122,22 @@ mod mock_session { part_size: test_config.part_size, }; let client = Arc::new(MockClient::new(client_config)); - - let options = vec![ - MountOption::DefaultPermissions, - MountOption::FSName("mountpoint-s3".to_string()), - MountOption::NoAtime, - MountOption::AutoUnmount, - MountOption::AllowOther, - ]; - let runtime = ThreadPool::builder().pool_size(1).create().unwrap(); + let store = default_store(client.clone(), runtime, test_config.prefetcher_config); - let prefix = Prefix::new(&prefix).expect("valid prefix"); - let session = Session::new( - S3FuseFilesystem::new( - Arc::clone(&client), - runtime, - bucket, - &prefix, - test_config.filesystem_config, - ), - mount_dir.path(), - &options, - ) - .unwrap(); - - let session = BackgroundSession::new(session).unwrap(); + let session = create_fuse_session(store, bucket, &prefix, mount_dir.path(), test_config.filesystem_config); + let test_client = create_test_client(client, &prefix); + (mount_dir, session, test_client) + } + + fn create_test_client(client: Arc, prefix: &str) -> TestClientBox { let test_client = MockTestClient { - prefix: prefix.to_string(), + prefix: prefix.to_owned(), client, }; - (mount_dir, session, Box::new(test_client)) + Box::new(test_client) } struct MockTestClient { @@ -215,33 +231,23 @@ mod s3_session { .endpoint_config(EndpointConfig::new(®ion)); let client = S3CrtClient::new(client_config).unwrap(); let runtime = client.event_loop_group(); + let store = default_store(Arc::new(client), runtime, test_config.prefetcher_config); + + let session = create_fuse_session(store, &bucket, &prefix, mount_dir.path(), test_config.filesystem_config); + let test_client = create_test_client(®ion, &bucket, &prefix); + + (mount_dir, session, test_client) + } - let options = vec![ - MountOption::DefaultPermissions, - MountOption::FSName("mountpoint-s3".to_string()), - MountOption::NoAtime, - MountOption::AutoUnmount, - MountOption::AllowOther, - ]; - - let prefix = Prefix::new(&prefix).expect("valid prefix"); - let session = Session::new( - S3FuseFilesystem::new(client, runtime, &bucket, &prefix, test_config.filesystem_config), - mount_dir.path(), - &options, - ) - .unwrap(); - - let session = BackgroundSession::new(session).unwrap(); - - let sdk_client = tokio_block_on(async { get_test_sdk_client(®ion).await }); + fn create_test_client(region: &str, bucket: &str, prefix: &str) -> TestClientBox { + let sdk_client = tokio_block_on(async { get_test_sdk_client(region).await }); let test_client = SDKTestClient { - prefix: prefix.to_string(), - bucket, + prefix: prefix.to_owned(), + bucket: bucket.to_owned(), sdk_client, }; - (mount_dir, session, Box::new(test_client)) + Box::new(test_client) } async fn get_test_sdk_client(region: &str) -> aws_sdk_s3::Client { diff --git a/mountpoint-s3/tests/fuse_tests/prefetch_test.rs b/mountpoint-s3/tests/fuse_tests/prefetch_test.rs index 5592715eb..39d81e5b1 100644 --- a/mountpoint-s3/tests/fuse_tests/prefetch_test.rs +++ b/mountpoint-s3/tests/fuse_tests/prefetch_test.rs @@ -1,6 +1,5 @@ use fuser::BackgroundSession; use mountpoint_s3::prefetch::PrefetcherConfig; -use mountpoint_s3::S3FilesystemConfig; use std::fs::{File, OpenOptions}; use std::io::Read; use tempfile::TempDir; @@ -60,15 +59,10 @@ where ..Default::default() }; - let filesystem_config = S3FilesystemConfig { - prefetcher_config, - ..Default::default() - }; - let (mount_point, _session, mut test_client) = creator_fn( prefix, TestSessionConfig { - filesystem_config, + prefetcher_config, ..Default::default() }, ); diff --git a/mountpoint-s3/tests/reftests/harness.rs b/mountpoint-s3/tests/reftests/harness.rs index f2ff52718..611ca07e9 100644 --- a/mountpoint-s3/tests/reftests/harness.rs +++ b/mountpoint-s3/tests/reftests/harness.rs @@ -5,10 +5,10 @@ use std::sync::Arc; use std::time::Duration; use fuser::FileType; -use futures::executor::ThreadPool; use futures::future::{BoxFuture, FutureExt}; use mountpoint_s3::fs::{self, CacheConfig, InodeNo, ReadReplier, ToErrno, FUSE_ROOT_INODE}; use mountpoint_s3::prefix::Prefix; +use mountpoint_s3::store::TestStore; use mountpoint_s3::{S3Filesystem, S3FilesystemConfig}; use mountpoint_s3_client::mock_client::{MockClient, MockObject}; use mountpoint_s3_client::ObjectClient; @@ -164,7 +164,7 @@ impl InflightWrites { pub struct Harness { readdir_limit: usize, // max number of entries that a readdir will return; 0 means no limit reference: Reference, - fs: S3Filesystem, ThreadPool>, + fs: S3Filesystem>, client: Arc, bucket: String, inflight_writes: InflightWrites, @@ -173,7 +173,7 @@ pub struct Harness { impl Harness { /// Create a new test harness pub fn new( - fs: S3Filesystem, ThreadPool>, + fs: S3Filesystem>, client: Arc, reference: Reference, bucket: &str,