Skip to content

Commit

Permalink
remove uid concept from disk buckets
Browse files Browse the repository at this point in the history
  • Loading branch information
jeffwashington committed Mar 21, 2023
1 parent 4aac7ea commit 82bcdfa
Show file tree
Hide file tree
Showing 3 changed files with 37 additions and 71 deletions.
29 changes: 11 additions & 18 deletions bucket_map/src/bucket.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ use {
bucket_item::BucketItem,
bucket_map::BucketMapError,
bucket_stats::BucketMapStats,
bucket_storage::{BucketStorage, Uid, DEFAULT_CAPACITY_POW2},
bucket_storage::{BucketStorage, DEFAULT_CAPACITY_POW2},
index_entry::IndexEntry,
MaxSearch, RefCount,
},
Expand Down Expand Up @@ -205,7 +205,6 @@ impl<'b, T: Clone + Copy + 'static> Bucket<T> {
fn bucket_create_key(
index: &mut BucketStorage,
key: &Pubkey,
elem_uid: Uid,
random: u64,
is_resizing: bool,
) -> Result<u64, BucketMapError> {
Expand All @@ -216,7 +215,7 @@ impl<'b, T: Clone + Copy + 'static> Bucket<T> {
if !index.is_free(ii) {
continue;
}
index.allocate(ii, elem_uid, is_resizing).unwrap();
index.allocate(ii, is_resizing).unwrap();
let elem: &mut IndexEntry = index.get_mut(ii);
// These fields will be overwritten after allocation by callers.
// Since this part of the mmapped file could have previously been used by someone else, there can be garbage here.
Expand Down Expand Up @@ -282,22 +281,20 @@ impl<'b, T: Clone + Copy + 'static> Bucket<T> {
let (found, elem, elem_ix) = self.find_entry_mut(key)?;
if !found {
let is_resizing = false;
let elem_uid = IndexEntry::key_uid(key);
self.index.allocate(elem_ix, elem_uid, is_resizing).unwrap();
self.index.allocate(elem_ix, is_resizing).unwrap();
// These fields will be overwritten after allocation by callers.
// Since this part of the mmapped file could have previously been used by someone else, there can be garbage here.
elem.init(key);
}
elem.ref_count = ref_count;
let elem_uid = self.index.uid_unchecked(elem_ix);
let bucket_ix = elem.data_bucket_ix();
let current_bucket = &self.data[bucket_ix as usize];
let num_slots = data_len as u64;
if best_fit_bucket == bucket_ix && elem.num_slots > 0 {
// in place update
let elem_loc = elem.data_loc(current_bucket);
let slice: &mut [T] = current_bucket.get_mut_cell_slice(elem_loc, data_len as u64);
assert_eq!(current_bucket.uid(elem_loc), Some(elem_uid));
assert!(!current_bucket.is_free(elem_loc));
elem.num_slots = num_slots;

slice.iter_mut().zip(data).for_each(|(dest, src)| {
Expand Down Expand Up @@ -328,12 +325,12 @@ impl<'b, T: Clone + Copy + 'static> Bucket<T> {
elem.num_slots = num_slots;
if old_slots > 0 {
let current_bucket = &mut self.data[bucket_ix as usize];
current_bucket.free(elem_loc, elem_uid);
current_bucket.free(elem_loc);
}
//debug!( "DATA ALLOC {:?} {} {} {}", key, elem.data_location, best_bucket.capacity, elem_uid );
if num_slots > 0 {
let best_bucket = &mut self.data[best_fit_bucket as usize];
best_bucket.allocate(ix, elem_uid, false).unwrap();
best_bucket.allocate(ix, false).unwrap();
let slice = best_bucket.get_mut_cell_slice(ix, num_slots);
slice.iter_mut().zip(data).for_each(|(dest, src)| {
*dest = *src;
Expand All @@ -348,17 +345,16 @@ impl<'b, T: Clone + Copy + 'static> Bucket<T> {

pub fn delete_key(&mut self, key: &Pubkey) {
if let Some((elem, elem_ix)) = self.find_entry(key) {
let elem_uid = self.index.uid_unchecked(elem_ix);
if elem.num_slots > 0 {
let ix = elem.data_bucket_ix() as usize;
let data_bucket = &self.data[ix];
let loc = elem.data_loc(data_bucket);
let data_bucket = &mut self.data[ix];
//debug!( "DATA FREE {:?} {} {} {}", key, elem.data_location, data_bucket.capacity, elem_uid );
data_bucket.free(loc, elem_uid);
data_bucket.free(loc);
}
//debug!("INDEX FREE {:?} {}", key, elem_uid);
self.index.free(elem_ix, elem_uid);
self.index.free(elem_ix);
}
}

Expand All @@ -384,11 +380,9 @@ impl<'b, T: Clone + Copy + 'static> Bucket<T> {
let random = thread_rng().gen();
let mut valid = true;
for ix in 0..self.index.capacity() {
let uid = self.index.uid(ix);
if let Some(uid) = uid {
if !self.index.is_free(ix) {
let elem: &IndexEntry = self.index.get(ix);
let new_ix =
Self::bucket_create_key(&mut index, &elem.key, uid, random, true);
let new_ix = Self::bucket_create_key(&mut index, &elem.key, random, true);
if new_ix.is_err() {
valid = false;
break;
Expand Down Expand Up @@ -482,9 +476,8 @@ impl<'b, T: Clone + Copy + 'static> Bucket<T> {
}

fn bucket_index_ix(index: &BucketStorage, key: &Pubkey, random: u64) -> u64 {
let uid = IndexEntry::key_uid(key);
let mut s = DefaultHasher::new();
uid.hash(&mut s);
key.hash(&mut s);
//the locally generated random will make it hard for an attacker
//to deterministically cause all the pubkeys to land in the same
//location in any bucket on all validators
Expand Down
66 changes: 24 additions & 42 deletions bucket_map/src/bucket_storage.rs
Original file line number Diff line number Diff line change
Expand Up @@ -36,8 +36,12 @@ pub const DEFAULT_CAPACITY_POW2: u8 = 5;

/// A Header UID of 0 indicates that the header is unlocked
const UID_UNLOCKED: Uid = 0;
/// uid in maps is 1 or 0, where 0 is empty, 1 is in-use
const UID_LOCKED: Uid = 1;

pub(crate) type Uid = u64;
/// u64 for purposes of 8 byte alignment
/// We only need 1 bit of this.
type Uid = u64;

#[repr(C)]
struct Header {
Expand All @@ -55,19 +59,13 @@ impl Header {
false
}
}

/// mark this entry as unlocked
fn unlock(&mut self, expected: Uid) {
assert_eq!(expected, self.lock);
fn unlock(&mut self) {
assert_eq!(UID_LOCKED, self.lock);
self.lock = UID_UNLOCKED;
}
/// uid that has locked this entry or None if unlocked
fn uid(&self) -> Option<Uid> {
if self.lock == UID_UNLOCKED {
None
} else {
Some(self.lock)
}
}

/// true if this entry is unlocked
fn is_unlocked(&self) -> bool {
self.lock == UID_UNLOCKED
Expand Down Expand Up @@ -162,32 +160,24 @@ impl BucketStorage {
}
}

/// return uid allocated at index 'ix' or None if vacant
pub fn uid(&self, ix: u64) -> Option<Uid> {
assert!(ix < self.capacity(), "bad index size");
self.header_ptr(ix).uid()
}

/// true if the entry at index 'ix' is free (as opposed to being allocated)
pub fn is_free(&self, ix: u64) -> bool {
// note that the terminology in the implementation is locked or unlocked.
// but our api is allocate/free
self.header_ptr(ix).is_unlocked()
}

/// caller knows id is not empty
pub fn uid_unchecked(&self, ix: u64) -> Uid {
self.uid(ix).unwrap()
}

/// 'is_resizing' true if caller is resizing the index (so don't increment count)
/// 'is_resizing' false if caller is adding an item to the index (so increment count)
pub fn allocate(&self, ix: u64, uid: Uid, is_resizing: bool) -> Result<(), BucketStorageError> {
pub fn allocate(
&mut self,
ix: u64,
is_resizing: bool,
) -> Result<(), BucketStorageError> {
assert!(ix < self.capacity(), "allocate: bad index size");
assert!(UID_UNLOCKED != uid, "allocate: bad uid");
let mut e = Err(BucketStorageError::AlreadyAllocated);
//debug!("ALLOC {} {}", ix, uid);
if self.header_mut_ptr(ix).try_lock(uid) {
if self.header_mut_ptr(ix).try_lock(UID_LOCKED) {
e = Ok(());
if !is_resizing {
self.count.fetch_add(1, Ordering::Relaxed);
Expand All @@ -196,10 +186,9 @@ impl BucketStorage {
e
}

pub fn free(&mut self, ix: u64, uid: Uid) {
pub fn free(&mut self, ix: u64) {
assert!(ix < self.capacity(), "bad index size");
assert!(UID_UNLOCKED != uid, "free: bad uid");
self.header_mut_ptr(ix).unlock(uid);
self.header_mut_ptr(ix).unlock();
self.count.fetch_sub(1, Ordering::Relaxed);
}

Expand Down Expand Up @@ -312,6 +301,7 @@ impl BucketStorage {
}

/// copy contents from 'old_bucket' to 'self'
/// This is used by data buckets
fn copy_contents(&mut self, old_bucket: &Self) {
let mut m = Measure::start("grow");
let old_cap = old_bucket.capacity();
Expand Down Expand Up @@ -393,25 +383,17 @@ mod test {
let mut storage =
BucketStorage::new(Arc::new(paths), 1, 1, 1, Arc::default(), Arc::default());
let ix = 0;
let uid = Uid::MAX;
assert!(storage.is_free(ix));
assert!(storage.allocate(ix, uid, false).is_ok());
assert!(storage.allocate(ix, uid, false).is_err());
assert!(storage.allocate(ix, false).is_ok());
assert!(storage.allocate(ix, false).is_err());
assert!(!storage.is_free(ix));
assert_eq!(storage.uid(ix), Some(uid));
assert_eq!(storage.uid_unchecked(ix), uid);
storage.free(ix, uid);
storage.free(ix);
assert!(storage.is_free(ix));
assert_eq!(storage.uid(ix), None);
let uid = 1;
assert!(storage.is_free(ix));
assert!(storage.allocate(ix, uid, false).is_ok());
assert!(storage.allocate(ix, uid, false).is_err());
assert!(storage.allocate(ix, false).is_ok());
assert!(storage.allocate(ix, false).is_err());
assert!(!storage.is_free(ix));
assert_eq!(storage.uid(ix), Some(uid));
assert_eq!(storage.uid_unchecked(ix), uid);
storage.free(ix, uid);
storage.free(ix);
assert!(storage.is_free(ix));
assert_eq!(storage.uid(ix), None);
}
}
13 changes: 2 additions & 11 deletions bucket_map/src/index_entry.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,15 +3,13 @@
use {
crate::{
bucket::Bucket,
bucket_storage::{BucketStorage, Uid},
bucket_storage::{BucketStorage},
RefCount,
},
modular_bitfield::prelude::*,
solana_sdk::{clock::Slot, pubkey::Pubkey},
std::{
collections::hash_map::DefaultHasher,
fmt::Debug,
hash::{Hash, Hasher},
},
};

Expand Down Expand Up @@ -99,21 +97,14 @@ impl IndexEntry {
let data_bucket_ix = self.data_bucket_ix();
let data_bucket = &bucket.data[data_bucket_ix as usize];
let loc = self.data_loc(data_bucket);
let uid = Self::key_uid(&self.key);
assert_eq!(Some(uid), data_bucket.uid(loc));
assert!(!data_bucket.is_free(loc));
data_bucket.get_cell_slice(loc, self.num_slots)
} else {
// num_slots is 0. This means we don't have an actual allocation.
BucketStorage::get_empty_cell_slice()
};
Some((slice, self.ref_count))
}

pub fn key_uid(key: &Pubkey) -> Uid {
let mut s = DefaultHasher::new();
key.hash(&mut s);
s.finish().max(1u64)
}
}

#[cfg(test)]
Expand Down

0 comments on commit 82bcdfa

Please sign in to comment.