Skip to content

Commit

Permalink
disk index: introduce IndexEntryPlaceInBucket
Browse files Browse the repository at this point in the history
  • Loading branch information
jeffwashington committed Mar 29, 2023
1 parent 1b552a7 commit 9f7daf8
Show file tree
Hide file tree
Showing 3 changed files with 171 additions and 82 deletions.
76 changes: 39 additions & 37 deletions bucket_map/src/bucket.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ use {
bucket_map::BucketMapError,
bucket_stats::BucketMapStats,
bucket_storage::{BucketOccupied, BucketStorage, DEFAULT_CAPACITY_POW2},
index_entry::{DataBucket, IndexBucket, IndexEntry},
index_entry::{DataBucket, IndexBucket, IndexEntry, IndexEntryPlaceInBucket},
MaxSearch, RefCount,
},
rand::{thread_rng, Rng},
Expand Down Expand Up @@ -142,33 +142,33 @@ impl<'b, T: Clone + Copy + 'static> Bucket<T> {
if self.index.is_free(ii) {
continue;
}
let ix: &IndexEntry = self.index.get(ii);
let key = ix.key;
if range.map(|r| r.contains(&key)).unwrap_or(true) {
let val = ix.read_value(self);
let ix = IndexEntryPlaceInBucket::new(ii);
let key = ix.key(&self.index);
if range.map(|r| r.contains(key)).unwrap_or(true) {
let val = ix.read_value(&self.index, &self.data);
result.push(BucketItem {
pubkey: key,
ref_count: ix.ref_count(),
pubkey: *key,
ref_count: ix.ref_count(&self.index),
slot_list: val.map(|(v, _ref_count)| v.to_vec()).unwrap_or_default(),
});
}
}
result
}

pub fn find_index_entry(&self, key: &Pubkey) -> Option<(&IndexEntry, u64)> {
pub fn find_index_entry(&self, key: &Pubkey) -> Option<(IndexEntryPlaceInBucket<T>, u64)> {
Self::bucket_find_index_entry(&self.index, key, self.random)
}

/// find an entry for `key`
/// if entry exists, return the entry along with the index of the existing entry
/// if entry does not exist, return just the index of an empty entry appropriate for this key
/// returns (existing entry, index of the found or empty entry)
fn find_index_entry_mut<'a>(
index: &'a mut BucketStorage<IndexBucket>,
fn find_index_entry_mut(
index: &mut BucketStorage<IndexBucket>,
key: &Pubkey,
random: u64,
) -> Result<(Option<&'a mut IndexEntry>, u64), BucketMapError> {
) -> Result<(Option<IndexEntryPlaceInBucket<T>>, u64), BucketMapError> {
let ix = Self::bucket_index_ix(index, key, random);
let mut first_free = None;
let mut m = Measure::start("bucket_find_index_entry_mut");
Expand All @@ -181,15 +181,15 @@ impl<'b, T: Clone + Copy + 'static> Bucket<T> {
}
continue;
}
let elem: &IndexEntry = index.get(ii);
if elem.key == *key {
let elem = IndexEntryPlaceInBucket::new(ii);
if elem.key(index) == key {
m.stop();

index
.stats
.find_index_entry_mut_us
.fetch_add(m.as_us(), Ordering::Relaxed);
return Ok((Some(index.get_mut(ii)), ii));
return Ok((Some(elem), ii));
}
}
m.stop();
Expand All @@ -203,19 +203,19 @@ impl<'b, T: Clone + Copy + 'static> Bucket<T> {
}
}

