Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Replace callback in FS read with simple Result instead #691

Merged
merged 2 commits into from
Jan 6, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
36 changes: 11 additions & 25 deletions mountpoint-s3/src/fs.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
//! FUSE file system types and operations, not tied to the _fuser_ library bindings.

use bytes::Bytes;
use nix::unistd::{getgid, getuid};
use std::collections::HashMap;
use std::ffi::{OsStr, OsString};
Expand Down Expand Up @@ -477,19 +478,6 @@ pub struct DirectoryEntry {
lookup: LookedUp,
}

/// Reply to a `read` call. This is funky because we want the reply to happen with only a borrow of
/// the bytes. But that borrow probably comes from some lock in this module or below, and we don't
/// want to have to shoehorn that lifetime into the layer above us. So instead we have this trait
/// that forces the `read` method to invoke exactly one of the reply methods. The idea is that the
/// [Replied] type should be private and unconstructable by this module.
pub trait ReadReplier {
type Replied;
/// Reply with a data payload
fn data(self, data: &[u8]) -> Self::Replied;
/// Reply with an error
fn error(self, error: Error) -> Self::Replied;
}

impl<Client, Prefetcher> S3Filesystem<Client, Prefetcher>
where
Client: ObjectClient + Send + Sync + 'static,
Expand Down Expand Up @@ -642,16 +630,15 @@ where
}

#[allow(clippy::too_many_arguments)] // We don't get to choose this interface
pub async fn read<R: ReadReplier>(
dannycjones marked this conversation as resolved.
Show resolved Hide resolved
pub async fn read(
&self,
ino: InodeNo,
fh: u64,
offset: i64,
size: u32,
_flags: i32,
_lock: Option<u64>,
reply: R,
) -> R::Replied {
) -> Result<Bytes, Error> {
trace!(
"fs:read with ino {:?} fh {:?} offset {:?} size {:?}",
ino,
Expand All @@ -664,13 +651,13 @@ where
let file_handles = self.file_handles.read().await;
match file_handles.get(&fh) {
Some(handle) => handle.clone(),
None => return reply.error(err!(libc::EBADF, "invalid file handle")),
None => return Err(err!(libc::EBADF, "invalid file handle")),
}
};
logging::record_name(handle.inode.name());
let file_etag: ETag;
let mut request = match &handle.typ {
FileHandleType::Write { .. } => return reply.error(err!(libc::EBADF, "file handle is not open for reads")),
FileHandleType::Write { .. } => return Err(err!(libc::EBADF, "file handle is not open for reads")),
FileHandleType::Read { request, etag } => {
file_etag = etag.clone();
request.lock().await
Expand All @@ -688,18 +675,17 @@ where
}

match request.as_mut().unwrap().read(offset as u64, size as usize).await {
Ok(checksummed_bytes) => match checksummed_bytes.into_bytes() {
Ok(bytes) => reply.data(&bytes),
Err(e) => reply.error(err!(libc::EIO, source:e, "integrity error")),
},
Ok(checksummed_bytes) => checksummed_bytes
.into_bytes()
.map_err(|e| err!(libc::EIO, source:e, "integrity error")),
Err(PrefetchReadError::GetRequestFailed(ObjectClientError::ServiceError(
GetObjectError::PreconditionFailed,
))) => reply.error(err!(libc::ESTALE, "object was mutated remotely")),
Err(PrefetchReadError::Integrity(e)) => reply.error(err!(libc::EIO, source:e, "integrity error")),
))) => Err(err!(libc::ESTALE, "object was mutated remotely")),
Err(PrefetchReadError::Integrity(e)) => Err(err!(libc::EIO, source:e, "integrity error")),
Err(e @ PrefetchReadError::GetRequestFailed(_))
| Err(e @ PrefetchReadError::GetRequestTerminatedUnexpectedly)
| Err(e @ PrefetchReadError::GetRequestReturnedWrongOffset { .. }) => {
reply.error(err!(libc::EIO, source:e, "get request failed"))
Err(err!(libc::EIO, source:e, "get request failed"))
}
}
}
Expand Down
39 changes: 6 additions & 33 deletions mountpoint-s3/src/fuse.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,9 +8,7 @@ use std::time::SystemTime;
use time::OffsetDateTime;
use tracing::{field, instrument, Instrument};

use crate::fs::{
self, DirectoryEntry, DirectoryReplier, InodeNo, ReadReplier, S3Filesystem, S3FilesystemConfig, ToErrno,
};
use crate::fs::{DirectoryEntry, DirectoryReplier, InodeNo, S3Filesystem, S3FilesystemConfig, ToErrno};
use crate::prefetch::Prefetch;
use crate::prefix::Prefix;
#[cfg(target_os = "macos")]
Expand Down Expand Up @@ -127,39 +125,14 @@ where
) {
let mut bytes_sent = 0;

struct Replied(());

struct ReplyRead<'a> {
inner: fuser::ReplyData,
bytes_sent: &'a mut usize,
}

