Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Multilevel cache #1064

Merged
merged 2 commits into from
Nov 6, 2024
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 12 additions & 2 deletions mountpoint-s3-client/src/mock_client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ lazy_static! {
static ref RAMP_BYTES: Vec<u8> = ramp_bytes(0, RAMP_BUFFER_SIZE + RAMP_MODULUS);
}

#[derive(Debug, Default)]
#[derive(Debug, Default, Clone)]
pub struct MockClientConfig {
/// The bucket name this client will connect to
pub bucket: String,
Expand All @@ -74,7 +74,7 @@ pub struct MockClientConfig {

/// A mock implementation of an object client that we can manually add objects to, and then query
/// via the [ObjectClient] APIs.
#[derive(Debug)]
#[derive(Debug, Clone)]
passaro marked this conversation as resolved.
Show resolved Hide resolved
pub struct MockClient {
config: MockClientConfig,
objects: Arc<RwLock<BTreeMap<String, MockObject>>>,
Expand Down Expand Up @@ -107,6 +107,16 @@ impl MockClient {
self.objects.write().unwrap().remove(key);
}

/// Remove all objects for the mock client's bucket
pub fn remove_all_objects(&self) {
self.objects.write().unwrap().clear();
}

/// Number of objects in the mock client's bucket
pub fn object_count(&self) -> usize {
self.objects.write().unwrap().len()
}

/// Returns `true` if this mock client's bucket contains the specified key
pub fn contains_key(&self, key: &str) -> bool {
self.objects.read().unwrap().contains_key(key)
Expand Down
179 changes: 117 additions & 62 deletions mountpoint-s3/src/cli.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,10 +5,11 @@ use std::io::{Read, Write};
use std::num::NonZeroUsize;
use std::os::fd::AsRawFd;
use std::path::{Path, PathBuf};
use std::sync::Arc;
use std::time::Duration;

use anyhow::{anyhow, Context as _};
use clap::{value_parser, Parser, ValueEnum};
use clap::{value_parser, ArgGroup, Parser, ValueEnum};
use fuser::{MountOption, Session};
use futures::task::Spawn;
use mountpoint_s3_client::config::{AddressingStyle, EndpointConfig, S3ClientAuthConfig, S3ClientConfig};
Expand All @@ -26,7 +27,9 @@ use nix::unistd::ForkResult;
use regex::Regex;
use sysinfo::{RefreshKind, System};

use crate::data_cache::{CacheLimit, DiskDataCache, DiskDataCacheConfig, ExpressDataCache, ManagedCacheDir};
use crate::data_cache::{
CacheLimit, DiskDataCache, DiskDataCacheConfig, ExpressDataCache, ManagedCacheDir, MultilevelDataCache,
};
use crate::fs::{CacheConfig, ServerSideEncryption, TimeToLive};
use crate::fuse::session::FuseSession;
use crate::fuse::S3FuseFilesystem;
Expand All @@ -46,7 +49,15 @@ const CACHING_OPTIONS_HEADER: &str = "Caching options";
const ADVANCED_OPTIONS_HEADER: &str = "Advanced options";

#[derive(Parser, Debug)]
#[clap(name = "mount-s3", about = "Mountpoint for Amazon S3", version = build_info::FULL_VERSION)]
#[clap(
name = "mount-s3",
about = "Mountpoint for Amazon S3",
version = build_info::FULL_VERSION,
group(
ArgGroup::new("cache_group")
.multiple(true),
),
)]
pub struct CliArgs {
#[clap(help = "Name of bucket to mount", value_parser = parse_bucket_name)]
pub bucket_name: String,
Expand Down Expand Up @@ -298,10 +309,10 @@ pub struct CliArgs {
#[cfg(feature = "block_size")]
#[clap(
long,
help = "Size of a cache block in KiB [Default: 1024 (1 MiB) for disk cache, 512 (512 KiB) for S3 Express cache]",
help = "Size of a cache block in KiB [Default: 1024 (1 MiB) for disk cache and for S3 Express cache]",
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
help = "Size of a cache block in KiB [Default: 1024 (1 MiB) for disk cache and for S3 Express cache]",
help = "Size of a cache block in KiB [Default: 1024 (1 MiB)]",

help_heading = CACHING_OPTIONS_HEADER,
value_name = "KiB",
requires = "cache_group"
requires = "cache_group",
)]
pub cache_block_size: Option<u64>,

Expand Down Expand Up @@ -423,10 +434,7 @@ impl CliArgs {
if let Some(kib) = self.cache_block_size {
return kib * 1024;
}
if self.cache_express_bucket_name().is_some() {
return 512 * 1024; // 512 KiB block size - default for express cache
}
1024 * 1024 // 1 MiB block size - default for disk cache
1024 * 1024 // 1 MiB block size - default for disk cache and for express cache
}

fn cache_express_bucket_name(&self) -> Option<&str> {
Expand All @@ -437,6 +445,27 @@ impl CliArgs {
None
}

fn disk_data_cache_config(&self) -> Option<(&Path, DiskDataCacheConfig)> {
match self.cache.as_ref() {
Some(path) => {
let cache_limit = match self.max_cache_size {
// Fallback to no data cache.
Some(0) => return None,
vladem marked this conversation as resolved.
Show resolved Hide resolved
Some(max_size_in_mib) => CacheLimit::TotalSize {
max_size: (max_size_in_mib * 1024 * 1024) as usize,
},
None => CacheLimit::default(),
};
let cache_config = DiskDataCacheConfig {
block_size: self.cache_block_size_in_bytes(),
limit: cache_limit,
};
Some((path.as_path(), cache_config))
}
None => None,
}
}

/// Generates a logging configuration based on the CLI arguments.
///
/// This includes random string generation which can change with each invocation,
Expand Down Expand Up @@ -756,6 +785,17 @@ pub fn create_s3_client(args: &CliArgs) -> anyhow::Result<(S3CrtClient, EventLoo
Ok((client, runtime, s3_personality))
}

fn create_disk_cache(
cache_dir_path: &Path,
cache_config: DiskDataCacheConfig,
) -> anyhow::Result<(ManagedCacheDir, DiskDataCache)> {
let cache_key = env_unstable_cache_key();
let managed_cache_dir = ManagedCacheDir::new_from_parent_with_cache_key(cache_dir_path, cache_key)
.context("failed to create cache directory")?;
let cache_dir_path = managed_cache_dir.as_path_buf();
Ok((managed_cache_dir, DiskDataCache::new(cache_dir_path, cache_config)))
}

fn mount<ClientBuilder, Client, Runtime>(args: CliArgs, client_builder: ClientBuilder) -> anyhow::Result<FuseSession>
where
ClientBuilder: FnOnce(&CliArgs) -> anyhow::Result<(Client, Runtime, S3Personality)>,
Expand Down Expand Up @@ -840,26 +880,34 @@ where
tracing::trace!("using metadata TTL setting {metadata_cache_ttl:?}");
filesystem_config.cache_config = CacheConfig::new(metadata_cache_ttl);

if let Some(path) = &args.cache {
let cache_limit = match args.max_cache_size {
// Fallback to no data cache.
Some(0) => None,
Some(max_size_in_mib) => Some(CacheLimit::TotalSize {
max_size: (max_size_in_mib * 1024 * 1024) as usize,
}),
None => Some(CacheLimit::default()),
};
if let Some(cache_limit) = cache_limit {
let cache_config = DiskDataCacheConfig {
block_size: args.cache_block_size_in_bytes(),
limit: cache_limit,
};
let cache_key = env_unstable_cache_key();
let managed_cache_dir = ManagedCacheDir::new_from_parent_with_cache_key(path, cache_key)
.context("failed to create cache directory")?;
match (args.disk_data_cache_config(), args.cache_express_bucket_name()) {
(None, Some(express_bucket_name)) => {
tracing::trace!("using S3 Express One Zone bucket as a cache for object content");
let express_cache = ExpressDataCache::new(
express_bucket_name,
client.clone(),
&args.bucket_name,
args.cache_block_size_in_bytes(),
);

let cache = DiskDataCache::new(managed_cache_dir.as_path_buf(), cache_config);
let prefetcher = caching_prefetch(cache, runtime, prefetcher_config);
let prefetcher = caching_prefetch(express_cache, runtime, prefetcher_config);
let fuse_session = create_filesystem(
client,
prefetcher,
&args.bucket_name,
&args.prefix.unwrap_or_default(),
filesystem_config,
fuse_config,
&bucket_description,
)?;

Ok(fuse_session)
}
(Some((cache_dir_path, disk_data_cache_config)), None) => {
tracing::trace!("using local disk as a cache for object content");
let (managed_cache_dir, disk_cache) = create_disk_cache(cache_dir_path, disk_data_cache_config)?;

let prefetcher = caching_prefetch(disk_cache, runtime, prefetcher_config);
let mut fuse_session = create_filesystem(
client,
prefetcher,
Expand All @@ -874,43 +922,50 @@ where
drop(managed_cache_dir);
}));

return Ok(fuse_session);
Ok(fuse_session)
}
}
(Some((cache_dir_path, disk_data_cache_config)), Some(express_bucket_name)) => {
tracing::trace!("using both local disk and S3 Express One Zone bucket as a cache for object content");
let (managed_cache_dir, disk_cache) = create_disk_cache(cache_dir_path, disk_data_cache_config)?;
let express_cache = ExpressDataCache::new(
express_bucket_name,
client.clone(),
&args.bucket_name,
args.cache_block_size_in_bytes(),
);
let cache = MultilevelDataCache::new(Arc::new(disk_cache), express_cache, runtime.clone());

if let Some(express_bucket_name) = args.cache_express_bucket_name() {
// The cache can be shared across instances mounting the same bucket (including with different prefixes)
let source_description = &args.bucket_name;
let cache = ExpressDataCache::new(
express_bucket_name,
client.clone(),
source_description,
args.cache_block_size_in_bytes(),
);
let prefetcher = caching_prefetch(cache, runtime, prefetcher_config);
let fuse_session = create_filesystem(
client,
prefetcher,
&args.bucket_name,
&args.prefix.unwrap_or_default(),
filesystem_config,
fuse_config,
&bucket_description,
)?;

return Ok(fuse_session);
};
let prefetcher = caching_prefetch(cache, runtime, prefetcher_config);
let mut fuse_session = create_filesystem(
client,
prefetcher,
&args.bucket_name,
&args.prefix.unwrap_or_default(),
filesystem_config,
fuse_config,
&bucket_description,
)?;

let prefetcher = default_prefetch(runtime, prefetcher_config);
create_filesystem(
client,
prefetcher,
&args.bucket_name,
&args.prefix.unwrap_or_default(),
filesystem_config,
fuse_config,
&bucket_description,
)
fuse_session.run_on_close(Box::new(move || {
drop(managed_cache_dir);
}));

Ok(fuse_session)
}
_ => {
tracing::trace!("using no cache");
let prefetcher = default_prefetch(runtime, prefetcher_config);
create_filesystem(
client,
prefetcher,
&args.bucket_name,
&args.prefix.unwrap_or_default(),
filesystem_config,
fuse_config,
&bucket_description,
)
}
}
}

fn create_filesystem<Client, Prefetcher>(
Expand Down
2 changes: 2 additions & 0 deletions mountpoint-s3/src/data_cache.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ mod cache_directory;
mod disk_data_cache;
mod express_data_cache;
mod in_memory_data_cache;
mod multilevel_cache;

use async_trait::async_trait;
use thiserror::Error;
Expand All @@ -17,6 +18,7 @@ pub use crate::data_cache::cache_directory::ManagedCacheDir;
pub use crate::data_cache::disk_data_cache::{CacheLimit, DiskDataCache, DiskDataCacheConfig};
pub use crate::data_cache::express_data_cache::ExpressDataCache;
pub use crate::data_cache::in_memory_data_cache::InMemoryDataCache;
pub use crate::data_cache::multilevel_cache::MultilevelDataCache;

use crate::object::ObjectId;

Expand Down
19 changes: 11 additions & 8 deletions mountpoint-s3/src/data_cache/express_data_cache.rs
Original file line number Diff line number Diff line change
Expand Up @@ -82,15 +82,18 @@ where
// TODO: optimize for the common case of a single chunk.
let mut buffer = BytesMut::default();
while let Some(chunk) = result.next().await {
let (offset, body) = chunk?;
if offset != buffer.len() as u64 {
return Err(DataCacheError::InvalidBlockOffset);
match chunk {
Ok((offset, body)) => {
if offset != buffer.len() as u64 {
return Err(DataCacheError::InvalidBlockOffset);
}
buffer.extend_from_slice(&body);

result.as_mut().increment_read_window(self.block_size as usize);
passaro marked this conversation as resolved.
Show resolved Hide resolved
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This seems unnecessary now, doesn't it?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Happy to keep it for now and review when we optimize for the single chunk case (see TODO above).

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We'll need to account for a case when the block object is larger than block_size for some reason. If we've just removed this line the read may freeze. If we've kept it as it is now MP may attempt to read an unbounded amount of data to RAM.

This requires a bit more thinking, so I agree that it's better to address in the following PR.

}
Err(ObjectClientError::ServiceError(GetObjectError::NoSuchKey)) => return Ok(None),
Err(e) => return Err(e.into()),
}
buffer.extend_from_slice(&body);

// Ensure the flow-control window is large enough.
// TODO: review if/when we add a header to the block.
result.as_mut().increment_read_window(self.block_size as usize);
}
let buffer = buffer.freeze();
DataCacheResult::Ok(Some(buffer.into()))
Expand Down
Loading
Loading