Skip to content

Commit

Permalink
disk bucket stores single entry in index file
Browse files Browse the repository at this point in the history
  • Loading branch information
jeffwashington committed Mar 16, 2023
1 parent d66d1f7 commit 6131818
Show file tree
Hide file tree
Showing 4 changed files with 169 additions and 68 deletions.
101 changes: 69 additions & 32 deletions bucket_map/src/bucket.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ use {
solana_sdk::pubkey::Pubkey,
std::{
collections::hash_map::DefaultHasher,
fmt::Debug,
hash::{Hash, Hasher},
marker::PhantomData,
ops::RangeBounds,
Expand Down Expand Up @@ -76,7 +77,7 @@ pub struct Bucket<T> {
pub reallocated: Reallocated,
}

impl<'b, T: Clone + Copy + 'static> Bucket<T> {
impl<'b, T: Clone + Copy + Debug + Default + 'static> Bucket<T> {
pub fn new(
drives: Arc<Vec<PathBuf>>,
max_search: MaxSearch,
Expand All @@ -86,7 +87,7 @@ impl<'b, T: Clone + Copy + 'static> Bucket<T> {
let index = BucketStorage::new(
Arc::clone(&drives),
1,
std::mem::size_of::<IndexEntry>() as u64,
std::mem::size_of::<IndexEntry<T>>() as u64,
max_search,
Arc::clone(&stats.index),
count,
Expand All @@ -108,7 +109,7 @@ impl<'b, T: Clone + Copy + 'static> Bucket<T> {
if self.index.is_free(i) {
continue;
}
let ix: &IndexEntry = self.index.get(i);
let ix: &IndexEntry<T> = self.index.get(i);
rv.push(ix.key);
}
rv
Expand All @@ -124,7 +125,7 @@ impl<'b, T: Clone + Copy + 'static> Bucket<T> {
if self.index.is_free(ii) {
continue;
}
let ix: &IndexEntry = self.index.get(ii);
let ix: &IndexEntry<T> = self.index.get(ii);
let key = ix.key;
if range.map(|r| r.contains(&key)).unwrap_or(true) {
let val = ix.read_value(self);
Expand All @@ -138,14 +139,14 @@ impl<'b, T: Clone + Copy + 'static> Bucket<T> {
result
}

pub fn find_entry(&self, key: &Pubkey) -> Option<(&IndexEntry, u64)> {
pub fn find_entry(&self, key: &Pubkey) -> Option<(&IndexEntry<T>, u64)> {
Self::bucket_find_entry(&self.index, key, self.random)
}

fn find_entry_mut<'a>(
&'a self,
key: &Pubkey,
) -> Result<(bool, &'a mut IndexEntry, u64), BucketMapError> {
) -> Result<(bool, &'a mut IndexEntry<T>, u64), BucketMapError> {
let ix = Self::bucket_index_ix(&self.index, key, self.random);
let mut first_free = None;
let mut m = Measure::start("bucket_find_entry_mut");
Expand All @@ -157,7 +158,7 @@ impl<'b, T: Clone + Copy + 'static> Bucket<T> {
}
continue;
}
let elem: &mut IndexEntry = self.index.get_mut(ii);
let elem: &mut IndexEntry<T> = self.index.get_mut(ii);
if elem.key == *key {
m.stop();
self.stats
Expand All @@ -174,7 +175,7 @@ impl<'b, T: Clone + Copy + 'static> Bucket<T> {
.fetch_add(m.as_us(), Ordering::Relaxed);
match first_free {
Some(ii) => {
let elem: &mut IndexEntry = self.index.get_mut(ii);
let elem: &mut IndexEntry<T> = self.index.get_mut(ii);
Ok((false, elem, ii))
}
None => Err(self.index_no_space()),
Expand All @@ -185,14 +186,14 @@ impl<'b, T: Clone + Copy + 'static> Bucket<T> {
index: &'a BucketStorage,
key: &Pubkey,
random: u64,
) -> Option<(&'a IndexEntry, u64)> {
) -> Option<(&'a IndexEntry<T>, u64)> {
let ix = Self::bucket_index_ix(index, key, random);
for i in ix..ix + index.max_search() {
let ii = i % index.capacity();
if index.is_free(ii) {
continue;
}
let elem: &IndexEntry = index.get(ii);
let elem: &IndexEntry<T> = index.get(ii);
if elem.key == *key {
return Some((elem, ii));
}
Expand All @@ -215,7 +216,7 @@ impl<'b, T: Clone + Copy + 'static> Bucket<T> {
continue;
}
index.allocate(ii, elem_uid, is_resizing).unwrap();
let elem: &mut IndexEntry = index.get_mut(ii);
let elem: &mut IndexEntry<T> = index.get_mut(ii);
// These fields will be overwritten after allocation by callers.
// Since this part of the mmapped file could have previously been used by someone else, there can be garbage here.
elem.init(key);
Expand Down Expand Up @@ -268,60 +269,95 @@ impl<'b, T: Clone + Copy + 'static> Bucket<T> {
pub fn try_write(
&mut self,
key: &Pubkey,
data: impl Iterator<Item = &'b T>,
mut data: impl Iterator<Item = &'b T>,
data_len: usize,
ref_count: RefCount,
) -> Result<(), BucketMapError> {
let best_fit_bucket = IndexEntry::data_bucket_from_num_slots(data_len as u64);
if self.data.get(best_fit_bucket as usize).is_none() {
let best_fit_bucket = IndexEntry::<T>::data_bucket_from_num_slots(data_len as u64);
// data_len=0 => nothing to store
// data_len=1 => store in index, so no data bucket
let use_data_storage = data_len > 1;
if use_data_storage && self.data.get(best_fit_bucket as usize).is_none() {
// fail early if the data bucket we need doesn't exist - we don't want the index entry partially allocated
return Err(BucketMapError::DataNoSpace((best_fit_bucket, 0)));
}
let (found, elem, elem_ix) = self.find_entry_mut(key)?;
if !found {
let is_resizing = false;
let elem_uid = IndexEntry::key_uid(key);
let elem_uid = IndexEntry::<T>::key_uid(key);
self.index.allocate(elem_ix, elem_uid, is_resizing).unwrap();
// These fields will be overwritten after allocation by callers.
// Since this part of the mmapped file could have previously been used by someone else, there can be garbage here.
elem.init(key);
}
elem.ref_count = ref_count;
let old_slots = elem.num_slots;
let elem_uid = self.index.uid_unchecked(elem_ix);
let bucket_ix = elem.data_bucket_ix();
let current_bucket = &self.data[bucket_ix as usize];
let num_slots = data_len as u64;
if best_fit_bucket == bucket_ix && elem.num_slots > 0 {
// in place update
let bucket_ix = elem.data_bucket_ix();

if !use_data_storage {
// new data stored should be stored in elem.`first_element`
// new data len is 0 or 1
elem.num_slots = num_slots;
elem.first_element = if num_slots == 1 {
// replace
*data.next().unwrap()
} else {
// set to default for cleanliness
T::default()
};
if old_slots > 1 {
// free old data location
let elem_loc = elem.data_loc(&self.data[bucket_ix as usize]);
self.data[bucket_ix as usize].free(elem_loc, elem_uid);
}
} else if best_fit_bucket == bucket_ix {
// in place update in same data file
let current_bucket = &self.data[bucket_ix as usize];
let elem_loc = elem.data_loc(current_bucket);

let slice: &mut [T] = current_bucket.get_mut_cell_slice(elem_loc, data_len as u64);
assert_eq!(current_bucket.uid(elem_loc), Some(elem_uid));
elem.num_slots = num_slots;

slice.iter_mut().zip(data).for_each(|(dest, src)| {
*dest = *src;
});
Ok(())
} else {
// need to move the allocation to a best fit spot
// previous data could have been len=0, len=1, or in a different data file
let best_bucket = &self.data[best_fit_bucket as usize];
let cap_power = best_bucket.capacity_pow2;
let cap = best_bucket.capacity();
// start searching at a random spot in the data file
let pos = thread_rng().gen_range(0, cap);
// find free spot in data file
for i in pos..pos + self.index.max_search() {
let ix = i % cap;
if best_bucket.is_free(ix) {
let elem_loc = elem.data_loc(current_bucket);
let old_slots = elem.num_slots;
elem.set_storage_offset(ix);
elem.set_storage_capacity_when_created_pow2(best_bucket.capacity_pow2);
elem.num_slots = num_slots;
if old_slots > 0 {
let current_bucket = &mut self.data[bucket_ix as usize];
current_bucket.free(elem_loc, elem_uid);
match old_slots {
2.. => {
// free the entry in the data bucket the data was previously stored in
let elem_loc = elem.data_loc(&self.data[bucket_ix as usize]);
self.data[bucket_ix as usize].free(elem_loc, elem_uid);
}
1 => {
// nothing to free in a data bucket
// set `first_element` to default to avoid confusion
assert!(num_slots > 1);
elem.first_element = T::default();
}
0 => {
// nothing to free
}
}
//debug!( "DATA ALLOC {:?} {} {} {}", key, elem.data_location, best_bucket.capacity, elem_uid );
if num_slots > 0 {
assert!(num_slots > 1);
let best_bucket = &mut self.data[best_fit_bucket as usize];
best_bucket.allocate(ix, elem_uid, false).unwrap();
let slice = best_bucket.get_mut_cell_slice(ix, num_slots);
Expand All @@ -332,14 +368,15 @@ impl<'b, T: Clone + Copy + 'static> Bucket<T> {
return Ok(());
}
}
Err(BucketMapError::DataNoSpace((best_fit_bucket, cap_power)))
return Err(BucketMapError::DataNoSpace((best_fit_bucket, cap_power)));
}
Ok(())
}

pub fn delete_key(&mut self, key: &Pubkey) {
if let Some((elem, elem_ix)) = self.find_entry(key) {
let elem_uid = self.index.uid_unchecked(elem_ix);
if elem.num_slots > 0 {
if elem.num_slots > 1 {
let ix = elem.data_bucket_ix() as usize;
let data_bucket = &self.data[ix];
let loc = elem.data_loc(data_bucket);
Expand All @@ -364,7 +401,7 @@ impl<'b, T: Clone + Copy + 'static> Bucket<T> {
let mut index = BucketStorage::new_with_capacity(
Arc::clone(&self.drives),
1,
std::mem::size_of::<IndexEntry>() as u64,
std::mem::size_of::<IndexEntry<T>>() as u64,
// *2 causes rapid growth of index buckets
self.index.capacity_pow2 + i, // * 2,
self.index.max_search,
Expand All @@ -376,18 +413,18 @@ impl<'b, T: Clone + Copy + 'static> Bucket<T> {
for ix in 0..self.index.capacity() {
let uid = self.index.uid(ix);
if let Some(uid) = uid {
let elem: &IndexEntry = self.index.get(ix);
let elem: &IndexEntry<T> = self.index.get(ix);
let new_ix =
Self::bucket_create_key(&mut index, &elem.key, uid, random, true);
if new_ix.is_err() {
valid = false;
break;
}
let new_ix = new_ix.unwrap();
let new_elem: &mut IndexEntry = index.get_mut(new_ix);
let new_elem: &mut IndexEntry<T> = index.get_mut(new_ix);
*new_elem = *elem;
/*
let dbg_elem: IndexEntry = *new_elem;
let dbg_elem: IndexEntry<T> = *new_elem;
assert_eq!(
Self::bucket_find_entry(&index, &elem.key, random).unwrap(),
(&dbg_elem, new_ix)
Expand Down Expand Up @@ -458,7 +495,7 @@ impl<'b, T: Clone + Copy + 'static> Bucket<T> {
}

fn bucket_index_ix(index: &BucketStorage, key: &Pubkey, random: u64) -> u64 {
let uid = IndexEntry::key_uid(key);
let uid = IndexEntry::<T>::key_uid(key);
let mut s = DefaultHasher::new();
uid.hash(&mut s);
//the locally generated random will make it hard for an attacker
Expand Down
3 changes: 2 additions & 1 deletion bucket_map/src/bucket_api.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ use {
},
solana_sdk::pubkey::Pubkey,
std::{
fmt::Debug,
ops::RangeBounds,
path::PathBuf,
sync::{
Expand All @@ -25,7 +26,7 @@ pub struct BucketApi<T: Clone + Copy + 'static> {
count: Arc<AtomicU64>,
}

impl<T: Clone + Copy> BucketApi<T> {
impl<T: Clone + Copy + Debug + Default> BucketApi<T> {
pub fn new(
drives: Arc<Vec<PathBuf>>,
max_search: MaxSearch,
Expand Down
Loading

0 comments on commit 6131818

Please sign in to comment.