impl ReadReplier for ReplyRead<'_> {
type Replied = Replied;

fn data(self, data: &[u8]) -> Replied {
self.inner.data(data);
*self.bytes_sent = data.len();
Replied(())
}

fn error(self, error: fs::Error) -> Replied {
fuse_error!("read", self.inner, error);
Replied(())
match block_on(self.fs.read(ino, fh, offset, size, flags, lock).in_current_span()) {
Ok(data) => {
bytes_sent = data.len();
reply.data(&data);
}
Err(err) => fuse_error!("read", reply, err),
}

let replier = ReplyRead {
inner: reply,
bytes_sent: &mut bytes_sent,
};
block_on(
self.fs
.read(ino, fh, offset, size, flags, lock, replier)
.in_current_span(),
);
// return value of read is proof a reply was sent

metrics::counter!("fuse.total_bytes", bytes_sent as u64, "type" => "read");
metrics::histogram!("fuse.io_size", bytes_sent as f64, "type" => "read");
}
Expand Down
16 changes: 1 addition & 15 deletions mountpoint-s3/tests/common/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ pub mod s3;

use fuser::{FileAttr, FileType};
use futures::executor::ThreadPool;
use mountpoint_s3::fs::{self, DirectoryEntry, DirectoryReplier, ReadReplier, ToErrno};
use mountpoint_s3::fs::{DirectoryEntry, DirectoryReplier};
use mountpoint_s3::prefetch::{default_prefetch, DefaultPrefetcher};
use mountpoint_s3::prefix::Prefix;
use mountpoint_s3::{S3Filesystem, S3FilesystemConfig};
Expand Down Expand Up @@ -91,20 +91,6 @@ impl DirectoryReply {
}
}

pub struct ReadReply<'a>(pub &'a mut Result<Box<[u8]>, libc::c_int>);

impl<'a> ReadReplier for ReadReply<'a> {
type Replied = ();

fn data(self, data: &[u8]) -> Self::Replied {
*self.0 = Ok(data.into());
}

fn error(self, error: fs::Error) -> Self::Replied {
*self.0 = Err(error.to_errno());
}
}

