Skip to content

Commit

Permalink
disk bucket allocated bit not in header
Browse files Browse the repository at this point in the history
  • Loading branch information
jeffwashington committed Mar 22, 2023
1 parent 4285cb2 commit 64be54c
Showing 1 changed file with 79 additions and 19 deletions.
98 changes: 79 additions & 19 deletions bucket_map/src/bucket_storage.rs
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,10 @@ use {
*/
pub const DEFAULT_CAPACITY_POW2: u8 = 5;

/// true if the 'allocated' flag per entry should be represented a u64 header per entry
/// false if the 'allocated' flag per entry is stored elsewhere and there is no header
const ALLOCATED_BIT_IN_HEADER: bool = true;

/// A Header UID of 0 indicates that the header is unlocked
const UID_UNLOCKED: Uid = 0;
/// uid in maps is 1 or 0, where 0 is empty, 1 is in-use
Expand Down Expand Up @@ -103,7 +107,13 @@ impl BucketStorage {
stats: Arc<BucketStats>,
count: Arc<AtomicU64>,
) -> Self {
let cell_size = elem_size * num_elems + std::mem::size_of::<Header>() as u64;
let use_bit_field = !ALLOCATED_BIT_IN_HEADER;
let cell_size = elem_size * num_elems
+ if use_bit_field {
0
} else {
std::mem::size_of::<Header>() as u64
};
let (mmap, path) = Self::new_map(&drives, cell_size as usize, capacity_pow2, &stats);
Self {
path,
Expand Down Expand Up @@ -141,6 +151,7 @@ impl BucketStorage {

/// return ref to header of item 'ix' in mmapped file
fn header_ptr(&self, ix: u64) -> &Header {
assert!(ALLOCATED_BIT_IN_HEADER);
let ix = (ix * self.cell_size) as usize;
let hdr_slice: &[u8] = &self.mmap[ix..ix + std::mem::size_of::<Header>()];
unsafe {
Expand All @@ -152,6 +163,7 @@ impl BucketStorage {
/// return ref to header of item 'ix' in mmapped file
#[allow(clippy::mut_from_ref)]
fn header_mut_ptr(&self, ix: u64) -> &mut Header {
assert!(ALLOCATED_BIT_IN_HEADER);
let ix = (ix * self.cell_size) as usize;
let hdr_slice: &[u8] = &self.mmap[ix..ix + std::mem::size_of::<Header>()];
unsafe {
Expand All @@ -164,16 +176,30 @@ impl BucketStorage {
pub fn is_free(&self, ix: u64) -> bool {
// note that the terminology in the implementation is locked or unlocked.
// but our api is allocate/free
self.header_ptr(ix).is_unlocked()
if ALLOCATED_BIT_IN_HEADER {
self.header_ptr(ix).is_unlocked()
}
else {
unimplemented!();
}
}

fn try_lock(&mut self, ix: u64) -> bool {
if ALLOCATED_BIT_IN_HEADER {
self.header_mut_ptr(ix).try_lock()
}
else {
unimplemented!();
}
}

/// 'is_resizing' true if caller is resizing the index (so don't increment count)
/// 'is_resizing' false if caller is adding an item to the index (so increment count)
pub fn allocate(&self, ix: u64, is_resizing: bool) -> Result<(), BucketStorageError> {
pub fn allocate(&mut self, ix: u64, is_resizing: bool) -> Result<(), BucketStorageError> {
assert!(ix < self.capacity(), "allocate: bad index size");
let mut e = Err(BucketStorageError::AlreadyAllocated);
//debug!("ALLOC {} {}", ix, uid);
if self.header_mut_ptr(ix).try_lock() {
if self.try_lock(ix) {
e = Ok(());
if !is_resizing {
self.count.fetch_add(1, Ordering::Relaxed);
Expand All @@ -182,15 +208,28 @@ impl BucketStorage {
e
}

fn unlock(&mut self, ix: u64) {
if ALLOCATED_BIT_IN_HEADER {
self.header_mut_ptr(ix).unlock();
} else {
unimplemented!();
}
}

pub fn free(&mut self, ix: u64) {
assert!(ix < self.capacity(), "bad index size");
self.header_mut_ptr(ix).unlock();
self.unlock(ix);
self.count.fetch_sub(1, Ordering::Relaxed);
}

pub fn get<T: Sized>(&self, ix: u64) -> &T {
assert!(ix < self.capacity(), "bad index size");
let start = (ix * self.cell_size) as usize + std::mem::size_of::<Header>();
let start = (ix * self.cell_size) as usize
+ if ALLOCATED_BIT_IN_HEADER {
std::mem::size_of::<Header>()
} else {
0
};
let end = start + std::mem::size_of::<T>();
let item_slice: &[u8] = &self.mmap[start..end];
unsafe {
Expand All @@ -206,7 +245,12 @@ impl BucketStorage {
pub fn get_cell_slice<T: Sized>(&self, ix: u64, len: u64) -> &[T] {
assert!(ix < self.capacity(), "bad index size");
let ix = self.cell_size * ix;
let start = ix as usize + std::mem::size_of::<Header>();
let start = ix as usize
+ if ALLOCATED_BIT_IN_HEADER {
std::mem::size_of::<Header>()
} else {
0
};
let end = start + std::mem::size_of::<T>() * len as usize;
//debug!("GET slice {} {}", start, end);
let item_slice: &[u8] = &self.mmap[start..end];
Expand All @@ -219,7 +263,12 @@ impl BucketStorage {
#[allow(clippy::mut_from_ref)]
pub fn get_mut<T: Sized>(&self, ix: u64) -> &mut T {
assert!(ix < self.capacity(), "bad index size");
let start = (ix * self.cell_size) as usize + std::mem::size_of::<Header>();
let start = (ix * self.cell_size) as usize
+ if ALLOCATED_BIT_IN_HEADER {
std::mem::size_of::<Header>()
} else {
0
};
let end = start + std::mem::size_of::<T>();
let item_slice: &[u8] = &self.mmap[start..end];
unsafe {
Expand All @@ -232,7 +281,12 @@ impl BucketStorage {
pub fn get_mut_cell_slice<T: Sized>(&self, ix: u64, len: u64) -> &mut [T] {
assert!(ix < self.capacity(), "bad index size");
let ix = self.cell_size * ix;
let start = ix as usize + std::mem::size_of::<Header>();
let start = ix as usize
+ if ALLOCATED_BIT_IN_HEADER {
std::mem::size_of::<Header>()
} else {
0
};
let end = start + std::mem::size_of::<T>() * len as usize;
//debug!("GET mut slice {} {}", start, end);
let item_slice: &[u8] = &self.mmap[start..end];
Expand Down Expand Up @@ -306,16 +360,22 @@ impl BucketStorage {
let increment = self.capacity_pow2 - old_bucket.capacity_pow2;
let index_grow = 1 << increment;
(0..old_cap as usize).for_each(|i| {
let old_ix = i * old_bucket.cell_size as usize;
let new_ix = old_ix * index_grow;
let dst_slice: &[u8] = &self.mmap[new_ix..new_ix + old_bucket.cell_size as usize];
let src_slice: &[u8] = &old_map[old_ix..old_ix + old_bucket.cell_size as usize];

unsafe {
let dst = dst_slice.as_ptr() as *mut u8;
let src = src_slice.as_ptr() as *const u8;
std::ptr::copy_nonoverlapping(src, dst, old_bucket.cell_size as usize);
};
if !old_bucket.is_free(i as u64) {
if !ALLOCATED_BIT_IN_HEADER {
// mark the entry as allocated in the new bucket since the lock bit is not stored in a header
self.try_lock((i * index_grow) as u64);
}
let old_ix = i * old_bucket.cell_size as usize;
let new_ix = old_ix * index_grow;
let dst_slice: &[u8] = &self.mmap[new_ix..new_ix + old_bucket.cell_size as usize];
let src_slice: &[u8] = &old_map[old_ix..old_ix + old_bucket.cell_size as usize];

unsafe {
let dst = dst_slice.as_ptr() as *mut u8;
let src = src_slice.as_ptr() as *const u8;
std::ptr::copy_nonoverlapping(src, dst, old_bucket.cell_size as usize);
};
}
});
m.stop();
// resized so update total file size
Expand Down

0 comments on commit 64be54c

Please sign in to comment.