Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

disk index: store ref_count in data file #30974

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 9 additions & 6 deletions bucket_map/src/bucket.rs
Original file line number Diff line number Diff line change
Expand Up @@ -300,10 +300,8 @@ impl<'b, T: Clone + Copy + 'static> Bucket<T> {
elem_allocate.init(&mut self.index, key);
elem_allocate
};

elem.set_ref_count(&mut self.index, ref_count);
let num_slots = data_len as u64;
if num_slots <= 1 {
if num_slots <= 1 && ref_count == 1 {
// new data stored should be stored in IndexEntry and NOT in data file
// new data len is 0 or 1
if let OccupiedEnum::MultipleSlots(multiple_slots) =
Expand Down Expand Up @@ -335,6 +333,9 @@ impl<'b, T: Clone + Copy + 'static> Bucket<T> {

if best_fit_bucket == bucket_ix as u64 {
// in place update in same data file
MultipleSlots::set_ref_count(current_bucket, elem_loc, ref_count);

// write data
assert!(!current_bucket.is_free(elem_loc));
let slice: &mut [T] = current_bucket.get_mut_cell_slice(
elem_loc,
Expand All @@ -358,7 +359,7 @@ impl<'b, T: Clone + Copy + 'static> Bucket<T> {
}

// need to move the allocation to a best fit spot
let best_bucket = &self.data[best_fit_bucket as usize];
let best_bucket = &mut self.data[best_fit_bucket as usize];
let cap_power = best_bucket.contents.capacity_pow2();
let cap = best_bucket.capacity();
let pos = thread_rng().gen_range(0, cap);
Expand All @@ -379,15 +380,17 @@ impl<'b, T: Clone + Copy + 'static> Bucket<T> {
multiple_slots
.set_storage_capacity_when_created_pow2(best_bucket.contents.capacity_pow2());
multiple_slots.set_num_slots(num_slots);
MultipleSlots::set_ref_count(best_bucket, ix, ref_count);

elem.set_slot_count_enum_value(
&mut self.index,
OccupiedEnum::MultipleSlots(&multiple_slots),
);
//debug!( "DATA ALLOC {:?} {} {} {}", key, elem.data_location, best_bucket.capacity, elem_uid );
let best_bucket = &mut self.data[best_fit_bucket as usize];
best_bucket.occupy(ix, false).unwrap();
if num_slots > 0 {
// copy slotlist into the data bucket
let best_bucket = &mut self.data[best_fit_bucket as usize];
best_bucket.occupy(ix, false).unwrap();
let slice =
best_bucket.get_mut_cell_slice(ix, num_slots, IncludeHeader::NoHeader);
slice.iter_mut().zip(data).for_each(|(dest, src)| {
Expand Down
14 changes: 12 additions & 2 deletions bucket_map/src/bucket_storage.rs
Original file line number Diff line number Diff line change
Expand Up @@ -264,6 +264,18 @@ impl<O: BucketOccupied> BucketStorage<O> {
}
}

pub(crate) fn get_header<T>(&self, ix: u64) -> &T {
let slice = self.get_cell_slice::<T>(ix, 1, IncludeHeader::Header);
// SAFETY: `get_cell_slice` ensures there's at least one element in the slice
unsafe { slice.get_unchecked(0) }
}

pub(crate) fn get_header_mut<T>(&mut self, ix: u64) -> &mut T {
let slice = self.get_mut_cell_slice::<T>(ix, 1, IncludeHeader::Header);
// SAFETY: `get_mut_cell_slice` ensures there's at least one element in the slice
unsafe { slice.get_unchecked_mut(0) }
}

pub(crate) fn get<T>(&self, ix: u64) -> &T {
let slice = self.get_cell_slice::<T>(ix, 1, IncludeHeader::NoHeader);
// SAFETY: `get_cell_slice` ensures there's at least one element in the slice
Expand All @@ -276,14 +288,12 @@ impl<O: BucketOccupied> BucketStorage<O> {
unsafe { slice.get_unchecked_mut(0) }
}

#[allow(dead_code)]
pub(crate) fn get_mut_from_parts<T>(item_slice: &mut [u8]) -> &mut T {
debug_assert!(std::mem::size_of::<T>() <= item_slice.len());
let item = item_slice.as_mut_ptr() as *mut T;
unsafe { &mut *item }
}

#[allow(dead_code)]
pub(crate) fn get_from_parts<T>(item_slice: &[u8]) -> &T {
debug_assert!(std::mem::size_of::<T>() <= item_slice.len());
let item = item_slice.as_ptr() as *const T;
Expand Down
153 changes: 88 additions & 65 deletions bucket_map/src/index_entry.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,13 +12,12 @@ use {
std::{fmt::Debug, marker::PhantomData},
};

/// allocated in `contents` in a BucketStorage
pub struct BucketWithBitVec {
occupied: BitVec,
capacity_pow2: Capacity,
}
/// in use/occupied
const OCCUPIED_OCCUPIED: u8 = 1;
/// free, ie. not occupied
const OCCUPIED_FREE: u8 = 0;

impl BucketCapacity for BucketWithBitVec {
impl BucketCapacity for BucketWithHeader {
fn capacity(&self) -> u64 {
self.capacity_pow2.capacity()
}
Expand All @@ -27,26 +26,47 @@ impl BucketCapacity for BucketWithBitVec {
}
}

impl BucketOccupied for BucketWithBitVec {
/// header for elements in a bucket
/// needs to be multiple of size_of::<u64>()
#[derive(Copy, Clone)]
#[repr(C)]
struct DataBucketRefCountOccupiedHeader {
/// stores `ref_count` and
/// occupied = OCCUPIED_OCCUPIED or OCCUPIED_FREE
packed_ref_count: PackedRefCount,
}

/// allocated in `contents` in a BucketStorage
#[derive(Copy, Clone)]
#[repr(C)]
pub struct BucketWithHeader {
capacity_pow2: Capacity,
}

impl BucketOccupied for BucketWithHeader {
fn occupy(&mut self, element: &mut [u8], ix: usize) {
assert!(self.is_free(element, ix));
self.occupied.set(ix as u64, true);
let entry: &mut DataBucketRefCountOccupiedHeader =
BucketStorage::<BucketWithHeader>::get_mut_from_parts(element);
entry.packed_ref_count.set_occupied(OCCUPIED_OCCUPIED);
}
fn free(&mut self, element: &mut [u8], ix: usize) {
assert!(!self.is_free(element, ix));
self.occupied.set(ix as u64, false);
let entry: &mut DataBucketRefCountOccupiedHeader =
BucketStorage::<BucketWithHeader>::get_mut_from_parts(element);
entry.packed_ref_count.set_occupied(OCCUPIED_FREE);
}
fn is_free(&self, _element: &[u8], ix: usize) -> bool {
!self.occupied.get(ix as u64)
fn is_free(&self, element: &[u8], _ix: usize) -> bool {
let entry: &DataBucketRefCountOccupiedHeader =
BucketStorage::<BucketWithHeader>::get_from_parts(element);
entry.packed_ref_count.occupied() == OCCUPIED_FREE
}
fn offset_to_first_data() -> usize {
// no header, nothing stored in data stream
0
std::mem::size_of::<DataBucketRefCountOccupiedHeader>()
}
fn new(capacity: Capacity) -> Self {
assert!(matches!(capacity, Capacity::Pow2(_)));
Self {
occupied: BitVec::new_fill(false, capacity.capacity()),
capacity_pow2: capacity,
}
}
Expand Down Expand Up @@ -131,7 +151,7 @@ impl<T> BucketCapacity for IndexBucketUsingBitVecBits<T> {
}
}

pub type DataBucket = BucketWithBitVec;
pub type DataBucket = BucketWithHeader;
pub type IndexBucket<T> = IndexBucketUsingBitVecBits<T>;

/// contains the index of an entry in the index bucket.
Expand All @@ -147,7 +167,6 @@ pub struct IndexEntryPlaceInBucket<T: 'static> {
/// stored in the index bucket
pub struct IndexEntry<T: Clone + Copy> {
pub(crate) key: Pubkey, // can this be smaller if we have reduced the keys into buckets already?
packed_ref_count: PackedRefCount,
/// depends on the contents of ref_count.slot_count_enum
contents: SingleElementOrMultipleSlots<T>,
}
Expand All @@ -160,8 +179,8 @@ pub(crate) const MAX_LEGAL_REFCOUNT: RefCount = RefCount::MAX >> 1;
#[repr(C)]
#[derive(Debug, Default, Copy, Clone, Eq, PartialEq)]
pub(crate) struct PackedRefCount {
/// currently unused
pub(crate) unused: B1,
/// whether this entry in the data file is occupied or not
pub(crate) occupied: B1,
/// ref_count of this entry. We don't need any where near 63 bits for this value
pub(crate) ref_count: B63,
}
Expand Down Expand Up @@ -231,6 +250,26 @@ impl MultipleSlots {
self.storage_offset()
<< (storage.contents.capacity_pow2() - self.storage_capacity_when_created_pow2())
}

/// ref_count is stored in the header per cell, in `packed_ref_count`
pub fn set_ref_count(
data_bucket: &mut BucketStorage<DataBucket>,
data_ix: u64,
ref_count: RefCount,
) {
data_bucket
.get_header_mut::<DataBucketRefCountOccupiedHeader>(data_ix)
.packed_ref_count
.set_ref_count(ref_count);
}

/// ref_count is stored in the header per cell, in `packed_ref_count`
pub fn ref_count(data_bucket: &BucketStorage<DataBucket>, data_ix: u64) -> RefCount {
data_bucket
.get_header::<DataBucketRefCountOccupiedHeader>(data_ix)
.packed_ref_count
.ref_count()
}
}

#[repr(C)]
Expand Down Expand Up @@ -343,45 +382,41 @@ impl<T: Copy + 'static> IndexEntryPlaceInBucket<T> {
index_entry.key = *pubkey;
}

fn ref_count(&self, index_bucket: &BucketStorage<IndexBucket<T>>) -> RefCount {
let index_entry = index_bucket.get::<IndexEntry<T>>(self.ix);
index_entry.packed_ref_count.ref_count()
}

pub(crate) fn read_value<'a>(
&self,
index_bucket: &'a BucketStorage<IndexBucket<T>>,
data_buckets: &'a [BucketStorage<DataBucket>],
) -> (&'a [T], RefCount) {
(
match self.get_slot_count_enum(index_bucket) {
OccupiedEnum::ZeroSlots => {
// num_slots is 0. This means we don't have an actual allocation.
&[]
}
OccupiedEnum::OneSlotInIndex(single_element) => {
// only element is stored in the index entry
// Note that the lifetime comes from `index_bucket` here.
std::slice::from_ref(single_element)
}
OccupiedEnum::MultipleSlots(multiple_slots) => {
// data is in data file, so return a ref to that data
let data_bucket_ix = multiple_slots.data_bucket_ix();
let data_bucket = &data_buckets[data_bucket_ix as usize];
let loc = multiple_slots.data_loc(data_bucket);
assert!(!data_bucket.is_free(loc));
data_bucket.get_cell_slice::<T>(
loc,
multiple_slots.num_slots,
IncludeHeader::NoHeader,
)
}
_ => {
panic!("trying to read data from a free entry");
}
},
self.ref_count(index_bucket),
)
let mut ref_count = 1;
let slot_list = match self.get_slot_count_enum(index_bucket) {
OccupiedEnum::ZeroSlots => {
// num_slots is 0. This means empty slot list and ref_count=1
&[]
}
OccupiedEnum::OneSlotInIndex(single_element) => {
// only element is stored in the index entry
std::slice::from_ref(single_element)
}
OccupiedEnum::MultipleSlots(multiple_slots) => {
// slot list and ref_count are in data file
let data_bucket_ix =
MultipleSlots::data_bucket_from_num_slots(multiple_slots.num_slots);
let data_bucket = &data_buckets[data_bucket_ix as usize];
let loc = multiple_slots.data_loc(data_bucket);
assert!(!data_bucket.is_free(loc));

ref_count = MultipleSlots::ref_count(data_bucket, loc);
data_bucket.get_cell_slice::<T>(
loc,
multiple_slots.num_slots,
IncludeHeader::NoHeader,
)
}
_ => {
panic!("trying to read data from a free entry");
}
};
(slot_list, ref_count)
}

pub fn new(ix: u64) -> Self {
Expand All @@ -395,18 +430,6 @@ impl<T: Copy + 'static> IndexEntryPlaceInBucket<T> {
let entry: &IndexEntry<T> = index_bucket.get(self.ix);
&entry.key
}

pub fn set_ref_count(
&self,
index_bucket: &mut BucketStorage<IndexBucket<T>>,
ref_count: RefCount,
) {
let index_entry = index_bucket.get_mut::<IndexEntry<T>>(self.ix);
index_entry
.packed_ref_count
.set_ref_count_checked(ref_count)
.expect("ref count must fit into 62 bits!");
}
}

#[cfg(test)]
Expand Down Expand Up @@ -436,7 +459,7 @@ mod tests {
#[test]
fn test_size() {
assert_eq!(std::mem::size_of::<PackedStorage>(), 1 + 7);
assert_eq!(std::mem::size_of::<IndexEntry<u64>>(), 32 + 8 + 8 + 8);
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

look at the magic disappearance of that 8!

assert_eq!(std::mem::size_of::<IndexEntry<u64>>(), 32 + 8 + 8);
}

#[test]
Expand Down