Skip to content

Commit

Permalink
introduce bucket map with single data entry
Browse files Browse the repository at this point in the history
  • Loading branch information
jeffwashington committed Feb 27, 2023
1 parent 33e1799 commit 3fd040f
Show file tree
Hide file tree
Showing 4 changed files with 643 additions and 5 deletions.
351 changes: 350 additions & 1 deletion bucket_map/src/bucket.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ use {
bucket_map::BucketMapError,
bucket_stats::BucketMapStats,
bucket_storage::{BucketStorage, Uid, DEFAULT_CAPACITY_POW2},
index_entry::IndexEntry,
index_entry::{IndexEntry, IndexEntrySingle},
MaxSearch, RefCount,
},
rand::{thread_rng, Rng},
Expand Down Expand Up @@ -76,6 +76,355 @@ pub struct Bucket<T> {
pub reallocated: Reallocated,
}

pub struct BucketSingle<T> {
drives: Arc<Vec<PathBuf>>,
//index
pub index: BucketStorage,
//random offset for the index
random: u64,
_phantom: PhantomData<T>,
stats: Arc<BucketMapStats>,

pub reallocated: Reallocated,
}

impl<T: Clone + Copy> BucketSingle<T> {
pub fn new(
drives: Arc<Vec<PathBuf>>,
max_search: MaxSearch,
stats: Arc<BucketMapStats>,
count: Arc<AtomicU64>,
) -> Self {
let index = BucketStorage::new(
Arc::clone(&drives),
1,
std::mem::size_of::<IndexEntrySingle<T>>() as u64,
max_search,
Arc::clone(&stats.index),
count,
);
Self {
random: thread_rng().gen(),
drives,
index,
_phantom: PhantomData::default(),
stats,
reallocated: Reallocated::default(),
}
}

pub fn keys(&self) -> Vec<Pubkey> {
let mut rv = vec![];
for i in 0..self.index.capacity() {
if self.index.is_free(i) {
continue;
}
let ix: &IndexEntrySingle<T> = self.index.get(i);
rv.push(ix.key);
}
rv
}

pub fn items_in_range<R>(&self, range: &Option<&R>) -> Vec<BucketItem<T>>
where
R: RangeBounds<Pubkey>,
{
let mut result = Vec::with_capacity(self.index.count.load(Ordering::Relaxed) as usize);
for i in 0..self.index.capacity() {
let ii = i % self.index.capacity();
if self.index.is_free(ii) {
continue;
}
let ix: &IndexEntrySingle<T> = self.index.get(ii);
let key = ix.key;
if range.map(|r| r.contains(&key)).unwrap_or(true) {
result.push(BucketItem {
pubkey: key,
ref_count: 1,
slot_list: vec![], // todo val.map(|(v, _ref_count)| v.to_vec()).unwrap_or_default(),
});
}
}
result
}

pub fn find_entry(&self, key: &Pubkey) -> Option<(&IndexEntrySingle<T>, u64)> {
Self::bucket_find_entry(&self.index, key, self.random)
}

fn find_entry_mut<'a>(
&'a self,
key: &Pubkey,
) -> Result<(bool, &'a mut IndexEntrySingle<T>, u64), BucketMapError> {
let ix = Self::bucket_index_ix(&self.index, key, self.random);
let mut first_free = None;
let mut m = Measure::start("bucket_find_entry_mut");
for i in ix..ix + self.index.max_search() {
let ii = i % self.index.capacity();
if self.index.is_free(ii) {
if first_free.is_none() {
first_free = Some(ii);
}
continue;
}
let elem: &mut IndexEntrySingle<T> = self.index.get_mut(ii);
if elem.key == *key {
m.stop();
self.stats
.index
.find_entry_mut_us
.fetch_add(m.as_us(), Ordering::Relaxed);
return Ok((true, elem, ii));
}
}
m.stop();
self.stats
.index
.find_entry_mut_us
.fetch_add(m.as_us(), Ordering::Relaxed);
match first_free {
Some(ii) => {
let elem: &mut IndexEntrySingle<T> = self.index.get_mut(ii);
Ok((false, elem, ii))
}
None => Err(self.index_no_space()),
}
}

