Skip to content

Commit

Permalink
Implement ObjectStore using a DataCache
Browse files Browse the repository at this point in the history
Signed-off-by: Alessandro Passaro <[email protected]>
  • Loading branch information
passaro committed Oct 31, 2023
1 parent ebc3e25 commit 70a1106
Show file tree
Hide file tree
Showing 9 changed files with 716 additions and 28 deletions.
2 changes: 1 addition & 1 deletion mountpoint-s3-client/src/object_client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ pub type GetBodyPart = (u64, Box<[u8]>);
/// An ETag (entity tag) is a unique identifier for a HTTP object.
///
/// New ETags can be created with the [`FromStr`] implementation.
#[derive(Debug, Clone, PartialEq)]
#[derive(Debug, Clone, PartialEq, Eq, Hash)]
pub struct ETag {
etag: String,
}
Expand Down
16 changes: 16 additions & 0 deletions mountpoint-s3/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -557,7 +557,9 @@ fn mount(args: CliArgs) -> anyhow::Result<FuseSession> {

#[cfg(feature = "caching")]
{
use mountpoint_s3::data_cache::in_memory_data_cache::InMemoryDataCache;
use mountpoint_s3::fs::CacheConfig;
use mountpoint_s3::store::cached_store;

if args.enable_metadata_caching {
// TODO: Review default for TTL
Expand All @@ -568,6 +570,20 @@ fn mount(args: CliArgs) -> anyhow::Result<FuseSession> {
file_ttl: metadata_cache_ttl,
};
}

if args.enable_data_caching {
let cache = InMemoryDataCache::new(args.data_cache_block_size);
let client = Arc::new(client);
let store = cached_store(client, cache, runtime, prefetcher_config);
return create_filesystem(
store,
&args.bucket_name,
&prefix,
filesystem_config,
fuse_config,
&bucket_description,
);
}
}

let store = default_store(Arc::new(client), runtime, prefetcher_config);
Expand Down
95 changes: 74 additions & 21 deletions mountpoint-s3/src/prefetch.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
//! we increase the size of the GetObject requests up to some maximum. If the reader ever makes a
//! non-sequential read, we abandon the prefetching and start again with the minimum request size.
pub mod cached_feed;
pub mod feed;
mod part;
mod part_queue;
Expand Down Expand Up @@ -418,22 +419,32 @@ mod tests {
// It's convenient to write test constants like "1 * 1024 * 1024" for symmetry
#![allow(clippy::identity_op)]

use crate::data_cache::in_memory_data_cache::InMemoryDataCache;
use crate::prefetch::feed::ClientPartFeed;
use crate::store::PrefetchGetObject;

use super::cached_feed::CachedPartFeed;
use super::*;
use futures::executor::{block_on, ThreadPool};
use futures::task::Spawn;
use mountpoint_s3_client::error::{GetObjectError, ObjectClientError};
use mountpoint_s3_client::failure_client::{countdown_failure_client, RequestFailureMap};
use mountpoint_s3_client::mock_client::{ramp_bytes, MockClient, MockClientConfig, MockClientError, MockObject};
use mountpoint_s3_client::ObjectClient;
use proptest::proptest;
use proptest::strategy::{Just, Strategy};
use proptest::{prop_oneof, proptest};
use proptest_derive::Arbitrary;
use std::collections::HashMap;
use test_case::test_case;

const MB: usize = 1024 * 1024;

#[derive(Debug, Clone, Copy)]
pub enum DataCacheConfig {
NoCache,
InMemoryCache { block_size: usize },
}

#[derive(Debug, Arbitrary)]
struct TestConfig {
#[proptest(strategy = "16usize..1*1024*1024")]
Expand All @@ -448,6 +459,15 @@ mod tests {
max_forward_seek_distance: u64,
#[proptest(strategy = "1u64..4*1024*1024")]
max_backward_seek_distance: u64,
#[proptest(strategy = "data_cache_config_strategy()")]
data_cache_config: DataCacheConfig,
}

fn data_cache_config_strategy() -> impl Strategy<Value = DataCacheConfig> {
prop_oneof![
Just(DataCacheConfig::NoCache),
(16usize..2 * 1024 * 1024).prop_map(|block_size| DataCacheConfig::InMemoryCache { block_size })
]
}

type GetObjectFn<E> = dyn Fn(&str, &str, u64, ETag) -> Box<dyn PrefetchGetObject<ClientError = E>>;
Expand Down Expand Up @@ -477,25 +497,39 @@ mod tests {
}
}

fn create_prefetcher<Client>(client: Client, config: PrefetcherConfig) -> PrefetcherBox<Client::ClientError>
fn create_prefetcher<Client>(
client: Client,
config: PrefetcherConfig,
cache_config: DataCacheConfig,
) -> PrefetcherBox<Client::ClientError>
where
Client: ObjectClient + Send + Sync + 'static,
{
let runtime = ThreadPool::builder().pool_size(1).create().unwrap();
create_prefetcher_with_runtime(client, runtime, config)
create_prefetcher_with_runtime(client, runtime, config, cache_config)
}

fn create_prefetcher_with_runtime<Client, Runtime>(
client: Client,
runtime: Runtime,
config: PrefetcherConfig,
cache_config: DataCacheConfig,
) -> PrefetcherBox<Client::ClientError>
where
Client: ObjectClient + Send + Sync + 'static,
Runtime: Spawn + Send + Sync + 'static,
{
let part_feed = ClientPartFeed::new(Arc::new(client), runtime);
PrefetcherBox::new(part_feed, config)
match cache_config {
DataCacheConfig::NoCache => {
let part_feed = ClientPartFeed::new(Arc::new(client), runtime);
PrefetcherBox::new(part_feed, config)
}
DataCacheConfig::InMemoryCache { block_size } => {
let cache = InMemoryDataCache::new(block_size as u64);
let part_feed = CachedPartFeed::new(Arc::new(client), runtime, cache);
PrefetcherBox::new(part_feed, config)
}
}
}

fn run_sequential_read_test(size: u64, read_size: usize, test_config: TestConfig) {
Expand All @@ -518,7 +552,7 @@ mod tests {
max_backward_seek_distance: test_config.max_backward_seek_distance,
};

let prefetcher = create_prefetcher(client, prefetcher_config);
let prefetcher = create_prefetcher(client, prefetcher_config, test_config.data_cache_config);

let mut request = prefetcher.get("test-bucket", "hello", size, etag);

Expand All @@ -536,41 +570,47 @@ mod tests {
assert_eq!(next_offset, size);
}

#[test]
fn sequential_read_small() {
#[test_case(DataCacheConfig::NoCache)]
#[test_case(DataCacheConfig::InMemoryCache { block_size: 1 * MB })]
fn sequential_read_small(data_cache_config: DataCacheConfig) {
let config = TestConfig {
first_request_size: 256 * 1024,
max_request_size: 1024 * 1024 * 1024,
sequential_prefetch_multiplier: 8,
client_part_size: 8 * 1024 * 1024,
max_forward_seek_distance: 16 * 1024 * 1024,
max_backward_seek_distance: 2 * 1024 * 1024,
data_cache_config,
};
run_sequential_read_test(1024 * 1024 + 111, 1024 * 1024, config);
}

#[test]
fn sequential_read_medium() {
#[test_case(DataCacheConfig::NoCache)]
#[test_case(DataCacheConfig::InMemoryCache { block_size: 1 * MB })]
fn sequential_read_medium(data_cache_config: DataCacheConfig) {
let config = TestConfig {
first_request_size: 256 * 1024,
max_request_size: 64 * 1024 * 1024,
sequential_prefetch_multiplier: 8,
client_part_size: 8 * 1024 * 1024,
max_forward_seek_distance: 16 * 1024 * 1024,
max_backward_seek_distance: 2 * 1024 * 1024,
data_cache_config,
};
run_sequential_read_test(16 * 1024 * 1024 + 111, 1024 * 1024, config);
}

#[test]
fn sequential_read_large() {
#[test_case(DataCacheConfig::NoCache)]
#[test_case(DataCacheConfig::InMemoryCache { block_size: 1 * MB })]
fn sequential_read_large(data_cache_config: DataCacheConfig) {
let config = TestConfig {
first_request_size: 256 * 1024,
max_request_size: 64 * 1024 * 1024,
sequential_prefetch_multiplier: 8,
client_part_size: 8 * 1024 * 1024,
max_forward_seek_distance: 16 * 1024 * 1024,
max_backward_seek_distance: 2 * 1024 * 1024,
data_cache_config,
};
run_sequential_read_test(256 * 1024 * 1024 + 111, 1024 * 1024, config);
}
Expand Down Expand Up @@ -599,7 +639,7 @@ mod tests {
sequential_prefetch_multiplier: test_config.sequential_prefetch_multiplier,
..Default::default()
};
let prefetcher = create_prefetcher(client, prefetcher_config);
let prefetcher = create_prefetcher(client, prefetcher_config, test_config.data_cache_config);

let mut request = prefetcher.get("test-bucket", "hello", size, etag);

Expand All @@ -621,17 +661,23 @@ mod tests {
assert!(next_offset < size); // Since we're injecting failures, shouldn't make it to the end
}

#[test_case("invalid range; length=42")]
#[test_case("invalid range; length=42", DataCacheConfig::NoCache)]
#[test_case("invalid range; length=42", DataCacheConfig::InMemoryCache { block_size: 1 * MB })]
// test case for the request failure due to etag not matching
#[test_case("At least one of the pre-conditions you specified did not hold")]
fn fail_request_sequential_small(err_value: &str) {
#[test_case(
"At least one of the pre-conditions you specified did not hold",
DataCacheConfig::NoCache
)]
#[test_case("At least one of the pre-conditions you specified did not hold", DataCacheConfig::InMemoryCache { block_size: 1 * MB })]
fn fail_request_sequential_small(err_value: &str, data_cache_config: DataCacheConfig) {
let config = TestConfig {
first_request_size: 256 * 1024,
max_request_size: 1024 * 1024 * 1024,
sequential_prefetch_multiplier: 8,
client_part_size: 8 * 1024 * 1024,
max_forward_seek_distance: 16 * 1024 * 1024,
max_backward_seek_distance: 2 * 1024 * 1024,
data_cache_config,
};

let mut get_failures = HashMap::new();
Expand Down Expand Up @@ -673,6 +719,7 @@ mod tests {
client_part_size: 181682,
max_forward_seek_distance: 1,
max_backward_seek_distance: 18668,
data_cache_config: DataCacheConfig::NoCache,
};
run_sequential_read_test(object_size, read_size, config);
}
Expand All @@ -696,7 +743,7 @@ mod tests {
max_backward_seek_distance: test_config.max_backward_seek_distance,
..Default::default()
};
let prefetcher = create_prefetcher(client, prefetcher_config);
let prefetcher = create_prefetcher(client, prefetcher_config, test_config.data_cache_config);

let mut request = prefetcher.get("test-bucket", "hello", object_size, etag);

Expand Down Expand Up @@ -757,6 +804,7 @@ mod tests {
client_part_size: 516882,
max_forward_seek_distance: 16 * 1024 * 1024,
max_backward_seek_distance: 2 * 1024 * 1024,
data_cache_config: DataCacheConfig::NoCache,
};
run_random_read_test(object_size, reads, config);
}
Expand All @@ -772,6 +820,7 @@ mod tests {
client_part_size: 1219731,
max_forward_seek_distance: 16 * 1024 * 1024,
max_backward_seek_distance: 2 * 1024 * 1024,
data_cache_config: DataCacheConfig::NoCache,
};
run_random_read_test(object_size, reads, config);
}
Expand All @@ -787,6 +836,7 @@ mod tests {
client_part_size: 1219731,
max_forward_seek_distance: 2260662,
max_backward_seek_distance: 2369799,
data_cache_config: DataCacheConfig::NoCache,
};
run_random_read_test(object_size, reads, config);
}
Expand All @@ -802,6 +852,7 @@ mod tests {
client_part_size: 1972409,
max_forward_seek_distance: 2810651,
max_backward_seek_distance: 3531090,
data_cache_config: DataCacheConfig::NoCache,
};
run_random_read_test(object_size, reads, config);
}
Expand All @@ -828,7 +879,7 @@ mod tests {
first_request_size: FIRST_REQUEST_SIZE,
..Default::default()
};
let prefetcher = create_prefetcher(client, prefetcher_config);
let prefetcher = create_prefetcher(client, prefetcher_config, DataCacheConfig::NoCache);

// Try every possible seek from first_read_size
for offset in first_read_size + 1..OBJECT_SIZE {
Expand Down Expand Up @@ -864,7 +915,7 @@ mod tests {
first_request_size: FIRST_REQUEST_SIZE,
..Default::default()
};
let prefetcher = create_prefetcher(client, prefetcher_config);
let prefetcher = create_prefetcher(client, prefetcher_config, DataCacheConfig::NoCache);

// Try every possible seek from first_read_size
for offset in 0..first_read_size {
Expand Down Expand Up @@ -924,7 +975,8 @@ mod tests {
..Default::default()
};

let prefetcher = create_prefetcher_with_runtime(client, ShuttleRuntime, prefetcher_config);
let prefetcher =
create_prefetcher_with_runtime(client, ShuttleRuntime, prefetcher_config, DataCacheConfig::NoCache);

let mut request = prefetcher.get("test-bucket", "hello", object_size, file_etag);

Expand Down Expand Up @@ -981,7 +1033,8 @@ mod tests {
..Default::default()
};

let prefetcher = create_prefetcher_with_runtime(client, ShuttleRuntime, prefetcher_config);
let prefetcher =
create_prefetcher_with_runtime(client, ShuttleRuntime, prefetcher_config, DataCacheConfig::NoCache);

let mut request = prefetcher.get("test-bucket", "hello", object_size, file_etag);

Expand Down
Loading

0 comments on commit 70a1106

Please sign in to comment.