Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Serialization benchmarks. #808

Draft
wants to merge 2 commits into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 12 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

11 changes: 10 additions & 1 deletion mountpoint-s3/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ async-channel = "2.1.1"
async-lock = "3.3.0"
async-trait = "0.1.57"
bincode = "1.3.3"
bincode2 = "2.0.1"
Comment on lines 18 to +19
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't like that we have to take a non-dev-dependency on "all the versions we want to benchmark" here. I'd rather put it behind a feature flag, or just not commit the versions we're not using.

bytes = { version = "1.2.1", features = ["serde"] }
clap = { version = "4.1.9", features = ["derive"] }
const_format = "0.2.30"
Expand Down Expand Up @@ -54,6 +55,7 @@ aws-config = "1.1.4"
aws-sdk-s3 = "1.14.0"
aws-sdk-sts = "1.12.0"
base16ct = { version = "0.1.1", features = ["alloc"] }
criterion = "0.5"
ctor = "0.2.6"
filetime = "0.2.21"
futures = { version = "*", features = ["thread-pool"] }
Expand Down Expand Up @@ -91,4 +93,11 @@ path = "src/main.rs"
[[bin]]
name = "mock-mount-s3"
path = "src/bin/mock-mount-s3.rs"
required-features = ["mountpoint-s3-client/mock"]
required-features = ["mountpoint-s3-client/mock"]

[[bench]]
name = "cache_serialization"
harness = false

[profile.bench]
debug = true
121 changes: 121 additions & 0 deletions mountpoint-s3/benches/cache_serialization.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,121 @@
use std::{fs, io::Write, path::PathBuf, sync::Arc};

use criterion::{criterion_group, criterion_main, Criterion};

use mountpoint_s3::data_cache::{
Bincode2DiskBlockFileReader, Bincode2DiskBlockFileWriter, ChecksummedBytes, DataCache, DiskDataCache,
DiskDataCacheConfig, ObjectId,
};
use mountpoint_s3_client::types::ETag;
use rand::Rng;
const BLOCK_SIZE: u64 = 1024 * 1024;

#[inline]
fn read_cache_bincode() {
let config = DiskDataCacheConfig {
block_size: BLOCK_SIZE,
limit: mountpoint_s3::data_cache::CacheLimit::Unbounded,
..Default::default()
};
let cache_directory = PathBuf::from("/tmp/mp-cache1/");
let cache = DiskDataCache::new(cache_directory, config);
let cache_key = ObjectId::new("a".into(), ETag::for_tests());
let _data = cache
.get_block(&cache_key, 0, 0)
.expect("is able to read")
.expect("data is there");
}

#[inline]
fn read_cache_bincode2() {
let config = DiskDataCacheConfig {
block_size: BLOCK_SIZE,
limit: mountpoint_s3::data_cache::CacheLimit::Unbounded,
reader: Arc::new(Bincode2DiskBlockFileReader {}),
writer: Arc::new(Bincode2DiskBlockFileWriter {}),
};
let cache_directory = PathBuf::from("/tmp/mp-cache2/");
let cache = DiskDataCache::new(cache_directory, config);
let cache_key = ObjectId::new("a".into(), ETag::for_tests());
let _data = cache
.get_block(&cache_key, 0, 0)
.expect("is able to read")
.expect("data is there");
}

#[inline]
fn read_file() {
let _read = fs::read("/tmp/mp-file/file").expect("is able to read file");
}

fn random_bytes(length: usize) -> Vec<u8> {
let mut rng = rand::thread_rng();
let random_bytes: Vec<u8> = (0..length).map(|_| rng.gen()).collect();
random_bytes
}

fn write_file(data: &Vec<u8>) {
fs::create_dir("/tmp/mp-file").expect("is able to create temp dir");
let file_path = "/tmp/mp-file/file";
let mut file = fs::File::create(file_path).expect("is able to create file");
file.write_all(data.as_slice()).expect("is able to write file");
}

