diff --git a/bucket_map/src/bucket.rs b/bucket_map/src/bucket.rs index 9a4c649db1d82b..8728c3764f2a77 100644 --- a/bucket_map/src/bucket.rs +++ b/bucket_map/src/bucket.rs @@ -6,6 +6,7 @@ use { bucket_storage::{BucketOccupied, BucketStorage, DEFAULT_CAPACITY_POW2}, index_entry::{ DataBucket, IndexBucket, IndexEntry, IndexEntryPlaceInBucket, MultipleSlots, + SlotCountEnum, }, MaxSearch, RefCount, }, @@ -78,8 +79,14 @@ impl Reallocated { } } +/// when updating the index, this keeps track of the previous data entry which will need to be freed +struct DataFileEntryToFree { + bucket_ix: usize, + location: u64, +} + // >= 2 instances of BucketStorage per 'bucket' in the bucket map. 1 for index, >= 1 for data -pub struct Bucket { +pub struct Bucket { drives: Arc>, //index pub index: BucketStorage>, @@ -263,7 +270,7 @@ impl<'b, T: Clone + Copy + 'static> Bucket { pub fn try_write( &mut self, key: &Pubkey, - data: impl Iterator, + mut data: impl Iterator, data_len: usize, ref_count: RefCount, ) -> Result<(), BucketMapError> { @@ -287,71 +294,115 @@ impl<'b, T: Clone + Copy + 'static> Bucket { }; elem.set_ref_count(&mut self.index, ref_count); - let current_multiple_slots = elem.get_multiple_slots(&self.index); - let bucket_ix = current_multiple_slots.data_bucket_ix(); let num_slots = data_len as u64; - if best_fit_bucket == bucket_ix && current_multiple_slots.num_slots() > 0 { - let current_bucket = &mut self.data[bucket_ix as usize]; - // in place update - let elem_loc = current_multiple_slots.data_loc(current_bucket); - assert!(!current_bucket.is_free(elem_loc)); - let slice: &mut [T] = current_bucket.get_mut_cell_slice(elem_loc, data_len as u64); - let current_multiple_slots = elem.get_multiple_slots_mut(&mut self.index); - current_multiple_slots.set_num_slots(num_slots); - - slice.iter_mut().zip(data).for_each(|(dest, src)| { - *dest = *src; + if num_slots <= 1 { + // new data stored should be stored in IndexEntry and NOT in data file + // new data len is 0 or 1 + if let SlotCountEnum::MultipleSlots(multiple_slots) = + elem.get_slot_count_enum(&self.index) + { + let bucket_ix = multiple_slots.data_bucket_ix() as usize; + // free the entry in the data bucket the data was previously stored in + let loc = multiple_slots.data_loc(&self.data[bucket_ix ]); + self.data[bucket_ix ].free(loc); + } + elem.set_slot_count_enum_value( + &mut self.index, + if let Some(single_element) = data.next() { + SlotCountEnum::OneSlotInIndex(single_element) + } else { + SlotCountEnum::ZeroSlots + }, + ); + return Ok(()); + } + + // storing the slot list requires using the data file + let mut old_data_entry_to_free = None; + // see if old elements were in a data file + if let Some(multiple_slots) = elem.get_multiple_slots_mut(&mut self.index) { + let bucket_ix = multiple_slots.data_bucket_ix() as usize; + let current_bucket = &mut self.data[bucket_ix]; + let elem_loc = multiple_slots.data_loc(current_bucket); + + if best_fit_bucket == bucket_ix as u64 { + // in place update in same data file + assert!(!current_bucket.is_free(elem_loc)); + let slice: &mut [T] = current_bucket.get_mut_cell_slice(elem_loc, data_len as u64); + multiple_slots.set_num_slots(num_slots); + + slice.iter_mut().zip(data).for_each(|(dest, src)| { + *dest = *src; + }); + return Ok(()); + } + + // not updating in place, so remember old entry to free + // Wait to free until we make sure we don't have to resize the best_fit_bucket + old_data_entry_to_free = Some(DataFileEntryToFree { + bucket_ix, + location: elem_loc, }); - Ok(()) - } else { - // need to move the allocation to a best fit spot - let best_bucket = &self.data[best_fit_bucket as usize]; - let current_bucket = &self.data[bucket_ix as usize]; - let cap_power = best_bucket.capacity_pow2; - let cap = best_bucket.capacity(); - let pos = thread_rng().gen_range(0, cap); - // max search is increased here by a lot for this search. The idea is that we just have to find an empty bucket somewhere. - // We don't mind waiting on a new write (by searching longer). Writing is done in the background only. - // Wasting space by doubling the bucket size is worse behavior. We expect more - // updates and fewer inserts, so we optimize for more compact data. - // We can accomplish this by increasing how many locations we're willing to search for an empty data cell. - // For the index bucket, it is more like a hash table and we have to exhaustively search 'max_search' to prove an item does not exist. - // And we do have to support the 'does not exist' case with good performance. So, it makes sense to grow the index bucket when it is too large. - // For data buckets, the offset is stored in the index, so it is directly looked up. So, the only search is on INSERT or update to a new sized value. - for i in pos..pos + (max_search * 10).min(cap) { - let ix = i % cap; - if best_bucket.is_free(ix) { - let elem_loc = current_multiple_slots.data_loc(current_bucket); - let old_slots = current_multiple_slots.num_slots(); - let multiple_slots = elem.get_multiple_slots_mut(&mut self.index); - multiple_slots.set_storage_offset(ix); - multiple_slots - .set_storage_capacity_when_created_pow2(best_bucket.capacity_pow2); - multiple_slots.set_num_slots(num_slots); - if old_slots > 0 { - let current_bucket = &mut self.data[bucket_ix as usize]; - current_bucket.free(elem_loc); - } - //debug!( "DATA ALLOC {:?} {} {} {}", key, elem.data_location, best_bucket.capacity, elem_uid ); - if num_slots > 0 { - let best_bucket = &mut self.data[best_fit_bucket as usize]; - best_bucket.occupy(ix, false).unwrap(); - let slice = best_bucket.get_mut_cell_slice(ix, num_slots); - slice.iter_mut().zip(data).for_each(|(dest, src)| { - *dest = *src; - }); - } - return Ok(()); + } + + // need to move the allocation to a best fit spot + let best_bucket = &self.data[best_fit_bucket as usize]; + let cap_power = best_bucket.capacity_pow2; + let cap = best_bucket.capacity(); + let pos = thread_rng().gen_range(0, cap); + let mut success = false; + // max search is increased here by a lot for this search. The idea is that we just have to find an empty bucket somewhere. + // We don't mind waiting on a new write (by searching longer). Writing is done in the background only. + // Wasting space by doubling the bucket size is worse behavior. We expect more + // updates and fewer inserts, so we optimize for more compact data. + // We can accomplish this by increasing how many locations we're willing to search for an empty data cell. + // For the index bucket, it is more like a hash table and we have to exhaustively search 'max_search' to prove an item does not exist. + // And we do have to support the 'does not exist' case with good performance. So, it makes sense to grow the index bucket when it is too large. + // For data buckets, the offset is stored in the index, so it is directly looked up. So, the only search is on INSERT or update to a new sized value. + for i in pos..pos + (max_search * 10).min(cap) { + let ix = i % cap; + if best_bucket.is_free(ix) { + let mut multiple_slots = MultipleSlots::default(); + multiple_slots.set_storage_offset(ix); + multiple_slots.set_storage_capacity_when_created_pow2(best_bucket.capacity_pow2); + multiple_slots.set_num_slots(num_slots); + elem.set_slot_count_enum_value( + &mut self.index, + SlotCountEnum::MultipleSlots(&multiple_slots), + ); + //debug!( "DATA ALLOC {:?} {} {} {}", key, elem.data_location, best_bucket.capacity, elem_uid ); + if num_slots > 0 { + // copy slotlist into the data bucket + let best_bucket = &mut self.data[best_fit_bucket as usize]; + best_bucket.occupy(ix, false).unwrap(); + let slice = best_bucket.get_mut_cell_slice(ix, num_slots); + slice.iter_mut().zip(data).for_each(|(dest, src)| { + *dest = *src; + }); } + success = true; + break; } - Err(BucketMapError::DataNoSpace((best_fit_bucket, cap_power))) } + if !success { + return Err(BucketMapError::DataNoSpace((best_fit_bucket, cap_power))); + } + if let Some(DataFileEntryToFree { + bucket_ix, + location, + }) = old_data_entry_to_free + { + // free the entry in the data bucket the data was previously stored in + self.data[bucket_ix].free(location); + } + Ok(()) } pub fn delete_key(&mut self, key: &Pubkey) { if let Some((elem, elem_ix)) = self.find_index_entry(key) { - let multiple_slots = elem.get_multiple_slots_mut(&mut self.index); - if multiple_slots.num_slots() > 0 { + if let SlotCountEnum::MultipleSlots(multiple_slots) = + elem.get_slot_count_enum(&self.index) + { let ix = multiple_slots.data_bucket_ix() as usize; let data_bucket = &self.data[ix]; let loc = multiple_slots.data_loc(data_bucket); diff --git a/bucket_map/src/bucket_map.rs b/bucket_map/src/bucket_map.rs index 0193642c408022..309eabf37d8494 100644 --- a/bucket_map/src/bucket_map.rs +++ b/bucket_map/src/bucket_map.rs @@ -365,7 +365,7 @@ mod tests { let v = (0..count) .map(|x| (x as usize, x as usize /*thread_rng().gen::()*/)) .collect::>(); - let rc = thread_rng().gen::(); + let rc = thread_rng().gen_range(0, RefCount::MAX >> 2); (v, rc) }; diff --git a/bucket_map/src/bucket_storage.rs b/bucket_map/src/bucket_storage.rs index bbba6fdcafd45b..d063e1024c82a8 100644 --- a/bucket_map/src/bucket_storage.rs +++ b/bucket_map/src/bucket_storage.rs @@ -372,7 +372,7 @@ mod test { let mut storage = BucketStorage::>::new( Arc::new(paths), 1, - 1, + std::mem::size_of::>() as u64, 1, Arc::default(), Arc::default(), diff --git a/bucket_map/src/index_entry.rs b/bucket_map/src/index_entry.rs index 80dc68967fdfcf..3ae3867ca4bd7c 100644 --- a/bucket_map/src/index_entry.rs +++ b/bucket_map/src/index_entry.rs @@ -68,21 +68,31 @@ pub struct IndexEntryPlaceInBucket { #[derive(Copy, Clone)] // one instance of this per item in the index // stored in the index bucket -pub struct IndexEntry { - pub key: Pubkey, // can this be smaller if we have reduced the keys into buckets already? - ref_count: RefCount, // can this be smaller? Do we ever need more than 4B refcounts? - multiple_slots: MultipleSlots, - _phantom: PhantomData<&'static T>, +pub struct IndexEntry { + pub(crate) key: Pubkey, // can this be smaller if we have reduced the keys into buckets already? + packed_ref_count: PackedRefCount, + /// depends on the contents of ref_count.slot_count_enum + pub(crate) contents: SingleElementOrMultipleSlots, +} + +#[bitfield(bits = 64)] +#[repr(C)] +#[derive(Debug, Default, Copy, Clone, Eq, PartialEq)] +pub(crate) struct PackedRefCount { + /// tag for `SlotCountEnum` + pub(crate) slot_count_enum: B2, + /// ref_count of this entry. We don't need any where near 62 bits for this value + pub(crate) ref_count: B62, } /// required fields when an index element references the data file #[repr(C)] -#[derive(Debug, Default, Copy, Clone)] +#[derive(Debug, Default, Copy, Clone, Eq, PartialEq)] pub(crate) struct MultipleSlots { // if the bucket doubled, the index can be recomputed using storage_cap_and_offset.create_bucket_capacity_pow2 storage_cap_and_offset: PackedStorage, /// num elements in the slot list - num_slots: Slot, // can this be smaller? epoch size should ~ be the max len. this is the num elements in the slot list + num_slots: Slot, } impl MultipleSlots { @@ -141,6 +151,69 @@ impl MultipleSlots { } } +#[repr(C)] +#[derive(Copy, Clone)] +pub(crate) union SingleElementOrMultipleSlots { + /// the slot list contains a single element. No need for an entry in the data file. + /// The element itself is stored in place in the index entry + pub(crate) single_element: T, + /// the slot list ocntains more than one element. This contains the reference to the data file. + pub(crate) multiple_slots: MultipleSlots, +} + +#[repr(u8)] +#[derive(Debug, Eq, PartialEq)] +pub(crate) enum SlotCountEnum<'a, T> { + /// this spot is not allocated + Free = 0, + /// zero slots in the slot list + ZeroSlots = 1, + /// one slot in the slot list, it is stored in the index + OneSlotInIndex(&'a T) = 2, + /// > 1 slots, slots are stored in data file + MultipleSlots(&'a MultipleSlots) = 3, +} + +impl IndexEntry { + pub(crate) fn get_slot_count_enum(&self) -> SlotCountEnum<'_, T> { + unsafe { + match self.packed_ref_count.slot_count_enum() { + 0 => SlotCountEnum::Free, + 1 => SlotCountEnum::ZeroSlots, + 2 => SlotCountEnum::OneSlotInIndex(&self.contents.single_element), + 3 => SlotCountEnum::MultipleSlots(&self.contents.multiple_slots), + _ => { + panic!("unexpected value"); + } + } + } + } + + pub(crate) fn get_multiple_slots_mut(&mut self) -> Option<&mut MultipleSlots> { + unsafe { + match self.packed_ref_count.slot_count_enum() { + 3 => Some(&mut self.contents.multiple_slots), + _ => None, + } + } + } + + pub(crate) fn set_slot_count_enum_value<'a>(&'a mut self, value: SlotCountEnum<'a, T>) { + self.packed_ref_count.set_slot_count_enum(match value { + SlotCountEnum::Free => 0, + SlotCountEnum::ZeroSlots => 1, + SlotCountEnum::OneSlotInIndex(single_element) => { + self.contents.single_element = *single_element; + 2 + } + SlotCountEnum::MultipleSlots(multiple_slots) => { + self.contents.multiple_slots = *multiple_slots; + 3 + } + }); + } +} + /// Pack the storage offset and capacity-when-crated-pow2 fields into a single u64 #[bitfield(bits = 64)] #[repr(C)] @@ -150,71 +223,72 @@ struct PackedStorage { offset: B56, } -impl IndexEntryPlaceInBucket { - pub fn init(&self, index_bucket: &mut BucketStorage>, pubkey: &Pubkey) { - let index_entry = index_bucket.get_mut::>(self.ix); - index_entry.key = *pubkey; - index_entry.ref_count = 0; - index_entry.multiple_slots = MultipleSlots::default(); - } - - pub fn set_storage_capacity_when_created_pow2( +impl IndexEntryPlaceInBucket { + pub(crate) fn get_slot_count_enum<'a>( &self, - index_bucket: &mut BucketStorage>, - storage_capacity_when_created_pow2: u8, - ) { - self.get_multiple_slots_mut(index_bucket) - .set_storage_capacity_when_created_pow2(storage_capacity_when_created_pow2); + index_bucket: &'a BucketStorage>, + ) -> SlotCountEnum<'a, T> { + let index_entry = index_bucket.get::>(self.ix); + index_entry.get_slot_count_enum() } - pub fn set_storage_offset( + pub(crate) fn get_multiple_slots_mut<'a>( &self, - index_bucket: &mut BucketStorage>, - storage_offset: u64, - ) { - self.get_multiple_slots_mut(index_bucket) - .set_storage_offset(storage_offset); + index_bucket: &'a mut BucketStorage>, + ) -> Option<&'a mut MultipleSlots> { + let index_entry = index_bucket.get_mut::>(self.ix); + index_entry.get_multiple_slots_mut() } - pub(crate) fn get_multiple_slots<'a>( + pub(crate) fn set_slot_count_enum_value<'a>( &self, - index_bucket: &'a BucketStorage>, - ) -> &'a MultipleSlots { - &index_bucket.get::>(self.ix).multiple_slots + index_bucket: &'a mut BucketStorage>, + value: SlotCountEnum<'a, T>, + ) { + let index_entry = index_bucket.get_mut::>(self.ix); + index_entry.set_slot_count_enum_value(value); } - pub(crate) fn get_multiple_slots_mut<'a>( - &self, - index_bucket: &'a mut BucketStorage>, - ) -> &'a mut MultipleSlots { - &mut index_bucket - .get_mut::>(self.ix) - .multiple_slots + pub fn init(&self, index_bucket: &mut BucketStorage>, pubkey: &Pubkey) { + self.set_slot_count_enum_value(index_bucket, SlotCountEnum::ZeroSlots); + let index_entry = index_bucket.get_mut::>(self.ix); + index_entry.key = *pubkey; + index_entry.packed_ref_count.set_ref_count(0); } pub fn ref_count(&self, index_bucket: &BucketStorage>) -> RefCount { let index_entry = index_bucket.get::>(self.ix); - index_entry.ref_count + index_entry.packed_ref_count.ref_count() } pub fn read_value<'a>( &self, - index_bucket: &BucketStorage>, + index_bucket: &'a BucketStorage>, data_buckets: &'a [BucketStorage], ) -> Option<(&'a [T], RefCount)> { - let multiple_slots = self.get_multiple_slots(index_bucket); - let num_slots = multiple_slots.num_slots(); - let slice = if num_slots > 0 { - let data_bucket_ix = multiple_slots.data_bucket_ix(); - let data_bucket = &data_buckets[data_bucket_ix as usize]; - let loc = multiple_slots.data_loc(data_bucket); - assert!(!data_bucket.is_free(loc)); - data_bucket.get_cell_slice(loc, num_slots) - } else { - // num_slots is 0. This means we don't have an actual allocation. - &[] - }; - Some((slice, self.ref_count(index_bucket))) + Some(( + match self.get_slot_count_enum(index_bucket) { + SlotCountEnum::ZeroSlots => { + // num_slots is 0. This means we don't have an actual allocation. + &[] + } + SlotCountEnum::OneSlotInIndex(single_element) => { + // only element is stored in the index entry + std::slice::from_ref(single_element) + } + SlotCountEnum::MultipleSlots(multiple_slots) => { + let data_bucket_ix = multiple_slots.data_bucket_ix(); + let data_bucket = &data_buckets[data_bucket_ix as usize]; + let loc = multiple_slots.data_loc(data_bucket); + assert!(!data_bucket.is_free(loc)); + data_bucket.get_cell_slice::(loc, multiple_slots.num_slots) + } + _ => { + unimplemented!(); + } + }, + self.ref_count(index_bucket), + )) } pub fn new(ix: u64) -> Self { @@ -235,7 +309,10 @@ impl IndexEntryPlaceInBucket { ref_count: RefCount, ) { let index_entry = index_bucket.get_mut::>(self.ix); - index_entry.ref_count = ref_count; + index_entry + .packed_ref_count + .set_ref_count_checked(ref_count) + .expect("ref count must fit into 62 bits!") } } @@ -247,13 +324,14 @@ mod tests { tempfile::tempdir, }; - impl IndexEntry { + impl IndexEntry { pub fn new(key: Pubkey) -> Self { IndexEntry { key, - ref_count: 0, - multiple_slots: MultipleSlots::default(), - _phantom: PhantomData, + packed_ref_count: PackedRefCount::default(), + contents: SingleElementOrMultipleSlots { + multiple_slots: MultipleSlots::default(), + }, } } } @@ -281,7 +359,10 @@ mod tests { #[test] fn test_size() { assert_eq!(std::mem::size_of::(), 1 + 7); - assert_eq!(std::mem::size_of::>(), 32 + 8 + 8 + 8); + assert_eq!( + std::mem::size_of::>(), + 32 + 8 + (8 + 8).max(std::mem::size_of::()) + ); } fn index_bucket_for_testing() -> BucketStorage> {