Skip to content

Commit

Permalink
Integrate cache in Prefetcher
Browse files Browse the repository at this point in the history
Signed-off-by: Alessandro Passaro <[email protected]>
  • Loading branch information
passaro committed Nov 6, 2023
1 parent 763e02a commit feff7bc
Show file tree
Hide file tree
Showing 7 changed files with 710 additions and 15 deletions.
38 changes: 38 additions & 0 deletions mountpoint-s3/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -239,6 +239,28 @@ struct CliArgs {
)]
pub metadata_cache_ttl: Option<Duration>,

// TODO: Temporary for testing. Review before exposing outside "caching" feature.
#[cfg(feature = "caching")]
#[clap(
long,
help = "Enable caching of object data in a directory",
help_heading = CACHING_OPTIONS_HEADER,
value_name = "DIRECTORY",
)]
pub data_caching_directory: Option<PathBuf>,

// TODO: Temporary for testing. Review before exposing outside "caching" feature.
#[cfg(feature = "caching")]
#[clap(
long,
help = "Block size for the data cache",
default_value = "1048576",
value_parser = value_parser!(u64).range(1..),
help_heading = CACHING_OPTIONS_HEADER,
requires = "data_caching_directory",
)]
pub data_cache_block_size: u64,

#[clap(
long,
help = "Configure a string to be prepended to the 'User-Agent' HTTP request header for all S3 requests",
Expand Down Expand Up @@ -537,7 +559,9 @@ fn mount(args: CliArgs) -> anyhow::Result<FuseSession> {

#[cfg(feature = "caching")]
{
use mountpoint_s3::data_cache::DiskDataCache;
use mountpoint_s3::fs::CacheConfig;
use mountpoint_s3::prefetch::caching_prefetch;

if args.enable_metadata_caching {
// TODO: Review default for TTL
Expand All @@ -548,6 +572,20 @@ fn mount(args: CliArgs) -> anyhow::Result<FuseSession> {
file_ttl: metadata_cache_ttl,
};
}

if let Some(path) = args.data_caching_directory {
let cache = DiskDataCache::new(path, args.data_cache_block_size);
let prefetcher = caching_prefetch(cache, runtime, prefetcher_config);
return create_filesystem(
client,
prefetcher,
&args.bucket_name,
&prefix,
filesystem_config,
fuse_config,
&bucket_description,
);
}
}

let prefetcher = default_prefetch(runtime, prefetcher_config);
Expand Down
101 changes: 88 additions & 13 deletions mountpoint-s3/src/prefetch.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
//! we increase the size of the GetObject requests up to some maximum. If the reader ever makes a
//! non-sequential read, we abandon the prefetching and start again with the minimum request size.
mod caching_stream;
mod part;
mod part_queue;
mod part_stream;
Expand All @@ -27,6 +28,8 @@ use thiserror::Error;
use tracing::trace;

use crate::checksums::{ChecksummedBytes, IntegrityError};
use crate::data_cache::DataCache;
use crate::prefetch::caching_stream::CachingPartStream;
use crate::prefetch::part_stream::{ClientPartStream, ObjectPartStream, RequestRange};
use crate::prefetch::seek_window::SeekWindow;
use crate::prefetch::task::RequestTask;
Expand Down Expand Up @@ -84,6 +87,21 @@ where
Prefetcher::new(part_stream, prefetcher_config)
}

pub type CachingPrefetcher<Cache, Runtime> = Prefetcher<CachingPartStream<Cache, Runtime>>;

pub fn caching_prefetch<Cache, Runtime>(
cache: Cache,
runtime: Runtime,
prefetcher_config: PrefetcherConfig,
) -> CachingPrefetcher<Cache, Runtime>
where
Cache: DataCache + Send + Sync + 'static,
Runtime: Spawn + Send + Sync + 'static,
{
let part_stream = CachingPartStream::new(runtime, cache);
Prefetcher::new(part_stream, prefetcher_config)
}

