From 9fb22bc0be2747dd6825b570225ba5aec6233c1f Mon Sep 17 00:00:00 2001 From: "Jeff Washington (jwash)" Date: Tue, 4 Apr 2023 21:17:48 -0500 Subject: [PATCH] disk index: store ref_count in data file (#30974) --- bucket_map/src/bucket.rs | 15 +-- bucket_map/src/bucket_storage.rs | 14 ++- bucket_map/src/index_entry.rs | 153 ++++++++++++++++++------------- 3 files changed, 109 insertions(+), 73 deletions(-) diff --git a/bucket_map/src/bucket.rs b/bucket_map/src/bucket.rs index 5ae398be4c1656..55da7817b79539 100644 --- a/bucket_map/src/bucket.rs +++ b/bucket_map/src/bucket.rs @@ -300,10 +300,8 @@ impl<'b, T: Clone + Copy + 'static> Bucket { elem_allocate.init(&mut self.index, key); elem_allocate }; - - elem.set_ref_count(&mut self.index, ref_count); let num_slots = data_len as u64; - if num_slots <= 1 { + if num_slots <= 1 && ref_count == 1 { // new data stored should be stored in IndexEntry and NOT in data file // new data len is 0 or 1 if let OccupiedEnum::MultipleSlots(multiple_slots) = @@ -335,6 +333,9 @@ impl<'b, T: Clone + Copy + 'static> Bucket { if best_fit_bucket == bucket_ix as u64 { // in place update in same data file + MultipleSlots::set_ref_count(current_bucket, elem_loc, ref_count); + + // write data assert!(!current_bucket.is_free(elem_loc)); let slice: &mut [T] = current_bucket.get_mut_cell_slice( elem_loc, @@ -358,7 +359,7 @@ impl<'b, T: Clone + Copy + 'static> Bucket { } // need to move the allocation to a best fit spot - let best_bucket = &self.data[best_fit_bucket as usize]; + let best_bucket = &mut self.data[best_fit_bucket as usize]; let cap_power = best_bucket.contents.capacity_pow2(); let cap = best_bucket.capacity(); let pos = thread_rng().gen_range(0, cap); @@ -379,15 +380,17 @@ impl<'b, T: Clone + Copy + 'static> Bucket { multiple_slots .set_storage_capacity_when_created_pow2(best_bucket.contents.capacity_pow2()); multiple_slots.set_num_slots(num_slots); + MultipleSlots::set_ref_count(best_bucket, ix, ref_count); + elem.set_slot_count_enum_value( &mut self.index, OccupiedEnum::MultipleSlots(&multiple_slots), ); //debug!( "DATA ALLOC {:?} {} {} {}", key, elem.data_location, best_bucket.capacity, elem_uid ); + let best_bucket = &mut self.data[best_fit_bucket as usize]; + best_bucket.occupy(ix, false).unwrap(); if num_slots > 0 { // copy slotlist into the data bucket - let best_bucket = &mut self.data[best_fit_bucket as usize]; - best_bucket.occupy(ix, false).unwrap(); let slice = best_bucket.get_mut_cell_slice(ix, num_slots, IncludeHeader::NoHeader); slice.iter_mut().zip(data).for_each(|(dest, src)| { diff --git a/bucket_map/src/bucket_storage.rs b/bucket_map/src/bucket_storage.rs index efac32c7d53ddf..72b68319d205da 100644 --- a/bucket_map/src/bucket_storage.rs +++ b/bucket_map/src/bucket_storage.rs @@ -264,6 +264,18 @@ impl BucketStorage { } } + pub(crate) fn get_header(&self, ix: u64) -> &T { + let slice = self.get_cell_slice::(ix, 1, IncludeHeader::Header); + // SAFETY: `get_cell_slice` ensures there's at least one element in the slice + unsafe { slice.get_unchecked(0) } + } + + pub(crate) fn get_header_mut(&mut self, ix: u64) -> &mut T { + let slice = self.get_mut_cell_slice::(ix, 1, IncludeHeader::Header); + // SAFETY: `get_mut_cell_slice` ensures there's at least one element in the slice + unsafe { slice.get_unchecked_mut(0) } + } + pub(crate) fn get(&self, ix: u64) -> &T { let slice = self.get_cell_slice::(ix, 1, IncludeHeader::NoHeader); // SAFETY: `get_cell_slice` ensures there's at least one element in the slice @@ -276,14 +288,12 @@ impl BucketStorage { unsafe { slice.get_unchecked_mut(0) } } - #[allow(dead_code)] pub(crate) fn get_mut_from_parts(item_slice: &mut [u8]) -> &mut T { debug_assert!(std::mem::size_of::() <= item_slice.len()); let item = item_slice.as_mut_ptr() as *mut T; unsafe { &mut *item } } - #[allow(dead_code)] pub(crate) fn get_from_parts(item_slice: &[u8]) -> &T { debug_assert!(std::mem::size_of::() <= item_slice.len()); let item = item_slice.as_ptr() as *const T; diff --git a/bucket_map/src/index_entry.rs b/bucket_map/src/index_entry.rs index a24ba2f7875a28..a82ec6a1a8828b 100644 --- a/bucket_map/src/index_entry.rs +++ b/bucket_map/src/index_entry.rs @@ -12,13 +12,12 @@ use { std::{fmt::Debug, marker::PhantomData}, }; -/// allocated in `contents` in a BucketStorage -pub struct BucketWithBitVec { - occupied: BitVec, - capacity_pow2: Capacity, -} +/// in use/occupied +const OCCUPIED_OCCUPIED: u8 = 1; +/// free, ie. not occupied +const OCCUPIED_FREE: u8 = 0; -impl BucketCapacity for BucketWithBitVec { +impl BucketCapacity for BucketWithHeader { fn capacity(&self) -> u64 { self.capacity_pow2.capacity() } @@ -27,26 +26,47 @@ impl BucketCapacity for BucketWithBitVec { } } -impl BucketOccupied for BucketWithBitVec { +/// header for elements in a bucket +/// needs to be multiple of size_of::() +#[derive(Copy, Clone)] +#[repr(C)] +struct DataBucketRefCountOccupiedHeader { + /// stores `ref_count` and + /// occupied = OCCUPIED_OCCUPIED or OCCUPIED_FREE + packed_ref_count: PackedRefCount, +} + +/// allocated in `contents` in a BucketStorage +#[derive(Copy, Clone)] +#[repr(C)] +pub struct BucketWithHeader { + capacity_pow2: Capacity, +} + +impl BucketOccupied for BucketWithHeader { fn occupy(&mut self, element: &mut [u8], ix: usize) { assert!(self.is_free(element, ix)); - self.occupied.set(ix as u64, true); + let entry: &mut DataBucketRefCountOccupiedHeader = + BucketStorage::::get_mut_from_parts(element); + entry.packed_ref_count.set_occupied(OCCUPIED_OCCUPIED); } fn free(&mut self, element: &mut [u8], ix: usize) { assert!(!self.is_free(element, ix)); - self.occupied.set(ix as u64, false); + let entry: &mut DataBucketRefCountOccupiedHeader = + BucketStorage::::get_mut_from_parts(element); + entry.packed_ref_count.set_occupied(OCCUPIED_FREE); } - fn is_free(&self, _element: &[u8], ix: usize) -> bool { - !self.occupied.get(ix as u64) + fn is_free(&self, element: &[u8], _ix: usize) -> bool { + let entry: &DataBucketRefCountOccupiedHeader = + BucketStorage::::get_from_parts(element); + entry.packed_ref_count.occupied() == OCCUPIED_FREE } fn offset_to_first_data() -> usize { - // no header, nothing stored in data stream - 0 + std::mem::size_of::() } fn new(capacity: Capacity) -> Self { assert!(matches!(capacity, Capacity::Pow2(_))); Self { - occupied: BitVec::new_fill(false, capacity.capacity()), capacity_pow2: capacity, } } @@ -131,7 +151,7 @@ impl BucketCapacity for IndexBucketUsingBitVecBits { } } -pub type DataBucket = BucketWithBitVec; +pub type DataBucket = BucketWithHeader; pub type IndexBucket = IndexBucketUsingBitVecBits; /// contains the index of an entry in the index bucket. @@ -147,7 +167,6 @@ pub struct IndexEntryPlaceInBucket { /// stored in the index bucket pub struct IndexEntry { pub(crate) key: Pubkey, // can this be smaller if we have reduced the keys into buckets already? - packed_ref_count: PackedRefCount, /// depends on the contents of ref_count.slot_count_enum contents: SingleElementOrMultipleSlots, } @@ -160,8 +179,8 @@ pub(crate) const MAX_LEGAL_REFCOUNT: RefCount = RefCount::MAX >> 1; #[repr(C)] #[derive(Debug, Default, Copy, Clone, Eq, PartialEq)] pub(crate) struct PackedRefCount { - /// currently unused - pub(crate) unused: B1, + /// whether this entry in the data file is occupied or not + pub(crate) occupied: B1, /// ref_count of this entry. We don't need any where near 63 bits for this value pub(crate) ref_count: B63, } @@ -231,6 +250,26 @@ impl MultipleSlots { self.storage_offset() << (storage.contents.capacity_pow2() - self.storage_capacity_when_created_pow2()) } + + /// ref_count is stored in the header per cell, in `packed_ref_count` + pub fn set_ref_count( + data_bucket: &mut BucketStorage, + data_ix: u64, + ref_count: RefCount, + ) { + data_bucket + .get_header_mut::(data_ix) + .packed_ref_count + .set_ref_count(ref_count); + } + + /// ref_count is stored in the header per cell, in `packed_ref_count` + pub fn ref_count(data_bucket: &BucketStorage, data_ix: u64) -> RefCount { + data_bucket + .get_header::(data_ix) + .packed_ref_count + .ref_count() + } } #[repr(C)] @@ -343,45 +382,41 @@ impl IndexEntryPlaceInBucket { index_entry.key = *pubkey; } - fn ref_count(&self, index_bucket: &BucketStorage>) -> RefCount { - let index_entry = index_bucket.get::>(self.ix); - index_entry.packed_ref_count.ref_count() - } - pub(crate) fn read_value<'a>( &self, index_bucket: &'a BucketStorage>, data_buckets: &'a [BucketStorage], ) -> (&'a [T], RefCount) { - ( - match self.get_slot_count_enum(index_bucket) { - OccupiedEnum::ZeroSlots => { - // num_slots is 0. This means we don't have an actual allocation. - &[] - } - OccupiedEnum::OneSlotInIndex(single_element) => { - // only element is stored in the index entry - // Note that the lifetime comes from `index_bucket` here. - std::slice::from_ref(single_element) - } - OccupiedEnum::MultipleSlots(multiple_slots) => { - // data is in data file, so return a ref to that data - let data_bucket_ix = multiple_slots.data_bucket_ix(); - let data_bucket = &data_buckets[data_bucket_ix as usize]; - let loc = multiple_slots.data_loc(data_bucket); - assert!(!data_bucket.is_free(loc)); - data_bucket.get_cell_slice::( - loc, - multiple_slots.num_slots, - IncludeHeader::NoHeader, - ) - } - _ => { - panic!("trying to read data from a free entry"); - } - }, - self.ref_count(index_bucket), - ) + let mut ref_count = 1; + let slot_list = match self.get_slot_count_enum(index_bucket) { + OccupiedEnum::ZeroSlots => { + // num_slots is 0. This means empty slot list and ref_count=1 + &[] + } + OccupiedEnum::OneSlotInIndex(single_element) => { + // only element is stored in the index entry + std::slice::from_ref(single_element) + } + OccupiedEnum::MultipleSlots(multiple_slots) => { + // slot list and ref_count are in data file + let data_bucket_ix = + MultipleSlots::data_bucket_from_num_slots(multiple_slots.num_slots); + let data_bucket = &data_buckets[data_bucket_ix as usize]; + let loc = multiple_slots.data_loc(data_bucket); + assert!(!data_bucket.is_free(loc)); + + ref_count = MultipleSlots::ref_count(data_bucket, loc); + data_bucket.get_cell_slice::( + loc, + multiple_slots.num_slots, + IncludeHeader::NoHeader, + ) + } + _ => { + panic!("trying to read data from a free entry"); + } + }; + (slot_list, ref_count) } pub fn new(ix: u64) -> Self { @@ -395,18 +430,6 @@ impl IndexEntryPlaceInBucket { let entry: &IndexEntry = index_bucket.get(self.ix); &entry.key } - - pub fn set_ref_count( - &self, - index_bucket: &mut BucketStorage>, - ref_count: RefCount, - ) { - let index_entry = index_bucket.get_mut::>(self.ix); - index_entry - .packed_ref_count - .set_ref_count_checked(ref_count) - .expect("ref count must fit into 62 bits!"); - } } #[cfg(test)] @@ -436,7 +459,7 @@ mod tests { #[test] fn test_size() { assert_eq!(std::mem::size_of::(), 1 + 7); - assert_eq!(std::mem::size_of::>(), 32 + 8 + 8 + 8); + assert_eq!(std::mem::size_of::>(), 32 + 8 + 8); } #[test]