Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Introduce Prefetch trait #595

Merged
merged 1 commit into from
Nov 8, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 4 additions & 3 deletions mountpoint-s3/examples/fs_benchmark.rs
Original file line number Diff line number Diff line change
@@ -1,13 +1,13 @@
use clap::{Arg, ArgAction, Command};
use fuser::{BackgroundSession, MountOption, Session};
use mountpoint_s3::fuse::S3FuseFilesystem;
use mountpoint_s3::prefetch::default_prefetch;
use mountpoint_s3::S3FilesystemConfig;
use mountpoint_s3_client::config::{EndpointConfig, S3ClientConfig};
use mountpoint_s3_client::S3CrtClient;
use mountpoint_s3_crt::common::rust_log_adapter::RustLogAdapter;
use std::{
fs::File,
fs::OpenOptions,
fs::{File, OpenOptions},
io::{self, BufRead, BufReader},
time::Instant,
};
Expand Down Expand Up @@ -164,8 +164,9 @@ fn mount_file_system(bucket_name: &str, region: &str, throughput_target_gbps: Op
bucket_name,
mountpoint.to_str().unwrap()
);
let prefetcher = default_prefetch(runtime, Default::default());
let session = Session::new(
S3FuseFilesystem::new(client, runtime, bucket_name, &Default::default(), filesystem_config),
S3FuseFilesystem::new(client, prefetcher, bucket_name, &Default::default(), filesystem_config),
mountpoint,
&options,
)
Expand Down
6 changes: 3 additions & 3 deletions mountpoint-s3/examples/prefetch_benchmark.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ use std::time::Instant;

