From 70a1106c8011ac92bffb7dd9a8f60d9ecc41be00 Mon Sep 17 00:00:00 2001 From: Alessandro Passaro Date: Thu, 5 Oct 2023 07:47:26 +0000 Subject: [PATCH] Implement ObjectStore using a DataCache Signed-off-by: Alessandro Passaro --- mountpoint-s3-client/src/object_client.rs | 2 +- mountpoint-s3/src/main.rs | 16 + mountpoint-s3/src/prefetch.rs | 95 +++- mountpoint-s3/src/prefetch/cached_feed.rs | 453 ++++++++++++++++++ mountpoint-s3/src/store.rs | 22 +- .../tests/fuse_tests/consistency_test.rs | 20 + mountpoint-s3/tests/fuse_tests/mod.rs | 62 +++ .../tests/fuse_tests/prefetch_test.rs | 49 ++ mountpoint-s3/tests/fuse_tests/read_test.rs | 25 +- 9 files changed, 716 insertions(+), 28 deletions(-) create mode 100644 mountpoint-s3/src/prefetch/cached_feed.rs diff --git a/mountpoint-s3-client/src/object_client.rs b/mountpoint-s3-client/src/object_client.rs index 87d01ae3a..d6d2c837f 100644 --- a/mountpoint-s3-client/src/object_client.rs +++ b/mountpoint-s3-client/src/object_client.rs @@ -20,7 +20,7 @@ pub type GetBodyPart = (u64, Box<[u8]>); /// An ETag (entity tag) is a unique identifier for a HTTP object. /// /// New ETags can be created with the [`FromStr`] implementation. -#[derive(Debug, Clone, PartialEq)] +#[derive(Debug, Clone, PartialEq, Eq, Hash)] pub struct ETag { etag: String, } diff --git a/mountpoint-s3/src/main.rs b/mountpoint-s3/src/main.rs index 016e91a51..2995dd138 100644 --- a/mountpoint-s3/src/main.rs +++ b/mountpoint-s3/src/main.rs @@ -557,7 +557,9 @@ fn mount(args: CliArgs) -> anyhow::Result { #[cfg(feature = "caching")] { + use mountpoint_s3::data_cache::in_memory_data_cache::InMemoryDataCache; use mountpoint_s3::fs::CacheConfig; + use mountpoint_s3::store::cached_store; if args.enable_metadata_caching { // TODO: Review default for TTL @@ -568,6 +570,20 @@ fn mount(args: CliArgs) -> anyhow::Result { file_ttl: metadata_cache_ttl, }; } + + if args.enable_data_caching { + let cache = InMemoryDataCache::new(args.data_cache_block_size); + let client = Arc::new(client); + let store = cached_store(client, cache, runtime, prefetcher_config); + return create_filesystem( + store, + &args.bucket_name, + &prefix, + filesystem_config, + fuse_config, + &bucket_description, + ); + } } let store = default_store(Arc::new(client), runtime, prefetcher_config); diff --git a/mountpoint-s3/src/prefetch.rs b/mountpoint-s3/src/prefetch.rs index 2f10d4b1d..081e8c880 100644 --- a/mountpoint-s3/src/prefetch.rs +++ b/mountpoint-s3/src/prefetch.rs @@ -7,6 +7,7 @@ //! we increase the size of the GetObject requests up to some maximum. If the reader ever makes a //! non-sequential read, we abandon the prefetching and start again with the minimum request size. +pub mod cached_feed; pub mod feed; mod part; mod part_queue; @@ -418,9 +419,11 @@ mod tests { // It's convenient to write test constants like "1 * 1024 * 1024" for symmetry #![allow(clippy::identity_op)] + use crate::data_cache::in_memory_data_cache::InMemoryDataCache; use crate::prefetch::feed::ClientPartFeed; use crate::store::PrefetchGetObject; + use super::cached_feed::CachedPartFeed; use super::*; use futures::executor::{block_on, ThreadPool}; use futures::task::Spawn; @@ -428,12 +431,20 @@ mod tests { use mountpoint_s3_client::failure_client::{countdown_failure_client, RequestFailureMap}; use mountpoint_s3_client::mock_client::{ramp_bytes, MockClient, MockClientConfig, MockClientError, MockObject}; use mountpoint_s3_client::ObjectClient; - use proptest::proptest; use proptest::strategy::{Just, Strategy}; + use proptest::{prop_oneof, proptest}; use proptest_derive::Arbitrary; use std::collections::HashMap; use test_case::test_case; + const MB: usize = 1024 * 1024; + + #[derive(Debug, Clone, Copy)] + pub enum DataCacheConfig { + NoCache, + InMemoryCache { block_size: usize }, + } + #[derive(Debug, Arbitrary)] struct TestConfig { #[proptest(strategy = "16usize..1*1024*1024")] @@ -448,6 +459,15 @@ mod tests { max_forward_seek_distance: u64, #[proptest(strategy = "1u64..4*1024*1024")] max_backward_seek_distance: u64, + #[proptest(strategy = "data_cache_config_strategy()")] + data_cache_config: DataCacheConfig, + } + + fn data_cache_config_strategy() -> impl Strategy { + prop_oneof![ + Just(DataCacheConfig::NoCache), + (16usize..2 * 1024 * 1024).prop_map(|block_size| DataCacheConfig::InMemoryCache { block_size }) + ] } type GetObjectFn = dyn Fn(&str, &str, u64, ETag) -> Box>; @@ -477,25 +497,39 @@ mod tests { } } - fn create_prefetcher(client: Client, config: PrefetcherConfig) -> PrefetcherBox + fn create_prefetcher( + client: Client, + config: PrefetcherConfig, + cache_config: DataCacheConfig, + ) -> PrefetcherBox where Client: ObjectClient + Send + Sync + 'static, { let runtime = ThreadPool::builder().pool_size(1).create().unwrap(); - create_prefetcher_with_runtime(client, runtime, config) + create_prefetcher_with_runtime(client, runtime, config, cache_config) } fn create_prefetcher_with_runtime( client: Client, runtime: Runtime, config: PrefetcherConfig, + cache_config: DataCacheConfig, ) -> PrefetcherBox where Client: ObjectClient + Send + Sync + 'static, Runtime: Spawn + Send + Sync + 'static, { - let part_feed = ClientPartFeed::new(Arc::new(client), runtime); - PrefetcherBox::new(part_feed, config) + match cache_config { + DataCacheConfig::NoCache => { + let part_feed = ClientPartFeed::new(Arc::new(client), runtime); + PrefetcherBox::new(part_feed, config) + } + DataCacheConfig::InMemoryCache { block_size } => { + let cache = InMemoryDataCache::new(block_size as u64); + let part_feed = CachedPartFeed::new(Arc::new(client), runtime, cache); + PrefetcherBox::new(part_feed, config) + } + } } fn run_sequential_read_test(size: u64, read_size: usize, test_config: TestConfig) { @@ -518,7 +552,7 @@ mod tests { max_backward_seek_distance: test_config.max_backward_seek_distance, }; - let prefetcher = create_prefetcher(client, prefetcher_config); + let prefetcher = create_prefetcher(client, prefetcher_config, test_config.data_cache_config); let mut request = prefetcher.get("test-bucket", "hello", size, etag); @@ -536,8 +570,9 @@ mod tests { assert_eq!(next_offset, size); } - #[test] - fn sequential_read_small() { + #[test_case(DataCacheConfig::NoCache)] + #[test_case(DataCacheConfig::InMemoryCache { block_size: 1 * MB })] + fn sequential_read_small(data_cache_config: DataCacheConfig) { let config = TestConfig { first_request_size: 256 * 1024, max_request_size: 1024 * 1024 * 1024, @@ -545,12 +580,14 @@ mod tests { client_part_size: 8 * 1024 * 1024, max_forward_seek_distance: 16 * 1024 * 1024, max_backward_seek_distance: 2 * 1024 * 1024, + data_cache_config, }; run_sequential_read_test(1024 * 1024 + 111, 1024 * 1024, config); } - #[test] - fn sequential_read_medium() { + #[test_case(DataCacheConfig::NoCache)] + #[test_case(DataCacheConfig::InMemoryCache { block_size: 1 * MB })] + fn sequential_read_medium(data_cache_config: DataCacheConfig) { let config = TestConfig { first_request_size: 256 * 1024, max_request_size: 64 * 1024 * 1024, @@ -558,12 +595,14 @@ mod tests { client_part_size: 8 * 1024 * 1024, max_forward_seek_distance: 16 * 1024 * 1024, max_backward_seek_distance: 2 * 1024 * 1024, + data_cache_config, }; run_sequential_read_test(16 * 1024 * 1024 + 111, 1024 * 1024, config); } - #[test] - fn sequential_read_large() { + #[test_case(DataCacheConfig::NoCache)] + #[test_case(DataCacheConfig::InMemoryCache { block_size: 1 * MB })] + fn sequential_read_large(data_cache_config: DataCacheConfig) { let config = TestConfig { first_request_size: 256 * 1024, max_request_size: 64 * 1024 * 1024, @@ -571,6 +610,7 @@ mod tests { client_part_size: 8 * 1024 * 1024, max_forward_seek_distance: 16 * 1024 * 1024, max_backward_seek_distance: 2 * 1024 * 1024, + data_cache_config, }; run_sequential_read_test(256 * 1024 * 1024 + 111, 1024 * 1024, config); } @@ -599,7 +639,7 @@ mod tests { sequential_prefetch_multiplier: test_config.sequential_prefetch_multiplier, ..Default::default() }; - let prefetcher = create_prefetcher(client, prefetcher_config); + let prefetcher = create_prefetcher(client, prefetcher_config, test_config.data_cache_config); let mut request = prefetcher.get("test-bucket", "hello", size, etag); @@ -621,10 +661,15 @@ mod tests { assert!(next_offset < size); // Since we're injecting failures, shouldn't make it to the end } - #[test_case("invalid range; length=42")] + #[test_case("invalid range; length=42", DataCacheConfig::NoCache)] + #[test_case("invalid range; length=42", DataCacheConfig::InMemoryCache { block_size: 1 * MB })] // test case for the request failure due to etag not matching - #[test_case("At least one of the pre-conditions you specified did not hold")] - fn fail_request_sequential_small(err_value: &str) { + #[test_case( + "At least one of the pre-conditions you specified did not hold", + DataCacheConfig::NoCache + )] + #[test_case("At least one of the pre-conditions you specified did not hold", DataCacheConfig::InMemoryCache { block_size: 1 * MB })] + fn fail_request_sequential_small(err_value: &str, data_cache_config: DataCacheConfig) { let config = TestConfig { first_request_size: 256 * 1024, max_request_size: 1024 * 1024 * 1024, @@ -632,6 +677,7 @@ mod tests { client_part_size: 8 * 1024 * 1024, max_forward_seek_distance: 16 * 1024 * 1024, max_backward_seek_distance: 2 * 1024 * 1024, + data_cache_config, }; let mut get_failures = HashMap::new(); @@ -673,6 +719,7 @@ mod tests { client_part_size: 181682, max_forward_seek_distance: 1, max_backward_seek_distance: 18668, + data_cache_config: DataCacheConfig::NoCache, }; run_sequential_read_test(object_size, read_size, config); } @@ -696,7 +743,7 @@ mod tests { max_backward_seek_distance: test_config.max_backward_seek_distance, ..Default::default() }; - let prefetcher = create_prefetcher(client, prefetcher_config); + let prefetcher = create_prefetcher(client, prefetcher_config, test_config.data_cache_config); let mut request = prefetcher.get("test-bucket", "hello", object_size, etag); @@ -757,6 +804,7 @@ mod tests { client_part_size: 516882, max_forward_seek_distance: 16 * 1024 * 1024, max_backward_seek_distance: 2 * 1024 * 1024, + data_cache_config: DataCacheConfig::NoCache, }; run_random_read_test(object_size, reads, config); } @@ -772,6 +820,7 @@ mod tests { client_part_size: 1219731, max_forward_seek_distance: 16 * 1024 * 1024, max_backward_seek_distance: 2 * 1024 * 1024, + data_cache_config: DataCacheConfig::NoCache, }; run_random_read_test(object_size, reads, config); } @@ -787,6 +836,7 @@ mod tests { client_part_size: 1219731, max_forward_seek_distance: 2260662, max_backward_seek_distance: 2369799, + data_cache_config: DataCacheConfig::NoCache, }; run_random_read_test(object_size, reads, config); } @@ -802,6 +852,7 @@ mod tests { client_part_size: 1972409, max_forward_seek_distance: 2810651, max_backward_seek_distance: 3531090, + data_cache_config: DataCacheConfig::NoCache, }; run_random_read_test(object_size, reads, config); } @@ -828,7 +879,7 @@ mod tests { first_request_size: FIRST_REQUEST_SIZE, ..Default::default() }; - let prefetcher = create_prefetcher(client, prefetcher_config); + let prefetcher = create_prefetcher(client, prefetcher_config, DataCacheConfig::NoCache); // Try every possible seek from first_read_size for offset in first_read_size + 1..OBJECT_SIZE { @@ -864,7 +915,7 @@ mod tests { first_request_size: FIRST_REQUEST_SIZE, ..Default::default() }; - let prefetcher = create_prefetcher(client, prefetcher_config); + let prefetcher = create_prefetcher(client, prefetcher_config, DataCacheConfig::NoCache); // Try every possible seek from first_read_size for offset in 0..first_read_size { @@ -924,7 +975,8 @@ mod tests { ..Default::default() }; - let prefetcher = create_prefetcher_with_runtime(client, ShuttleRuntime, prefetcher_config); + let prefetcher = + create_prefetcher_with_runtime(client, ShuttleRuntime, prefetcher_config, DataCacheConfig::NoCache); let mut request = prefetcher.get("test-bucket", "hello", object_size, file_etag); @@ -981,7 +1033,8 @@ mod tests { ..Default::default() }; - let prefetcher = create_prefetcher_with_runtime(client, ShuttleRuntime, prefetcher_config); + let prefetcher = + create_prefetcher_with_runtime(client, ShuttleRuntime, prefetcher_config, DataCacheConfig::NoCache); let mut request = prefetcher.get("test-bucket", "hello", object_size, file_etag); diff --git a/mountpoint-s3/src/prefetch/cached_feed.rs b/mountpoint-s3/src/prefetch/cached_feed.rs new file mode 100644 index 000000000..c4f1fde99 --- /dev/null +++ b/mountpoint-s3/src/prefetch/cached_feed.rs @@ -0,0 +1,453 @@ +use std::time::Instant; +use std::{ops::Range, sync::Arc}; + +use bytes::Bytes; +use futures::task::{Spawn, SpawnExt}; +use futures::{pin_mut, StreamExt}; +use mountpoint_s3_client::error::ObjectClientError; +use mountpoint_s3_client::{types::ETag, ObjectClient}; +use tracing::{debug_span, error, trace, warn, Instrument}; + +use crate::checksums::ChecksummedBytes; +use crate::data_cache::{BlockIndex, DataCache}; +use crate::prefetch::part_queue::unbounded_part_queue; +use crate::prefetch::{ + feed::{ObjectPartFeed, RequestRange}, + part::Part, + part_queue::PartQueueProducer, + task::RequestTask, +}; +use crate::store::PrefetchReadError; + +pub type CacheKey = (String, ETag); + +/// [ObjectPartFeed] implementation which maintains a [Cache] for the object data retrieved by a [Client]. +#[derive(Debug)] +pub struct CachedPartFeed { + inner: CachedPartFeedInner, + runtime: Runtime, +} + +impl CachedPartFeed { + pub fn new(client: Arc, runtime: Runtime, cache: Cache) -> Self { + Self { + inner: CachedPartFeedInner { + client, + cache: Arc::new(cache), + }, + runtime, + } + } +} + +impl ObjectPartFeed for CachedPartFeed +where + Client: ObjectClient + Send + Sync + 'static, + Cache: DataCache + Send + Sync + 'static, + Runtime: Spawn, +{ + type ClientError = Client::ClientError; + + fn spawn_get_object_request( + &self, + bucket: &str, + key: &str, + if_match: ETag, + range: RequestRange, + _preferred_part_size: usize, + ) -> RequestTask { + let range = get_aligned_request_range(range, self.inner.cache.block_size()); + + let start = range.start(); + let size = range.len(); + + let (part_queue, part_queue_producer) = unbounded_part_queue(); + trace!(?range, "spawning request"); + + let request_task = { + let inner = self.inner.clone(); + let bucket = bucket.to_owned(); + let key = key.to_owned(); + let etag = if_match.clone(); + let span = debug_span!("prefetch", ?range); + + async move { + inner + .get_from_cache(&bucket, &key, etag, range, part_queue_producer) + .await; + } + .instrument(span) + }; + + let task_handle = self.runtime.spawn_with_handle(request_task).unwrap(); + + RequestTask::from_handle(task_handle, size, start, part_queue) + } +} + +fn get_aligned_request_range(range: RequestRange, block_size: u64) -> RequestRange { + let object_size = range.object_size(); + let offset = range.start(); + let preferred_size = range.len(); + + // If the request size is bigger than a block size we will try to align it to block boundaries. + let offset_in_part = offset % block_size; + let size = if offset_in_part != 0 { + // if the offset is not at the start of the part we will drain all the bytes from that part first + let remaining_in_part = block_size - offset_in_part; + preferred_size.min(remaining_in_part as usize) + } else { + // if the request size is smaller than the block size, just return the block size + if preferred_size < block_size as usize { + block_size as usize + } else { + // if it exceeds block boundaries, trim it to the block boundaries + let request_boundary = offset + preferred_size as u64; + let remainder = request_boundary % block_size; + if remainder != 0 { + preferred_size + (block_size - remainder) as usize + } else { + preferred_size + } + } + }; + RequestRange::new(object_size, offset, size) +} + +#[derive(Debug)] +struct CachedPartFeedInner { + client: Arc, + cache: Arc, +} + +impl Clone for CachedPartFeedInner { + fn clone(&self) -> Self { + Self { + client: self.client.clone(), + cache: self.cache.clone(), + } + } +} + +impl CachedPartFeedInner +where + Client: ObjectClient + Send + Sync + 'static, + Cache: DataCache + Send + Sync, +{ + async fn get_from_cache( + &self, + bucket: &str, + key: &str, + if_match: ETag, + range: RequestRange, + part_queue_producer: PartQueueProducer, + ) { + let block_size = self.cache.block_size(); + let block_range = block_indices_for_byte_range(&range, block_size); + + let cache_key = (key.to_owned(), if_match); + let block_indexes = match self.cache.cached_block_indices(&cache_key, block_range.clone()) { + Ok(blocks) => blocks, + Err(error) => { + trace!(?key, ?range, ?error, "error reading from cache"); + return self + .get_from_client(bucket, &cache_key, range, block_range, part_queue_producer) + .await; + } + }; + + // If the cached blocks do not cover the whole range, fall back to the client. + // TODO: consider allowing partial cache hits. + let block_range_len = block_range.end.saturating_sub(block_range.start) as usize; + if block_indexes.len() < block_range_len { + trace!( + ?key, + ?range, + "cache miss - only found {} blocks out of {}", + block_indexes.len(), + block_range_len + ); + return self + .get_from_client(bucket, &cache_key, range, block_range, part_queue_producer) + .await; + } + + for block_index in block_indexes { + match self.cache.get_block(&cache_key, block_index) { + Ok(Some(block)) => { + trace!(?key, ?range, block_index, "cache hit"); + let part = make_part(key, block, block_index, block_size, &range); + metrics::counter!("cache.total_bytes", part.len() as u64, "type" => "read"); + part_queue_producer.push(Ok(part)); + } + Ok(None) => { + let range = range.trim_start(block_index * block_size); + let block_range = block_index..block_range.end; + trace!(?key, ?range, block_index, "cache miss - no data for block"); + return self + .get_from_client(bucket, &cache_key, range, block_range, part_queue_producer) + .await; + } + Err(error) => { + let range = range.trim_start(block_index * block_size); + let block_range = block_index..block_range.end; + trace!(?key, ?range, block_index, ?error, "error reading block from cache"); + return self + .get_from_client(bucket, &cache_key, range, block_range, part_queue_producer) + .await; + } + } + } + } + + async fn get_from_client( + &self, + bucket: &str, + cache_key: &CacheKey, + range: RequestRange, + block_range: Range, + part_queue_producer: PartQueueProducer, + ) { + let key = &cache_key.0; + let block_size = self.cache.block_size(); + assert!(block_size > 0); + + let block_aligned_byte_range = + (block_range.start * block_size)..(block_range.end * block_size).min(range.object_size() as u64); + + trace!( + ?key, + range =? block_aligned_byte_range, + original_range =? range, + "fetching data from client" + ); + let get_object_result = match self + .client + .get_object( + bucket, + key, + Some(block_aligned_byte_range), + Some(cache_key.1.to_owned()), + ) + .await + { + Ok(get_object_result) => get_object_result, + Err(e) => { + error!(key, error=?e, "GetObject request failed"); + part_queue_producer.push(Err(PrefetchReadError::map(e))); + return; + } + }; + + pin_mut!(get_object_result); + let mut block_index = block_range.start; + let mut buffer = ChecksummedBytes::default(); + loop { + match get_object_result.next().await { + Some(Ok((offset, body))) => { + trace!(offset, length = body.len(), "received GetObject part"); + + // Split the body into blocks. + let mut body: Bytes = body.into(); + while !body.is_empty() { + let remaining = (block_size as usize).saturating_sub(buffer.len()).min(body.len()); + let chunk = body.split_to(remaining); + if let Err(e) = buffer.extend(chunk.into()) { + error!(key, error=?e, "Integrity check failed"); + part_queue_producer.push(Err(ObjectClientError::ServiceError(e.into()))); + return; + } + if buffer.len() < block_size as usize { + break; + } + self.update_cache(cache_key, block_index, &buffer); + part_queue_producer.push(Ok(make_part(key, buffer, block_index, block_size, &range))); + block_index += 1; + buffer = ChecksummedBytes::default(); + } + } + Some(Err(e)) => { + error!(key, error=?e, "GetObject body part failed"); + part_queue_producer.push(Err(PrefetchReadError::map(e))); + break; + } + None => { + if !buffer.is_empty() { + if buffer.len() + (block_index * block_size) as usize == range.object_size() { + // Write last block to the cache. + self.update_cache(cache_key, block_index, &buffer); + } + part_queue_producer.push(Ok(make_part(key, buffer, block_index, block_size, &range))); + } + break; + } + } + } + trace!("request finished"); + } + + fn update_cache(&self, cache_key: &CacheKey, block_index: u64, block: &ChecksummedBytes) { + // TODO: consider updating the cache asynchronously + let start = Instant::now(); + match self.cache.put_block(cache_key.clone(), block_index, block.clone()) { + Ok(()) => { + metrics::histogram!("cache.write_duration_us", start.elapsed().as_micros() as f64); + metrics::counter!("cache.total_bytes", block.len() as u64, "type" => "write"); + } + Err(error) => { + warn!(key = cache_key.0, block_index, ?error, "failed to update cache"); + } + }; + } +} + +fn make_part(key: &str, block: ChecksummedBytes, block_index: u64, block_size: u64, range: &RequestRange) -> Part { + let block_offset = block_index * block_size; + let block_size = block.len(); + let part_range = range + .trim_start(block_offset) + .trim_end(block_offset + block_size as u64); + trace!( + key, + ?part_range, + block_index, + block_offset, + block_size, + "feeding part from block" + ); + + let trim_start = (part_range.start().saturating_sub(block_offset)) as usize; + let trim_end = (part_range.end().saturating_sub(block_offset)) as usize; + let bytes = block.slice(trim_start..trim_end); + Part::new(key, part_range.start(), bytes) +} + +fn block_indices_for_byte_range(range: &RequestRange, block_size: u64) -> Range { + let start_block = range.start() / block_size; + let mut end_block = range.end() / block_size; + if !range.is_empty() && range.end() % block_size != 0 { + end_block += 1; + } + + start_block..end_block +} + +#[cfg(test)] +mod tests { + // It's convenient to write test constants like "1 * 1024 * 1024" for symmetry + #![allow(clippy::identity_op)] + + use futures::executor::{block_on, ThreadPool}; + use mountpoint_s3_client::mock_client::{MockClient, MockClientConfig, MockObject, Operation}; + use test_case::test_case; + + use crate::data_cache::in_memory_data_cache::InMemoryDataCache; + + use super::*; + + const KB: usize = 1024; + const MB: usize = 1024 * 1024; + + #[test_case(1 * MB, 8 * MB, 16 * MB, 0, 16 * MB; "whole object")] + #[test_case(1 * MB, 8 * MB, 16 * MB, 1 * MB, 3 * MB + 512 * KB; "aligned offset")] + #[test_case(1 * MB, 8 * MB, 16 * MB, 512 * KB, 3 * MB; "non-aligned range")] + #[test_case(3 * MB, 8 * MB, 14 * MB, 0, 14 * MB; "whole object, size not aligned to parts or blocks")] + #[test_case(3 * MB, 8 * MB, 14 * MB, 9 * MB, 100 * MB; "aligned offset, size not aligned to parts or blocks")] + #[test_case(1 * MB, 8 * MB, 100 * KB, 0, 100 * KB; "small object")] + #[test_case(8 * MB, 5 * MB, 16 * MB, 0, 16 * MB; "cache blocks larger than client parts")] + fn test_read_from_cache( + block_size: usize, + client_part_size: usize, + object_size: usize, + offset: usize, + preferred_size: usize, + ) { + let key = "object"; + let seed = 0xaa; + let object = MockObject::ramp(seed, object_size, ETag::for_tests()); + let etag = object.etag(); + + let cache = InMemoryDataCache::new(block_size as u64); + let bucket = "test-bucket"; + let config = MockClientConfig { + bucket: bucket.to_string(), + part_size: client_part_size, + }; + let mock_client = Arc::new(MockClient::new(config)); + mock_client.add_object(key, object.clone()); + + let runtime = ThreadPool::builder().pool_size(1).create().unwrap(); + let feed = CachedPartFeed::new(mock_client.clone(), runtime, cache); + let range = RequestRange::new(object_size, offset as u64, preferred_size); + + let first_read_count = { + // First request (from client) + let get_object_counter = mock_client.new_counter(Operation::GetObject); + let request_task = feed.spawn_get_object_request(bucket, key, etag.clone(), range, 0); + compare_read(key, &object, request_task); + get_object_counter.count() + }; + assert!(first_read_count > 0); + + let second_read_count = { + // Second request (from cache) + let get_object_counter = mock_client.new_counter(Operation::GetObject); + let request_task = feed.spawn_get_object_request(bucket, key, etag.clone(), range, 0); + compare_read(key, &object, request_task); + get_object_counter.count() + }; + assert_eq!(second_read_count, 0); + } + + #[test_case(1 * MB, 8 * MB)] + #[test_case(8 * MB, 8 * MB)] + #[test_case(1 * MB, 5 * MB + 1)] + #[test_case(1 * MB + 1, 5 * MB)] + fn test_get_object_parts(block_size: usize, client_part_size: usize) { + let key = "object"; + let object_size = 16 * MB; + let seed = 0xaa; + let object = MockObject::ramp(seed, object_size, ETag::for_tests()); + let etag = object.etag(); + + let cache = InMemoryDataCache::new(block_size as u64); + let bucket = "test-bucket"; + let config = MockClientConfig { + bucket: bucket.to_string(), + part_size: client_part_size, + }; + let mock_client = Arc::new(MockClient::new(config)); + mock_client.add_object(key, object.clone()); + + let runtime = ThreadPool::builder().pool_size(1).create().unwrap(); + let feed = CachedPartFeed::new(mock_client.clone(), runtime, cache); + + for offset in [0, 512 * KB, 1 * MB, 4 * MB, 9 * MB] { + for preferred_size in [1 * KB, 512 * KB, 4 * MB, 12 * MB, 16 * MB] { + let range = RequestRange::new(object_size, offset as u64, preferred_size); + let request_task = feed.spawn_get_object_request(bucket, key, etag.clone(), range, 0); + compare_read(key, &object, request_task); + } + } + } + + fn compare_read( + key: &str, + object: &MockObject, + mut request_task: RequestTask, + ) { + let mut offset = request_task.start_offset(); + let mut remaining = request_task.total_size(); + while remaining > 0 { + let part = block_on(request_task.read(remaining)).unwrap(); + let bytes = part.into_bytes(key, offset).unwrap(); + + let expected = object.read(offset, bytes.len()); + let bytes = bytes.into_bytes().unwrap(); + assert_eq!(bytes, *expected); + + offset += bytes.len() as u64; + remaining -= bytes.len(); + } + } +} diff --git a/mountpoint-s3/src/store.rs b/mountpoint-s3/src/store.rs index 467ce0a12..f602a7822 100644 --- a/mountpoint-s3/src/store.rs +++ b/mountpoint-s3/src/store.rs @@ -17,7 +17,12 @@ use thiserror::Error; use crate::{ checksums::{ChecksummedBytes, IntegrityError}, - prefetch::{feed::ClientPartFeed, PrefetcherConfig}, + data_cache::DataCache, + prefetch::{ + cached_feed::{CacheKey, CachedPartFeed}, + feed::ClientPartFeed, + PrefetcherConfig, + }, }; use crate::{ prefetch::{self, feed::ObjectPartFeed, Prefetcher}, @@ -230,6 +235,21 @@ where ClientStore::new(client, part_feed, prefetcher_config) } +pub fn cached_store( + client: Arc, + cache: Cache, + runtime: Runtime, + prefetcher_config: PrefetcherConfig, +) -> ClientStore> +where + Client: ObjectClient + Send + Sync + 'static, + Cache: DataCache + Send + Sync + 'static, + Runtime: Spawn + Send + Sync + 'static, +{ + let part_feed = CachedPartFeed::new(client.clone(), runtime, cache); + ClientStore::new(client, part_feed, prefetcher_config) +} + pub type TestStore = ClientStore>; pub fn test_store(client: Arc) -> TestStore diff --git a/mountpoint-s3/tests/fuse_tests/consistency_test.rs b/mountpoint-s3/tests/fuse_tests/consistency_test.rs index 27deb154e..42d13e596 100644 --- a/mountpoint-s3/tests/fuse_tests/consistency_test.rs +++ b/mountpoint-s3/tests/fuse_tests/consistency_test.rs @@ -5,6 +5,8 @@ use fuser::BackgroundSession; use tempfile::TempDir; use test_case::test_case; +use mountpoint_s3::data_cache::in_memory_data_cache::InMemoryDataCache; + use crate::fuse_tests::{TestClientBox, TestSessionConfig}; fn page_cache_sharing_test(creator_fn: F, prefix: &str) @@ -58,8 +60,26 @@ fn page_cache_sharing_test_s3() { page_cache_sharing_test(crate::fuse_tests::s3_session::new, "page_cache_sharing_test"); } +#[cfg(feature = "s3_tests")] +#[test] +fn page_cache_sharing_test_s3_with_cache() { + page_cache_sharing_test( + crate::fuse_tests::s3_session::new_with_cache(InMemoryDataCache::new(1024 * 1024)), + "page_cache_sharing_test", + ); +} + #[test_case(""; "no prefix")] #[test_case("page_cache_sharing_test"; "prefix")] fn page_cache_sharing_test_mock(prefix: &str) { page_cache_sharing_test(crate::fuse_tests::mock_session::new, prefix); } + +#[test_case(""; "no prefix")] +#[test_case("page_cache_sharing_test"; "prefix")] +fn page_cache_sharing_test_mock_with_cache(prefix: &str) { + page_cache_sharing_test( + crate::fuse_tests::mock_session::new_with_cache(InMemoryDataCache::new(1024 * 1024)), + prefix, + ); +} diff --git a/mountpoint-s3/tests/fuse_tests/mod.rs b/mountpoint-s3/tests/fuse_tests/mod.rs index 76a31467e..86c50e1dc 100644 --- a/mountpoint-s3/tests/fuse_tests/mod.rs +++ b/mountpoint-s3/tests/fuse_tests/mod.rs @@ -18,9 +18,12 @@ use std::path::Path; use std::sync::Arc; use fuser::{BackgroundSession, MountOption, Session}; +use mountpoint_s3::data_cache::DataCache; use mountpoint_s3::fuse::S3FuseFilesystem; +use mountpoint_s3::prefetch::cached_feed::CacheKey; use mountpoint_s3::prefetch::PrefetcherConfig; use mountpoint_s3::prefix::Prefix; +use mountpoint_s3::store::cached_store; use mountpoint_s3::store::{default_store, ObjectStore}; use mountpoint_s3::S3FilesystemConfig; use mountpoint_s3_client::types::PutObjectParams; @@ -131,6 +134,38 @@ mod mock_session { (mount_dir, session, test_client) } + /// Create a FUSE mount backed by a mock object client, with caching, that does not talk to S3 + pub fn new_with_cache( + cache: Cache, + ) -> impl FnOnce(&str, TestSessionConfig) -> (TempDir, BackgroundSession, TestClientBox) + where + Cache: DataCache + Send + Sync + 'static, + { + |test_name, test_config| { + let mount_dir = tempfile::tempdir().unwrap(); + + let bucket = "test_bucket"; + let prefix = if test_name.is_empty() { + test_name.to_string() + } else { + format!("{test_name}/") + }; + + let client_config = MockClientConfig { + bucket: bucket.to_string(), + part_size: test_config.part_size, + }; + let client = Arc::new(MockClient::new(client_config)); + let runtime = ThreadPool::builder().pool_size(1).create().unwrap(); + let store = cached_store(client.clone(), cache, runtime, test_config.prefetcher_config); + + let session = create_fuse_session(store, bucket, &prefix, mount_dir.path(), test_config.filesystem_config); + let test_client = create_test_client(client, &prefix); + + (mount_dir, session, test_client) + } + } + fn create_test_client(client: Arc, prefix: &str) -> TestClientBox { let test_client = MockTestClient { prefix: prefix.to_owned(), @@ -239,6 +274,33 @@ mod s3_session { (mount_dir, session, test_client) } + /// Create a FUSE mount backed by a real S3 client, with caching + pub fn new_with_cache( + cache: Cache, + ) -> impl FnOnce(&str, TestSessionConfig) -> (TempDir, BackgroundSession, TestClientBox) + where + Cache: DataCache + Send + Sync + 'static, + { + |test_name, test_config| { + let mount_dir = tempfile::tempdir().unwrap(); + + let (bucket, prefix) = get_test_bucket_and_prefix(test_name); + let region = get_test_region(); + + let client_config = S3ClientConfig::default() + .part_size(test_config.part_size) + .endpoint_config(EndpointConfig::new(®ion)); + let client = S3CrtClient::new(client_config).unwrap(); + let runtime = client.event_loop_group(); + let store = cached_store(Arc::new(client), cache, runtime, test_config.prefetcher_config); + + let session = create_fuse_session(store, &bucket, &prefix, mount_dir.path(), test_config.filesystem_config); + let test_client = create_test_client(®ion, &bucket, &prefix); + + (mount_dir, session, test_client) + } + } + fn create_test_client(region: &str, bucket: &str, prefix: &str) -> TestClientBox { let sdk_client = tokio_block_on(async { get_test_sdk_client(region).await }); let test_client = SDKTestClient { diff --git a/mountpoint-s3/tests/fuse_tests/prefetch_test.rs b/mountpoint-s3/tests/fuse_tests/prefetch_test.rs index 39d81e5b1..f2817b9b2 100644 --- a/mountpoint-s3/tests/fuse_tests/prefetch_test.rs +++ b/mountpoint-s3/tests/fuse_tests/prefetch_test.rs @@ -1,4 +1,5 @@ use fuser::BackgroundSession; +use mountpoint_s3::data_cache::in_memory_data_cache::InMemoryDataCache; use mountpoint_s3::prefetch::PrefetcherConfig; use std::fs::{File, OpenOptions}; use std::io::Read; @@ -36,6 +37,17 @@ fn read_test_s3(object_size: usize) { read_test(crate::fuse_tests::s3_session::new, object_size); } +#[cfg(feature = "s3_tests")] +#[test_case(0; "empty file")] +#[test_case(1; "single-byte file")] +#[test_case(1024*1024; "1MiB file")] +fn read_test_s3_with_cache(object_size: usize) { + read_test( + crate::fuse_tests::s3_session::new_with_cache(InMemoryDataCache::new(1024 * 1024)), + object_size, + ); +} + #[test_case(0; "empty file")] #[test_case(1; "single-byte file")] #[test_case(1024*1024; "1MiB file")] @@ -43,6 +55,16 @@ fn read_test_mock(object_size: usize) { read_test(crate::fuse_tests::mock_session::new, object_size); } +#[test_case(0; "empty file")] +#[test_case(1; "single-byte file")] +#[test_case(1024*1024; "1MiB file")] +fn read_test_mock_with_cache(object_size: usize) { + read_test( + crate::fuse_tests::mock_session::new_with_cache(InMemoryDataCache::new(1024 * 1024)), + object_size, + ); +} + /// test for checking either prefetching fails or read original object when object is mutated during read. /// Prefetching of next request occurs when more than half of the current request is being read. /// So, when we read the first block, it prefetches the requests ti require to fulfill and the next request @@ -136,6 +158,19 @@ fn prefetch_test_etag_mock(request_size: usize, read_size: usize) { ); } +#[test_case(256 * 1024, 1024; "default first request size, much larger than first block read size")] +#[test_case(64 * 1024, 1024; "first request size smaller than default, much larger than first block read size")] +#[test_case(512 * 1024, 1024; "first request size greater than default, much larger than first block read size")] +#[test_case(64 * 1024, 500 * 1024; "first request size smaller than first block read size")] +fn prefetch_test_etag_mock_with_cache(request_size: usize, read_size: usize) { + prefetch_test_etag( + crate::fuse_tests::mock_session::new_with_cache(InMemoryDataCache::new(1024 * 1024)), + "prefetch_test_etag_mock", + request_size, + read_size, + ); +} + #[cfg(feature = "s3_tests")] #[test_case(256 * 1024, 1024; "default first request size, much larger than first block read size")] #[test_case(64 * 1024, 1024; "first request size smaller than default, much larger than first block read size")] @@ -149,3 +184,17 @@ fn prefetch_test_etag_s3(request_size: usize, read_size: usize) { read_size, ); } + +#[cfg(feature = "s3_tests")] +#[test_case(256 * 1024, 1024; "default first request size, much larger than first block read size")] +#[test_case(64 * 1024, 1024; "first request size smaller than default, much larger than first block read size")] +#[test_case(512 * 1024, 1024; "first request size greater than default, much larger than first block read size")] +#[test_case(256 * 1024, 256 * 1024; "first request size smaller than first block read size")] +fn prefetch_test_etag_s3_with_cache(request_size: usize, read_size: usize) { + prefetch_test_etag( + crate::fuse_tests::s3_session::new_with_cache(InMemoryDataCache::new(1024 * 1024)), + "prefetch_test_etag_s3", + request_size, + read_size, + ); +} diff --git a/mountpoint-s3/tests/fuse_tests/read_test.rs b/mountpoint-s3/tests/fuse_tests/read_test.rs index 2bc5e9d49..66824588d 100644 --- a/mountpoint-s3/tests/fuse_tests/read_test.rs +++ b/mountpoint-s3/tests/fuse_tests/read_test.rs @@ -4,6 +4,7 @@ use std::os::unix::prelude::PermissionsExt; use std::time::{Duration, Instant}; use fuser::BackgroundSession; +use mountpoint_s3::data_cache::in_memory_data_cache::InMemoryDataCache; use mountpoint_s3_client::types::PutObjectParams; use rand::RngCore; use rand::SeedableRng as _; @@ -66,14 +67,28 @@ fn basic_read_test_s3() { basic_read_test(crate::fuse_tests::s3_session::new, "basic_read_test"); } +#[cfg(feature = "s3_tests")] #[test] -fn basic_read_test_mock() { - basic_read_test(crate::fuse_tests::mock_session::new, ""); +fn basic_read_test_s3_with_cache() { + basic_read_test( + crate::fuse_tests::s3_session::new_with_cache(InMemoryDataCache::new(1024 * 1024)), + "basic_read_test", + ); } -#[test] -fn basic_read_test_mock_prefix() { - basic_read_test(crate::fuse_tests::mock_session::new, "basic_read_test"); +#[test_case("")] +#[test_case("basic_read_test")] +fn basic_read_test_mock(prefix: &str) { + basic_read_test(crate::fuse_tests::mock_session::new, prefix); +} + +#[test_case("")] +#[test_case("basic_read_test")] +fn basic_read_test_mock_with_cache(prefix: &str) { + basic_read_test( + crate::fuse_tests::mock_session::new_with_cache(InMemoryDataCache::new(1024 * 1024)), + prefix, + ); } #[derive(PartialEq)]