fn bucket_find_entry<'a>(
index: &'a BucketStorage,
key: &Pubkey,
random: u64,
) -> Option<(&'a IndexEntrySingle<T>, u64)> {
let ix = Self::bucket_index_ix(index, key, random);
for i in ix..ix + index.max_search() {
let ii = i % index.capacity();
if index.is_free(ii) {
continue;
}
let elem: &IndexEntrySingle<T> = index.get(ii);
if elem.key == *key {
return Some((elem, ii));
}
}
None
}

fn bucket_create_key(
index: &mut BucketStorage,
entry: &IndexEntrySingle<T>,
elem_uid: Uid,
random: u64,
is_resizing: bool,
) -> Result<u64, BucketMapError> {
let mut m = Measure::start("bucket_create_key");
let ix = Self::bucket_index_ix(index, &entry.key, random);
for i in ix..ix + index.max_search() {
let ii = i % index.capacity();
if !index.is_free(ii) {
continue;
}
index.allocate(ii, elem_uid, is_resizing).unwrap();
let elem: &mut IndexEntrySingle<T> = index.get_mut(ii);
// These fields will be overwritten after allocation by callers.
// Since this part of the mmapped file could have previously been used by someone else, there can be garbage here.
elem.init(&entry.key, entry.info);
//debug!( "INDEX ALLOC {:?} {} {} {}", key, ii, index.capacity, elem_uid );
m.stop();
index
.stats
.find_entry_mut_us
.fetch_add(m.as_us(), Ordering::Relaxed);
return Ok(ii);
}
m.stop();
index
.stats
.find_entry_mut_us
.fetch_add(m.as_us(), Ordering::Relaxed);
Err(BucketMapError::IndexNoSpace(index.capacity_pow2))
}

pub fn addref(&mut self, _key: &Pubkey) -> Option<RefCount> {
// remove
None
}

pub fn unref(&mut self, _key: &Pubkey) -> Option<RefCount> {
// eliminate
None
}

pub fn read_value(&self, key: &Pubkey) -> Option<&T> {
//debug!("READ_VALUE: {:?}", key);
let (elem, _) = self.find_entry(key)?;
Some(&elem.info)
}

fn index_no_space(&self) -> BucketMapError {
BucketMapError::IndexNoSpace(self.index.capacity_pow2)
}

pub fn try_write(&mut self, key: &Pubkey, data: T) -> Result<(), BucketMapError> {
let (found, elem, elem_ix) = self.find_entry_mut(key)?;
if !found {
let is_resizing = false;
let elem_uid = IndexEntry::key_uid(key);
self.index.allocate(elem_ix, elem_uid, is_resizing).unwrap();
// These fields will be overwritten after allocation by callers.
// Since this part of the mmapped file could have previously been used by someone else, there can be garbage here.
elem.init(key, data);
} else {
elem.info = data;
}
Ok(())
}

pub fn delete_key(&mut self, key: &Pubkey) {
if let Some((_elem, elem_ix)) = self.find_entry(key) {
let elem_uid = self.index.uid_unchecked(elem_ix);
//debug!("INDEX FREE {:?} {}", key, elem_uid);
self.index.free(elem_ix, elem_uid);
}
}

