Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

remove uid concept from disk buckets #30836

Merged
merged 2 commits into from
Mar 22, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
29 changes: 11 additions & 18 deletions bucket_map/src/bucket.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ use {
bucket_item::BucketItem,
bucket_map::BucketMapError,
bucket_stats::BucketMapStats,
bucket_storage::{BucketStorage, Uid, DEFAULT_CAPACITY_POW2},
bucket_storage::{BucketStorage, DEFAULT_CAPACITY_POW2},
index_entry::IndexEntry,
MaxSearch, RefCount,
},
Expand Down Expand Up @@ -205,7 +205,6 @@ impl<'b, T: Clone + Copy + 'static> Bucket<T> {
fn bucket_create_key(
index: &mut BucketStorage,
key: &Pubkey,
elem_uid: Uid,
random: u64,
is_resizing: bool,
) -> Result<u64, BucketMapError> {
Expand All @@ -216,7 +215,7 @@ impl<'b, T: Clone + Copy + 'static> Bucket<T> {
if !index.is_free(ii) {
continue;
}
index.allocate(ii, elem_uid, is_resizing).unwrap();
index.allocate(ii, is_resizing).unwrap();
let elem: &mut IndexEntry = index.get_mut(ii);
// These fields will be overwritten after allocation by callers.
// Since this part of the mmapped file could have previously been used by someone else, there can be garbage here.
Expand Down Expand Up @@ -282,22 +281,20 @@ impl<'b, T: Clone + Copy + 'static> Bucket<T> {
let (found, elem, elem_ix) = self.find_entry_mut(key)?;
if !found {
let is_resizing = false;
let elem_uid = IndexEntry::key_uid(key);
self.index.allocate(elem_ix, elem_uid, is_resizing).unwrap();
self.index.allocate(elem_ix, is_resizing).unwrap();
// These fields will be overwritten after allocation by callers.
// Since this part of the mmapped file could have previously been used by someone else, there can be garbage here.
elem.init(key);
}
elem.ref_count = ref_count;
let elem_uid = self.index.uid_unchecked(elem_ix);
let bucket_ix = elem.data_bucket_ix();
let current_bucket = &self.data[bucket_ix as usize];
let num_slots = data_len as u64;
if best_fit_bucket == bucket_ix && elem.num_slots > 0 {
// in place update
let elem_loc = elem.data_loc(current_bucket);
let slice: &mut [T] = current_bucket.get_mut_cell_slice(elem_loc, data_len as u64);
assert_eq!(current_bucket.uid(elem_loc), Some(elem_uid));
assert!(!current_bucket.is_free(elem_loc));
elem.num_slots = num_slots;

slice.iter_mut().zip(data).for_each(|(dest, src)| {
Expand Down Expand Up @@ -328,12 +325,12 @@ impl<'b, T: Clone + Copy + 'static> Bucket<T> {
elem.num_slots = num_slots;
if old_slots > 0 {
let current_bucket = &mut self.data[bucket_ix as usize];
current_bucket.free(elem_loc, elem_uid);
current_bucket.free(elem_loc);
}
//debug!( "DATA ALLOC {:?} {} {} {}", key, elem.data_location, best_bucket.capacity, elem_uid );
if num_slots > 0 {
let best_bucket = &mut self.data[best_fit_bucket as usize];
best_bucket.allocate(ix, elem_uid, false).unwrap();
best_bucket.allocate(ix, false).unwrap();
let slice = best_bucket.get_mut_cell_slice(ix, num_slots);
slice.iter_mut().zip(data).for_each(|(dest, src)| {
*dest = *src;
Expand All @@ -348,17 +345,16 @@ impl<'b, T: Clone + Copy + 'static> Bucket<T> {

pub fn delete_key(&mut self, key: &Pubkey) {
if let Some((elem, elem_ix)) = self.find_entry(key) {
let elem_uid = self.index.uid_unchecked(elem_ix);
if elem.num_slots > 0 {
let ix = elem.data_bucket_ix() as usize;
let data_bucket = &self.data[ix];
let loc = elem.data_loc(data_bucket);
let data_bucket = &mut self.data[ix];
//debug!( "DATA FREE {:?} {} {} {}", key, elem.data_location, data_bucket.capacity, elem_uid );
data_bucket.free(loc, elem_uid);
data_bucket.free(loc);
}
//debug!("INDEX FREE {:?} {}", key, elem_uid);
self.index.free(elem_ix, elem_uid);
self.index.free(elem_ix);
}
}

Expand All @@ -384,11 +380,9 @@ impl<'b, T: Clone + Copy + 'static> Bucket<T> {
let random = thread_rng().gen();
let mut valid = true;
for ix in 0..self.index.capacity() {
let uid = self.index.uid(ix);
if let Some(uid) = uid {
if !self.index.is_free(ix) {
let elem: &IndexEntry = self.index.get(ix);
let new_ix =
Self::bucket_create_key(&mut index, &elem.key, uid, random, true);
let new_ix = Self::bucket_create_key(&mut index, &elem.key, random, true);
if new_ix.is_err() {
valid = false;
break;
Expand Down Expand Up @@ -482,9 +476,8 @@ impl<'b, T: Clone + Copy + 'static> Bucket<T> {
}

fn bucket_index_ix(index: &BucketStorage, key: &Pubkey, random: u64) -> u64 {
let uid = IndexEntry::key_uid(key);
let mut s = DefaultHasher::new();
uid.hash(&mut s);
key.hash(&mut s);
//the locally generated random will make it hard for an attacker
//to deterministically cause all the pubkeys to land in the same
//location in any bucket on all validators
Expand Down
66 changes: 22 additions & 44 deletions bucket_map/src/bucket_storage.rs
Original file line number Diff line number Diff line change
Expand Up @@ -36,8 +36,12 @@ pub const DEFAULT_CAPACITY_POW2: u8 = 5;

/// A Header UID of 0 indicates that the header is unlocked
const UID_UNLOCKED: Uid = 0;
/// uid in maps is 1 or 0, where 0 is empty, 1 is in-use
const UID_LOCKED: Uid = 1;

pub(crate) type Uid = u64;
/// u64 for purposes of 8 byte alignment
/// We only need 1 bit of this.
type Uid = u64;

#[repr(C)]
struct Header {
Expand All @@ -47,27 +51,21 @@ struct Header {
impl Header {
/// try to lock this entry with 'uid'
/// return true if it could be locked
fn try_lock(&mut self, uid: Uid) -> bool {
fn try_lock(&mut self) -> bool {
if self.lock == UID_UNLOCKED {
self.lock = uid;
self.lock = UID_LOCKED;
true
} else {
false
}
}

/// mark this entry as unlocked
fn unlock(&mut self, expected: Uid) {
Copy link
Contributor

@brooksprumo brooksprumo Mar 21, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Removing the Uid param here is nice. Do you plan to also do that for try_lock()?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yes, I should have. done now.

assert_eq!(expected, self.lock);
fn unlock(&mut self) {
assert_eq!(UID_LOCKED, self.lock);
self.lock = UID_UNLOCKED;
}
/// uid that has locked this entry or None if unlocked
fn uid(&self) -> Option<Uid> {
if self.lock == UID_UNLOCKED {
None
} else {
Some(self.lock)
}
}

/// true if this entry is unlocked
fn is_unlocked(&self) -> bool {
self.lock == UID_UNLOCKED
Expand Down Expand Up @@ -162,32 +160,20 @@ impl BucketStorage {
}
}

/// return uid allocated at index 'ix' or None if vacant
pub fn uid(&self, ix: u64) -> Option<Uid> {
assert!(ix < self.capacity(), "bad index size");
self.header_ptr(ix).uid()
}

/// true if the entry at index 'ix' is free (as opposed to being allocated)
pub fn is_free(&self, ix: u64) -> bool {
// note that the terminology in the implementation is locked or unlocked.
// but our api is allocate/free
self.header_ptr(ix).is_unlocked()
}

/// caller knows id is not empty
pub fn uid_unchecked(&self, ix: u64) -> Uid {
self.uid(ix).unwrap()
}

/// 'is_resizing' true if caller is resizing the index (so don't increment count)
/// 'is_resizing' false if caller is adding an item to the index (so increment count)
pub fn allocate(&self, ix: u64, uid: Uid, is_resizing: bool) -> Result<(), BucketStorageError> {
pub fn allocate(&self, ix: u64, is_resizing: bool) -> Result<(), BucketStorageError> {
assert!(ix < self.capacity(), "allocate: bad index size");
assert!(UID_UNLOCKED != uid, "allocate: bad uid");
let mut e = Err(BucketStorageError::AlreadyAllocated);
//debug!("ALLOC {} {}", ix, uid);
if self.header_mut_ptr(ix).try_lock(uid) {
if self.header_mut_ptr(ix).try_lock() {
e = Ok(());
if !is_resizing {
self.count.fetch_add(1, Ordering::Relaxed);
Expand All @@ -196,10 +182,9 @@ impl BucketStorage {
e
}

pub fn free(&mut self, ix: u64, uid: Uid) {
pub fn free(&mut self, ix: u64) {
assert!(ix < self.capacity(), "bad index size");
assert!(UID_UNLOCKED != uid, "free: bad uid");
self.header_mut_ptr(ix).unlock(uid);
self.header_mut_ptr(ix).unlock();
self.count.fetch_sub(1, Ordering::Relaxed);
}

Expand Down Expand Up @@ -312,6 +297,7 @@ impl BucketStorage {
}

/// copy contents from 'old_bucket' to 'self'
/// This is used by data buckets
fn copy_contents(&mut self, old_bucket: &Self) {
let mut m = Measure::start("grow");
let old_cap = old_bucket.capacity();
Expand Down Expand Up @@ -393,25 +379,17 @@ mod test {
let mut storage =
BucketStorage::new(Arc::new(paths), 1, 1, 1, Arc::default(), Arc::default());
let ix = 0;
let uid = Uid::MAX;
assert!(storage.is_free(ix));
assert!(storage.allocate(ix, uid, false).is_ok());
assert!(storage.allocate(ix, uid, false).is_err());
assert!(storage.allocate(ix, false).is_ok());
assert!(storage.allocate(ix, false).is_err());
assert!(!storage.is_free(ix));
assert_eq!(storage.uid(ix), Some(uid));
assert_eq!(storage.uid_unchecked(ix), uid);
storage.free(ix, uid);
storage.free(ix);
assert!(storage.is_free(ix));
assert_eq!(storage.uid(ix), None);
let uid = 1;
assert!(storage.is_free(ix));
assert!(storage.allocate(ix, uid, false).is_ok());
assert!(storage.allocate(ix, uid, false).is_err());
assert!(storage.allocate(ix, false).is_ok());
assert!(storage.allocate(ix, false).is_err());
assert!(!storage.is_free(ix));
assert_eq!(storage.uid(ix), Some(uid));
assert_eq!(storage.uid_unchecked(ix), uid);
storage.free(ix, uid);
storage.free(ix);
assert!(storage.is_free(ix));
assert_eq!(storage.uid(ix), None);
}
}
21 changes: 3 additions & 18 deletions bucket_map/src/index_entry.rs
Original file line number Diff line number Diff line change
@@ -1,18 +1,10 @@
#![allow(dead_code)]

use {
crate::{
bucket::Bucket,
bucket_storage::{BucketStorage, Uid},
RefCount,
},
crate::{bucket::Bucket, bucket_storage::BucketStorage, RefCount},
modular_bitfield::prelude::*,
solana_sdk::{clock::Slot, pubkey::Pubkey},
std::{
collections::hash_map::DefaultHasher,
fmt::Debug,
hash::{Hash, Hasher},
},
std::fmt::Debug,
};

#[repr(C)]
Expand Down Expand Up @@ -99,21 +91,14 @@ impl IndexEntry {
let data_bucket_ix = self.data_bucket_ix();
let data_bucket = &bucket.data[data_bucket_ix as usize];
let loc = self.data_loc(data_bucket);
let uid = Self::key_uid(&self.key);
assert_eq!(Some(uid), data_bucket.uid(loc));
assert!(!data_bucket.is_free(loc));
data_bucket.get_cell_slice(loc, self.num_slots)
} else {
// num_slots is 0. This means we don't have an actual allocation.
BucketStorage::get_empty_cell_slice()
};
Some((slice, self.ref_count))
}

pub fn key_uid(key: &Pubkey) -> Uid {
let mut s = DefaultHasher::new();
key.hash(&mut s);
s.finish().max(1u64)
}
}

#[cfg(test)]
Expand Down