Skip to content

Commit

Permalink
return file name as u128 for disk buckets (#33044)
Browse files Browse the repository at this point in the history
  • Loading branch information
jeffwashington authored Aug 29, 2023
1 parent 9bc09c9 commit 551317d
Show file tree
Hide file tree
Showing 2 changed files with 43 additions and 30 deletions.
26 changes: 15 additions & 11 deletions bucket_map/src/bucket.rs
Original file line number Diff line number Diff line change
Expand Up @@ -116,7 +116,7 @@ impl<'b, T: Clone + Copy + PartialEq + std::fmt::Debug + 'static> Bucket<T> {
stats: Arc<BucketMapStats>,
count: Arc<AtomicU64>,
) -> Self {
let index = BucketStorage::new(
let (index, _file_name) = BucketStorage::new(
Arc::clone(&drives),
1,
std::mem::size_of::<IndexEntry<T>>() as u64,
Expand Down Expand Up @@ -569,7 +569,7 @@ impl<'b, T: Clone + Copy + PartialEq + std::fmt::Debug + 'static> Bucket<T> {
count += 1;
// grow relative to the current capacity
let new_capacity = (current_capacity * 110 / 100).max(anticipated_size);
let mut index = BucketStorage::new_with_capacity(
let (mut index, _file_name) = BucketStorage::new_with_capacity(
Arc::clone(&self.drives),
1,
std::mem::size_of::<IndexEntry<T>>() as u64,
Expand Down Expand Up @@ -649,14 +649,17 @@ impl<'b, T: Clone + Copy + PartialEq + std::fmt::Debug + 'static> Bucket<T> {
if self.data.get(ix).is_none() {
for i in self.data.len()..ix {
// insert empty data buckets
self.add_data_bucket(BucketStorage::new(
Arc::clone(&self.drives),
1 << i,
Self::elem_size(),
self.index.max_search,
Arc::clone(&self.stats.data),
Arc::default(),
));
self.add_data_bucket(
BucketStorage::new(
Arc::clone(&self.drives),
1 << i,
Self::elem_size(),
self.index.max_search,
Arc::clone(&self.stats.data),
Arc::default(),
)
.0,
);
}
self.add_data_bucket(bucket);
} else {
Expand All @@ -671,7 +674,7 @@ impl<'b, T: Clone + Copy + PartialEq + std::fmt::Debug + 'static> Bucket<T> {
/// grow a data bucket
/// The application of the new bucket is deferred until the next write lock.
pub fn grow_data(&self, data_index: u64, current_capacity_pow2: u8) {
let new_bucket = BucketStorage::new_resized(
let (new_bucket, _file_name) = BucketStorage::new_resized(
&self.drives,
self.index.max_search,
self.data.get(data_index as usize),
Expand Down Expand Up @@ -798,6 +801,7 @@ mod tests {
Arc::default(),
Arc::default(),
)
.0
}

#[test]
Expand Down
47 changes: 28 additions & 19 deletions bucket_map/src/bucket_storage.rs
Original file line number Diff line number Diff line change
Expand Up @@ -137,7 +137,7 @@ impl<O: BucketOccupied> BucketStorage<O> {
max_search: MaxSearch,
stats: Arc<BucketStats>,
count: Arc<AtomicU64>,
) -> Self {
) -> (Self, u128) {
let offset = O::offset_to_first_data();
let size_of_u64 = std::mem::size_of::<u64>();
assert_eq!(
Expand All @@ -147,16 +147,19 @@ impl<O: BucketOccupied> BucketStorage<O> {
);
let cell_size = elem_size * num_elems + offset as u64;
let bytes = Self::allocate_to_fill_page(&mut capacity, cell_size);
let (mmap, path) = Self::new_map(&drives, bytes, &stats);
Self {
path,
mmap,
cell_size,
count,
stats,
max_search,
contents: O::new(capacity),
}
let (mmap, path, file_name) = Self::new_map(&drives, bytes, &stats);
(
Self {
path,
mmap,
cell_size,
count,
stats,
max_search,
contents: O::new(capacity),
},
file_name,
)
}

fn allocate_to_fill_page(capacity: &mut Capacity, cell_size: u64) -> u64 {
Expand Down Expand Up @@ -187,7 +190,7 @@ impl<O: BucketOccupied> BucketStorage<O> {
max_search: MaxSearch,
stats: Arc<BucketStats>,
count: Arc<AtomicU64>,
) -> Self {
) -> (Self, u128) {
Self::new_with_capacity(
drives,
num_elems,
Expand Down Expand Up @@ -335,11 +338,12 @@ impl<O: BucketOccupied> BucketStorage<O> {
}

/// allocate a new memory mapped file of size `bytes` on one of `drives`
fn new_map(drives: &[PathBuf], bytes: u64, stats: &BucketStats) -> (MmapMut, PathBuf) {
fn new_map(drives: &[PathBuf], bytes: u64, stats: &BucketStats) -> (MmapMut, PathBuf, u128) {
let mut measure_new_file = Measure::start("measure_new_file");
let r = thread_rng().gen_range(0..drives.len());
let drive = &drives[r];
let pos = format!("{}", thread_rng().gen_range(0..u128::MAX),);
let file_random = thread_rng().gen_range(0..u128::MAX);
let pos = format!("{}", file_random,);
let file = drive.join(pos);
let mut data = OpenOptions::new()
.read(true)
Expand Down Expand Up @@ -368,7 +372,11 @@ impl<O: BucketOccupied> BucketStorage<O> {
data.flush().unwrap(); // can we skip this?
measure_flush.stop();
let mut measure_mmap = Measure::start("measure_mmap");
let res = (unsafe { MmapMut::map_mut(&data).unwrap() }, file);
let res = (
unsafe { MmapMut::map_mut(&data).unwrap() },
file,
file_random,
);
measure_mmap.stop();
stats
.new_file_us
Expand Down Expand Up @@ -433,8 +441,8 @@ impl<O: BucketOccupied> BucketStorage<O> {
num_elems: u64,
elem_size: u64,
stats: &Arc<BucketStats>,
) -> Self {
let mut new_bucket = Self::new_with_capacity(
) -> (Self, u128) {
let (mut new_bucket, file_name) = Self::new_with_capacity(
Arc::clone(drives),
num_elems,
elem_size,
Expand All @@ -449,7 +457,7 @@ impl<O: BucketOccupied> BucketStorage<O> {
new_bucket.copy_contents(bucket);
}
new_bucket.update_max_size();
new_bucket
(new_bucket, file_name)
}

/// Return the number of bytes currently allocated
Expand Down Expand Up @@ -484,7 +492,8 @@ mod test {
1,
Arc::default(),
Arc::default(),
);
)
.0;
let ix = 0;
assert!(storage.is_free(ix));
assert!(storage.occupy(ix, false).is_ok());
Expand Down

0 comments on commit 551317d

Please sign in to comment.