Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

disk index: introduce IndexEntryPlaceInBucket #30944

Merged
merged 4 commits into from
Mar 29, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
76 changes: 39 additions & 37 deletions bucket_map/src/bucket.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ use {
bucket_map::BucketMapError,
bucket_stats::BucketMapStats,
bucket_storage::{BucketOccupied, BucketStorage, DEFAULT_CAPACITY_POW2},
index_entry::{DataBucket, IndexBucket, IndexEntry},
index_entry::{DataBucket, IndexBucket, IndexEntry, IndexEntryPlaceInBucket},
MaxSearch, RefCount,
},
rand::{thread_rng, Rng},
Expand Down Expand Up @@ -142,33 +142,33 @@ impl<'b, T: Clone + Copy + 'static> Bucket<T> {
if self.index.is_free(ii) {
continue;
}
let ix: &IndexEntry = self.index.get(ii);
let key = ix.key;
if range.map(|r| r.contains(&key)).unwrap_or(true) {
let val = ix.read_value(self);
let ix = IndexEntryPlaceInBucket::new(ii);
let key = ix.key(&self.index);
if range.map(|r| r.contains(key)).unwrap_or(true) {
let val = ix.read_value(&self.index, &self.data);
result.push(BucketItem {
pubkey: key,
ref_count: ix.ref_count(),
pubkey: *key,
ref_count: ix.ref_count(&self.index),
slot_list: val.map(|(v, _ref_count)| v.to_vec()).unwrap_or_default(),
});
}
}
result
}

pub fn find_index_entry(&self, key: &Pubkey) -> Option<(&IndexEntry, u64)> {
pub fn find_index_entry(&self, key: &Pubkey) -> Option<(IndexEntryPlaceInBucket, u64)> {
Self::bucket_find_index_entry(&self.index, key, self.random)
}

/// find an entry for `key`
/// if entry exists, return the entry along with the index of the existing entry
/// if entry does not exist, return just the index of an empty entry appropriate for this key
/// returns (existing entry, index of the found or empty entry)
fn find_index_entry_mut<'a>(
index: &'a mut BucketStorage<IndexBucket>,
fn find_index_entry_mut(
index: &mut BucketStorage<IndexBucket>,
key: &Pubkey,
random: u64,
) -> Result<(Option<&'a mut IndexEntry>, u64), BucketMapError> {
) -> Result<(Option<IndexEntryPlaceInBucket>, u64), BucketMapError> {
let ix = Self::bucket_index_ix(index, key, random);
let mut first_free = None;
let mut m = Measure::start("bucket_find_index_entry_mut");
Expand All @@ -181,15 +181,15 @@ impl<'b, T: Clone + Copy + 'static> Bucket<T> {
}
continue;
}
let elem: &IndexEntry = index.get(ii);
if elem.key == *key {
let elem = IndexEntryPlaceInBucket::new(ii);
if elem.key(index) == key {
m.stop();

index
.stats
.find_index_entry_mut_us
.fetch_add(m.as_us(), Ordering::Relaxed);
return Ok((Some(index.get_mut(ii)), ii));
return Ok((Some(elem), ii));
}
}
m.stop();
Expand All @@ -203,19 +203,19 @@ impl<'b, T: Clone + Copy + 'static> Bucket<T> {
}
}

