From feff7bcbcedadaaa9b91c0bd9007093caef5484d Mon Sep 17 00:00:00 2001 From: Alessandro Passaro Date: Fri, 3 Nov 2023 14:12:43 +0000 Subject: [PATCH] Integrate cache in Prefetcher Signed-off-by: Alessandro Passaro --- mountpoint-s3/src/main.rs | 38 ++ mountpoint-s3/src/prefetch.rs | 101 ++++- mountpoint-s3/src/prefetch/caching_stream.rs | 422 ++++++++++++++++++ .../tests/fuse_tests/consistency_test.rs | 20 + mountpoint-s3/tests/fuse_tests/mod.rs | 76 +++- .../tests/fuse_tests/prefetch_test.rs | 49 ++ mountpoint-s3/tests/fuse_tests/read_test.rs | 19 + 7 files changed, 710 insertions(+), 15 deletions(-) create mode 100644 mountpoint-s3/src/prefetch/caching_stream.rs diff --git a/mountpoint-s3/src/main.rs b/mountpoint-s3/src/main.rs index 9ec735a6b..a652c8d8d 100644 --- a/mountpoint-s3/src/main.rs +++ b/mountpoint-s3/src/main.rs @@ -239,6 +239,28 @@ struct CliArgs { )] pub metadata_cache_ttl: Option, + // TODO: Temporary for testing. Review before exposing outside "caching" feature. + #[cfg(feature = "caching")] + #[clap( + long, + help = "Enable caching of object data in a directory", + help_heading = CACHING_OPTIONS_HEADER, + value_name = "DIRECTORY", + )] + pub data_caching_directory: Option, + + // TODO: Temporary for testing. Review before exposing outside "caching" feature. + #[cfg(feature = "caching")] + #[clap( + long, + help = "Block size for the data cache", + default_value = "1048576", + value_parser = value_parser!(u64).range(1..), + help_heading = CACHING_OPTIONS_HEADER, + requires = "data_caching_directory", + )] + pub data_cache_block_size: u64, + #[clap( long, help = "Configure a string to be prepended to the 'User-Agent' HTTP request header for all S3 requests", @@ -537,7 +559,9 @@ fn mount(args: CliArgs) -> anyhow::Result { #[cfg(feature = "caching")] { + use mountpoint_s3::data_cache::DiskDataCache; use mountpoint_s3::fs::CacheConfig; + use mountpoint_s3::prefetch::caching_prefetch; if args.enable_metadata_caching { // TODO: Review default for TTL @@ -548,6 +572,20 @@ fn mount(args: CliArgs) -> anyhow::Result { file_ttl: metadata_cache_ttl, }; } + + if let Some(path) = args.data_caching_directory { + let cache = DiskDataCache::new(path, args.data_cache_block_size); + let prefetcher = caching_prefetch(cache, runtime, prefetcher_config); + return create_filesystem( + client, + prefetcher, + &args.bucket_name, + &prefix, + filesystem_config, + fuse_config, + &bucket_description, + ); + } } let prefetcher = default_prefetch(runtime, prefetcher_config); diff --git a/mountpoint-s3/src/prefetch.rs b/mountpoint-s3/src/prefetch.rs index de2eef18a..65073d825 100644 --- a/mountpoint-s3/src/prefetch.rs +++ b/mountpoint-s3/src/prefetch.rs @@ -7,6 +7,7 @@ //! we increase the size of the GetObject requests up to some maximum. If the reader ever makes a //! non-sequential read, we abandon the prefetching and start again with the minimum request size. +mod caching_stream; mod part; mod part_queue; mod part_stream; @@ -27,6 +28,8 @@ use thiserror::Error; use tracing::trace; use crate::checksums::{ChecksummedBytes, IntegrityError}; +use crate::data_cache::DataCache; +use crate::prefetch::caching_stream::CachingPartStream; use crate::prefetch::part_stream::{ClientPartStream, ObjectPartStream, RequestRange}; use crate::prefetch::seek_window::SeekWindow; use crate::prefetch::task::RequestTask; @@ -84,6 +87,21 @@ where Prefetcher::new(part_stream, prefetcher_config) } +pub type CachingPrefetcher = Prefetcher>; + +pub fn caching_prefetch( + cache: Cache, + runtime: Runtime, + prefetcher_config: PrefetcherConfig, +) -> CachingPrefetcher +where + Cache: DataCache + Send + Sync + 'static, + Runtime: Spawn + Send + Sync + 'static, +{ + let part_stream = CachingPartStream::new(runtime, cache); + Prefetcher::new(part_stream, prefetcher_config) +} + #[derive(Debug, Clone, Copy)] pub struct PrefetcherConfig { /// Size of the first request in a prefetch run @@ -504,6 +522,10 @@ mod tests { // It's convenient to write test constants like "1 * 1024 * 1024" for symmetry #![allow(clippy::identity_op)] + use crate::data_cache::InMemoryDataCache; + use crate::prefetch::part_stream::ClientPartStream; + + use super::caching_stream::CachingPartStream; use super::*; use futures::executor::{block_on, ThreadPool}; use mountpoint_s3_client::error::{GetObjectError, ObjectClientError}; @@ -515,6 +537,8 @@ mod tests { use std::collections::HashMap; use test_case::test_case; + const MB: usize = 1024 * 1024; + #[derive(Debug, Arbitrary)] struct TestConfig { #[proptest(strategy = "16usize..1*1024*1024")] @@ -536,6 +560,12 @@ mod tests { ClientPartStream::new(runtime) } + fn caching_stream(block_size: usize) -> CachingPartStream { + let runtime = ThreadPool::builder().pool_size(1).create().unwrap(); + let cache = InMemoryDataCache::new(block_size as u64); + CachingPartStream::new(runtime, cache) + } + fn run_sequential_read_test( part_stream: Stream, size: u64, @@ -578,8 +608,12 @@ mod tests { assert_eq!(next_offset, size); } - #[test] - fn sequential_read_small() { + #[test_case(default_stream())] + #[test_case(caching_stream(1 * MB))] + fn sequential_read_small(part_stream: Stream) + where + Stream: ObjectPartStream + Send + Sync + 'static, + { let config = TestConfig { first_request_size: 256 * 1024, max_request_size: 1024 * 1024 * 1024, @@ -588,11 +622,15 @@ mod tests { max_forward_seek_distance: 16 * 1024 * 1024, max_backward_seek_distance: 2 * 1024 * 1024, }; - run_sequential_read_test(default_stream(), 1024 * 1024 + 111, 1024 * 1024, config); + run_sequential_read_test(part_stream, 1024 * 1024 + 111, 1024 * 1024, config); } - #[test] - fn sequential_read_medium() { + #[test_case(default_stream())] + #[test_case(caching_stream(1 * MB))] + fn sequential_read_medium(part_stream: Stream) + where + Stream: ObjectPartStream + Send + Sync + 'static, + { let config = TestConfig { first_request_size: 256 * 1024, max_request_size: 64 * 1024 * 1024, @@ -601,11 +639,15 @@ mod tests { max_forward_seek_distance: 16 * 1024 * 1024, max_backward_seek_distance: 2 * 1024 * 1024, }; - run_sequential_read_test(default_stream(), 16 * 1024 * 1024 + 111, 1024 * 1024, config); + run_sequential_read_test(part_stream, 16 * 1024 * 1024 + 111, 1024 * 1024, config); } - #[test] - fn sequential_read_large() { + #[test_case(default_stream())] + #[test_case(caching_stream(1 * MB))] + fn sequential_read_large(part_stream: Stream) + where + Stream: ObjectPartStream + Send + Sync + 'static, + { let config = TestConfig { first_request_size: 256 * 1024, max_request_size: 64 * 1024 * 1024, @@ -614,7 +656,8 @@ mod tests { max_forward_seek_distance: 16 * 1024 * 1024, max_backward_seek_distance: 2 * 1024 * 1024, }; - run_sequential_read_test(default_stream(), 256 * 1024 * 1024 + 111, 1024 * 1024, config); + + run_sequential_read_test(part_stream, 256 * 1024 * 1024 + 111, 1024 * 1024, config); } fn fail_sequential_read_test( @@ -664,10 +707,15 @@ mod tests { assert!(next_offset < size); // Since we're injecting failures, shouldn't make it to the end } - #[test_case("invalid range; length=42")] + #[test_case("invalid range; length=42", default_stream())] + #[test_case("invalid range; length=42", caching_stream(1 * MB))] // test case for the request failure due to etag not matching - #[test_case("At least one of the pre-conditions you specified did not hold")] - fn fail_request_sequential_small(err_value: &str) { + #[test_case("At least one of the pre-conditions you specified did not hold", default_stream())] + #[test_case("At least one of the pre-conditions you specified did not hold", caching_stream(1 * MB))] + fn fail_request_sequential_small(err_value: &str, part_stream: Stream) + where + Stream: ObjectPartStream + Send + Sync + 'static, + { let config = TestConfig { first_request_size: 256 * 1024, max_request_size: 1024 * 1024 * 1024, @@ -685,7 +733,7 @@ mod tests { ))), ); - fail_sequential_read_test(default_stream(), 1024 * 1024 + 111, 1024 * 1024, config, get_failures); + fail_sequential_read_test(part_stream, 1024 * 1024 + 111, 1024 * 1024, config, get_failures); } proptest! { @@ -703,6 +751,23 @@ mod tests { let read_size = (size as usize / read_factor).max(1); run_sequential_read_test(default_stream(), size, read_size, config); } + + #[test] + fn proptest_sequential_read_with_cache( + size in 1u64..1 * 1024 * 1024, + read_size in 1usize..1 * 1024 * 1024, + block_size in 16usize..1 * 1024 * 1024, + config: TestConfig, + ) { + run_sequential_read_test(caching_stream(block_size), size, read_size, config); + } + + #[test] + fn proptest_sequential_read_small_read_size_with_cache(size in 1u64..1 * 1024 * 1024, read_factor in 1usize..10, + block_size in 16usize..1 * 1024 * 1024, config: TestConfig) { + let read_size = (size as usize / read_factor).max(1); + run_sequential_read_test(caching_stream(block_size), size, read_size, config); + } } #[test] @@ -792,6 +857,16 @@ mod tests { let (object_size, reads) = reads; run_random_read_test(default_stream(), object_size, reads, config); } + + #[test] + fn proptest_random_read_with_cache( + reads in random_read_strategy(1 * 1024 * 1024), + block_size in 16usize..1 * 1024 * 1024, + config: TestConfig, + ) { + let (object_size, reads) = reads; + run_random_read_test(caching_stream(block_size), object_size, reads, config); + } } #[test] diff --git a/mountpoint-s3/src/prefetch/caching_stream.rs b/mountpoint-s3/src/prefetch/caching_stream.rs new file mode 100644 index 000000000..0b57d0c2b --- /dev/null +++ b/mountpoint-s3/src/prefetch/caching_stream.rs @@ -0,0 +1,422 @@ +use std::time::Instant; +use std::{ops::Range, sync::Arc}; + +use bytes::Bytes; +use futures::task::{Spawn, SpawnExt}; +use futures::{pin_mut, StreamExt}; +use mountpoint_s3_client::{types::ETag, ObjectClient}; +use tracing::{debug_span, error, trace, warn, Instrument}; + +use crate::checksums::ChecksummedBytes; +use crate::data_cache::{BlockIndex, CacheKey, DataCache}; +use crate::prefetch::part::Part; +use crate::prefetch::part_queue::{unbounded_part_queue, PartQueueProducer}; +use crate::prefetch::part_stream::{ObjectPartStream, RequestRange}; +use crate::prefetch::task::RequestTask; +use crate::prefetch::PrefetchReadError; + +/// [ObjectPartStream] implementation which maintains a [DataCache] for the object data +/// retrieved by an [ObjectClient]. +#[derive(Debug)] +pub struct CachingPartStream { + cache: Arc, + runtime: Runtime, +} + +impl CachingPartStream { + pub fn new(runtime: Runtime, cache: Cache) -> Self { + Self { + cache: Arc::new(cache), + runtime, + } + } +} + +impl ObjectPartStream for CachingPartStream +where + Cache: DataCache + Send + Sync + 'static, + Runtime: Spawn, +{ + fn spawn_get_object_request( + &self, + client: &Client, + bucket: &str, + key: &str, + if_match: ETag, + range: RequestRange, + _preferred_part_size: usize, + ) -> RequestTask<::ClientError> + where + Client: ObjectClient + Clone + Send + Sync + 'static, + { + let range = get_aligned_request_range(range, self.cache.block_size()); + + let start = range.start(); + let size = range.len(); + + let (part_queue, part_queue_producer) = unbounded_part_queue(); + trace!(?range, "spawning request"); + + let request_task = { + let request = CachingRequest::new( + client.clone(), + self.cache.clone(), + bucket.to_owned(), + key.to_owned(), + if_match, + part_queue_producer, + ); + let span = debug_span!("prefetch", ?range); + async move { + request.get_from_cache(range).await; + } + .instrument(span) + }; + + let task_handle = self.runtime.spawn_with_handle(request_task).unwrap(); + + RequestTask::from_handle(task_handle, size, start, part_queue) + } +} + +fn get_aligned_request_range(range: RequestRange, block_size: u64) -> RequestRange { + let object_size = range.object_size(); + let offset = range.start(); + let preferred_size = range.len(); + + // If the request size is bigger than a block size we will try to align it to block boundaries. + let offset_in_part = offset % block_size; + let size = if offset_in_part != 0 { + // if the offset is not at the start of the part we will drain all the bytes from that part first + let remaining_in_part = block_size - offset_in_part; + preferred_size.min(remaining_in_part as usize) + } else { + // if the request size is smaller than the block size, just return the block size + if preferred_size < block_size as usize { + block_size as usize + } else { + // if it exceeds block boundaries, trim it to the block boundaries + let request_boundary = offset + preferred_size as u64; + let remainder = request_boundary % block_size; + if remainder != 0 { + preferred_size + (block_size - remainder) as usize + } else { + preferred_size + } + } + }; + RequestRange::new(object_size, offset, size) +} + +#[derive(Debug)] +struct CachingRequest { + client: Client, + cache: Arc, + bucket: String, + cache_key: CacheKey, + part_queue_producer: PartQueueProducer, +} + +impl CachingRequest +where + Client: ObjectClient + Send + Sync + 'static, + Cache: DataCache + Send + Sync, +{ + fn new( + client: Client, + cache: Arc, + bucket: String, + key: String, + etag: ETag, + part_queue_producer: PartQueueProducer, + ) -> Self { + let cache_key = CacheKey { s3_key: key, etag }; + Self { + client, + cache, + bucket, + cache_key, + part_queue_producer, + } + } + + async fn get_from_cache(&self, range: RequestRange) { + let key = &self.cache_key.s3_key; + let block_size = self.cache.block_size(); + let block_range = self.block_indices_for_byte_range(&range); + + // TODO: consider starting GetObject requests pre-emptively if cache blocks are missing + for block_index in block_range.clone() { + match self.cache.get_block(&self.cache_key, block_index) { + Ok(Some(block)) => { + trace!(?key, ?range, block_index, "cache hit"); + let part = self.make_part(block, block_index, &range); + metrics::counter!("cache.total_bytes", part.len() as u64, "type" => "read"); + self.part_queue_producer.push(Ok(part)); + } + Ok(None) => { + trace!(?key, ?range, block_index, "cache miss - no data for block"); + return self + .get_from_client(range.trim_start(block_index * block_size), block_index..block_range.end) + .await; + } + Err(error) => { + trace!(?key, ?range, block_index, ?error, "error reading block from cache"); + return self + .get_from_client(range.trim_start(block_index * block_size), block_index..block_range.end) + .await; + } + } + } + } + + async fn get_from_client(&self, range: RequestRange, block_range: Range) { + let key = &self.cache_key.s3_key; + let block_size = self.cache.block_size(); + assert!(block_size > 0); + + let block_aligned_byte_range = + (block_range.start * block_size)..(block_range.end * block_size).min(range.object_size() as u64); + + trace!( + ?key, + range =? block_aligned_byte_range, + original_range =? range, + "fetching data from client" + ); + let get_object_result = match self + .client + .get_object( + &self.bucket, + key, + Some(block_aligned_byte_range), + Some(self.cache_key.etag.clone()), + ) + .await + { + Ok(get_object_result) => get_object_result, + Err(e) => { + error!(key, error=?e, "GetObject request failed"); + self.part_queue_producer + .push(Err(PrefetchReadError::GetRequestFailed(e))); + return; + } + }; + + pin_mut!(get_object_result); + let mut block_index = block_range.start; + let mut buffer = ChecksummedBytes::default(); + loop { + match get_object_result.next().await { + Some(Ok((offset, body))) => { + trace!(offset, length = body.len(), "received GetObject part"); + + // Split the body into blocks. + let mut body: Bytes = body.into(); + while !body.is_empty() { + let remaining = (block_size as usize).saturating_sub(buffer.len()).min(body.len()); + let chunk = body.split_to(remaining); + if let Err(e) = buffer.extend(chunk.into()) { + error!(key, error=?e, "Integrity check failed"); + self.part_queue_producer.push(Err(e.into())); + return; + } + if buffer.len() < block_size as usize { + break; + } + self.update_cache(block_index, &buffer); + self.part_queue_producer + .push(Ok(self.make_part(buffer, block_index, &range))); + block_index += 1; + buffer = ChecksummedBytes::default(); + } + } + Some(Err(e)) => { + error!(key, error=?e, "GetObject body part failed"); + self.part_queue_producer + .push(Err(PrefetchReadError::GetRequestFailed(e))); + break; + } + None => { + if !buffer.is_empty() { + if buffer.len() + (block_index * block_size) as usize == range.object_size() { + // Write last block to the cache. + self.update_cache(block_index, &buffer); + } + self.part_queue_producer + .push(Ok(self.make_part(buffer, block_index, &range))); + } + break; + } + } + } + trace!("request finished"); + } + + fn update_cache(&self, block_index: u64, block: &ChecksummedBytes) { + // TODO: consider updating the cache asynchronously + let start = Instant::now(); + match self.cache.put_block(self.cache_key.clone(), block_index, block.clone()) { + Ok(()) => { + metrics::histogram!("cache.write_duration_us", start.elapsed().as_micros() as f64); + metrics::counter!("cache.total_bytes", block.len() as u64, "type" => "write"); + } + Err(error) => { + warn!(key=?self.cache_key.s3_key, block_index, ?error, "failed to update cache"); + } + }; + } + + fn make_part(&self, block: ChecksummedBytes, block_index: u64, range: &RequestRange) -> Part { + let key = &self.cache_key.s3_key; + let block_offset = block_index * self.cache.block_size(); + let block_size = block.len(); + let part_range = range + .trim_start(block_offset) + .trim_end(block_offset + block_size as u64); + trace!( + key, + ?part_range, + block_index, + block_offset, + block_size, + "creating part from block" + ); + + let trim_start = (part_range.start().saturating_sub(block_offset)) as usize; + let trim_end = (part_range.end().saturating_sub(block_offset)) as usize; + let bytes = block.slice(trim_start..trim_end); + Part::new(key, part_range.start(), bytes) + } + + fn block_indices_for_byte_range(&self, range: &RequestRange) -> Range { + let block_size = self.cache.block_size(); + let start_block = range.start() / block_size; + let mut end_block = range.end() / block_size; + if !range.is_empty() && range.end() % block_size != 0 { + end_block += 1; + } + + start_block..end_block + } +} + +#[cfg(test)] +mod tests { + // It's convenient to write test constants like "1 * 1024 * 1024" for symmetry + #![allow(clippy::identity_op)] + + use futures::executor::{block_on, ThreadPool}; + use mountpoint_s3_client::mock_client::{MockClient, MockClientConfig, MockObject, Operation}; + use test_case::test_case; + + use crate::data_cache::InMemoryDataCache; + + use super::*; + + const KB: usize = 1024; + const MB: usize = 1024 * 1024; + + #[test_case(1 * MB, 8 * MB, 16 * MB, 0, 16 * MB; "whole object")] + #[test_case(1 * MB, 8 * MB, 16 * MB, 1 * MB, 3 * MB + 512 * KB; "aligned offset")] + #[test_case(1 * MB, 8 * MB, 16 * MB, 512 * KB, 3 * MB; "non-aligned range")] + #[test_case(3 * MB, 8 * MB, 14 * MB, 0, 14 * MB; "whole object, size not aligned to parts or blocks")] + #[test_case(3 * MB, 8 * MB, 14 * MB, 9 * MB, 100 * MB; "aligned offset, size not aligned to parts or blocks")] + #[test_case(1 * MB, 8 * MB, 100 * KB, 0, 100 * KB; "small object")] + #[test_case(8 * MB, 5 * MB, 16 * MB, 0, 16 * MB; "cache blocks larger than client parts")] + fn test_read_from_cache( + block_size: usize, + client_part_size: usize, + object_size: usize, + offset: usize, + preferred_size: usize, + ) { + let key = "object"; + let seed = 0xaa; + let object = MockObject::ramp(seed, object_size, ETag::for_tests()); + let etag = object.etag(); + + let cache = InMemoryDataCache::new(block_size as u64); + let bucket = "test-bucket"; + let config = MockClientConfig { + bucket: bucket.to_string(), + part_size: client_part_size, + }; + let mock_client = Arc::new(MockClient::new(config)); + mock_client.add_object(key, object.clone()); + + let runtime = ThreadPool::builder().pool_size(1).create().unwrap(); + let stream = CachingPartStream::new(runtime, cache); + let range = RequestRange::new(object_size, offset as u64, preferred_size); + + let first_read_count = { + // First request (from client) + let get_object_counter = mock_client.new_counter(Operation::GetObject); + let request_task = stream.spawn_get_object_request(&mock_client, bucket, key, etag.clone(), range, 0); + compare_read(key, &object, request_task); + get_object_counter.count() + }; + assert!(first_read_count > 0); + + let second_read_count = { + // Second request (from cache) + let get_object_counter = mock_client.new_counter(Operation::GetObject); + let request_task = stream.spawn_get_object_request(&mock_client, bucket, key, etag.clone(), range, 0); + compare_read(key, &object, request_task); + get_object_counter.count() + }; + assert_eq!(second_read_count, 0); + } + + #[test_case(1 * MB, 8 * MB)] + #[test_case(8 * MB, 8 * MB)] + #[test_case(1 * MB, 5 * MB + 1)] + #[test_case(1 * MB + 1, 5 * MB)] + fn test_get_object_parts(block_size: usize, client_part_size: usize) { + let key = "object"; + let object_size = 16 * MB; + let seed = 0xaa; + let object = MockObject::ramp(seed, object_size, ETag::for_tests()); + let etag = object.etag(); + + let cache = InMemoryDataCache::new(block_size as u64); + let bucket = "test-bucket"; + let config = MockClientConfig { + bucket: bucket.to_string(), + part_size: client_part_size, + }; + let mock_client = Arc::new(MockClient::new(config)); + mock_client.add_object(key, object.clone()); + + let runtime = ThreadPool::builder().pool_size(1).create().unwrap(); + let stream = CachingPartStream::new(runtime, cache); + + for offset in [0, 512 * KB, 1 * MB, 4 * MB, 9 * MB] { + for preferred_size in [1 * KB, 512 * KB, 4 * MB, 12 * MB, 16 * MB] { + let range = RequestRange::new(object_size, offset as u64, preferred_size); + let request_task = stream.spawn_get_object_request(&mock_client, bucket, key, etag.clone(), range, 0); + compare_read(key, &object, request_task); + } + } + } + + fn compare_read( + key: &str, + object: &MockObject, + mut request_task: RequestTask, + ) { + let mut offset = request_task.start_offset(); + let mut remaining = request_task.total_size(); + while remaining > 0 { + let part = block_on(request_task.read(remaining)).unwrap(); + let bytes = part.into_bytes(key, offset).unwrap(); + + let expected = object.read(offset, bytes.len()); + let bytes = bytes.into_bytes().unwrap(); + assert_eq!(bytes, *expected); + + offset += bytes.len() as u64; + remaining -= bytes.len(); + } + } +} diff --git a/mountpoint-s3/tests/fuse_tests/consistency_test.rs b/mountpoint-s3/tests/fuse_tests/consistency_test.rs index 27deb154e..393d6f433 100644 --- a/mountpoint-s3/tests/fuse_tests/consistency_test.rs +++ b/mountpoint-s3/tests/fuse_tests/consistency_test.rs @@ -5,6 +5,8 @@ use fuser::BackgroundSession; use tempfile::TempDir; use test_case::test_case; +use mountpoint_s3::data_cache::InMemoryDataCache; + use crate::fuse_tests::{TestClientBox, TestSessionConfig}; fn page_cache_sharing_test(creator_fn: F, prefix: &str) @@ -58,8 +60,26 @@ fn page_cache_sharing_test_s3() { page_cache_sharing_test(crate::fuse_tests::s3_session::new, "page_cache_sharing_test"); } +#[cfg(feature = "s3_tests")] +#[test] +fn page_cache_sharing_test_s3_with_cache() { + page_cache_sharing_test( + crate::fuse_tests::s3_session::new_with_cache(InMemoryDataCache::new(1024 * 1024)), + "page_cache_sharing_test", + ); +} + #[test_case(""; "no prefix")] #[test_case("page_cache_sharing_test"; "prefix")] fn page_cache_sharing_test_mock(prefix: &str) { page_cache_sharing_test(crate::fuse_tests::mock_session::new, prefix); } + +#[test_case(""; "no prefix")] +#[test_case("page_cache_sharing_test"; "prefix")] +fn page_cache_sharing_test_mock_with_cache(prefix: &str) { + page_cache_sharing_test( + crate::fuse_tests::mock_session::new_with_cache(InMemoryDataCache::new(1024 * 1024)), + prefix, + ); +} diff --git a/mountpoint-s3/tests/fuse_tests/mod.rs b/mountpoint-s3/tests/fuse_tests/mod.rs index aab0860ff..551839fcb 100644 --- a/mountpoint-s3/tests/fuse_tests/mod.rs +++ b/mountpoint-s3/tests/fuse_tests/mod.rs @@ -21,6 +21,7 @@ use aws_sdk_s3::primitives::ByteStream; use aws_sdk_sts::config::Region; use fuser::{BackgroundSession, MountOption, Session}; use futures::Future; +use mountpoint_s3::data_cache::DataCache; use mountpoint_s3::fuse::S3FuseFilesystem; use mountpoint_s3::prefetch::{Prefetch, PrefetcherConfig}; use mountpoint_s3::prefix::Prefix; @@ -111,7 +112,7 @@ mod mock_session { use super::*; use futures::executor::ThreadPool; - use mountpoint_s3::prefetch::default_prefetch; + use mountpoint_s3::prefetch::{caching_prefetch, default_prefetch}; use mountpoint_s3_client::mock_client::{MockClient, MockClientConfig, MockObject}; /// Create a FUSE mount backed by a mock object client that does not talk to S3 @@ -145,6 +146,44 @@ mod mock_session { (mount_dir, session, test_client) } + /// Create a FUSE mount backed by a mock object client, with caching, that does not talk to S3 + pub fn new_with_cache( + cache: Cache, + ) -> impl FnOnce(&str, TestSessionConfig) -> (TempDir, BackgroundSession, TestClientBox) + where + Cache: DataCache + Send + Sync + 'static, + { + |test_name, test_config| { + let mount_dir = tempfile::tempdir().unwrap(); + + let bucket = "test_bucket"; + let prefix = if test_name.is_empty() { + test_name.to_string() + } else { + format!("{test_name}/") + }; + + let client_config = MockClientConfig { + bucket: bucket.to_string(), + part_size: test_config.part_size, + }; + let client = Arc::new(MockClient::new(client_config)); + let runtime = ThreadPool::builder().pool_size(1).create().unwrap(); + let prefetcher = caching_prefetch(cache, runtime, test_config.prefetcher_config); + let session = create_fuse_session( + client.clone(), + prefetcher, + bucket, + &prefix, + mount_dir.path(), + test_config.filesystem_config, + ); + let test_client = create_test_client(client, &prefix); + + (mount_dir, session, test_client) + } + } + fn create_test_client(client: Arc, prefix: &str) -> TestClientBox { let test_client = MockTestClient { prefix: prefix.to_owned(), @@ -228,7 +267,7 @@ mod s3_session { use aws_sdk_s3::primitives::ByteStream; use aws_sdk_s3::types::{ChecksumAlgorithm, GlacierJobParameters, RestoreRequest, Tier}; use aws_sdk_s3::Client; - use mountpoint_s3::prefetch::default_prefetch; + use mountpoint_s3::prefetch::{caching_prefetch, default_prefetch}; use mountpoint_s3_client::config::{EndpointConfig, S3ClientConfig}; use mountpoint_s3_client::S3CrtClient; @@ -258,6 +297,39 @@ mod s3_session { (mount_dir, session, test_client) } + /// Create a FUSE mount backed by a real S3 client, with caching + pub fn new_with_cache( + cache: Cache, + ) -> impl FnOnce(&str, TestSessionConfig) -> (TempDir, BackgroundSession, TestClientBox) + where + Cache: DataCache + Send + Sync + 'static, + { + |test_name, test_config| { + let mount_dir = tempfile::tempdir().unwrap(); + + let (bucket, prefix) = get_test_bucket_and_prefix(test_name); + let region = get_test_region(); + + let client_config = S3ClientConfig::default() + .part_size(test_config.part_size) + .endpoint_config(EndpointConfig::new(®ion)); + let client = S3CrtClient::new(client_config).unwrap(); + let runtime = client.event_loop_group(); + let prefetcher = caching_prefetch(cache, runtime, test_config.prefetcher_config); + let session = create_fuse_session( + client, + prefetcher, + &bucket, + &prefix, + mount_dir.path(), + test_config.filesystem_config, + ); + let test_client = create_test_client(®ion, &bucket, &prefix); + + (mount_dir, session, test_client) + } + } + fn create_test_client(region: &str, bucket: &str, prefix: &str) -> TestClientBox { let sdk_client = tokio_block_on(async { get_test_sdk_client(region).await }); let test_client = SDKTestClient { diff --git a/mountpoint-s3/tests/fuse_tests/prefetch_test.rs b/mountpoint-s3/tests/fuse_tests/prefetch_test.rs index 39d81e5b1..25f994301 100644 --- a/mountpoint-s3/tests/fuse_tests/prefetch_test.rs +++ b/mountpoint-s3/tests/fuse_tests/prefetch_test.rs @@ -1,4 +1,5 @@ use fuser::BackgroundSession; +use mountpoint_s3::data_cache::InMemoryDataCache; use mountpoint_s3::prefetch::PrefetcherConfig; use std::fs::{File, OpenOptions}; use std::io::Read; @@ -36,6 +37,17 @@ fn read_test_s3(object_size: usize) { read_test(crate::fuse_tests::s3_session::new, object_size); } +#[cfg(feature = "s3_tests")] +#[test_case(0; "empty file")] +#[test_case(1; "single-byte file")] +#[test_case(1024*1024; "1MiB file")] +fn read_test_s3_with_cache(object_size: usize) { + read_test( + crate::fuse_tests::s3_session::new_with_cache(InMemoryDataCache::new(1024 * 1024)), + object_size, + ); +} + #[test_case(0; "empty file")] #[test_case(1; "single-byte file")] #[test_case(1024*1024; "1MiB file")] @@ -43,6 +55,16 @@ fn read_test_mock(object_size: usize) { read_test(crate::fuse_tests::mock_session::new, object_size); } +#[test_case(0; "empty file")] +#[test_case(1; "single-byte file")] +#[test_case(1024*1024; "1MiB file")] +fn read_test_mock_with_cache(object_size: usize) { + read_test( + crate::fuse_tests::mock_session::new_with_cache(InMemoryDataCache::new(1024 * 1024)), + object_size, + ); +} + /// test for checking either prefetching fails or read original object when object is mutated during read. /// Prefetching of next request occurs when more than half of the current request is being read. /// So, when we read the first block, it prefetches the requests ti require to fulfill and the next request @@ -136,6 +158,19 @@ fn prefetch_test_etag_mock(request_size: usize, read_size: usize) { ); } +#[test_case(256 * 1024, 1024; "default first request size, much larger than first block read size")] +#[test_case(64 * 1024, 1024; "first request size smaller than default, much larger than first block read size")] +#[test_case(512 * 1024, 1024; "first request size greater than default, much larger than first block read size")] +#[test_case(64 * 1024, 500 * 1024; "first request size smaller than first block read size")] +fn prefetch_test_etag_mock_with_cache(request_size: usize, read_size: usize) { + prefetch_test_etag( + crate::fuse_tests::mock_session::new_with_cache(InMemoryDataCache::new(1024 * 1024)), + "prefetch_test_etag_mock", + request_size, + read_size, + ); +} + #[cfg(feature = "s3_tests")] #[test_case(256 * 1024, 1024; "default first request size, much larger than first block read size")] #[test_case(64 * 1024, 1024; "first request size smaller than default, much larger than first block read size")] @@ -149,3 +184,17 @@ fn prefetch_test_etag_s3(request_size: usize, read_size: usize) { read_size, ); } + +#[cfg(feature = "s3_tests")] +#[test_case(256 * 1024, 1024; "default first request size, much larger than first block read size")] +#[test_case(64 * 1024, 1024; "first request size smaller than default, much larger than first block read size")] +#[test_case(512 * 1024, 1024; "first request size greater than default, much larger than first block read size")] +#[test_case(256 * 1024, 256 * 1024; "first request size smaller than first block read size")] +fn prefetch_test_etag_s3_with_cache(request_size: usize, read_size: usize) { + prefetch_test_etag( + crate::fuse_tests::s3_session::new_with_cache(InMemoryDataCache::new(1024 * 1024)), + "prefetch_test_etag_s3", + request_size, + read_size, + ); +} diff --git a/mountpoint-s3/tests/fuse_tests/read_test.rs b/mountpoint-s3/tests/fuse_tests/read_test.rs index 170d45b8e..e65776a71 100644 --- a/mountpoint-s3/tests/fuse_tests/read_test.rs +++ b/mountpoint-s3/tests/fuse_tests/read_test.rs @@ -4,6 +4,7 @@ use std::os::unix::prelude::PermissionsExt; use std::time::{Duration, Instant}; use fuser::BackgroundSession; +use mountpoint_s3::data_cache::InMemoryDataCache; use mountpoint_s3_client::types::PutObjectParams; use rand::RngCore; use rand::SeedableRng as _; @@ -66,12 +67,30 @@ fn basic_read_test_s3() { basic_read_test(crate::fuse_tests::s3_session::new, "basic_read_test"); } +#[cfg(feature = "s3_tests")] +#[test] +fn basic_read_test_s3_with_cache() { + basic_read_test( + crate::fuse_tests::s3_session::new_with_cache(InMemoryDataCache::new(1024 * 1024)), + "basic_read_test", + ); +} + #[test_case("")] #[test_case("basic_read_test")] fn basic_read_test_mock(prefix: &str) { basic_read_test(crate::fuse_tests::mock_session::new, prefix); } +#[test_case("")] +#[test_case("basic_read_test")] +fn basic_read_test_mock_with_cache(prefix: &str) { + basic_read_test( + crate::fuse_tests::mock_session::new_with_cache(InMemoryDataCache::new(1024 * 1024)), + prefix, + ); +} + #[derive(PartialEq)] enum RestorationOptions { None,