Skip to content

Commit

Permalink
Introduce Prefetch trait (#595)
Browse files Browse the repository at this point in the history
Introduce a new `Prefetch` trait to abstract how `S3Filesystem` fetches object data from an `ObjectClient`. While this change does not introduce any functional change, this abstraction will be used to implement optional object data caching.

The existing `Prefetcher` struct has been adapted to implement the new `Prefetch` trait. The main changes are:
* it is generic on the `ObjectPartStream` (previously `ObjectPartFeed`), rather than using dynamic dispatch,
* it does not own an `ObjectClient` instance, instead one is required when initiating a `prefetch` request,
* the logic to spawn a new task for each `GetObject` request and handle the object body parts returned was moved into `ObjectPartStream`.

Signed-off-by: Alessandro Passaro <[email protected]>
  • Loading branch information
passaro authored Nov 8, 2023
1 parent 6dead83 commit c6b2a17
Show file tree
Hide file tree
Showing 16 changed files with 841 additions and 545 deletions.
7 changes: 4 additions & 3 deletions mountpoint-s3/examples/fs_benchmark.rs
Original file line number Diff line number Diff line change
@@ -1,13 +1,13 @@
use clap::{Arg, ArgAction, Command};
use fuser::{BackgroundSession, MountOption, Session};
use mountpoint_s3::fuse::S3FuseFilesystem;
use mountpoint_s3::prefetch::default_prefetch;
use mountpoint_s3::S3FilesystemConfig;
use mountpoint_s3_client::config::{EndpointConfig, S3ClientConfig};
use mountpoint_s3_client::S3CrtClient;
use mountpoint_s3_crt::common::rust_log_adapter::RustLogAdapter;
use std::{
fs::File,
fs::OpenOptions,
fs::{File, OpenOptions},
io::{self, BufRead, BufReader},
time::Instant,
};
Expand Down Expand Up @@ -164,8 +164,9 @@ fn mount_file_system(bucket_name: &str, region: &str, throughput_target_gbps: Op
bucket_name,
mountpoint.to_str().unwrap()
);
let prefetcher = default_prefetch(runtime, Default::default());
let session = Session::new(
S3FuseFilesystem::new(client, runtime, bucket_name, &Default::default(), filesystem_config),
S3FuseFilesystem::new(client, prefetcher, bucket_name, &Default::default(), filesystem_config),
mountpoint,
&options,
)
Expand Down
6 changes: 3 additions & 3 deletions mountpoint-s3/examples/prefetch_benchmark.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ use std::time::Instant;

use clap::{Arg, Command};
use futures::executor::{block_on, ThreadPool};
use mountpoint_s3::prefetch::Prefetcher;
use mountpoint_s3::prefetch::{default_prefetch, Prefetch, PrefetchResult};
use mountpoint_s3_client::config::{EndpointConfig, S3ClientConfig};
use mountpoint_s3_client::types::ETag;
use mountpoint_s3_client::S3CrtClient;
Expand Down Expand Up @@ -80,12 +80,12 @@ fn main() {

for i in 0..iterations.unwrap_or(1) {
let runtime = ThreadPool::builder().pool_size(1).create().unwrap();
let manager = Prefetcher::new(client.clone(), runtime, Default::default());
let manager = default_prefetch(runtime, Default::default());
let received_size = Arc::new(AtomicU64::new(0));

let start = Instant::now();

let mut request = manager.get(bucket, key, size, ETag::for_tests());
let mut request = manager.prefetch(client.clone(), bucket, key, size, ETag::for_tests());
block_on(async {
loop {
let offset = received_size.load(Ordering::SeqCst);
Expand Down
90 changes: 60 additions & 30 deletions mountpoint-s3/src/fs.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
//! FUSE file system types and operations, not tied to the _fuser_ library bindings.
use futures::task::Spawn;
use nix::unistd::{getgid, getuid};
use std::collections::HashMap;
use std::ffi::{OsStr, OsString};
Expand All @@ -15,7 +14,7 @@ use mountpoint_s3_client::types::ETag;
use mountpoint_s3_client::ObjectClient;

use crate::inode::{Inode, InodeError, InodeKind, LookedUp, ReaddirHandle, Superblock, WriteHandle};
use crate::prefetch::{PrefetchGetObject, PrefetchReadError, Prefetcher, PrefetcherConfig};
use crate::prefetch::{Prefetch, PrefetchReadError, PrefetchResult};
use crate::prefix::Prefix;
use crate::sync::atomic::{AtomicI64, AtomicU64, Ordering};
use crate::sync::{Arc, AsyncMutex, AsyncRwLock};
Expand Down Expand Up @@ -49,30 +48,54 @@ impl DirHandle {
}

#[derive(Debug)]
struct FileHandle<Client: ObjectClient, Runtime> {
struct FileHandle<Client, Prefetcher>
where
Client: ObjectClient + Send + Sync + 'static,
Prefetcher: Prefetch,
{
inode: Inode,
full_key: String,
object_size: u64,
typ: FileHandleType<Client, Runtime>,
typ: FileHandleType<Client, Prefetcher>,
}

#[derive(Debug)]
enum FileHandleType<Client: ObjectClient, Runtime> {
enum FileHandleType<Client, Prefetcher>
where
Client: ObjectClient + Send + Sync + 'static,
Prefetcher: Prefetch,
{
Read {
request: AsyncMutex<Option<PrefetchGetObject<Client, Runtime>>>,
request: AsyncMutex<Option<Prefetcher::PrefetchResult<Client>>>,
etag: ETag,
},
Write(AsyncMutex<UploadState<Client>>),
}

impl<Client: ObjectClient, Runtime> FileHandleType<Client, Runtime> {
impl<Client, Prefetcher> std::fmt::Debug for FileHandleType<Client, Prefetcher>
where
Client: ObjectClient + Send + Sync + 'static + std::fmt::Debug,
Prefetcher: Prefetch,
{
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
match self {
Self::Read { request: _, etag } => f.debug_struct("Read").field("etag", etag).finish(),
Self::Write(arg0) => f.debug_tuple("Write").field(arg0).finish(),
}
}
}

impl<Client, Prefetcher> FileHandleType<Client, Prefetcher>
where
Client: ObjectClient + Send + Sync,
Prefetcher: Prefetch,
{
async fn new_write_handle(
lookup: &LookedUp,
ino: InodeNo,
flags: i32,
pid: u32,
fs: &S3Filesystem<Client, Runtime>,
) -> Result<FileHandleType<Client, Runtime>, Error> {
fs: &S3Filesystem<Client, Prefetcher>,
) -> Result<FileHandleType<Client, Prefetcher>, Error> {
// We can't support O_SYNC writes because they require the data to go to stable storage
// at `write` time, but we only commit a PUT at `close` time.
if flags & (libc::O_SYNC | libc::O_DSYNC) != 0 {
Expand All @@ -96,7 +119,7 @@ impl<Client: ObjectClient, Runtime> FileHandleType<Client, Runtime> {
Ok(handle)
}

async fn new_read_handle(lookup: &LookedUp) -> Result<FileHandleType<Client, Runtime>, Error> {
async fn new_read_handle(lookup: &LookedUp) -> Result<FileHandleType<Client, Prefetcher>, Error> {
if !lookup.stat.is_readable {
return Err(err!(
libc::EACCES,
Expand Down Expand Up @@ -302,8 +325,6 @@ pub struct S3FilesystemConfig {
pub dir_mode: u16,
/// File permissions
pub file_mode: u16,
/// Prefetcher configuration
pub prefetcher_config: PrefetcherConfig,
/// Allow delete
pub allow_delete: bool,
/// Storage class to be used for new object uploads
Expand All @@ -322,39 +343,45 @@ impl Default for S3FilesystemConfig {
gid,
dir_mode: 0o755,
file_mode: 0o644,
prefetcher_config: PrefetcherConfig::default(),
allow_delete: false,
storage_class: None,
}
}
}

#[derive(Debug)]
pub struct S3Filesystem<Client: ObjectClient, Runtime> {
pub struct S3Filesystem<Client, Prefetcher>
where
Client: ObjectClient + Send + Sync + 'static,
Prefetcher: Prefetch,
{
config: S3FilesystemConfig,
client: Arc<Client>,
superblock: Superblock,
prefetcher: Prefetcher<Client, Runtime>,
prefetcher: Prefetcher,
uploader: Uploader<Client>,
bucket: String,
#[allow(unused)]
prefix: Prefix,
next_handle: AtomicU64,
dir_handles: AsyncRwLock<HashMap<u64, Arc<DirHandle>>>,
file_handles: AsyncRwLock<HashMap<u64, Arc<FileHandle<Client, Runtime>>>>,
file_handles: AsyncRwLock<HashMap<u64, Arc<FileHandle<Client, Prefetcher>>>>,
}

impl<Client, Runtime> S3Filesystem<Client, Runtime>
impl<Client, Prefetcher> S3Filesystem<Client, Prefetcher>
where
Client: ObjectClient + Send + Sync + 'static,
Runtime: Spawn + Send + Sync,
Prefetcher: Prefetch,
{
pub fn new(client: Client, runtime: Runtime, bucket: &str, prefix: &Prefix, config: S3FilesystemConfig) -> Self {
let superblock = Superblock::new(bucket, prefix, config.cache_config.clone());

pub fn new(
client: Client,
prefetcher: Prefetcher,
bucket: &str,
prefix: &Prefix,
config: S3FilesystemConfig,
) -> Self {
let client = Arc::new(client);

let prefetcher = Prefetcher::new(client.clone(), runtime, config.prefetcher_config);
let superblock = Superblock::new(bucket, prefix, config.cache_config.clone());
let uploader = Uploader::new(client.clone(), config.storage_class.to_owned());

Self {
Expand Down Expand Up @@ -429,10 +456,10 @@ pub trait ReadReplier {
fn error(self, error: Error) -> Self::Replied;
}

impl<Client, Runtime> S3Filesystem<Client, Runtime>
impl<Client, Prefetcher> S3Filesystem<Client, Prefetcher>
where
Client: ObjectClient + Send + Sync + 'static,
Runtime: Spawn + Send + Sync,
Prefetcher: Prefetch,
{
pub async fn init(&self, config: &mut KernelConfig) -> Result<(), libc::c_int> {
let _ = config.add_capabilities(fuser::consts::FUSE_DO_READDIRPLUS);
Expand Down Expand Up @@ -608,10 +635,13 @@ where
};

if request.is_none() {
*request = Some(
self.prefetcher
.get(&self.bucket, &handle.full_key, handle.object_size, file_etag),
);
*request = Some(self.prefetcher.prefetch(
self.client.clone(),
&self.bucket,
&handle.full_key,
handle.object_size,
file_etag,
));
}

match request.as_mut().unwrap().read(offset as u64, size as usize).await {
Expand Down
30 changes: 20 additions & 10 deletions mountpoint-s3/src/fuse.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
//! Links _fuser_ method calls into Mountpoint's filesystem code in [crate::fs].
use futures::executor::block_on;
use futures::task::Spawn;
use mountpoint_s3_client::ObjectClient;
use std::ffi::OsStr;
use std::path::Path;
use std::time::SystemTime;
Expand All @@ -11,14 +11,14 @@ use tracing::{instrument, Instrument};
use crate::fs::{
self, DirectoryEntry, DirectoryReplier, InodeNo, ReadReplier, S3Filesystem, S3FilesystemConfig, ToErrno,
};
use crate::prefetch::Prefetch;
use crate::prefix::Prefix;
#[cfg(target_os = "macos")]
use fuser::ReplyXTimes;
use fuser::{
Filesystem, KernelConfig, ReplyAttr, ReplyBmap, ReplyCreate, ReplyData, ReplyEmpty, ReplyEntry, ReplyIoctl,
ReplyLock, ReplyLseek, ReplyOpen, ReplyWrite, ReplyXattr, Request, TimeOrNow,
};
use mountpoint_s3_client::ObjectClient;

pub mod session;

Expand Down Expand Up @@ -48,26 +48,36 @@ macro_rules! fuse_unsupported {

/// This is just a thin wrapper around [S3Filesystem] that implements the actual `fuser` protocol,
/// so that we can test our actual filesystem implementation without having actual FUSE in the loop.
pub struct S3FuseFilesystem<Client: ObjectClient, Runtime> {
fs: S3Filesystem<Client, Runtime>,
pub struct S3FuseFilesystem<Client, Prefetcher>
where
Client: ObjectClient + Send + Sync + 'static,
Prefetcher: Prefetch,
{
fs: S3Filesystem<Client, Prefetcher>,
}

impl<Client, Runtime> S3FuseFilesystem<Client, Runtime>
impl<Client, Prefetcher> S3FuseFilesystem<Client, Prefetcher>
where
Client: ObjectClient + Send + Sync + 'static,
Runtime: Spawn + Send + Sync,
Prefetcher: Prefetch,
{
pub fn new(client: Client, runtime: Runtime, bucket: &str, prefix: &Prefix, config: S3FilesystemConfig) -> Self {
let fs = S3Filesystem::new(client, runtime, bucket, prefix, config);
pub fn new(
client: Client,
prefetcher: Prefetcher,
bucket: &str,
prefix: &Prefix,
config: S3FilesystemConfig,
) -> Self {
let fs = S3Filesystem::new(client, prefetcher, bucket, prefix, config);

Self { fs }
}
}

impl<Client, Runtime> Filesystem for S3FuseFilesystem<Client, Runtime>
impl<Client, Prefetcher> Filesystem for S3FuseFilesystem<Client, Prefetcher>
where
Client: ObjectClient + Send + Sync + 'static,
Runtime: Spawn + Send + Sync,
Prefetcher: Prefetch,
{
#[instrument(level="warn", skip_all, fields(req=_req.unique()))]
fn init(&self, _req: &Request<'_>, config: &mut KernelConfig) -> Result<(), libc::c_int> {
Expand Down
Loading

0 comments on commit c6b2a17

Please sign in to comment.