fn bucket_find_index_entry<'a>(
index: &'a BucketStorage<IndexBucket>,
fn bucket_find_index_entry(
index: &BucketStorage<IndexBucket>,
key: &Pubkey,
random: u64,
) -> Option<(&'a IndexEntry, u64)> {
) -> Option<(IndexEntryPlaceInBucket<T>, u64)> {
let ix = Self::bucket_index_ix(index, key, random);
for i in ix..ix + index.max_search() {
let ii = i % index.capacity();
if index.is_free(ii) {
continue;
}
let elem: &IndexEntry = index.get(ii);
if elem.key == *key {
let elem = IndexEntryPlaceInBucket::new(ii);
if elem.key(index) == key {
return Some((elem, ii));
}
}
Expand All @@ -236,10 +236,9 @@ impl<'b, T: Clone + Copy + 'static> Bucket<T> {
continue;
}
index.occupy(ii, is_resizing).unwrap();
let elem: &mut IndexEntry = index.get_mut(ii);
// These fields will be overwritten after allocation by callers.
// Since this part of the mmapped file could have previously been used by someone else, there can be garbage here.
elem.init(key);
IndexEntryPlaceInBucket::<T>::new(ii).init(index, key);
//debug!( "INDEX ALLOC {:?} {} {} {}", key, ii, index.capacity, elem_uid );
m.stop();
index
Expand All @@ -259,7 +258,7 @@ impl<'b, T: Clone + Copy + 'static> Bucket<T> {
pub fn read_value(&self, key: &Pubkey) -> Option<(&[T], RefCount)> {
//debug!("READ_VALUE: {:?}", key);
let (elem, _) = self.find_index_entry(key)?;
elem.read_value(self)
elem.read_value(&self.index, &self.data)
}

pub fn try_write(
Expand All @@ -281,23 +280,23 @@ impl<'b, T: Clone + Copy + 'static> Bucket<T> {
} else {
let is_resizing = false;
self.index.occupy(elem_ix, is_resizing).unwrap();
let elem_allocate = IndexEntryPlaceInBucket::new(elem_ix);
// These fields will be overwritten after allocation by callers.
// Since this part of the mmapped file could have previously been used by someone else, there can be garbage here.
let elem_allocate: &mut IndexEntry = self.index.get_mut(elem_ix);
elem_allocate.init(key);
elem_allocate.init(&mut self.index, key);
elem_allocate
};

elem.ref_count = ref_count;
let bucket_ix = elem.data_bucket_ix();
elem.set_ref_count(&mut self.index, ref_count);
let bucket_ix = elem.data_bucket_ix(&self.index);
let current_bucket = &self.data[bucket_ix as usize];
let num_slots = data_len as u64;
if best_fit_bucket == bucket_ix && elem.num_slots > 0 {
if best_fit_bucket == bucket_ix && elem.num_slots(&self.index) > 0 {
// in place update
let elem_loc = elem.data_loc(current_bucket);
let elem_loc = elem.data_loc(&self.index, current_bucket);
let slice: &mut [T] = current_bucket.get_mut_cell_slice(elem_loc, data_len as u64);
assert!(!current_bucket.is_free(elem_loc));
elem.num_slots = num_slots;
elem.set_num_slots(&mut self.index, num_slots);

slice.iter_mut().zip(data).for_each(|(dest, src)| {
*dest = *src;
Expand All @@ -320,11 +319,14 @@ impl<'b, T: Clone + Copy + 'static> Bucket<T> {
for i in pos..pos + (max_search * 10).min(cap) {
let ix = i % cap;
if best_bucket.is_free(ix) {
let elem_loc = elem.data_loc(current_bucket);
let old_slots = elem.num_slots;
elem.set_storage_offset(ix);
elem.set_storage_capacity_when_created_pow2(best_bucket.capacity_pow2);
elem.num_slots = num_slots;
let elem_loc = elem.data_loc(&self.index, current_bucket);
let old_slots = elem.num_slots(&self.index);
elem.set_storage_offset(&mut self.index, ix);
elem.set_storage_capacity_when_created_pow2(
&mut self.index,
best_bucket.capacity_pow2,
);
elem.set_num_slots(&mut self.index, num_slots);
if old_slots > 0 {
let current_bucket = &mut self.data[bucket_ix as usize];
current_bucket.free(elem_loc);
Expand All @@ -347,10 +349,10 @@ impl<'b, T: Clone + Copy + 'static> Bucket<T> {

pub fn delete_key(&mut self, key: &Pubkey) {
if let Some((elem, elem_ix)) = self.find_index_entry(key) {
if elem.num_slots > 0 {
let ix = elem.data_bucket_ix() as usize;
if elem.num_slots(&self.index) > 0 {
let ix = elem.data_bucket_ix(&self.index) as usize;
let data_bucket = &self.data[ix];
let loc = elem.data_loc(data_bucket);
let loc = elem.data_loc(&self.index, data_bucket);
let data_bucket = &mut self.data[ix];
//debug!( "DATA FREE {:?} {} {} {}", key, elem.data_location, data_bucket.capacity, elem_uid );
data_bucket.free(loc);
Expand Down
4 changes: 2 additions & 2 deletions bucket_map/src/bucket_storage.rs
Original file line number Diff line number Diff line change
Expand Up @@ -179,12 +179,12 @@ impl<O: BucketOccupied> BucketStorage<O> {
}
}

fn get_start_offset_with_header(&self, ix: u64) -> usize {
pub(crate) fn get_start_offset_with_header(&self, ix: u64) -> usize {
assert!(ix < self.capacity(), "bad index size");
(self.cell_size * ix) as usize
}

fn get_start_offset_no_header(&self, ix: u64) -> usize {
pub(crate) fn get_start_offset_no_header(&self, ix: u64) -> usize {
self.get_start_offset_with_header(ix) + O::offset_to_first_data()
}

Expand Down
Loading

0 comments on commit 9f7daf8

Please sign in to comment.