fn write_cache_bincode(data: &[u8]) {
let config = DiskDataCacheConfig {
block_size: BLOCK_SIZE,
limit: mountpoint_s3::data_cache::CacheLimit::Unbounded,
..Default::default()
};
fs::create_dir("/tmp/mp-cache1").expect("is able to create dir 1");
let cache_dir = PathBuf::from("/tmp/mp-cache1/");
let cache = DiskDataCache::new(cache_dir, config);
let cache_key = ObjectId::new("a".into(), ETag::for_tests());
let data = ChecksummedBytes::new(data.to_owned().into());
cache
.put_block(cache_key.clone(), 0, 0, data)
.expect("is able to write to cache");
}

fn write_cache_bincode2(data: &[u8]) {
let config = DiskDataCacheConfig {
block_size: BLOCK_SIZE,
limit: mountpoint_s3::data_cache::CacheLimit::Unbounded,
reader: Arc::new(Bincode2DiskBlockFileReader {}),
writer: Arc::new(Bincode2DiskBlockFileWriter {}),
};
fs::create_dir("/tmp/mp-cache2").expect("is able to create dir 1");
let cache_dir = PathBuf::from("/tmp/mp-cache2/");
let cache = DiskDataCache::new(cache_dir, config);
let cache_key = ObjectId::new("a".into(), ETag::for_tests());
let data = ChecksummedBytes::new(data.to_owned().into());
cache
.put_block(cache_key.clone(), 0, 0, data)
.expect("is able to write to cache");
}

fn cleanup() {
let _ = fs::remove_dir_all("/tmp/mp-file");
let _ = fs::remove_dir_all("/tmp/mp-cache1");
let _ = fs::remove_dir_all("/tmp/mp-cache2");
}

fn setup() {
let data = random_bytes(BLOCK_SIZE.try_into().unwrap());
write_file(&data);
write_cache_bincode(&data);
write_cache_bincode2(&data);
}

pub fn criterion_benchmark(c: &mut Criterion) {
setup();

c.bench_function("read_cache_bincode", |b| b.iter(read_cache_bincode));
c.bench_function("read_cache_bincode2", |b| b.iter(read_cache_bincode2));
c.bench_function("read_file", |b| b.iter(read_file));

cleanup();
}

criterion_group!(benches, criterion_benchmark);
criterion_main!(benches);
10 changes: 7 additions & 3 deletions mountpoint-s3/src/data_cache.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,16 +6,20 @@

mod cache_directory;
mod disk_data_cache;
mod disk_data_cache_io;
mod in_memory_data_cache;

use thiserror::Error;

pub use crate::checksums::ChecksummedBytes;
pub use crate::data_cache::cache_directory::ManagedCacheDir;
pub use crate::data_cache::disk_data_cache::{CacheLimit, DiskDataCache, DiskDataCacheConfig};
pub use crate::data_cache::disk_data_cache::{CacheLimit, DiskBlock, DiskDataCache, DiskDataCacheConfig};
pub use crate::data_cache::disk_data_cache_io::{
Bincode2DiskBlockFileReader, Bincode2DiskBlockFileWriter, BincodeDiskBlockFileReader, BincodeDiskBlockFileWriter,
DiskDataReader, DiskDataWriter,
};
pub use crate::data_cache::in_memory_data_cache::InMemoryDataCache;

use crate::object::ObjectId;
pub use crate::object::ObjectId;

/// Indexes blocks within a given object.
pub type BlockIndex = u64;
Expand Down
37 changes: 20 additions & 17 deletions mountpoint-s3/src/data_cache/disk_data_cache.rs
Original file line number Diff line number Diff line change
@@ -1,9 +1,10 @@
//! Module for the on-disk data cache implementation.

use std::fs;
use std::io::{ErrorKind, Read, Seek, Write};
use std::io::{ErrorKind, Read, Write};
use std::os::unix::fs::{DirBuilderExt, OpenOptionsExt};
use std::path::{Path, PathBuf};
use std::sync::Arc;
use std::time::Instant;

use bytes::Bytes;
Expand All @@ -19,6 +20,9 @@ use crate::data_cache::DataCacheError;
use crate::object::ObjectId;
use crate::sync::Mutex;

use super::disk_data_cache_io::{
BincodeDiskBlockFileReader, BincodeDiskBlockFileWriter, DiskDataReader, DiskDataWriter,
};
use super::{BlockIndex, ChecksummedBytes, DataCache, DataCacheResult};