use clap::{Arg, Command};
use futures::executor::{block_on, ThreadPool};
use mountpoint_s3::prefetch::Prefetcher;
use mountpoint_s3::prefetch::{default_prefetch, Prefetch, PrefetchResult};
use mountpoint_s3_client::config::{EndpointConfig, S3ClientConfig};
use mountpoint_s3_client::types::ETag;
use mountpoint_s3_client::S3CrtClient;
Expand Down Expand Up @@ -80,12 +80,12 @@ fn main() {

for i in 0..iterations.unwrap_or(1) {
let runtime = ThreadPool::builder().pool_size(1).create().unwrap();
let manager = Prefetcher::new(client.clone(), runtime, Default::default());
let manager = default_prefetch(runtime, Default::default());
let received_size = Arc::new(AtomicU64::new(0));

let start = Instant::now();

let mut request = manager.get(bucket, key, size, ETag::for_tests());
let mut request = manager.prefetch(client.clone(), bucket, key, size, ETag::for_tests());
block_on(async {
loop {
let offset = received_size.load(Ordering::SeqCst);
Expand Down
90 changes: 60 additions & 30 deletions mountpoint-s3/src/fs.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
//! FUSE file system types and operations, not tied to the _fuser_ library bindings.

use futures::task::Spawn;
use nix::unistd::{getgid, getuid};
use std::collections::HashMap;
use std::ffi::{OsStr, OsString};
Expand All @@ -15,7 +14,7 @@ use mountpoint_s3_client::types::ETag;
use mountpoint_s3_client::ObjectClient;

use crate::inode::{Inode, InodeError, InodeKind, LookedUp, ReaddirHandle, Superblock, WriteHandle};
use crate::prefetch::{PrefetchGetObject, PrefetchReadError, Prefetcher, PrefetcherConfig};
use crate::prefetch::{Prefetch, PrefetchReadError, PrefetchResult};
use crate::prefix::Prefix;
use crate::sync::atomic::{AtomicI64, AtomicU64, Ordering};
use crate::sync::{Arc, AsyncMutex, AsyncRwLock};
Expand Down Expand Up @@ -49,30 +48,54 @@ impl DirHandle {
}

#[derive(Debug)]
struct FileHandle<Client: ObjectClient, Runtime> {
struct FileHandle<Client, Prefetcher>
where
Client: ObjectClient + Send + Sync + 'static,
Prefetcher: Prefetch,
{
inode: Inode,
full_key: String,
object_size: u64,
typ: FileHandleType<Client, Runtime>,
typ: FileHandleType<Client, Prefetcher>,
}

#[derive(Debug)]
enum FileHandleType<Client: ObjectClient, Runtime> {
enum FileHandleType<Client, Prefetcher>
where
Client: ObjectClient + Send + Sync + 'static,
Prefetcher: Prefetch,
{
Read {
request: AsyncMutex<Option<PrefetchGetObject<Client, Runtime>>>,
request: AsyncMutex<Option<Prefetcher::PrefetchResult<Client>>>,
etag: ETag,
},
Write(AsyncMutex<UploadState<Client>>),
}

impl<Client: ObjectClient, Runtime> FileHandleType<Client, Runtime> {
impl<Client, Prefetcher> std::fmt::Debug for FileHandleType<Client, Prefetcher>
where
Client: ObjectClient + Send + Sync + 'static + std::fmt::Debug,
Prefetcher: Prefetch,
{
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
match self {
Self::Read { request: _, etag } => f.debug_struct("Read").field("etag", etag).finish(),
Self::Write(arg0) => f.debug_tuple("Write").field(arg0).finish(),
}
}
}

impl<Client, Prefetcher> FileHandleType<Client, Prefetcher>
where
Client: ObjectClient + Send + Sync,
Prefetcher: Prefetch,
{
async fn new_write_handle(
lookup: &LookedUp,
ino: InodeNo,
flags: i32,
pid: u32,
fs: &S3Filesystem<Client, Runtime>,
) -> Result<FileHandleType<Client, Runtime>, Error> {
fs: &S3Filesystem<Client, Prefetcher>,
) -> Result<FileHandleType<Client, Prefetcher>, Error> {
// We can't support O_SYNC writes because they require the data to go to stable storage
// at `write` time, but we only commit a PUT at `close` time.
if flags & (libc::O_SYNC | libc::O_DSYNC) != 0 {
Expand All @@ -96,7 +119,7 @@ impl<Client: ObjectClient, Runtime> FileHandleType<Client, Runtime> {
Ok(handle)
}

async fn new_read_handle(lookup: &LookedUp) -> Result<FileHandleType<Client, Runtime>, Error> {
async fn new_read_handle(lookup: &LookedUp) -> Result<FileHandleType<Client, Prefetcher>, Error> {
if !lookup.stat.is_readable {
return Err(err!(
libc::EACCES,
Expand Down Expand Up @@ -302,8 +325,6 @@ pub struct S3FilesystemConfig {
pub dir_mode: u16,
/// File permissions
pub file_mode: u16,
/// Prefetcher configuration
pub prefetcher_config: PrefetcherConfig,
/// Allow delete
pub allow_delete: bool,
/// Storage class to be used for new object uploads
Expand All @@ -322,39 +343,45 @@ impl Default for S3FilesystemConfig {
gid,
dir_mode: 0o755,
file_mode: 0o644,
prefetcher_config: PrefetcherConfig::default(),
allow_delete: false,
storage_class: None,
}
}
}

#[derive(Debug)]
pub struct S3Filesystem<Client: ObjectClient, Runtime> {
pub struct S3Filesystem<Client, Prefetcher>
where
Client: ObjectClient + Send + Sync + 'static,
Prefetcher: Prefetch,
{
config: S3FilesystemConfig,
client: Arc<Client>,
superblock: Superblock,
prefetcher: Prefetcher<Client, Runtime>,
prefetcher: Prefetcher,
uploader: Uploader<Client>,
bucket: String,
#[allow(unused)]
prefix: Prefix,
next_handle: AtomicU64,
dir_handles: AsyncRwLock<HashMap<u64, Arc<DirHandle>>>,
file_handles: AsyncRwLock<HashMap<u64, Arc<FileHandle<Client, Runtime>>>>,
file_handles: AsyncRwLock<HashMap<u64, Arc<FileHandle<Client, Prefetcher>>>>,
}

impl<Client, Runtime> S3Filesystem<Client, Runtime>
impl<Client, Prefetcher> S3Filesystem<Client, Prefetcher>
where
Client: ObjectClient + Send + Sync + 'static,
Runtime: Spawn + Send + Sync,
Prefetcher: Prefetch,
{
pub fn new(client: Client, runtime: Runtime, bucket: &str, prefix: &Prefix, config: S3FilesystemConfig) -> Self {
let superblock = Superblock::new(bucket, prefix, config.cache_config.clone());

pub fn new(
client: Client,
prefetcher: Prefetcher,
bucket: &str,
prefix: &Prefix,
config: S3FilesystemConfig,
) -> Self {
let client = Arc::new(client);

let prefetcher = Prefetcher::new(client.clone(), runtime, config.prefetcher_config);
let superblock = Superblock::new(bucket, prefix, config.cache_config.clone());
let uploader = Uploader::new(client.clone(), config.storage_class.to_owned());

Self {
Expand Down Expand Up @@ -429,10 +456,10 @@ pub trait ReadReplier {
fn error(self, error: Error) -> Self::Replied;
}

impl<Client, Runtime> S3Filesystem<Client, Runtime>
impl<Client, Prefetcher> S3Filesystem<Client, Prefetcher>
where
Client: ObjectClient + Send + Sync + 'static,
Runtime: Spawn + Send + Sync,
Prefetcher: Prefetch,
{
pub async fn init(&self, config: &mut KernelConfig) -> Result<(), libc::c_int> {
let _ = config.add_capabilities(fuser::consts::FUSE_DO_READDIRPLUS);
Expand Down Expand Up @@ -608,10 +635,13 @@ where
};

if request.is_none() {
*request = Some(
self.prefetcher
.get(&self.bucket, &handle.full_key, handle.object_size, file_etag),
);
*request = Some(self.prefetcher.prefetch(
self.client.clone(),
&self.bucket,
&handle.full_key,
handle.object_size,
file_etag,
));
}

match request.as_mut().unwrap().read(offset as u64, size as usize).await {
Expand Down
30 changes: 20 additions & 10 deletions mountpoint-s3/src/fuse.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
//! Links _fuser_ method calls into Mountpoint's filesystem code in [crate::fs].

use futures::executor::block_on;
use futures::task::Spawn;
use mountpoint_s3_client::ObjectClient;
use std::ffi::OsStr;
use std::path::Path;
use std::time::SystemTime;
Expand All @@ -11,14 +11,14 @@ use tracing::{instrument, Instrument};
use crate::fs::{
self, DirectoryEntry, DirectoryReplier, InodeNo, ReadReplier, S3Filesystem, S3FilesystemConfig, ToErrno,
};
use crate::prefetch::Prefetch;
use crate::prefix::Prefix;
#[cfg(target_os = "macos")]
use fuser::ReplyXTimes;
use fuser::{
Filesystem, KernelConfig, ReplyAttr, ReplyBmap, ReplyCreate, ReplyData, ReplyEmpty, ReplyEntry, ReplyIoctl,
ReplyLock, ReplyLseek, ReplyOpen, ReplyWrite, ReplyXattr, Request, TimeOrNow,
};
use mountpoint_s3_client::ObjectClient;

pub mod session;

Expand Down Expand Up @@ -48,26 +48,36 @@ macro_rules! fuse_unsupported {

/// This is just a thin wrapper around [S3Filesystem] that implements the actual `fuser` protocol,
/// so that we can test our actual filesystem implementation without having actual FUSE in the loop.
pub struct S3FuseFilesystem<Client: ObjectClient, Runtime> {
fs: S3Filesystem<Client, Runtime>,
pub struct S3FuseFilesystem<Client, Prefetcher>
where
Client: ObjectClient + Send + Sync + 'static,
Prefetcher: Prefetch,
{
fs: S3Filesystem<Client, Prefetcher>,
}

impl<Client, Runtime> S3FuseFilesystem<Client, Runtime>
impl<Client, Prefetcher> S3FuseFilesystem<Client, Prefetcher>
where
Client: ObjectClient + Send + Sync + 'static,
Runtime: Spawn + Send + Sync,
Prefetcher: Prefetch,
{
pub fn new(client: Client, runtime: Runtime, bucket: &str, prefix: &Prefix, config: S3FilesystemConfig) -> Self {
let fs = S3Filesystem::new(client, runtime, bucket, prefix, config);
pub fn new(
client: Client,
prefetcher: Prefetcher,
bucket: &str,
prefix: &Prefix,
config: S3FilesystemConfig,
) -> Self {
let fs = S3Filesystem::new(client, prefetcher, bucket, prefix, config);

Self { fs }
}
}

impl<Client, Runtime> Filesystem for S3FuseFilesystem<Client, Runtime>
impl<Client, Prefetcher> Filesystem for S3FuseFilesystem<Client, Prefetcher>
where
Client: ObjectClient + Send + Sync + 'static,
Runtime: Spawn + Send + Sync,
Prefetcher: Prefetch,
{
#[instrument(level="warn", skip_all, fields(req=_req.unique()))]
fn init(&self, _req: &Request<'_>, config: &mut KernelConfig) -> Result<(), libc::c_int> {
Expand Down
Loading
Loading