Skip to content

Commit

Permalink
bucket map stores uid in use field in memory bitvec
Browse files Browse the repository at this point in the history
  • Loading branch information
jeffwashington committed Mar 17, 2023
1 parent 6dd5a22 commit 7d0a6e0
Show file tree
Hide file tree
Showing 6 changed files with 161 additions and 32 deletions.
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions bucket_map/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ license = { workspace = true }
edition = { workspace = true }

[dependencies]
bv = { workspace = true, features = ["serde"] }
log = { workspace = true }
memmap2 = { workspace = true }
modular-bitfield = { workspace = true }
Expand Down
35 changes: 22 additions & 13 deletions bucket_map/src/bucket.rs
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,7 @@ impl<'b, T: Clone + Copy + 'static> Bucket<T> {
max_search: MaxSearch,
stats: Arc<BucketMapStats>,
count: Arc<AtomicU64>,
use_bit_field: bool,
) -> Self {
let index = BucketStorage::new(
Arc::clone(&drives),
Expand All @@ -90,6 +91,7 @@ impl<'b, T: Clone + Copy + 'static> Bucket<T> {
max_search,
Arc::clone(&stats.index),
count,
use_bit_field,
);
Self {
random: thread_rng().gen(),
Expand Down Expand Up @@ -142,10 +144,7 @@ impl<'b, T: Clone + Copy + 'static> Bucket<T> {
Self::bucket_find_entry(&self.index, key, self.random)
}

fn find_entry_mut<'a>(
&'a self,
key: &Pubkey,
) -> Result<(bool, &'a mut IndexEntry, u64), BucketMapError> {
fn find_entry_mut<'a>(&mut self, key: &Pubkey) -> Result<(bool, u64), BucketMapError> {
let ix = Self::bucket_index_ix(&self.index, key, self.random);
let mut first_free = None;
let mut m = Measure::start("bucket_find_entry_mut");
Expand All @@ -164,7 +163,7 @@ impl<'b, T: Clone + Copy + 'static> Bucket<T> {
.index
.find_entry_mut_us
.fetch_add(m.as_us(), Ordering::Relaxed);
return Ok((true, elem, ii));
return Ok((true, ii));
}
}
m.stop();
Expand All @@ -175,7 +174,7 @@ impl<'b, T: Clone + Copy + 'static> Bucket<T> {
match first_free {
Some(ii) => {
let elem: &mut IndexEntry = self.index.get_mut(ii);
Ok((false, elem, ii))
Ok((false, ii))
}
None => Err(self.index_no_space()),
}
Expand All @@ -189,6 +188,7 @@ impl<'b, T: Clone + Copy + 'static> Bucket<T> {
let ix = Self::bucket_index_ix(index, key, random);
for i in ix..ix + index.max_search() {
let ii = i % index.capacity();
assert!(ii < index.capacity(), "{}, {}", ii, index.capacity());
if index.is_free(ii) {
continue;
}
Expand Down Expand Up @@ -236,8 +236,9 @@ impl<'b, T: Clone + Copy + 'static> Bucket<T> {
}

pub fn addref(&mut self, key: &Pubkey) -> Option<RefCount> {
if let Ok((found, elem, _)) = self.find_entry_mut(key) {
if let Ok((found, elem_ix)) = self.find_entry_mut(key) {
if found {
let elem: &mut IndexEntry = self.index.get_mut(elem_ix);
elem.ref_count += 1;
return Some(elem.ref_count);
}
Expand All @@ -246,8 +247,9 @@ impl<'b, T: Clone + Copy + 'static> Bucket<T> {
}

pub fn unref(&mut self, key: &Pubkey) -> Option<RefCount> {
if let Ok((found, elem, _)) = self.find_entry_mut(key) {
if let Ok((found, elem_ix)) = self.find_entry_mut(key) {
if found {
let elem: &mut IndexEntry = self.index.get_mut(elem_ix);
elem.ref_count -= 1;
return Some(elem.ref_count);
}
Expand Down Expand Up @@ -277,14 +279,18 @@ impl<'b, T: Clone + Copy + 'static> Bucket<T> {
// fail early if the data bucket we need doesn't exist - we don't want the index entry partially allocated
return Err(BucketMapError::DataNoSpace((best_fit_bucket, 0)));
}
let (found, elem, elem_ix) = self.find_entry_mut(key)?;
let (found, elem_ix) = self.find_entry_mut(key)?;
let elem: &mut IndexEntry;
if !found {
let is_resizing = false;
let elem_uid = IndexEntry::key_uid(key);
let elem_uid = IndexEntry::key_uid2(key);
self.index.allocate(elem_ix, elem_uid, is_resizing).unwrap();
elem = self.index.get_mut(elem_ix);
// These fields will be overwritten after allocation by callers.
// Since this part of the mmapped file could have previously been used by someone else, there can be garbage here.
elem.init(key);
} else {
elem = self.index.get_mut(elem_ix);
}
elem.ref_count = ref_count;
let elem_uid = self.index.uid_unchecked(elem_ix);
Expand Down Expand Up @@ -378,6 +384,7 @@ impl<'b, T: Clone + Copy + 'static> Bucket<T> {
self.index.max_search,
Arc::clone(&self.stats.index),
Arc::clone(&self.index.count),
self.index.bit_field.is_some(),
);
let random = thread_rng().gen();
let mut valid = true;
Expand Down Expand Up @@ -440,6 +447,7 @@ impl<'b, T: Clone + Copy + 'static> Bucket<T> {
self.index.max_search,
Arc::clone(&self.stats.data),
Arc::default(),
false,
))
}
self.data.push(bucket);
Expand All @@ -450,7 +458,7 @@ impl<'b, T: Clone + Copy + 'static> Bucket<T> {

/// grow a data bucket
/// The application of the new bucket is deferred until the next write lock.
pub fn grow_data(&self, data_index: u64, current_capacity_pow2: u8) {
pub fn grow_data(&self, data_index: u64, current_capacity_pow2: u8, use_bit_field: bool) {
let new_bucket = BucketStorage::new_resized(
&self.drives,
self.index.max_search,
Expand All @@ -459,14 +467,15 @@ impl<'b, T: Clone + Copy + 'static> Bucket<T> {
1 << data_index,
Self::elem_size(),
&self.stats.data,
use_bit_field,
);
self.reallocated.add_reallocation();
let mut items = self.reallocated.items.lock().unwrap();
items.data = Some((data_index, new_bucket));
}

fn bucket_index_ix(index: &BucketStorage, key: &Pubkey, random: u64) -> u64 {
let uid = IndexEntry::key_uid(key);
let uid = IndexEntry::full_key_uid(key);
let mut s = DefaultHasher::new();
uid.hash(&mut s);
//the locally generated random will make it hard for an attacker
Expand All @@ -484,7 +493,7 @@ impl<'b, T: Clone + Copy + 'static> Bucket<T> {
match err {
BucketMapError::DataNoSpace((data_index, current_capacity_pow2)) => {
//debug!("GROWING SPACE {:?}", (data_index, current_capacity_pow2));
self.grow_data(data_index, current_capacity_pow2);
self.grow_data(data_index, current_capacity_pow2, false);
}
BucketMapError::IndexNoSpace(current_capacity_pow2) => {
//debug!("GROWING INDEX {}", sz);
Expand Down
2 changes: 2 additions & 0 deletions bucket_map/src/bucket_api.rs
Original file line number Diff line number Diff line change
Expand Up @@ -83,13 +83,15 @@ impl<T: Clone + Copy> BucketApi<T> {
}

fn get_write_bucket(&self) -> RwLockWriteGuard<Option<Bucket<T>>> {
let use_bit_field = true;
let mut bucket = self.bucket.write().unwrap();
if bucket.is_none() {
*bucket = Some(Bucket::new(
Arc::clone(&self.drives),
self.max_search,
Arc::clone(&self.stats),
Arc::clone(&self.count),
use_bit_field,
));
} else {
let write = bucket.as_mut().unwrap();
Expand Down
Loading

0 comments on commit 7d0a6e0

Please sign in to comment.