Skip to content

Commit

Permalink
disk index: store single slot list in index entry
Browse files Browse the repository at this point in the history
  • Loading branch information
jeffwashington committed Mar 31, 2023
1 parent a2797eb commit eef4496
Show file tree
Hide file tree
Showing 3 changed files with 204 additions and 113 deletions.
167 changes: 109 additions & 58 deletions bucket_map/src/bucket.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ use {
bucket_storage::{BucketOccupied, BucketStorage, DEFAULT_CAPACITY_POW2},
index_entry::{
DataBucket, IndexBucket, IndexEntry, IndexEntryPlaceInBucket, MultipleSlots,
OccupiedEnum,
},
MaxSearch, RefCount,
},
Expand Down Expand Up @@ -78,8 +79,14 @@ impl<I: BucketOccupied, D: BucketOccupied> Reallocated<I, D> {
}
}

/// when updating the index, this keeps track of the previous data entry which will need to be freed
struct DataFileEntryToFree {
bucket_ix: usize,
location: u64,
}

// >= 2 instances of BucketStorage per 'bucket' in the bucket map. 1 for index, >= 1 for data
pub struct Bucket<T: 'static> {
pub struct Bucket<T: Copy + 'static> {
drives: Arc<Vec<PathBuf>>,
//index
pub index: BucketStorage<IndexBucket<T>>,
Expand Down Expand Up @@ -263,7 +270,7 @@ impl<'b, T: Clone + Copy + 'static> Bucket<T> {
pub fn try_write(
&mut self,
key: &Pubkey,
data: impl Iterator<Item = &'b T>,
mut data: impl Iterator<Item = &'b T>,
data_len: usize,
ref_count: RefCount,
) -> Result<(), BucketMapError> {
Expand All @@ -287,71 +294,115 @@ impl<'b, T: Clone + Copy + 'static> Bucket<T> {
};

elem.set_ref_count(&mut self.index, ref_count);
let current_multiple_slots = elem.get_multiple_slots(&self.index);
let bucket_ix = current_multiple_slots.data_bucket_ix();
let num_slots = data_len as u64;
if best_fit_bucket == bucket_ix && current_multiple_slots.num_slots() > 0 {
let current_bucket = &mut self.data[bucket_ix as usize];
// in place update
let elem_loc = current_multiple_slots.data_loc(current_bucket);
assert!(!current_bucket.is_free(elem_loc));
let slice: &mut [T] = current_bucket.get_mut_cell_slice(elem_loc, data_len as u64);
let current_multiple_slots = elem.get_multiple_slots_mut(&mut self.index);
current_multiple_slots.set_num_slots(num_slots);

slice.iter_mut().zip(data).for_each(|(dest, src)| {
*dest = *src;
if num_slots <= 1 {
// new data stored should be stored in IndexEntry and NOT in data file
// new data len is 0 or 1
if let OccupiedEnum::MultipleSlots(multiple_slots) =
elem.get_slot_count_enum(&self.index)
{
let bucket_ix = multiple_slots.data_bucket_ix() as usize;
// free the entry in the data bucket the data was previously stored in
let loc = multiple_slots.data_loc(&self.data[bucket_ix]);
self.data[bucket_ix].free(loc);
}
elem.set_slot_count_enum_value(
&mut self.index,
if let Some(single_element) = data.next() {
OccupiedEnum::OneSlotInIndex(single_element)
} else {
OccupiedEnum::ZeroSlots
},
);
return Ok(());
}

// storing the slot list requires using the data file
let mut old_data_entry_to_free = None;
// see if old elements were in a data file
if let Some(multiple_slots) = elem.get_multiple_slots_mut(&mut self.index) {
let bucket_ix = multiple_slots.data_bucket_ix() as usize;
let current_bucket = &mut self.data[bucket_ix];
let elem_loc = multiple_slots.data_loc(current_bucket);

if best_fit_bucket == bucket_ix as u64 {
// in place update in same data file
assert!(!current_bucket.is_free(elem_loc));
let slice: &mut [T] = current_bucket.get_mut_cell_slice(elem_loc, data_len as u64);
multiple_slots.set_num_slots(num_slots);

slice.iter_mut().zip(data).for_each(|(dest, src)| {
*dest = *src;
});
return Ok(());
}

// not updating in place, so remember old entry to free
// Wait to free until we make sure we don't have to resize the best_fit_bucket
old_data_entry_to_free = Some(DataFileEntryToFree {
bucket_ix,
location: elem_loc,
});
Ok(())
} else {
// need to move the allocation to a best fit spot
let best_bucket = &self.data[best_fit_bucket as usize];
let current_bucket = &self.data[bucket_ix as usize];
let cap_power = best_bucket.capacity_pow2;
let cap = best_bucket.capacity();
let pos = thread_rng().gen_range(0, cap);
// max search is increased here by a lot for this search. The idea is that we just have to find an empty bucket somewhere.
// We don't mind waiting on a new write (by searching longer). Writing is done in the background only.
// Wasting space by doubling the bucket size is worse behavior. We expect more
// updates and fewer inserts, so we optimize for more compact data.
// We can accomplish this by increasing how many locations we're willing to search for an empty data cell.
// For the index bucket, it is more like a hash table and we have to exhaustively search 'max_search' to prove an item does not exist.
// And we do have to support the 'does not exist' case with good performance. So, it makes sense to grow the index bucket when it is too large.
// For data buckets, the offset is stored in the index, so it is directly looked up. So, the only search is on INSERT or update to a new sized value.
for i in pos..pos + (max_search * 10).min(cap) {
let ix = i % cap;
if best_bucket.is_free(ix) {
let elem_loc = current_multiple_slots.data_loc(current_bucket);
let old_slots = current_multiple_slots.num_slots();
let multiple_slots = elem.get_multiple_slots_mut(&mut self.index);
multiple_slots.set_storage_offset(ix);
multiple_slots
.set_storage_capacity_when_created_pow2(best_bucket.capacity_pow2);
multiple_slots.set_num_slots(num_slots);
if old_slots > 0 {
let current_bucket = &mut self.data[bucket_ix as usize];
current_bucket.free(elem_loc);
}
//debug!( "DATA ALLOC {:?} {} {} {}", key, elem.data_location, best_bucket.capacity, elem_uid );
if num_slots > 0 {
let best_bucket = &mut self.data[best_fit_bucket as usize];
best_bucket.occupy(ix, false).unwrap();
let slice = best_bucket.get_mut_cell_slice(ix, num_slots);
slice.iter_mut().zip(data).for_each(|(dest, src)| {
*dest = *src;
});
}
return Ok(());
}

// need to move the allocation to a best fit spot
let best_bucket = &self.data[best_fit_bucket as usize];
let cap_power = best_bucket.capacity_pow2;
let cap = best_bucket.capacity();
let pos = thread_rng().gen_range(0, cap);
let mut success = false;
// max search is increased here by a lot for this search. The idea is that we just have to find an empty bucket somewhere.
// We don't mind waiting on a new write (by searching longer). Writing is done in the background only.
// Wasting space by doubling the bucket size is worse behavior. We expect more
// updates and fewer inserts, so we optimize for more compact data.
// We can accomplish this by increasing how many locations we're willing to search for an empty data cell.
// For the index bucket, it is more like a hash table and we have to exhaustively search 'max_search' to prove an item does not exist.
// And we do have to support the 'does not exist' case with good performance. So, it makes sense to grow the index bucket when it is too large.
// For data buckets, the offset is stored in the index, so it is directly looked up. So, the only search is on INSERT or update to a new sized value.
for i in pos..pos + (max_search * 10).min(cap) {
let ix = i % cap;
if best_bucket.is_free(ix) {
let mut multiple_slots = MultipleSlots::default();
multiple_slots.set_storage_offset(ix);
multiple_slots.set_storage_capacity_when_created_pow2(best_bucket.capacity_pow2);
multiple_slots.set_num_slots(num_slots);
elem.set_slot_count_enum_value(
&mut self.index,
OccupiedEnum::MultipleSlots(&multiple_slots),
);
//debug!( "DATA ALLOC {:?} {} {} {}", key, elem.data_location, best_bucket.capacity, elem_uid );
if num_slots > 0 {
// copy slotlist into the data bucket
let best_bucket = &mut self.data[best_fit_bucket as usize];
best_bucket.occupy(ix, false).unwrap();
let slice = best_bucket.get_mut_cell_slice(ix, num_slots);
slice.iter_mut().zip(data).for_each(|(dest, src)| {
*dest = *src;
});
}
success = true;
break;
}
Err(BucketMapError::DataNoSpace((best_fit_bucket, cap_power)))
}
if !success {
return Err(BucketMapError::DataNoSpace((best_fit_bucket, cap_power)));
}
if let Some(DataFileEntryToFree {
bucket_ix,
location,
}) = old_data_entry_to_free
{
// free the entry in the data bucket the data was previously stored in
self.data[bucket_ix].free(location);
}
Ok(())
}

pub fn delete_key(&mut self, key: &Pubkey) {
if let Some((elem, elem_ix)) = self.find_index_entry(key) {
let multiple_slots = elem.get_multiple_slots_mut(&mut self.index);
if multiple_slots.num_slots() > 0 {
if let OccupiedEnum::MultipleSlots(multiple_slots) =
elem.get_slot_count_enum(&self.index)
{
let ix = multiple_slots.data_bucket_ix() as usize;
let data_bucket = &self.data[ix];
let loc = multiple_slots.data_loc(data_bucket);
Expand Down
4 changes: 2 additions & 2 deletions bucket_map/src/bucket_storage.rs
Original file line number Diff line number Diff line change
Expand Up @@ -190,13 +190,13 @@ impl<O: BucketOccupied> BucketStorage<O> {
unsafe { slice.get_unchecked_mut(0) }
}

pub(crate) fn get_mut_from_parts<T: Sized>(item_slice: &mut [u8]) -> &mut T {
pub(crate) fn get_mut_from_parts<T>(item_slice: &mut [u8]) -> &mut T {
debug_assert!(std::mem::size_of::<T>() <= item_slice.len());
let item = item_slice.as_mut_ptr() as *mut T;
unsafe { &mut *item }
}

pub(crate) fn get_from_parts<T: Sized>(item_slice: &[u8]) -> &T {
pub(crate) fn get_from_parts<T>(item_slice: &[u8]) -> &T {
debug_assert!(std::mem::size_of::<T>() <= item_slice.len());
let item = item_slice.as_ptr() as *const T;
unsafe { &*item }
Expand Down
Loading

0 comments on commit eef4496

Please sign in to comment.