diff --git a/bucket_map/src/bucket.rs b/bucket_map/src/bucket.rs index 5c29756b6216eb..8e81d3e70a70f1 100644 --- a/bucket_map/src/bucket.rs +++ b/bucket_map/src/bucket.rs @@ -3,7 +3,10 @@ use { bucket_item::BucketItem, bucket_map::BucketMapError, bucket_stats::BucketMapStats, - bucket_storage::{BucketOccupied, BucketStorage, IncludeHeader, DEFAULT_CAPACITY_POW2}, + bucket_storage::{ + BucketCapacity, BucketOccupied, BucketStorage, Capacity, IncludeHeader, + DEFAULT_CAPACITY_POW2, + }, index_entry::{ DataBucket, IndexBucket, IndexEntry, IndexEntryPlaceInBucket, MultipleSlots, OccupiedEnum, @@ -88,11 +91,11 @@ struct DataFileEntryToFree { // >= 2 instances of BucketStorage per 'bucket' in the bucket map. 1 for index, >= 1 for data pub struct Bucket { drives: Arc>, - //index + /// index pub index: BucketStorage>, - //random offset for the index + /// random offset for the index random: u64, - //storage buckets to store SlotSlice up to a power of 2 in len + /// storage buckets to store SlotSlice up to a power of 2 in len pub data: Vec>, stats: Arc, @@ -210,7 +213,7 @@ impl<'b, T: Clone + Copy + 'static> Bucket { .fetch_add(m.as_us(), Ordering::Relaxed); match first_free { Some(ii) => Ok((None, ii)), - None => Err(BucketMapError::IndexNoSpace(index.capacity_pow2)), + None => Err(BucketMapError::IndexNoSpace(index.capacity.capacity_pow2())), } } @@ -263,7 +266,7 @@ impl<'b, T: Clone + Copy + 'static> Bucket { .stats .find_index_entry_mut_us .fetch_add(m.as_us(), Ordering::Relaxed); - Err(BucketMapError::IndexNoSpace(index.capacity_pow2)) + Err(BucketMapError::IndexNoSpace(index.capacity.capacity_pow2())) } pub fn read_value(&self, key: &Pubkey) -> Option<(&[T], RefCount)> { @@ -356,7 +359,7 @@ impl<'b, T: Clone + Copy + 'static> Bucket { // need to move the allocation to a best fit spot let best_bucket = &self.data[best_fit_bucket as usize]; - let cap_power = best_bucket.capacity_pow2; + let cap_power = best_bucket.capacity.capacity_pow2(); let cap = best_bucket.capacity(); let pos = thread_rng().gen_range(0, cap); let mut success = false; @@ -373,7 +376,8 @@ impl<'b, T: Clone + Copy + 'static> Bucket { if best_bucket.is_free(ix) { let mut multiple_slots = MultipleSlots::default(); multiple_slots.set_storage_offset(ix); - multiple_slots.set_storage_capacity_when_created_pow2(best_bucket.capacity_pow2); + multiple_slots + .set_storage_capacity_when_created_pow2(best_bucket.capacity.capacity_pow2()); multiple_slots.set_num_slots(num_slots); elem.set_slot_count_enum_value( &mut self.index, @@ -430,8 +434,8 @@ impl<'b, T: Clone + Copy + 'static> Bucket { } pub fn grow_index(&self, current_capacity_pow2: u8) { - if self.index.capacity_pow2 == current_capacity_pow2 { - let mut starting_size_pow2 = self.index.capacity_pow2; + if self.index.capacity.capacity_pow2() == current_capacity_pow2 { + let mut starting_size_pow2 = self.index.capacity.capacity_pow2(); if self.anticipated_size > 0 { // start the growth at the next pow2 larger than what would be required to hold `anticipated_size`. // This will prevent unnecessary repeated grows at startup. @@ -446,7 +450,7 @@ impl<'b, T: Clone + Copy + 'static> Bucket { 1, std::mem::size_of::>() as u64, // the subtle `+ i` here causes us to grow from the starting size by a power of 2 on each iteration of the for loop - starting_size_pow2 + i, + Capacity::Pow2(starting_size_pow2 + i), self.index.max_search, Arc::clone(&self.stats.index), Arc::clone(&self.index.count), @@ -540,7 +544,10 @@ impl<'b, T: Clone + Copy + 'static> Bucket { &self.drives, self.index.max_search, self.data.get(data_index as usize), - std::cmp::max(current_capacity_pow2 + 1, DEFAULT_CAPACITY_POW2), + Capacity::Pow2(std::cmp::max( + current_capacity_pow2 + 1, + DEFAULT_CAPACITY_POW2, + )), 1 << data_index, Self::elem_size(), &self.stats.data, @@ -564,7 +571,7 @@ impl<'b, T: Clone + Copy + 'static> Bucket { /// grow the appropriate piece. Note this takes an immutable ref. /// The actual grow is set into self.reallocated and applied later on a write lock - pub fn grow(&self, err: BucketMapError) { + pub(crate) fn grow(&self, err: BucketMapError) { match err { BucketMapError::DataNoSpace((data_index, current_capacity_pow2)) => { //debug!("GROWING SPACE {:?}", (data_index, current_capacity_pow2)); diff --git a/bucket_map/src/bucket_map.rs b/bucket_map/src/bucket_map.rs index 478f5500a80d43..f2b7f0ab89436e 100644 --- a/bucket_map/src/bucket_map.rs +++ b/bucket_map/src/bucket_map.rs @@ -47,11 +47,15 @@ impl std::fmt::Debug for BucketMap { } } +/// used to communicate resize necessary and current size. #[derive(Debug)] pub enum BucketMapError { /// (bucket_index, current_capacity_pow2) + /// Note that this is specific to data buckets DataNoSpace((u64, u8)), + /// current_capacity_pow2 + /// Note that this is specific to index buckets IndexNoSpace(u8), } diff --git a/bucket_map/src/bucket_storage.rs b/bucket_map/src/bucket_storage.rs index c9820ae4328da8..af95c0e087597d 100644 --- a/bucket_map/src/bucket_storage.rs +++ b/bucket_map/src/bucket_storage.rs @@ -49,8 +49,8 @@ pub trait BucketOccupied { /// This must be a multiple of sizeof(u64) fn offset_to_first_data() -> usize; /// initialize this struct - /// `num_elements` is the number of elements allocated in the bucket - fn new(num_elements: usize) -> Self; + /// `capacity` is the number of elements allocated in the bucket + fn new(capacity: Capacity) -> Self; /// copying entry. Any in-memory (per-bucket) data structures may need to be copied for this `ix_old`. /// no-op by default fn copying_entry( @@ -64,11 +64,19 @@ pub trait BucketOccupied { } } +pub trait BucketCapacity { + fn capacity(&self) -> u64; + fn capacity_pow2(&self) -> u8 { + unimplemented!(); + } +} + pub struct BucketStorage { path: PathBuf, mmap: MmapMut, pub cell_size: u64, - pub capacity_pow2: u8, + /// number of cells this bucket can hold + pub capacity: Capacity, pub count: Arc, pub stats: Arc, pub max_search: MaxSearch, @@ -94,12 +102,40 @@ pub(crate) enum IncludeHeader { NoHeader, } +/// 2 common ways of specifying capacity +#[derive(Debug, PartialEq, Copy, Clone)] +pub enum Capacity { + /// 1 << Pow2 produces # elements + Pow2(u8), + /// Actual # elements + Actual(u64), +} + +impl BucketCapacity for Capacity { + fn capacity(&self) -> u64 { + match self { + Capacity::Pow2(pow2) => 1 << *pow2, + Capacity::Actual(elements) => *elements, + } + } + fn capacity_pow2(&self) -> u8 { + match self { + Capacity::Pow2(pow2) => *pow2, + Capacity::Actual(_elements) => { + panic!("illegal to ask for pow2 from random capacity"); + } + } + } +} + impl BucketStorage { + /// allocate a bucket of at least `capacity` elements. + /// if capacity can be random, more may be allocated to fill the last page. pub fn new_with_capacity( drives: Arc>, num_elems: u64, elem_size: u64, - capacity_pow2: u8, + mut capacity: Capacity, max_search: MaxSearch, stats: Arc, count: Arc, @@ -112,18 +148,35 @@ impl BucketStorage { "header size must be a multiple of u64" ); let cell_size = elem_size * num_elems + offset as u64; - let bytes = (1u64 << capacity_pow2) * cell_size; + let bytes = Self::allocate_to_fill_page(&mut capacity, cell_size); let (mmap, path) = Self::new_map(&drives, bytes, &stats); Self { path, mmap, cell_size, count, - capacity_pow2, + capacity, stats, max_search, - contents: O::new(1 << capacity_pow2), + contents: O::new(capacity), + } + } + + fn allocate_to_fill_page(capacity: &mut Capacity, cell_size: u64) -> u64 { + let mut bytes = capacity.capacity() * cell_size; + if let Capacity::Actual(_) = capacity { + // maybe bump up allocation to fit a page size + const PAGE_SIZE: u64 = 4 * 1024; + let full_page_bytes = bytes / PAGE_SIZE * PAGE_SIZE / cell_size * cell_size; + if full_page_bytes < bytes { + let bytes_new = ((bytes / PAGE_SIZE) + 1) * PAGE_SIZE / cell_size * cell_size; + assert!(bytes_new >= bytes, "allocating less than requested, capacity: {}, bytes: {}, bytes_new: {}, full_page_bytes: {}", capacity.capacity(), bytes, bytes_new, full_page_bytes); + assert_eq!(bytes_new % cell_size, 0); + bytes = bytes_new; + *capacity = Capacity::Actual(bytes / cell_size); + } } + bytes } pub fn max_search(&self) -> u64 { @@ -142,7 +195,7 @@ impl BucketStorage { drives, num_elems, elem_size, - DEFAULT_CAPACITY_POW2, + Capacity::Pow2(DEFAULT_CAPACITY_POW2), max_search, stats, count, @@ -330,7 +383,7 @@ impl BucketStorage { let old_cap = old_bucket.capacity(); let old_map = &old_bucket.mmap; - let increment = self.capacity_pow2 - old_bucket.capacity_pow2; + let increment = self.capacity.capacity_pow2() - old_bucket.capacity.capacity_pow2(); let index_grow = 1 << increment; (0..old_cap as usize).for_each(|i| { if !old_bucket.is_free(i as u64) { @@ -370,7 +423,7 @@ impl BucketStorage { drives: &Arc>, max_search: MaxSearch, bucket: Option<&Self>, - capacity_pow_2: u8, + capacity: Capacity, num_elems: u64, elem_size: u64, stats: &Arc, @@ -379,7 +432,7 @@ impl BucketStorage { Arc::clone(drives), num_elems, elem_size, - capacity_pow_2, + capacity, max_search, Arc::clone(stats), bucket @@ -400,7 +453,7 @@ impl BucketStorage { /// Return the number of cells currently allocated pub fn capacity(&self) -> u64 { - 1 << self.capacity_pow2 + self.capacity.capacity() } } @@ -458,11 +511,17 @@ mod test { std::mem::size_of::() - 1 } /// initialize this struct - fn new(_num_elements: usize) -> Self { + fn new(_num_elements: Capacity) -> Self { Self {} } } + impl BucketCapacity for BucketBadHeader { + fn capacity(&self) -> u64 { + unimplemented!(); + } + } + #[test] #[should_panic(expected = "assertion failed: `(left == right)`")] fn test_header_size() { @@ -470,7 +529,7 @@ mod test { Arc::default(), 0, 0, - 0, + Capacity::Pow2(0), 0, Arc::default(), Arc::default(), diff --git a/bucket_map/src/index_entry.rs b/bucket_map/src/index_entry.rs index f613e7da173c19..fea14c2469d001 100644 --- a/bucket_map/src/index_entry.rs +++ b/bucket_map/src/index_entry.rs @@ -2,7 +2,7 @@ use { crate::{ - bucket_storage::{BucketOccupied, BucketStorage, IncludeHeader}, + bucket_storage::{BucketCapacity, BucketOccupied, BucketStorage, Capacity, IncludeHeader}, RefCount, }, bv::BitVec, @@ -33,9 +33,9 @@ impl BucketOccupied for BucketWithBitVec { // no header, nothing stored in data stream 0 } - fn new(num_elements: usize) -> Self { + fn new(capacity: Capacity) -> Self { Self { - occupied: BitVec::new_fill(false, num_elements as u64), + occupied: BitVec::new_fill(false, capacity.capacity()), } } } @@ -64,9 +64,10 @@ impl BucketOccupied for IndexBucketUsingRefCountBits { matches!(entry.get_slot_count_enum(), OccupiedEnum::Free) } fn offset_to_first_data() -> usize { + // no header, nothing stored in data stream 0 } - fn new(_num_elements: usize) -> Self { + fn new(_capacity: Capacity) -> Self { Self { _phantom: PhantomData, } @@ -170,7 +171,8 @@ impl MultipleSlots { /// This function maps the original data location into an index in the current bucket storage. /// This is coupled with how we resize bucket storages. pub(crate) fn data_loc(&self, storage: &BucketStorage) -> u64 { - self.storage_offset() << (storage.capacity_pow2 - self.storage_capacity_when_created_pow2()) + self.storage_offset() + << (storage.capacity.capacity_pow2() - self.storage_capacity_when_created_pow2()) } }