/// Enable tracing and CRT logging when running unit tests.
#[ctor::ctor]
fn init_tracing_subscriber() {
Expand Down
59 changes: 28 additions & 31 deletions mountpoint-s3/tests/fs.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ use std::time::{Duration, SystemTime};
use test_case::test_case;

mod common;
use common::{assert_attr, make_test_filesystem, make_test_filesystem_with_client, DirectoryReply, ReadReply};
use common::{assert_attr, make_test_filesystem, make_test_filesystem_with_client, DirectoryReply};

#[test_case(""; "unprefixed")]
#[test_case("test_prefix/"; "prefixed")]
Expand Down Expand Up @@ -75,9 +75,11 @@ async fn test_read_dir_root(prefix: &str) {
assert_attr(attr.attr, FileType::RegularFile, 15, uid, gid, file_perm);

let fh = fs.open(reply.ino, 0x8000, 0).await.unwrap().fh;
let mut read = Err(0);
fs.read(reply.ino, fh, 0, 4096, 0, None, ReadReply(&mut read)).await;
assert_eq!(&read.unwrap()[..], &[0xa0 + (i as u8 + 1); 15]);
let bytes_read = fs
.read(reply.ino, fh, 0, 4096, 0, None)
.await
.expect("fs read should succeed");
assert_eq!(&bytes_read[..], &[0xa0 + (i as u8 + 1); 15]);
fs.release(reply.ino, fh, 0, None, true).await.unwrap();

offset = offset.max(reply.offset);
Expand Down Expand Up @@ -149,9 +151,11 @@ async fn test_read_dir_nested(prefix: &str) {
assert_attr(attr.attr, FileType::RegularFile, 15, uid, gid, file_perm);

let fh = fs.open(reply.ino, 0x8000, 0).await.unwrap().fh;
let mut read = Err(0);
fs.read(reply.ino, fh, 0, 4096, 0, None, ReadReply(&mut read)).await;
assert_eq!(&read.unwrap()[..], &[0xa0 + (i as u8 + 1); 15]);
let bytes_read = fs
.read(reply.ino, fh, 0, 4096, 0, None)
.await
.expect("fs read should succeed");
assert_eq!(&bytes_read[..], &[0xa0 + (i as u8 + 1); 15]);
fs.release(reply.ino, fh, 0, None, true).await.unwrap();

offset = offset.max(reply.offset);
Expand Down Expand Up @@ -445,12 +449,12 @@ async fn test_random_read(object_size: usize) {
let offset = rng.gen_range(0..object_size);
// TODO do we need to bound it? should work anyway, just partial read, right?
let length = rng.gen_range(0..(object_size - offset).min(1024 * 1024)) + 1;
let mut read = Err(0);
fs.read(ino, fh, offset as i64, length as u32, 0, None, ReadReply(&mut read))
.await;
let read = read.unwrap();
assert_eq!(read.len(), length);
assert_eq!(&read[..], &expected[offset..offset + length]);
let bytes_read = fs
.read(ino, fh, offset as i64, length as u32, 0, None)
.await
.expect("fs read should succeed");
assert_eq!(bytes_read.len(), length);
assert_eq!(&bytes_read[..], &expected[offset..offset + length]);
}

fs.release(ino, fh, 0, None, true).await.unwrap();
Expand Down Expand Up @@ -494,10 +498,11 @@ async fn test_implicit_directory_shadow(prefix: &str) {
assert_eq!(reply.entries[2].attr.kind, FileType::RegularFile);

let fh = fs.open(reply.entries[2].ino, 0x8000, 0).await.unwrap().fh;
let mut read = Err(0);
fs.read(reply.entries[2].ino, fh, 0, 4096, 0, None, ReadReply(&mut read))
.await;
assert_eq!(&read.unwrap()[..], &[0xa2; 15]);
let bytes_read = fs
.read(reply.entries[2].ino, fh, 0, 4096, 0, None)
.await
.expect("fs read should succeed");
assert_eq!(&bytes_read[..], &[0xa2; 15]);
fs.release(reply.entries[2].ino, fh, 0, None, true).await.unwrap();

// Explicitly looking up the shadowed file should fail
Expand Down Expand Up @@ -594,20 +599,12 @@ async fn test_sequential_write(write_size: usize) {
let mut offset = 0;
while offset < size {
let length = 1024.min(size - offset);
let mut read = Err(0);
fs.read(
file_ino,
fh,
offset as i64,
length as u32,
0,
None,
ReadReply(&mut read),
)
.await;
let read = read.unwrap();
assert_eq!(read.len(), length);
assert_eq!(&read[..], &body[offset..offset + length]);
let bytes_read = fs
.read(file_ino, fh, offset as i64, length as u32, 0, None)
.await
.expect("fs read should succeed");
assert_eq!(bytes_read.len(), length);
assert_eq!(&bytes_read[..], &body[offset..offset + length]);
offset += length;
}

Expand Down
26 changes: 7 additions & 19 deletions mountpoint-s3/tests/reftests/harness.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ use std::time::Duration;

use fuser::FileType;
use futures::future::{BoxFuture, FutureExt};
use mountpoint_s3::fs::{self, CacheConfig, InodeNo, ReadReplier, ToErrno, FUSE_ROOT_INODE};
use mountpoint_s3::fs::{CacheConfig, InodeNo, ToErrno, FUSE_ROOT_INODE};
use mountpoint_s3::prefix::Prefix;
use mountpoint_s3::S3FilesystemConfig;
use mountpoint_s3_client::mock_client::{MockClient, MockObject};
Expand Down Expand Up @@ -676,20 +676,6 @@ impl Harness {
}

async fn compare_file<'a>(&'a self, fs_file: InodeNo, ref_file: &'a MockObject) {
struct ReadVerifier(Box<[u8]>);

impl ReadReplier for ReadVerifier {
type Replied = ();

fn data(self, data: &[u8]) -> Self::Replied {
assert_eq!(&self.0[..], data, "read bytes don't match");
}

fn error(self, error: fs::Error) -> Self::Replied {
panic!("read failed: {error}");
}
}

let fh = match self.fs.open(fs_file, 0x8000, 0).await {
Ok(ret) => ret.fh,
Err(e) => panic!("failed to open {fs_file}: {e:?}"),
Expand All @@ -701,10 +687,12 @@ impl Harness {
let num_bytes = MAX_READ_SIZE.min(file_size - offset);
let ref_bytes = ref_file.read(offset as u64, num_bytes);
assert_eq!(ref_bytes.len(), num_bytes);
let read_verifier = ReadVerifier(ref_bytes);
self.fs
.read(fs_file, fh, offset as i64, num_bytes as u32, 0, None, read_verifier)
.await;
let bytes_from_read = self
.fs
.read(fs_file, fh, offset as i64, num_bytes as u32, 0, None)
.await
.expect("read should succeed");
assert_eq!(&ref_bytes[..], &bytes_from_read, "read bytes did not match");
offset += num_bytes;
}
}
Expand Down
Loading