Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

use bits from ref count to store an enum tag #30903

Closed
wants to merge 3 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions bucket_map/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ license = { workspace = true }
edition = { workspace = true }

[dependencies]
bv = { workspace = true, features = ["serde"] }
log = { workspace = true }
memmap2 = { workspace = true }
modular-bitfield = { workspace = true }
Expand Down
258 changes: 162 additions & 96 deletions bucket_map/src/bucket.rs

Large diffs are not rendered by default.

7 changes: 4 additions & 3 deletions bucket_map/src/bucket_map.rs
Original file line number Diff line number Diff line change
Expand Up @@ -204,6 +204,7 @@ mod tests {
let config = BucketMapConfig::new(1 << 1);
let index = BucketMap::new(config);
let bucket = index.get_bucket(&key);
let value = [0];
if pass == 0 {
index.insert(&key, (&[0], 0));
} else {
Expand All @@ -220,7 +221,7 @@ mod tests {
let result = index.try_insert(&key, (&[0], 0));
assert!(result.is_ok());
}
assert_eq!(index.read_value(&key), Some((vec![0], 0)));
assert_eq!(index.read_value(&key), Some((value.to_vec(), 0)));
}
}

Expand Down Expand Up @@ -371,15 +372,15 @@ mod tests {
})
.collect::<Vec<_>>();
let hash_map = RwLock::new(HashMap::<Pubkey, (Vec<(usize, usize)>, RefCount)>::new());
let max_slot_list_len = 3;
let max_slot_list_len = 5;
let all_keys = Mutex::new(vec![]);

let gen_rand_value = || {
let count = thread_rng().gen_range(0, max_slot_list_len);
let v = (0..count)
.map(|x| (x as usize, x as usize /*thread_rng().gen::<usize>()*/))
.collect::<Vec<_>>();
let rc = thread_rng().gen::<RefCount>();
let rc = thread_rng().gen_range(0, RefCount::MAX >> 2);
(v, rc)
};