#[derive(Debug, Clone, Copy)]
pub struct PrefetcherConfig {
/// Size of the first request in a prefetch run
Expand Down Expand Up @@ -504,6 +522,10 @@ mod tests {
// It's convenient to write test constants like "1 * 1024 * 1024" for symmetry
#![allow(clippy::identity_op)]

use crate::data_cache::InMemoryDataCache;
use crate::prefetch::part_stream::ClientPartStream;

use super::caching_stream::CachingPartStream;
use super::*;
use futures::executor::{block_on, ThreadPool};
use mountpoint_s3_client::error::{GetObjectError, ObjectClientError};
Expand All @@ -515,6 +537,8 @@ mod tests {
use std::collections::HashMap;
use test_case::test_case;

const MB: usize = 1024 * 1024;

#[derive(Debug, Arbitrary)]
struct TestConfig {
#[proptest(strategy = "16usize..1*1024*1024")]
Expand All @@ -536,6 +560,12 @@ mod tests {
ClientPartStream::new(runtime)
}

fn caching_stream(block_size: usize) -> CachingPartStream<InMemoryDataCache, ThreadPool> {
let runtime = ThreadPool::builder().pool_size(1).create().unwrap();
let cache = InMemoryDataCache::new(block_size as u64);
CachingPartStream::new(runtime, cache)
}

fn run_sequential_read_test<Stream: ObjectPartStream + Send + Sync + 'static>(
part_stream: Stream,
size: u64,
Expand Down Expand Up @@ -578,8 +608,12 @@ mod tests {
assert_eq!(next_offset, size);
}

#[test]
fn sequential_read_small() {
#[test_case(default_stream())]
#[test_case(caching_stream(1 * MB))]
fn sequential_read_small<Stream>(part_stream: Stream)
where
Stream: ObjectPartStream + Send + Sync + 'static,
{
let config = TestConfig {
first_request_size: 256 * 1024,
max_request_size: 1024 * 1024 * 1024,
Expand All @@ -588,11 +622,15 @@ mod tests {
max_forward_seek_distance: 16 * 1024 * 1024,
max_backward_seek_distance: 2 * 1024 * 1024,
};
run_sequential_read_test(default_stream(), 1024 * 1024 + 111, 1024 * 1024, config);
run_sequential_read_test(part_stream, 1024 * 1024 + 111, 1024 * 1024, config);
}

#[test]
fn sequential_read_medium() {
#[test_case(default_stream())]
#[test_case(caching_stream(1 * MB))]
fn sequential_read_medium<Stream>(part_stream: Stream)
where
Stream: ObjectPartStream + Send + Sync + 'static,
{
let config = TestConfig {
first_request_size: 256 * 1024,
max_request_size: 64 * 1024 * 1024,
Expand All @@ -601,11 +639,15 @@ mod tests {
max_forward_seek_distance: 16 * 1024 * 1024,
max_backward_seek_distance: 2 * 1024 * 1024,
};
run_sequential_read_test(default_stream(), 16 * 1024 * 1024 + 111, 1024 * 1024, config);
run_sequential_read_test(part_stream, 16 * 1024 * 1024 + 111, 1024 * 1024, config);
}

#[test]
fn sequential_read_large() {
#[test_case(default_stream())]
#[test_case(caching_stream(1 * MB))]
fn sequential_read_large<Stream>(part_stream: Stream)
where
Stream: ObjectPartStream + Send + Sync + 'static,
{
let config = TestConfig {
first_request_size: 256 * 1024,
max_request_size: 64 * 1024 * 1024,
Expand All @@ -614,7 +656,8 @@ mod tests {
max_forward_seek_distance: 16 * 1024 * 1024,
max_backward_seek_distance: 2 * 1024 * 1024,
};
run_sequential_read_test(default_stream(), 256 * 1024 * 1024 + 111, 1024 * 1024, config);

run_sequential_read_test(part_stream, 256 * 1024 * 1024 + 111, 1024 * 1024, config);
}

fn fail_sequential_read_test<Stream: ObjectPartStream + Send + Sync + 'static>(
Expand Down Expand Up @@ -664,10 +707,15 @@ mod tests {
assert!(next_offset < size); // Since we're injecting failures, shouldn't make it to the end
}

#[test_case("invalid range; length=42")]
#[test_case("invalid range; length=42", default_stream())]
#[test_case("invalid range; length=42", caching_stream(1 * MB))]
// test case for the request failure due to etag not matching
#[test_case("At least one of the pre-conditions you specified did not hold")]
fn fail_request_sequential_small(err_value: &str) {
#[test_case("At least one of the pre-conditions you specified did not hold", default_stream())]
#[test_case("At least one of the pre-conditions you specified did not hold", caching_stream(1 * MB))]
fn fail_request_sequential_small<Stream>(err_value: &str, part_stream: Stream)
where
Stream: ObjectPartStream + Send + Sync + 'static,
{
let config = TestConfig {
first_request_size: 256 * 1024,
max_request_size: 1024 * 1024 * 1024,
Expand All @@ -685,7 +733,7 @@ mod tests {
))),
);

fail_sequential_read_test(default_stream(), 1024 * 1024 + 111, 1024 * 1024, config, get_failures);
fail_sequential_read_test(part_stream, 1024 * 1024 + 111, 1024 * 1024, config, get_failures);
}

proptest! {
Expand All @@ -703,6 +751,23 @@ mod tests {
let read_size = (size as usize / read_factor).max(1);
run_sequential_read_test(default_stream(), size, read_size, config);
}

#[test]
fn proptest_sequential_read_with_cache(
size in 1u64..1 * 1024 * 1024,
read_size in 1usize..1 * 1024 * 1024,
block_size in 16usize..1 * 1024 * 1024,
config: TestConfig,
) {
run_sequential_read_test(caching_stream(block_size), size, read_size, config);
}

#[test]
fn proptest_sequential_read_small_read_size_with_cache(size in 1u64..1 * 1024 * 1024, read_factor in 1usize..10,
block_size in 16usize..1 * 1024 * 1024, config: TestConfig) {
let read_size = (size as usize / read_factor).max(1);
run_sequential_read_test(caching_stream(block_size), size, read_size, config);
}
}

#[test]
Expand Down Expand Up @@ -792,6 +857,16 @@ mod tests {
let (object_size, reads) = reads;
run_random_read_test(default_stream(), object_size, reads, config);
}

#[test]
fn proptest_random_read_with_cache(
reads in random_read_strategy(1 * 1024 * 1024),
block_size in 16usize..1 * 1024 * 1024,
config: TestConfig,
) {
let (object_size, reads) = reads;
run_random_read_test(caching_stream(block_size), object_size, reads, config);
}
}

#[test]
Expand Down
Loading

0 comments on commit feff7bc

Please sign in to comment.