pub fn grow_index(&self, current_capacity_pow2: u8) {
if self.index.capacity_pow2 == current_capacity_pow2 {
let mut m = Measure::start("grow_index");
//debug!("GROW_INDEX: {}", current_capacity_pow2);
let increment = 1;
for i in increment.. {
//increasing the capacity by ^4 reduces the
//likelihood of a re-index collision of 2^(max_search)^2
//1 in 2^32
let mut index = BucketStorage::new_with_capacity(
Arc::clone(&self.drives),
1,
std::mem::size_of::<IndexEntrySingle<T>>() as u64,
// *2 causes rapid growth of index buckets
self.index.capacity_pow2 + i, // * 2,
self.index.max_search,
Arc::clone(&self.stats.index),
Arc::clone(&self.index.count),
);
let random = thread_rng().gen();
let mut valid = true;
for ix in 0..self.index.capacity() {
let uid = self.index.uid(ix);
if let Some(uid) = uid {
let elem: &IndexEntrySingle<T> = self.index.get(ix);
let new_ix = Self::bucket_create_key(&mut index, elem, uid, random, true);
if new_ix.is_err() {
valid = false;
break;
}
let new_ix = new_ix.unwrap();
let new_elem: &mut IndexEntrySingle<T> = index.get_mut(new_ix);
*new_elem = *elem;
/*
let dbg_elem: IndexEntrySingle = *new_elem;
assert_eq!(
Self::bucket_find_entry(&index, &elem.key, random).unwrap(),
(&dbg_elem, new_ix)
);
*/
}
}
if valid {
self.stats.index.update_max_size(index.capacity());
let mut items = self.reallocated.items.lock().unwrap();
items.index = Some((random, index));
self.reallocated.add_reallocation();
break;
}
}
m.stop();
self.stats.index.resizes.fetch_add(1, Ordering::Relaxed);
self.stats
.index
.resize_us
.fetch_add(m.as_us(), Ordering::Relaxed);
}
}

pub fn apply_grow_index(&mut self, random: u64, index: BucketStorage) {
self.random = random;
self.index = index;
}

fn bucket_index_ix(index: &BucketStorage, key: &Pubkey, random: u64) -> u64 {
let uid = IndexEntry::key_uid(key);
let mut s = DefaultHasher::new();
uid.hash(&mut s);
//the locally generated random will make it hard for an attacker
//to deterministically cause all the pubkeys to land in the same
//location in any bucket on all validators
random.hash(&mut s);
let ix = s.finish();
ix % index.capacity()
//debug!( "INDEX_IX: {:?} uid:{} loc: {} cap:{}", key, uid, location, index.capacity() );
}

/// grow the appropriate piece. Note this takes an immutable ref.
/// The actual grow is set into self.reallocated and applied later on a write lock
pub fn grow(&self, err: BucketMapError) {
match err {
BucketMapError::DataNoSpace((_data_index, _current_capacity_pow2)) => {
//debug!("GROWING SPACE {:?}", (data_index, current_capacity_pow2));
unimplemented!();
}
BucketMapError::IndexNoSpace(current_capacity_pow2) => {
//debug!("GROWING INDEX {}", sz);
self.grow_index(current_capacity_pow2);
}
}
}

/// if a bucket was resized previously with a read lock, then apply that resize now
pub fn handle_delayed_grows(&mut self) {
if self.reallocated.get_reallocated() {
// swap out the bucket that was resized previously with a read lock
let mut items = std::mem::take(&mut *self.reallocated.items.lock().unwrap());

if let Some((random, bucket)) = items.index.take() {
self.apply_grow_index(random, bucket);
} else {
// data bucket
let (_i, _new_bucket) = items.data.take().unwrap();
unimplemented!();
}
}
}

pub fn insert(&mut self, key: &Pubkey, value: T) {
let new = value;
loop {
let rv = self.try_write(key, new);
match rv {
Ok(_) => return,
Err(err) => {
self.grow(err);
self.handle_delayed_grows();
}
}
}
}

pub fn update<F>(&mut self, key: &Pubkey, mut updatefn: F)
where
F: FnMut(Option<&T>) -> Option<T>,
{
let current = self.read_value(key);
let new = updatefn(current);
if new.is_none() {
self.delete_key(key);
return;
}
let new = new.unwrap();
self.insert(key, new);
}
}

impl<'b, T: Clone + Copy + 'b> Bucket<T> {
pub fn new(
drives: Arc<Vec<PathBuf>>,
Expand Down
Loading

0 comments on commit 3fd040f

Please sign in to comment.