Skip to content

Commit

Permalink
Implement prefer_s3 toggle in mountpoint-s3::fs::CacheConfig (#547)
Browse files Browse the repository at this point in the history
This plumbs in checks for if the filesystem should maintain strong consistency for operations like open.
There is no way to configure mountpoint-s3 itself to relax the consistency model - this change only impacts internals.

Signed-off-by: Daniel Carl Jones <[email protected]>
  • Loading branch information
dannycjones authored Oct 17, 2023
1 parent 7e94711 commit 8afdab6
Show file tree
Hide file tree
Showing 4 changed files with 181 additions and 20 deletions.
21 changes: 17 additions & 4 deletions mountpoint-s3/src/fs.rs
Original file line number Diff line number Diff line change
Expand Up @@ -194,6 +194,13 @@ impl<Client: ObjectClient> UploadState<Client> {

#[derive(Debug, Clone)]
pub struct CacheConfig {
/// Should the file system check S3 even when a valid cached entry may be available?
///
/// When enabled, some operations such as `getattr` are allowed to be served from cache
/// with a short TTL since Linux filesystems behave badly when the TTL is zero.
/// For example, results from `readdir` will expire immediately, and so the kernel will
/// immediately `getattr` every entry returned from `readdir`.
pub prefer_s3: bool,
/// How long the kernel will cache metadata for files
pub file_ttl: Duration,
/// How long the kernel will cache metadata for directories
Expand All @@ -202,8 +209,9 @@ pub struct CacheConfig {

impl Default for CacheConfig {
fn default() -> Self {
// We want to do as little caching as possible, but Linux filesystems behave badly when the
// TTL is exactly zero. For example, results from `readdir` will expire immediately, and so
// We want to do as little caching as possible by default,
// but Linux filesystems behave badly when the TTL is exactly zero.
// For example, results from `readdir` will expire immediately, and so
// the kernel will immediately re-lookup every entry returned from `readdir`. So we apply
// small non-zero TTLs. The goal is to be small enough that the impact on consistency is
// minimal, but large enough that a single cache miss doesn't cause a cascading effect where
Expand All @@ -213,7 +221,11 @@ impl Default for CacheConfig {
let file_ttl = Duration::from_millis(100);
let dir_ttl = Duration::from_millis(1000);

Self { file_ttl, dir_ttl }
Self {
prefer_s3: true,
file_ttl,
dir_ttl,
}
}
}

Expand Down Expand Up @@ -461,7 +473,8 @@ where
pub async fn open(&self, ino: InodeNo, flags: i32) -> Result<Opened, Error> {
trace!("fs:open with ino {:?} flags {:?}", ino, flags);

let lookup = self.superblock.getattr(&self.client, ino, true).await?;
let force_revalidate = self.config.cache_config.prefer_s3;
let lookup = self.superblock.getattr(&self.client, ino, force_revalidate).await?;

match lookup.inode.kind() {
InodeKind::Directory => return Err(InodeError::IsDirectory(lookup.inode.err()).into()),
Expand Down
170 changes: 155 additions & 15 deletions mountpoint-s3/src/inode.rs
Original file line number Diff line number Diff line change
Expand Up @@ -160,6 +160,13 @@ impl Superblock {
}
}
writing_children.remove(&ino);

if let Ok(state) = inode.get_inode_state() {
metrics::counter!(
"metadata_cache.inode_forgotten_before_expiry",
state.stat.is_valid().into(),
);
};
}
}
}
Expand All @@ -174,7 +181,10 @@ impl Superblock {
name: &OsStr,
) -> Result<LookedUp, InodeError> {
trace!(parent=?parent_ino, ?name, "lookup");
let lookup = self.inner.lookup(client, parent_ino, name).await?;
let lookup = self
.inner
.lookup_by_name(client, parent_ino, name, self.inner.cache_config.prefer_s3)
.await?;
self.inner.remember(&lookup.inode);
Ok(lookup)
}
Expand All @@ -197,7 +207,10 @@ impl Superblock {
}
}

let lookup = self.inner.lookup(client, inode.parent(), inode.name().as_ref()).await?;
let lookup = self
.inner
.lookup_by_name(client, inode.parent(), inode.name().as_ref(), true)
.await?;
if lookup.inode.ino() != ino {
Err(InodeError::StaleInode {
remote_key: lookup.inode.full_key().to_owned(),
Expand Down Expand Up @@ -287,7 +300,10 @@ impl Superblock {
) -> Result<LookedUp, InodeError> {
trace!(parent=?dir, ?name, "create");

let existing = self.inner.lookup(client, dir, name).await;
let existing = self
.inner
.lookup_by_name(client, dir, name, self.inner.cache_config.prefer_s3)
.await;
match existing {
Ok(lookup) => return Err(InodeError::FileAlreadyExists(lookup.inode.err())),
Err(InodeError::FileDoesNotExist) => (),
Expand Down Expand Up @@ -341,7 +357,10 @@ impl Superblock {
parent_ino: InodeNo,
name: &OsStr,
) -> Result<(), InodeError> {
let LookedUp { inode, .. } = self.inner.lookup(client, parent_ino, name).await?;
let LookedUp { inode, .. } = self
.inner
.lookup_by_name(client, parent_ino, name, self.inner.cache_config.prefer_s3)
.await?;

if inode.kind() == InodeKind::File {
return Err(InodeError::NotADirectory(inode.err()));
Expand Down Expand Up @@ -407,7 +426,10 @@ impl Superblock {
name: &OsStr,
) -> Result<(), InodeError> {
let parent = self.inner.get(parent_ino)?;
let LookedUp { inode, .. } = self.inner.lookup(client, parent_ino, name).await?;
let LookedUp { inode, .. } = self
.inner
.lookup_by_name(client, parent_ino, name, self.inner.cache_config.prefer_s3)
.await?;

if inode.kind() == InodeKind::Directory {
return Err(InodeError::IsDirectory(inode.err()));
Expand Down Expand Up @@ -474,7 +496,10 @@ impl Superblock {
}

impl SuperblockInner {
/// Retrieve the inode for the given number if it exists
/// Retrieve the inode for the given number if it exists.
///
/// The expiry of its stat field is not checked.
/// This may return error on no entry existing or if the Inode is corrupted.
pub fn get(&self, ino: InodeNo) -> Result<Inode, InodeError> {
let inode = self
.inodes
Expand All @@ -499,14 +524,16 @@ impl SuperblockInner {
}

/// Lookup an inode in the parent directory with the given name.
///
/// Updates the parent inode to be in sync with the client, but does
/// not add new inodes to the superblock. The caller is responsible
/// for calling [`remember()`] if that is required.
pub async fn lookup<OC: ObjectClient>(
pub async fn lookup_by_name<OC: ObjectClient>(
&self,
client: &OC,
parent_ino: InodeNo,
name: &OsStr,
skip_cache: bool,
) -> Result<LookedUp, InodeError> {
let name = name
.to_str()
Expand All @@ -518,14 +545,60 @@ impl SuperblockInner {
return Err(InodeError::InvalidFileName(name.into()));
}

// TODO use caches. if we already know about this name, we just need to revalidate the stat
// cache and then read it.
let remote = self.remote_lookup(client, parent_ino, name).await?;
let lookup = self.update_from_remote(parent_ino, name, remote)?;
let lookup = if skip_cache {
None
} else {
self.cache_lookup(parent_ino, name)
};

let lookup = match lookup {
Some(lookup) => lookup,
None => {
let remote = self.remote_lookup(client, parent_ino, name).await?;
self.update_from_remote(parent_ino, name, remote)?
}
};

lookup.inode.verify_child(parent_ino, name.as_ref())?;
Ok(lookup)
}

/// Lookup an [Inode] against known directory entries in the parent,
/// verifying any returned entry has not expired.
fn cache_lookup(&self, parent_ino: InodeNo, name: &str) -> Option<LookedUp> {
fn do_cache_lookup(parent: Inode, name: &str) -> Option<LookedUp> {
match &parent.get_inode_state().ok()?.kind_data {
InodeKindData::File { .. } => unreachable!("parent should be a directory!"),
InodeKindData::Directory { children, .. } => {
let inode = children.get(name)?;
let inode_stat = &inode.get_inode_state().ok()?.stat;
if inode_stat.is_valid() {
let lookup = LookedUp {
inode: inode.clone(),
stat: inode_stat.clone(),
};
return Some(lookup);
}
}
};

None
}

let lookup = self
.get(parent_ino)
.ok()
.and_then(|parent| do_cache_lookup(parent, name));

match &lookup {
Some(lookup) => trace!("lookup returned from cache: {:?}", lookup),
None => trace!("no lookup available from cache"),
}
metrics::counter!("metadata_cache.cache_hit", lookup.is_some().into());

lookup
}

/// Lookup an inode in the parent directory with the given name
/// on the remote client.
async fn remote_lookup<OC: ObjectClient>(
Expand Down Expand Up @@ -1121,6 +1194,7 @@ impl Inode {
Self { inner: inner.into() }
}

/// Verify [Inode] has the expected inode number and the inode content is valid for its checksum.
fn verify_inode(&self, expected_ino: InodeNo) -> Result<(), InodeError> {
let computed = Self::compute_checksum(self.ino(), self.full_key());
if computed == self.inner.checksum && self.ino() == expected_ino {
Expand All @@ -1130,6 +1204,8 @@ impl Inode {
}
}

/// Verify [Inode] has the expected inode number, expected parent inode number,
/// and the inode's content is valid for its checksum.
fn verify_child(&self, expected_parent: InodeNo, expected_name: &str) -> Result<(), InodeError> {
let computed = Self::compute_checksum(self.ino(), self.full_key());
if computed == self.inner.checksum && self.parent() == expected_parent && self.name() == expected_name {
Expand Down Expand Up @@ -1197,11 +1273,10 @@ impl From<InodeKind> for FileType {
enum InodeKindData {
File {},
Directory {
/// Mapping from child names to [Inode]s.
/// Mapping from child names to previously seen [Inode]s.
///
/// How should this field be used?:
/// - **Many operations should maintain** this list.
/// - **Only `mknod` and `mkdir` should read** this list, for checking if a file already exists.
/// The existence of a child or lack thereof does not imply the object does not exist,
/// nor that it currently exists in S3 in that state.
children: HashMap<String, Inode>,

/// A set of inode numbers that have been opened for write but not completed yet.
Expand Down Expand Up @@ -1510,6 +1585,71 @@ mod tests {
}
}

#[test_case(true; "cached")]
#[test_case(false; "not cached")]
#[tokio::test]
async fn test_lookup_with_caching(cached: bool) {
let bucket = "test_bucket";
let prefix = "prefix/";
let client_config = MockClientConfig {
bucket: bucket.to_string(),
part_size: 1024 * 1024,
};
let client = Arc::new(MockClient::new(client_config));

let keys = &[
format!("{prefix}dir0/file0.txt"),
format!("{prefix}dir0/sdir0/file0.txt"),
format!("{prefix}dir0/sdir0/file1.txt"),
];

let object_size = 30;
let mut last_modified = OffsetDateTime::UNIX_EPOCH;
for key in keys {
let mut obj = MockObject::constant(0xaa, object_size, ETag::for_tests());
last_modified += Duration::days(1);
obj.set_last_modified(last_modified);
client.add_object(key, obj);
}

let prefix = Prefix::new(prefix).expect("valid prefix");
let ts = OffsetDateTime::now_utc();
let ttl = if cached {
std::time::Duration::from_secs(60 * 60 * 24 * 7) // 7 days should be enough
} else {
std::time::Duration::ZERO
};
let superblock = Superblock::new(
bucket,
&prefix,
CacheConfig {
prefer_s3: false,
dir_ttl: ttl,
file_ttl: ttl,
},
);

let dir0 = superblock
.lookup(&client, FUSE_ROOT_INODE, &OsString::from("dir0"))
.await
.expect("should exist");
let file0 = superblock
.lookup(&client, dir0.inode.ino(), &OsString::from("file0.txt"))
.await
.expect("should exist");

client.remove_object(file0.inode.full_key());

let file0 = superblock
.lookup(&client, dir0.inode.ino(), &OsString::from("file0.txt"))
.await;
if cached {
file0.expect("file0 inode should still be served from cache");
} else {
file0.expect_err("file0 entry should have expired, and not be found in S3");
}
}

#[tokio::test]
async fn test_forget() {
let superblock = Superblock::new("test_bucket", &Default::default(), Default::default());
Expand Down
1 change: 1 addition & 0 deletions mountpoint-s3/tests/fuse_tests/lookup_test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -111,6 +111,7 @@ where
{
let filesystem_config = S3FilesystemConfig {
cache_config: CacheConfig {
prefer_s3: true,
file_ttl: Duration::ZERO,
dir_ttl: Duration::ZERO,
},
Expand Down
9 changes: 8 additions & 1 deletion mountpoint-s3/tests/reftests/harness.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,11 +2,12 @@ use std::collections::{BTreeMap, HashSet};
use std::fmt::Debug;
use std::path::{Component, Path, PathBuf};
use std::sync::Arc;
use std::time::Duration;

use fuser::FileType;
use futures::executor::ThreadPool;
use futures::future::{BoxFuture, FutureExt};
use mountpoint_s3::fs::{self, InodeNo, ReadReplier, ToErrno, FUSE_ROOT_INODE};
use mountpoint_s3::fs::{self, CacheConfig, InodeNo, ReadReplier, ToErrno, FUSE_ROOT_INODE};
use mountpoint_s3::prefix::Prefix;
use mountpoint_s3::{S3Filesystem, S3FilesystemConfig};
use mountpoint_s3_client::mock_client::{MockClient, MockObject};
Expand Down Expand Up @@ -885,6 +886,12 @@ mod mutations {
let config = S3FilesystemConfig {
readdir_size: 5,
allow_delete: true,
cache_config: CacheConfig {
// We are only interested in strong consistency for the reference tests. FUSE isn't even in the loop.
prefer_s3: true,
dir_ttl: Duration::ZERO,
file_ttl: Duration::ZERO,
},
..Default::default()
};
let (client, fs) = make_test_filesystem(BUCKET_NAME, &test_prefix, config);
Expand Down

0 comments on commit 8afdab6

Please sign in to comment.