Skip to content

Commit

Permalink
disk index: store single slot list in index entry
Browse files Browse the repository at this point in the history
  • Loading branch information
jeffwashington committed Mar 30, 2023
1 parent 1c2d0e9 commit 89354f4
Show file tree
Hide file tree
Showing 4 changed files with 373 additions and 171 deletions.
180 changes: 121 additions & 59 deletions bucket_map/src/bucket.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,10 @@ use {
bucket_map::BucketMapError,
bucket_stats::BucketMapStats,
bucket_storage::{BucketOccupied, BucketStorage, DEFAULT_CAPACITY_POW2},
index_entry::{DataBucket, IndexBucket, IndexEntry, IndexEntryPlaceInBucket},
index_entry::{
DataBucket, IndexBucket, IndexEntry, IndexEntryPlaceInBucket, MultipleSlots,
SlotCountEnum,
},
MaxSearch, RefCount,
},
rand::{thread_rng, Rng},
Expand Down Expand Up @@ -77,8 +80,14 @@ impl<I: BucketOccupied, D: BucketOccupied> Reallocated<I, D> {
}
}

/// when updating the index, this keeps track of the previous data entry which will need to be freed
struct DataFileEntryToFree {
bucket_ix: usize,
location: u64,
}

// >= 2 instances of BucketStorage per 'bucket' in the bucket map. 1 for index, >= 1 for data
pub struct Bucket<T: 'static> {
pub struct Bucket<T: Copy + 'static> {
drives: Arc<Vec<PathBuf>>,
//index
pub index: BucketStorage<IndexBucket<T>>,
Expand Down Expand Up @@ -264,7 +273,7 @@ impl<'b, T: Clone + Copy + 'static> Bucket<T> {
pub fn try_write(
&mut self,
key: &Pubkey,
data: impl Iterator<Item = &'b T>,
mut data: impl Iterator<Item = &'b T>,
data_len: usize,
ref_count: RefCount,
) -> Result<(), BucketMapError> {
Expand All @@ -288,72 +297,125 @@ impl<'b, T: Clone + Copy + 'static> Bucket<T> {
};

elem.set_ref_count(&mut self.index, ref_count);
let bucket_ix = elem.data_bucket_ix(&self.index);
let num_slots = data_len as u64;
if best_fit_bucket == bucket_ix && elem.num_slots(&self.index) > 0 {
let current_bucket = &mut self.data[bucket_ix as usize];
// in place update
let elem_loc = elem.data_loc(&self.index, current_bucket);
assert!(!current_bucket.is_free(elem_loc));
let slice: &mut [T] = current_bucket.get_mut_cell_slice(elem_loc, data_len as u64);
elem.set_num_slots(&mut self.index, num_slots);

slice.iter_mut().zip(data).for_each(|(dest, src)| {
*dest = *src;
let use_data_storage = num_slots > 1;

if !use_data_storage {
// new data stored should be stored in elem.`first_element`
// new data len is 0 or 1
let mut free_info = None;
if let SlotCountEnum::MultipleSlots(multiple_slots) =
elem.get_slot_count_enum(&self.index)
{
// free old data location
let bucket_ix =
IndexEntry::<T>::data_bucket_from_num_slots(multiple_slots.num_slots);
free_info = Some((
bucket_ix as usize,
IndexEntryPlaceInBucket::<T>::data_loc(
&self.data[bucket_ix as usize],
multiple_slots,
),
));
}
elem.set_slot_count_enum_value(
&mut self.index,
if let Some(single_element) = data.next() {
SlotCountEnum::OneSlotInIndex(single_element)
} else {
SlotCountEnum::ZeroSlots
},
);
if let Some((bucket_ix, elem_loc)) = free_info {
// free the entry in the data bucket the data was previously stored in
self.data[bucket_ix].free(elem_loc);
}
return Ok(());
}
// storing the slot list requires using the data file

let mut old_data_entry_to_free = None;
// see if old elements were in a data file
if let Some(multiple_slots) = elem.get_multiple_slots_mut(&mut self.index) {
let bucket_ix =
IndexEntry::<T>::data_bucket_from_num_slots(multiple_slots.num_slots) as usize;
let current_bucket = &mut self.data[bucket_ix];
let elem_loc = IndexEntryPlaceInBucket::<T>::data_loc(current_bucket, multiple_slots);

if best_fit_bucket == bucket_ix as u64 {
// in place update in same data file
assert!(!current_bucket.is_free(elem_loc));
let slice: &mut [T] = current_bucket.get_mut_cell_slice(elem_loc, data_len as u64);
multiple_slots.num_slots = num_slots;

slice.iter_mut().zip(data).for_each(|(dest, src)| {
*dest = *src;
});
return Ok(());
}

old_data_entry_to_free = Some(DataFileEntryToFree {
bucket_ix,
location: elem_loc,
});
Ok(())
} else {
// need to move the allocation to a best fit spot
let best_bucket = &self.data[best_fit_bucket as usize];
let current_bucket = &self.data[bucket_ix as usize];
let cap_power = best_bucket.capacity_pow2;
let cap = best_bucket.capacity();
let pos = thread_rng().gen_range(0, cap);
// max search is increased here by a lot for this search. The idea is that we just have to find an empty bucket somewhere.
// We don't mind waiting on a new write (by searching longer). Writing is done in the background only.
// Wasting space by doubling the bucket size is worse behavior. We expect more
// updates and fewer inserts, so we optimize for more compact data.
// We can accomplish this by increasing how many locations we're willing to search for an empty data cell.
// For the index bucket, it is more like a hash table and we have to exhaustively search 'max_search' to prove an item does not exist.
// And we do have to support the 'does not exist' case with good performance. So, it makes sense to grow the index bucket when it is too large.
// For data buckets, the offset is stored in the index, so it is directly looked up. So, the only search is on INSERT or update to a new sized value.
for i in pos..pos + (max_search * 10).min(cap) {
let ix = i % cap;
if best_bucket.is_free(ix) {
let elem_loc = elem.data_loc(&self.index, current_bucket);
let old_slots = elem.num_slots(&self.index);
elem.set_storage_offset(&mut self.index, ix);
elem.set_storage_capacity_when_created_pow2(
&mut self.index,
best_bucket.capacity_pow2,
);
elem.set_num_slots(&mut self.index, num_slots);
if old_slots > 0 {
let current_bucket = &mut self.data[bucket_ix as usize];
current_bucket.free(elem_loc);
}
//debug!( "DATA ALLOC {:?} {} {} {}", key, elem.data_location, best_bucket.capacity, elem_uid );
if num_slots > 0 {
let best_bucket = &mut self.data[best_fit_bucket as usize];
best_bucket.occupy(ix, false).unwrap();
let slice = best_bucket.get_mut_cell_slice(ix, num_slots);
slice.iter_mut().zip(data).for_each(|(dest, src)| {
*dest = *src;
});
}
return Ok(());
}

// need to move the allocation to a best fit spot
let best_bucket = &self.data[best_fit_bucket as usize];
let cap_power = best_bucket.capacity_pow2;
let cap = best_bucket.capacity();
let pos = thread_rng().gen_range(0, cap);
// max search is increased here by a lot for this search. The idea is that we just have to find an empty bucket somewhere.
// We don't mind waiting on a new write (by searching longer). Writing is done in the background only.
// Wasting space by doubling the bucket size is worse behavior. We expect more
// updates and fewer inserts, so we optimize for more compact data.
// We can accomplish this by increasing how many locations we're willing to search for an empty data cell.
// For the index bucket, it is more like a hash table and we have to exhaustively search 'max_search' to prove an item does not exist.
// And we do have to support the 'does not exist' case with good performance. So, it makes sense to grow the index bucket when it is too large.
// For data buckets, the offset is stored in the index, so it is directly looked up. So, the only search is on INSERT or update to a new sized value.
for i in pos..pos + (max_search * 10).min(cap) {
let ix = i % cap;
if best_bucket.is_free(ix) {
let mut multiple_slots = MultipleSlots::default();
multiple_slots.set_storage_offset(ix);
multiple_slots.set_storage_capacity_when_created_pow2(best_bucket.capacity_pow2);
multiple_slots.num_slots = num_slots;
elem.set_slot_count_enum_value(
&mut self.index,
SlotCountEnum::MultipleSlots(&multiple_slots),
);
//debug!( "DATA ALLOC {:?} {} {} {}", key, elem.data_location, best_bucket.capacity, elem_uid );
if num_slots > 0 {
let best_bucket = &mut self.data[best_fit_bucket as usize];
best_bucket.occupy(ix, false).unwrap();
let slice = best_bucket.get_mut_cell_slice(ix, num_slots);
slice.iter_mut().zip(data).for_each(|(dest, src)| {
*dest = *src;
});
}
if let Some(DataFileEntryToFree {
bucket_ix,
location,
}) = old_data_entry_to_free
{
// free the entry in the data bucket the data was previously stored in
self.data[bucket_ix].free(location);
}
return Ok(());
}
Err(BucketMapError::DataNoSpace((best_fit_bucket, cap_power)))
}
Err(BucketMapError::DataNoSpace((best_fit_bucket, cap_power)))
}

pub fn delete_key(&mut self, key: &Pubkey) {
if let Some((elem, elem_ix)) = self.find_index_entry(key) {
if elem.num_slots(&self.index) > 0 {
let ix = elem.data_bucket_ix(&self.index) as usize;
if let SlotCountEnum::MultipleSlots(multiple_slots) =
elem.get_slot_count_enum(&self.index)
{
let ix =
IndexEntry::<T>::data_bucket_from_num_slots(multiple_slots.num_slots) as usize;
let data_bucket = &self.data[ix];
let loc = elem.data_loc(&self.index, data_bucket);
let loc = IndexEntryPlaceInBucket::<T>::data_loc(data_bucket, multiple_slots);
let data_bucket = &mut self.data[ix];
//debug!( "DATA FREE {:?} {} {} {}", key, elem.data_location, data_bucket.capacity, elem_uid );
data_bucket.free(loc);
Expand Down
2 changes: 1 addition & 1 deletion bucket_map/src/bucket_map.rs
Original file line number Diff line number Diff line change
Expand Up @@ -365,7 +365,7 @@ mod tests {
let v = (0..count)
.map(|x| (x as usize, x as usize /*thread_rng().gen::<usize>()*/))
.collect::<Vec<_>>();
let rc = thread_rng().gen::<RefCount>();
let rc = thread_rng().gen_range(0, RefCount::MAX >> 2);
(v, rc)
};

Expand Down
9 changes: 8 additions & 1 deletion bucket_map/src/bucket_storage.rs
Original file line number Diff line number Diff line change
Expand Up @@ -205,6 +205,13 @@ impl<O: BucketOccupied> BucketStorage<O> {
unsafe { &mut *item }
}

pub(crate) fn get_from_parts<T: Sized>(item_slice: &[u8]) -> &T {
unsafe {
let item = item_slice.as_ptr() as *const T;
&*item
}
}

pub fn get_mut<T: Sized>(&mut self, ix: u64) -> &mut T {
let start = self.get_start_offset_no_header(ix);
let item_slice = &mut self.mmap[start..];
Expand Down Expand Up @@ -373,7 +380,7 @@ mod test {
let mut storage = BucketStorage::<IndexBucket<u64>>::new(
Arc::new(paths),
1,
1,
std::mem::size_of::<crate::index_entry::IndexEntry<u64>>() as u64,
1,
Arc::default(),
Arc::default(),
Expand Down
Loading

0 comments on commit 89354f4

Please sign in to comment.