Expand Down
194 changes: 77 additions & 117 deletions bucket_map/src/bucket_storage.rs
Original file line number Diff line number Diff line change
Expand Up @@ -34,74 +34,42 @@ use {
*/
pub const DEFAULT_CAPACITY_POW2: u8 = 5;

#[derive(Debug, PartialEq, Eq)]
enum IsAllocatedFlagLocation {
/// 'allocated' flag per entry is stored in a u64 header per entry
InHeader,
pub trait BucketOccupied {
/// set entry at `ix` as occupied (as opposed to free)
fn occupy(&mut self, cell: &mut [u8], ix: usize);
/// set entry at `ix` as free
fn free(&mut self, cell: &mut [u8], ix: usize);
/// return true if entry at `ix` is free
fn is_free(&self, cell: &[u8], ix: usize) -> bool;
/// # of bytes prior to first data held in the element.
/// This is the header size, if a header exists per element in the data.
fn offset_to_first_data() -> usize;
fn new(num_elements: usize) -> Self;
}

const IS_ALLOCATED_FLAG_LOCATION: IsAllocatedFlagLocation = IsAllocatedFlagLocation::InHeader;

/// A Header UID of 0 indicates that the header is unlocked
const UID_UNLOCKED: Uid = 0;
/// uid in maps is 1 or 0, where 0 is empty, 1 is in-use
const UID_LOCKED: Uid = 1;

/// u64 for purposes of 8 byte alignment
/// We only need 1 bit of this.
type Uid = u64;

#[repr(C)]
struct Header {
lock: u64,
}

impl Header {
/// try to lock this entry with 'uid'
/// return true if it could be locked
fn try_lock(&mut self) -> bool {
if self.lock == UID_UNLOCKED {
self.lock = UID_LOCKED;
true
} else {
false
}
}

/// mark this entry as unlocked
fn unlock(&mut self) {
assert_eq!(UID_LOCKED, self.lock);
self.lock = UID_UNLOCKED;
}

/// true if this entry is unlocked
fn is_unlocked(&self) -> bool {
self.lock == UID_UNLOCKED
}
}

pub struct BucketStorage {
pub struct BucketStorage<T: BucketOccupied> {
path: PathBuf,
mmap: MmapMut,
pub cell_size: u64,
pub capacity_pow2: u8,
pub count: Arc<AtomicU64>,
pub stats: Arc<BucketStats>,
pub max_search: MaxSearch,
pub contents: T,
}

#[derive(Debug)]
pub enum BucketStorageError {
AlreadyAllocated,
AlreadyOccupied,
}

impl Drop for BucketStorage {
impl<O: BucketOccupied> Drop for BucketStorage<O> {
fn drop(&mut self) {
let _ = remove_file(&self.path);
_ = remove_file(&self.path);
}
}

impl BucketStorage {
impl<O: BucketOccupied> BucketStorage<O> {
pub fn new_with_capacity(
drives: Arc<Vec<PathBuf>>,
num_elems: u64,
Expand All @@ -111,7 +79,7 @@ impl BucketStorage {
stats: Arc<BucketStats>,
count: Arc<AtomicU64>,
) -> Self {
let cell_size = elem_size * num_elems + Self::header_size() as u64;
let cell_size = elem_size * num_elems + O::offset_to_first_data() as u64;
let (mmap, path) = Self::new_map(&drives, cell_size as usize, capacity_pow2, &stats);
Self {
path,
Expand All @@ -121,13 +89,7 @@ impl BucketStorage {
capacity_pow2,
stats,
max_search,
}
}

/// non-zero if there is a header allocated prior to each element to store the 'allocated' bit
fn header_size() -> usize {
match IS_ALLOCATED_FLAG_LOCATION {
IsAllocatedFlagLocation::InHeader => std::mem::size_of::<Header>(),
contents: O::new(1 << capacity_pow2),
}
}

Expand All @@ -154,46 +116,29 @@ impl BucketStorage {
)
}

/// return ref to header of item 'ix' in mmapped file
fn header_ptr(&self, ix: u64) -> &Header {
self.header_mut_ptr(ix)
}

/// return ref to header of item 'ix' in mmapped file
#[allow(clippy::mut_from_ref)]
fn header_mut_ptr(&self, ix: u64) -> &mut Header {
assert_eq!(
IS_ALLOCATED_FLAG_LOCATION,
IsAllocatedFlagLocation::InHeader
);
let ix = (ix * self.cell_size) as usize;
let hdr_slice: &[u8] = &self.mmap[ix..ix + std::mem::size_of::<Header>()];
unsafe {
let hdr = hdr_slice.as_ptr() as *mut Header;
hdr.as_mut().unwrap()
}
}

/// true if the entry at index 'ix' is free (as opposed to being allocated)
/// true if the entry at index 'ix' is free (as opposed to being occupied)
pub fn is_free(&self, ix: u64) -> bool {
// note that the terminology in the implementation is locked or unlocked.
// but our api is allocate/free
match IS_ALLOCATED_FLAG_LOCATION {
IsAllocatedFlagLocation::InHeader => self.header_ptr(ix).is_unlocked(),
}
let start = self.get_start_offset_with_header(ix);
let entry = &self.mmap[start..];
self.contents.is_free(entry, ix as usize)
}

fn try_lock(&mut self, ix: u64) -> bool {
match IS_ALLOCATED_FLAG_LOCATION {
IsAllocatedFlagLocation::InHeader => self.header_mut_ptr(ix).try_lock(),
let start = self.get_start_offset_with_header(ix);
let entry = &mut self.mmap[start..];
if self.contents.is_free(entry, ix as usize) {
self.contents.occupy(entry, ix as usize);
true
} else {
false
}
}

/// 'is_resizing' true if caller is resizing the index (so don't increment count)
/// 'is_resizing' false if caller is adding an item to the index (so increment count)
pub fn allocate(&mut self, ix: u64, is_resizing: bool) -> Result<(), BucketStorageError> {
assert!(ix < self.capacity(), "allocate: bad index size");
let mut e = Err(BucketStorageError::AlreadyAllocated);
pub fn occupy(&mut self, ix: u64, is_resizing: bool) -> Result<(), BucketStorageError> {
assert!(ix < self.capacity(), "occupy: bad index size");
let mut e = Err(BucketStorageError::AlreadyOccupied);
//debug!("ALLOC {} {}", ix, uid);
if self.try_lock(ix) {
e = Ok(());
Expand All @@ -206,16 +151,13 @@ impl BucketStorage {

pub fn free(&mut self, ix: u64) {
assert!(ix < self.capacity(), "bad index size");
match IS_ALLOCATED_FLAG_LOCATION {
IsAllocatedFlagLocation::InHeader => {
self.header_mut_ptr(ix).unlock();
}
}
let start = self.get_start_offset_with_header(ix);
self.contents.free(&mut self.mmap[start..], ix as usize);
self.count.fetch_sub(1, Ordering::Relaxed);
}

pub fn get<T: Sized>(&self, ix: u64) -> &T {
let start = self.get_start_offset(ix);
let start = self.get_start_offset_no_header(ix);
let end = start + std::mem::size_of::<T>();
let item_slice: &[u8] = &self.mmap[start..end];
unsafe {
Expand All @@ -224,18 +166,17 @@ impl BucketStorage {
}
}

pub fn get_empty_cell_slice<T: Sized + 'static>() -> &'static [T] {
&[]
fn get_start_offset_with_header(&self, ix: u64) -> usize {
assert!(ix < self.capacity(), "bad index size");
(self.cell_size * ix) as usize
}

fn get_start_offset(&self, ix: u64) -> usize {
assert!(ix < self.capacity(), "bad index size");
let ix = self.cell_size * ix;
ix as usize + Self::header_size()
fn get_start_offset_no_header(&self, ix: u64) -> usize {
self.get_start_offset_with_header(ix) + O::offset_to_first_data()
}

pub fn get_cell_slice<T: Sized>(&self, ix: u64, len: u64) -> &[T] {
let start = self.get_start_offset(ix);
let start = self.get_start_offset_no_header(ix);
let end = start + std::mem::size_of::<T>() * len as usize;
//debug!("GET slice {} {}", start, end);
let item_slice: &[u8] = &self.mmap[start..end];
Expand All @@ -246,19 +187,30 @@ impl BucketStorage {
}

#[allow(clippy::mut_from_ref)]
pub fn get_mut<T: Sized>(&self, ix: u64) -> &mut T {
let start = self.get_start_offset(ix);
let end = start + std::mem::size_of::<T>();
let item_slice: &[u8] = &self.mmap[start..end];
pub fn get_mut_from_parts<T: Sized>(item_slice: &mut [u8]) -> &mut T {
unsafe {
let item = item_slice.as_ptr() as *mut T;
&mut *item
}
}

pub(crate) fn get_from_parts<T: Sized>(item_slice: &[u8]) -> &T {
unsafe {
let item = item_slice.as_ptr() as *const T;
&*item
}
}

pub fn get_mut<T: Sized>(&mut self, ix: u64) -> &mut T {
let start = self.get_start_offset_no_header(ix);
let item_slice = &mut self.mmap[start..];
let item_slice = &mut item_slice[..std::mem::size_of::<T>()];
Self::get_mut_from_parts(item_slice)
}

#[allow(clippy::mut_from_ref)]
pub fn get_mut_cell_slice<T: Sized>(&self, ix: u64, len: u64) -> &mut [T] {
let start = self.get_start_offset(ix);
let start = self.get_start_offset_no_header(ix);
let end = start + std::mem::size_of::<T>() * len as usize;
//debug!("GET mut slice {} {}", start, end);
let item_slice: &[u8] = &self.mmap[start..end];
Expand Down Expand Up @@ -333,10 +285,12 @@ impl BucketStorage {
let index_grow = 1 << increment;
(0..old_cap as usize).for_each(|i| {
if !old_bucket.is_free(i as u64) {
match IS_ALLOCATED_FLAG_LOCATION {
IsAllocatedFlagLocation::InHeader => {
// nothing to do when bit is in header
}
{
// copying from old to new. If 'occupied' bit is stored outside the data, then
// occupied has to be set on the new entry in the new bucket.
let start = self.get_start_offset_with_header((i * index_grow) as u64);
self.contents
.occupy(&mut self.mmap[start..], i * index_grow);
}
let old_ix = i * old_bucket.cell_size as usize;
let new_ix = old_ix * index_grow;
Expand Down Expand Up @@ -401,26 +355,32 @@ impl BucketStorage {

#[cfg(test)]
mod test {
use {super::*, tempfile::tempdir};
use {super::*, crate::index_entry::IndexBucket, tempfile::tempdir};

#[test]
fn test_bucket_storage() {
let tmpdir = tempdir().unwrap();
let paths: Vec<PathBuf> = vec![tmpdir.path().to_path_buf()];
assert!(!paths.is_empty());

let mut storage =
BucketStorage::new(Arc::new(paths), 1, 1, 1, Arc::default(), Arc::default());
let mut storage = BucketStorage::<IndexBucket<u64>>::new(
Arc::new(paths),
1,
1,
1,
Arc::default(),
Arc::default(),
);
let ix = 0;
assert!(storage.is_free(ix));
assert!(storage.allocate(ix, false).is_ok());
assert!(storage.allocate(ix, false).is_err());
assert!(storage.occupy(ix, false).is_ok());
assert!(storage.occupy(ix, false).is_err());
assert!(!storage.is_free(ix));
storage.free(ix);
assert!(storage.is_free(ix));
assert!(storage.is_free(ix));
assert!(storage.allocate(ix, false).is_ok());
assert!(storage.allocate(ix, false).is_err());
assert!(storage.occupy(ix, false).is_ok());
assert!(storage.occupy(ix, false).is_err());
assert!(!storage.is_free(ix));
storage.free(ix);
assert!(storage.is_free(ix));
Expand Down
Loading