fn bucket_find_index_entry<'a>(
index: &'a BucketStorage<IndexBucket>,
fn bucket_find_index_entry(
index: &BucketStorage<IndexBucket>,
key: &Pubkey,
random: u64,
) -> Option<(&'a IndexEntry, u64)> {
) -> Option<(IndexEntryPlaceInBucket, u64)> {
let ix = Self::bucket_index_ix(index, key, random);
for i in ix..ix + index.max_search() {
let ii = i % index.capacity();
if index.is_free(ii) {
continue;
}
let elem: &IndexEntry = index.get(ii);
if elem.key == *key {
let elem = IndexEntryPlaceInBucket::new(ii);
if elem.key(index) == key {
return Some((elem, ii));
}
}
Expand All @@ -236,10 +236,9 @@ impl<'b, T: Clone + Copy + 'static> Bucket<T> {
continue;
}
index.occupy(ii, is_resizing).unwrap();
let elem: &mut IndexEntry = index.get_mut(ii);
// These fields will be overwritten after allocation by callers.
// Since this part of the mmapped file could have previously been used by someone else, there can be garbage here.
elem.init(key);
IndexEntryPlaceInBucket::new(ii).init(index, key);
//debug!( "INDEX ALLOC {:?} {} {} {}", key, ii, index.capacity, elem_uid );
m.stop();
index
Expand All @@ -259,7 +258,7 @@ impl<'b, T: Clone + Copy + 'static> Bucket<T> {
pub fn read_value(&self, key: &Pubkey) -> Option<(&[T], RefCount)> {
//debug!("READ_VALUE: {:?}", key);
let (elem, _) = self.find_index_entry(key)?;
elem.read_value(self)
elem.read_value(&self.index, &self.data)
}

pub fn try_write(
Expand All @@ -281,23 +280,23 @@ impl<'b, T: Clone + Copy + 'static> Bucket<T> {
} else {
let is_resizing = false;
self.index.occupy(elem_ix, is_resizing).unwrap();
let elem_allocate = IndexEntryPlaceInBucket::new(elem_ix);
// These fields will be overwritten after allocation by callers.
// Since this part of the mmapped file could have previously been used by someone else, there can be garbage here.
let elem_allocate: &mut IndexEntry = self.index.get_mut(elem_ix);
elem_allocate.init(key);
elem_allocate.init(&mut self.index, key);
elem_allocate
};

elem.ref_count = ref_count;
let bucket_ix = elem.data_bucket_ix();
elem.set_ref_count(&mut self.index, ref_count);
let bucket_ix = elem.data_bucket_ix(&self.index);
let current_bucket = &self.data[bucket_ix as usize];
let num_slots = data_len as u64;
if best_fit_bucket == bucket_ix && elem.num_slots > 0 {
if best_fit_bucket == bucket_ix && elem.num_slots(&self.index) > 0 {
// in place update
let elem_loc = elem.data_loc(current_bucket);
let elem_loc = elem.data_loc(&self.index, current_bucket);
let slice: &mut [T] = current_bucket.get_mut_cell_slice(elem_loc, data_len as u64);
assert!(!current_bucket.is_free(elem_loc));
elem.num_slots = num_slots;
elem.set_num_slots(&mut self.index, num_slots);

slice.iter_mut().zip(data).for_each(|(dest, src)| {
*dest = *src;
Expand All @@ -320,11 +319,14 @@ impl<'b, T: Clone + Copy + 'static> Bucket<T> {
for i in pos..pos + (max_search * 10).min(cap) {
let ix = i % cap;
if best_bucket.is_free(ix) {
let elem_loc = elem.data_loc(current_bucket);
let old_slots = elem.num_slots;
elem.set_storage_offset(ix);
elem.set_storage_capacity_when_created_pow2(best_bucket.capacity_pow2);
elem.num_slots = num_slots;
let elem_loc = elem.data_loc(&self.index, current_bucket);
let old_slots = elem.num_slots(&self.index);
elem.set_storage_offset(&mut self.index, ix);
elem.set_storage_capacity_when_created_pow2(
&mut self.index,
best_bucket.capacity_pow2,
);
elem.set_num_slots(&mut self.index, num_slots);
if old_slots > 0 {
let current_bucket = &mut self.data[bucket_ix as usize];
current_bucket.free(elem_loc);
Expand All @@ -347,10 +349,10 @@ impl<'b, T: Clone + Copy + 'static> Bucket<T> {

pub fn delete_key(&mut self, key: &Pubkey) {
if let Some((elem, elem_ix)) = self.find_index_entry(key) {
if elem.num_slots > 0 {
let ix = elem.data_bucket_ix() as usize;
if elem.num_slots(&self.index) > 0 {
let ix = elem.data_bucket_ix(&self.index) as usize;
let data_bucket = &self.data[ix];
let loc = elem.data_loc(data_bucket);
let loc = elem.data_loc(&self.index, data_bucket);
let data_bucket = &mut self.data[ix];
//debug!( "DATA FREE {:?} {} {} {}", key, elem.data_location, data_bucket.capacity, elem_uid );
data_bucket.free(loc);
Expand Down
4 changes: 2 additions & 2 deletions bucket_map/src/bucket_storage.rs
Original file line number Diff line number Diff line change
Expand Up @@ -179,12 +179,12 @@ impl<O: BucketOccupied> BucketStorage<O> {
}
}

fn get_start_offset_with_header(&self, ix: u64) -> usize {
pub(crate) fn get_start_offset_with_header(&self, ix: u64) -> usize {
assert!(ix < self.capacity(), "bad index size");
(self.cell_size * ix) as usize
}

fn get_start_offset_no_header(&self, ix: u64) -> usize {
pub(crate) fn get_start_offset_no_header(&self, ix: u64) -> usize {
self.get_start_offset_with_header(ix) + O::offset_to_first_data()
}

Expand Down
Loading