Skip to content
This repository has been archived by the owner on Jan 13, 2025. It is now read-only.

wip: disk index: store single slot list in index entry #30963

Closed
wants to merge 2 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
169 changes: 110 additions & 59 deletions bucket_map/src/bucket.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ use {
bucket_storage::{BucketOccupied, BucketStorage, DEFAULT_CAPACITY_POW2},
index_entry::{
DataBucket, IndexBucket, IndexEntry, IndexEntryPlaceInBucket, MultipleSlots,
SlotCountEnum,
},
MaxSearch, RefCount,
},
Expand Down Expand Up @@ -78,8 +79,14 @@ impl<I: BucketOccupied, D: BucketOccupied> Reallocated<I, D> {
}
}

/// when updating the index, this keeps track of the previous data entry which will need to be freed
struct DataFileEntryToFree {
bucket_ix: usize,
location: u64,
}

// >= 2 instances of BucketStorage per 'bucket' in the bucket map. 1 for index, >= 1 for data
pub struct Bucket<T: 'static> {
pub struct Bucket<T: Copy + 'static> {
drives: Arc<Vec<PathBuf>>,
//index
pub index: BucketStorage<IndexBucket<T>>,
Expand Down Expand Up @@ -263,7 +270,7 @@ impl<'b, T: Clone + Copy + 'static> Bucket<T> {
pub fn try_write(
&mut self,
key: &Pubkey,
data: impl Iterator<Item = &'b T>,
mut data: impl Iterator<Item = &'b T>,
data_len: usize,
ref_count: RefCount,
) -> Result<(), BucketMapError> {
Expand All @@ -287,74 +294,118 @@ impl<'b, T: Clone + Copy + 'static> Bucket<T> {
};

elem.set_ref_count(&mut self.index, ref_count);
let current_multiple_slots = elem.get_multiple_slots(&self.index);
let bucket_ix = current_multiple_slots.data_bucket_ix();
let num_slots = data_len as u64;
if best_fit_bucket == bucket_ix && current_multiple_slots.num_slots() > 0 {
let current_bucket = &mut self.data[bucket_ix as usize];
// in place update
let elem_loc = elem.data_loc(&self.index, current_bucket);
assert!(!current_bucket.is_free(elem_loc));
let slice: &mut [T] = current_bucket.get_mut_cell_slice(elem_loc, data_len as u64);
let current_multiple_slots = elem.get_multiple_slots_mut(&mut self.index);
current_multiple_slots.set_num_slots(num_slots);

slice.iter_mut().zip(data).for_each(|(dest, src)| {
*dest = *src;
if num_slots <= 1 {
// new data stored should be stored in IndexEntry and NOT in data file
// new data len is 0 or 1
if let SlotCountEnum::MultipleSlots(multiple_slots) =
elem.get_slot_count_enum(&self.index)
{
let bucket_ix = multiple_slots.data_bucket_ix() as usize;
// free the entry in the data bucket the data was previously stored in
let loc = multiple_slots.data_loc(&self.data[bucket_ix ]);
self.data[bucket_ix ].free(loc);
}
elem.set_slot_count_enum_value(
&mut self.index,
if let Some(single_element) = data.next() {
SlotCountEnum::OneSlotInIndex(single_element)
} else {
SlotCountEnum::ZeroSlots
},
);
return Ok(());
}

// storing the slot list requires using the data file
let mut old_data_entry_to_free = None;
// see if old elements were in a data file
if let Some(multiple_slots) = elem.get_multiple_slots_mut(&mut self.index) {
let bucket_ix = multiple_slots.data_bucket_ix() as usize;
let current_bucket = &mut self.data[bucket_ix];
let elem_loc = multiple_slots.data_loc(current_bucket);

if best_fit_bucket == bucket_ix as u64 {
// in place update in same data file
assert!(!current_bucket.is_free(elem_loc));
let slice: &mut [T] = current_bucket.get_mut_cell_slice(elem_loc, data_len as u64);
multiple_slots.set_num_slots(num_slots);

slice.iter_mut().zip(data).for_each(|(dest, src)| {
*dest = *src;
});
return Ok(());
}

// not updating in place, so remember old entry to free
// Wait to free until we make sure we don't have to resize the best_fit_bucket
old_data_entry_to_free = Some(DataFileEntryToFree {
bucket_ix,
location: elem_loc,
});
Ok(())
} else {
// need to move the allocation to a best fit spot
let best_bucket = &self.data[best_fit_bucket as usize];
let current_bucket = &self.data[bucket_ix as usize];
let cap_power = best_bucket.capacity_pow2;
let cap = best_bucket.capacity();
let pos = thread_rng().gen_range(0, cap);
// max search is increased here by a lot for this search. The idea is that we just have to find an empty bucket somewhere.
// We don't mind waiting on a new write (by searching longer). Writing is done in the background only.
// Wasting space by doubling the bucket size is worse behavior. We expect more
// updates and fewer inserts, so we optimize for more compact data.
// We can accomplish this by increasing how many locations we're willing to search for an empty data cell.
// For the index bucket, it is more like a hash table and we have to exhaustively search 'max_search' to prove an item does not exist.
// And we do have to support the 'does not exist' case with good performance. So, it makes sense to grow the index bucket when it is too large.
// For data buckets, the offset is stored in the index, so it is directly looked up. So, the only search is on INSERT or update to a new sized value.
for i in pos..pos + (max_search * 10).min(cap) {
let ix = i % cap;
if best_bucket.is_free(ix) {
let elem_loc = elem.data_loc(&self.index, current_bucket);
let old_slots = current_multiple_slots.num_slots();
let multiple_slots = elem.get_multiple_slots_mut(&mut self.index);
multiple_slots.set_storage_offset(ix);
multiple_slots
.set_storage_capacity_when_created_pow2(best_bucket.capacity_pow2);
multiple_slots.set_num_slots(num_slots);
if old_slots > 0 {
let current_bucket = &mut self.data[bucket_ix as usize];
current_bucket.free(elem_loc);
}
//debug!( "DATA ALLOC {:?} {} {} {}", key, elem.data_location, best_bucket.capacity, elem_uid );
if num_slots > 0 {
let best_bucket = &mut self.data[best_fit_bucket as usize];
best_bucket.occupy(ix, false).unwrap();
let slice = best_bucket.get_mut_cell_slice(ix, num_slots);
slice.iter_mut().zip(data).for_each(|(dest, src)| {
*dest = *src;
});
}
return Ok(());
}

// need to move the allocation to a best fit spot
let best_bucket = &self.data[best_fit_bucket as usize];
let cap_power = best_bucket.capacity_pow2;
let cap = best_bucket.capacity();
let pos = thread_rng().gen_range(0, cap);
let mut success = false;
// max search is increased here by a lot for this search. The idea is that we just have to find an empty bucket somewhere.
// We don't mind waiting on a new write (by searching longer). Writing is done in the background only.
// Wasting space by doubling the bucket size is worse behavior. We expect more
// updates and fewer inserts, so we optimize for more compact data.
// We can accomplish this by increasing how many locations we're willing to search for an empty data cell.
// For the index bucket, it is more like a hash table and we have to exhaustively search 'max_search' to prove an item does not exist.
// And we do have to support the 'does not exist' case with good performance. So, it makes sense to grow the index bucket when it is too large.
// For data buckets, the offset is stored in the index, so it is directly looked up. So, the only search is on INSERT or update to a new sized value.
for i in pos..pos + (max_search * 10).min(cap) {
let ix = i % cap;
if best_bucket.is_free(ix) {
let mut multiple_slots = MultipleSlots::default();
multiple_slots.set_storage_offset(ix);
multiple_slots.set_storage_capacity_when_created_pow2(best_bucket.capacity_pow2);
multiple_slots.set_num_slots(num_slots);
elem.set_slot_count_enum_value(
&mut self.index,
SlotCountEnum::MultipleSlots(&multiple_slots),
);
//debug!( "DATA ALLOC {:?} {} {} {}", key, elem.data_location, best_bucket.capacity, elem_uid );
if num_slots > 0 {
// copy slotlist into the data bucket
let best_bucket = &mut self.data[best_fit_bucket as usize];
best_bucket.occupy(ix, false).unwrap();
let slice = best_bucket.get_mut_cell_slice(ix, num_slots);
slice.iter_mut().zip(data).for_each(|(dest, src)| {
*dest = *src;
});
}
success = true;
break;
}
Err(BucketMapError::DataNoSpace((best_fit_bucket, cap_power)))
}
if !success {
return Err(BucketMapError::DataNoSpace((best_fit_bucket, cap_power)));
}
if let Some(DataFileEntryToFree {
bucket_ix,
location,
}) = old_data_entry_to_free
{
// free the entry in the data bucket the data was previously stored in
self.data[bucket_ix].free(location);
}
Ok(())
}

pub fn delete_key(&mut self, key: &Pubkey) {
if let Some((elem, elem_ix)) = self.find_index_entry(key) {
let multiple_slots = elem.get_multiple_slots_mut(&mut self.index);
if multiple_slots.num_slots() > 0 {
if let SlotCountEnum::MultipleSlots(multiple_slots) =
elem.get_slot_count_enum(&self.index)
{
let ix = multiple_slots.data_bucket_ix() as usize;
let data_bucket = &self.data[ix];
let loc = elem.data_loc(&self.index, data_bucket);
let loc = multiple_slots.data_loc(data_bucket);
let data_bucket = &mut self.data[ix];
//debug!( "DATA FREE {:?} {} {} {}", key, elem.data_location, data_bucket.capacity, elem_uid );
data_bucket.free(loc);
Expand Down
2 changes: 1 addition & 1 deletion bucket_map/src/bucket_map.rs
Original file line number Diff line number Diff line change
Expand Up @@ -365,7 +365,7 @@ mod tests {
let v = (0..count)
.map(|x| (x as usize, x as usize /*thread_rng().gen::<usize>()*/))
.collect::<Vec<_>>();
let rc = thread_rng().gen::<RefCount>();
let rc = thread_rng().gen_range(0, RefCount::MAX >> 2);
(v, rc)
};

Expand Down
2 changes: 1 addition & 1 deletion bucket_map/src/bucket_storage.rs
Original file line number Diff line number Diff line change
Expand Up @@ -372,7 +372,7 @@ mod test {
let mut storage = BucketStorage::<IndexBucket<u64>>::new(
Arc::new(paths),
1,
1,
std::mem::size_of::<crate::index_entry::IndexEntry<u64>>() as u64,
1,
Arc::default(),
Arc::default(),
Expand Down
Loading