/// Disk and file-layout versioning.
Expand All @@ -42,13 +46,17 @@ pub struct DiskDataCacheConfig {
pub block_size: u64,
/// How to limit the cache size.
pub limit: CacheLimit,
pub reader: Arc<dyn DiskDataReader + Send + Sync>,
pub writer: Arc<dyn DiskDataWriter + Send + Sync>,
}

impl Default for DiskDataCacheConfig {
fn default() -> Self {
Self {
block_size: 1024 * 1024, // 1 MiB block size
limit: CacheLimit::AvailableSpace { min_ratio: 0.05 }, // Preserve 5% available space
reader: Arc::new(BincodeDiskBlockFileReader {}),
writer: Arc::new(BincodeDiskBlockFileWriter {}),
}
}
}
Expand Down Expand Up @@ -159,7 +167,7 @@ impl DiskBlockHeader {

/// Represents a fixed-size chunk of data that can be serialized.
#[derive(Serialize, Deserialize, Debug)]
struct DiskBlock {
pub struct DiskBlock {
/// Information describing the content of `data`, to be used to verify correctness
header: DiskBlockHeader,
/// Cached bytes
Expand Down Expand Up @@ -246,13 +254,8 @@ impl DiskDataCache {
return Err(DataCacheError::InvalidBlockContent);
}

let block: DiskBlock = match bincode::deserialize_from(&file) {
Ok(block) => block,
Err(e) => {
warn!("block could not be deserialized: {:?}", e);
return Err(DataCacheError::InvalidBlockContent);
}
};
let reader = &self.config.reader;
let block: DiskBlock = reader.read_from_file(&file)?;
let bytes = block
.data(cache_key, block_idx, block_offset)
.map_err(|err| match err {
Expand Down Expand Up @@ -287,14 +290,8 @@ impl DiskDataCache {
.mode(0o600)
.open(path.as_ref())?;
file.write_all(CACHE_VERSION.as_bytes())?;
let serialize_result = bincode::serialize_into(&mut file, &block);
if let Err(err) = serialize_result {
return match *err {
bincode::ErrorKind::Io(io_err) => return Err(DataCacheError::from(io_err)),
_ => Err(DataCacheError::InvalidBlockContent),
};
};
Ok(file.stream_position()? as usize)
let writer = &self.config.writer;
writer.write_to_file(&file, &block)
}

fn is_limit_exceeded(&self, size: usize) -> bool {
Expand Down Expand Up @@ -547,6 +544,7 @@ mod tests {
116, 97, 103, 11, 0, 0, 0, 0, 0, 0, 0, 104, 101, 108, 108, 111, 45, 119, 111, 114, 108, 100, 9, 85, 128,
46, 29, 32, 6, 192, 3, 0, 0, 0, 0, 0, 0, 0, 70, 111, 111,
];
// TODO: find a way to remove this reference to bincode
let serialized_bytes = bincode::serialize(&block).unwrap();
assert_eq!(
expected_bytes, serialized_bytes,
Expand All @@ -572,6 +570,7 @@ mod tests {
DiskDataCacheConfig {
block_size: 1024,
limit: CacheLimit::Unbounded,
..Default::default()
},
);

Expand Down Expand Up @@ -602,6 +601,7 @@ mod tests {
DiskDataCacheConfig {
block_size: 1024,
limit: CacheLimit::Unbounded,
..Default::default()
},
);

Expand Down Expand Up @@ -637,6 +637,7 @@ mod tests {
DiskDataCacheConfig {
block_size,
limit: CacheLimit::Unbounded,
..Default::default()
},
);
let cache_key_1 = ObjectId::new("a".into(), ETag::for_tests());
Expand Down Expand Up @@ -713,6 +714,7 @@ mod tests {
DiskDataCacheConfig {
block_size: 8 * 1024 * 1024,
limit: CacheLimit::Unbounded,
..Default::default()
},
);
let cache_key = ObjectId::new("a".into(), ETag::for_tests());
Expand Down Expand Up @@ -789,6 +791,7 @@ mod tests {
DiskDataCacheConfig {
block_size: BLOCK_SIZE as u64,
limit: CacheLimit::TotalSize { max_size: CACHE_LIMIT },
..Default::default()
},
);

Expand Down
Loading
Loading