Skip to content

Commit

Permalink
disk index: add Capacity
Browse files Browse the repository at this point in the history
  • Loading branch information
jeffwashington committed Apr 3, 2023
1 parent f289a07 commit e953d84
Show file tree
Hide file tree
Showing 4 changed files with 102 additions and 32 deletions.
31 changes: 19 additions & 12 deletions bucket_map/src/bucket.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,10 @@ use {
bucket_item::BucketItem,
bucket_map::BucketMapError,
bucket_stats::BucketMapStats,
bucket_storage::{BucketOccupied, BucketStorage, IncludeHeader, DEFAULT_CAPACITY_POW2},
bucket_storage::{
BucketCapacity, BucketOccupied, BucketStorage, Capacity, IncludeHeader,
DEFAULT_CAPACITY_POW2,
},
index_entry::{
DataBucket, IndexBucket, IndexEntry, IndexEntryPlaceInBucket, MultipleSlots,
OccupiedEnum,
Expand Down Expand Up @@ -88,11 +91,11 @@ struct DataFileEntryToFree {
// >= 2 instances of BucketStorage per 'bucket' in the bucket map. 1 for index, >= 1 for data
pub struct Bucket<T: Copy + 'static> {
drives: Arc<Vec<PathBuf>>,
//index
/// index
pub index: BucketStorage<IndexBucket<T>>,
//random offset for the index
/// random offset for the index
random: u64,
//storage buckets to store SlotSlice up to a power of 2 in len
/// storage buckets to store SlotSlice up to a power of 2 in len
pub data: Vec<BucketStorage<DataBucket>>,
stats: Arc<BucketMapStats>,

Expand Down Expand Up @@ -205,7 +208,7 @@ impl<'b, T: Clone + Copy + 'static> Bucket<T> {
.fetch_add(m.as_us(), Ordering::Relaxed);
match first_free {
Some(ii) => Ok((None, ii)),
None => Err(BucketMapError::IndexNoSpace(index.capacity_pow2)),
None => Err(BucketMapError::IndexNoSpace(index.capacity.capacity_pow2())),
}
}

Expand Down Expand Up @@ -258,7 +261,7 @@ impl<'b, T: Clone + Copy + 'static> Bucket<T> {
.stats
.find_index_entry_mut_us
.fetch_add(m.as_us(), Ordering::Relaxed);
Err(BucketMapError::IndexNoSpace(index.capacity_pow2))
Err(BucketMapError::IndexNoSpace(index.capacity.capacity_pow2()))
}

pub fn read_value(&self, key: &Pubkey) -> Option<(&[T], RefCount)> {
Expand Down Expand Up @@ -351,7 +354,7 @@ impl<'b, T: Clone + Copy + 'static> Bucket<T> {

// need to move the allocation to a best fit spot
let best_bucket = &self.data[best_fit_bucket as usize];
let cap_power = best_bucket.capacity_pow2;
let cap_power = best_bucket.capacity.capacity_pow2();
let cap = best_bucket.capacity();
let pos = thread_rng().gen_range(0, cap);
let mut success = false;
Expand All @@ -368,7 +371,8 @@ impl<'b, T: Clone + Copy + 'static> Bucket<T> {
if best_bucket.is_free(ix) {
let mut multiple_slots = MultipleSlots::default();
multiple_slots.set_storage_offset(ix);
multiple_slots.set_storage_capacity_when_created_pow2(best_bucket.capacity_pow2);
multiple_slots
.set_storage_capacity_when_created_pow2(best_bucket.capacity.capacity_pow2());
multiple_slots.set_num_slots(num_slots);
elem.set_slot_count_enum_value(
&mut self.index,
Expand Down Expand Up @@ -421,7 +425,7 @@ impl<'b, T: Clone + Copy + 'static> Bucket<T> {
}

pub fn grow_index(&self, current_capacity_pow2: u8) {
if self.index.capacity_pow2 == current_capacity_pow2 {
if self.index.capacity.capacity_pow2() == current_capacity_pow2 {
let mut m = Measure::start("grow_index");
//debug!("GROW_INDEX: {}", current_capacity_pow2);
let increment = 1;
Expand All @@ -434,7 +438,7 @@ impl<'b, T: Clone + Copy + 'static> Bucket<T> {
1,
std::mem::size_of::<IndexEntry<T>>() as u64,
// *2 causes rapid growth of index buckets
self.index.capacity_pow2 + i, // * 2,
Capacity::Pow2(self.index.capacity.capacity_pow2() + i), // * 2,
self.index.max_search,
Arc::clone(&self.stats.index),
Arc::clone(&self.index.count),
Expand Down Expand Up @@ -528,7 +532,10 @@ impl<'b, T: Clone + Copy + 'static> Bucket<T> {
&self.drives,
self.index.max_search,
self.data.get(data_index as usize),
std::cmp::max(current_capacity_pow2 + 1, DEFAULT_CAPACITY_POW2),
Capacity::Pow2(std::cmp::max(
current_capacity_pow2 + 1,
DEFAULT_CAPACITY_POW2,
)),
1 << data_index,
Self::elem_size(),
&self.stats.data,
Expand All @@ -552,7 +559,7 @@ impl<'b, T: Clone + Copy + 'static> Bucket<T> {

/// grow the appropriate piece. Note this takes an immutable ref.
/// The actual grow is set into self.reallocated and applied later on a write lock
pub fn grow(&self, err: BucketMapError) {
pub(crate) fn grow(&self, err: BucketMapError) {
match err {
BucketMapError::DataNoSpace((data_index, current_capacity_pow2)) => {
//debug!("GROWING SPACE {:?}", (data_index, current_capacity_pow2));
Expand Down
4 changes: 3 additions & 1 deletion bucket_map/src/bucket_map.rs
Original file line number Diff line number Diff line change
Expand Up @@ -47,11 +47,13 @@ impl<T: Clone + Copy + Debug> std::fmt::Debug for BucketMap<T> {
}
}

/// used to communicate resize necessary and current size.
#[derive(Debug)]
pub enum BucketMapError {
/// (bucket_index, current_capacity_pow2)
/// Note that this is specific to data buckets
DataNoSpace((u64, u8)),
/// current_capacity_pow2
/// Note that this is specific to index buckets
IndexNoSpace(u8),
}

Expand Down
87 changes: 73 additions & 14 deletions bucket_map/src/bucket_storage.rs
Original file line number Diff line number Diff line change
Expand Up @@ -49,8 +49,8 @@ pub trait BucketOccupied {
/// This must be a multiple of sizeof(u64)
fn offset_to_first_data() -> usize;
/// initialize this struct
/// `num_elements` is the number of elements allocated in the bucket
fn new(num_elements: usize) -> Self;
/// `capacity` is the number of elements allocated in the bucket
fn new(capacity: Capacity) -> Self;
/// copying entry. Any in-memory (per-bucket) data structures may need to be copied for this `ix_old`.
/// no-op by default
fn copying_entry(
Expand All @@ -64,11 +64,19 @@ pub trait BucketOccupied {
}
}

pub trait BucketCapacity {
fn capacity(&self) -> u64;
fn capacity_pow2(&self) -> u8 {
unimplemented!();
}
}

pub struct BucketStorage<O: BucketOccupied> {
path: PathBuf,
mmap: MmapMut,
pub cell_size: u64,
pub capacity_pow2: u8,
/// number of cells this bucket can hold
pub capacity: Capacity,
pub count: Arc<AtomicU64>,
pub stats: Arc<BucketStats>,
pub max_search: MaxSearch,
Expand All @@ -94,12 +102,40 @@ pub(crate) enum IncludeHeader {
NoHeader,
}

/// 2 common ways of specifying capacity
#[derive(Debug, PartialEq, Copy, Clone)]
pub enum Capacity {
/// 1 << Pow2 produces # elements
Pow2(u8),
/// Random IS the # elements
Random(u64),
}

impl BucketCapacity for Capacity {
fn capacity(&self) -> u64 {
match self {
Capacity::Pow2(pow2) => 1 << *pow2,
Capacity::Random(elements) => *elements,
}
}
fn capacity_pow2(&self) -> u8 {
match self {
Capacity::Pow2(pow2) => *pow2,
Capacity::Random(_elements) => {
panic!("illegal to ask for pow2 from random capacity");
}
}
}
}

impl<O: BucketOccupied> BucketStorage<O> {
/// allocate a bucket of at least `capacity` elements.
/// if capacity can be random, more may be allocated to fill the last page.
pub fn new_with_capacity(
drives: Arc<Vec<PathBuf>>,
num_elems: u64,
elem_size: u64,
capacity_pow2: u8,
mut capacity: Capacity,
max_search: MaxSearch,
stats: Arc<BucketStats>,
count: Arc<AtomicU64>,
Expand All @@ -112,18 +148,35 @@ impl<O: BucketOccupied> BucketStorage<O> {
"header size must be a multiple of u64"
);
let cell_size = elem_size * num_elems + offset as u64;
let bytes = (1u64 << capacity_pow2) * cell_size;
let bytes = Self::allocate_to_fill_page(&mut capacity, cell_size);
let (mmap, path) = Self::new_map(&drives, bytes, &stats);
Self {
path,
mmap,
cell_size,
count,
capacity_pow2,
capacity,
stats,
max_search,
contents: O::new(1 << capacity_pow2),
contents: O::new(capacity),
}
}

fn allocate_to_fill_page(capacity: &mut Capacity, cell_size: u64) -> u64 {
let mut bytes = capacity.capacity() * cell_size;
if let Capacity::Random(_) = capacity {
// maybe bump up allocation to fit a page size
const PAGE_SIZE: u64 = 4 * 1024;
let full_page_bytes = bytes / PAGE_SIZE * PAGE_SIZE / cell_size * cell_size;
if full_page_bytes < bytes {
let bytes_new = ((bytes / PAGE_SIZE) + 1) * PAGE_SIZE / cell_size * cell_size;
assert!(bytes_new >= bytes, "allocating less than requested, capacity: {}, bytes: {}, bytes_new: {}, full_page_bytes: {}", capacity.capacity(), bytes, bytes_new, full_page_bytes);
assert_eq!(bytes_new % cell_size, 0);
bytes = bytes_new;
*capacity = Capacity::Random(bytes / cell_size);
}
}
bytes
}

pub fn max_search(&self) -> u64 {
Expand All @@ -142,7 +195,7 @@ impl<O: BucketOccupied> BucketStorage<O> {
drives,
num_elems,
elem_size,
DEFAULT_CAPACITY_POW2,
Capacity::Pow2(DEFAULT_CAPACITY_POW2),
max_search,
stats,
count,
Expand Down Expand Up @@ -330,7 +383,7 @@ impl<O: BucketOccupied> BucketStorage<O> {
let old_cap = old_bucket.capacity();
let old_map = &old_bucket.mmap;

let increment = self.capacity_pow2 - old_bucket.capacity_pow2;
let increment = self.capacity.capacity_pow2() - old_bucket.capacity.capacity_pow2();
let index_grow = 1 << increment;
(0..old_cap as usize).for_each(|i| {
if !old_bucket.is_free(i as u64) {
Expand Down Expand Up @@ -370,7 +423,7 @@ impl<O: BucketOccupied> BucketStorage<O> {
drives: &Arc<Vec<PathBuf>>,
max_search: MaxSearch,
bucket: Option<&Self>,
capacity_pow_2: u8,
capacity: Capacity,
num_elems: u64,
elem_size: u64,
stats: &Arc<BucketStats>,
Expand All @@ -379,7 +432,7 @@ impl<O: BucketOccupied> BucketStorage<O> {
Arc::clone(drives),
num_elems,
elem_size,
capacity_pow_2,
capacity,
max_search,
Arc::clone(stats),
bucket
Expand All @@ -400,7 +453,7 @@ impl<O: BucketOccupied> BucketStorage<O> {

/// Return the number of cells currently allocated
pub fn capacity(&self) -> u64 {
1 << self.capacity_pow2
self.capacity.capacity()
}
}

Expand Down Expand Up @@ -458,19 +511,25 @@ mod test {
std::mem::size_of::<u64>() - 1
}
/// initialize this struct
fn new(_num_elements: usize) -> Self {
fn new(_num_elements: Capacity) -> Self {
Self {}
}
}

impl BucketCapacity for BucketBadHeader {
fn capacity(&self) -> u64 {
unimplemented!();
}
}

#[test]
#[should_panic(expected = "assertion failed: `(left == right)`")]
fn test_header_size() {
_ = BucketStorage::<BucketBadHeader>::new_with_capacity(
Arc::default(),
0,
0,
0,
Capacity::Pow2(0),
0,
Arc::default(),
Arc::default(),
Expand Down
12 changes: 7 additions & 5 deletions bucket_map/src/index_entry.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@

use {
crate::{
bucket_storage::{BucketOccupied, BucketStorage, IncludeHeader},
bucket_storage::{BucketCapacity, BucketOccupied, BucketStorage, Capacity, IncludeHeader},
RefCount,
},
bv::BitVec,
Expand Down Expand Up @@ -32,9 +32,9 @@ impl BucketOccupied for BucketWithBitVec {
// no header, nothing stored in data stream
0
}
fn new(num_elements: usize) -> Self {
fn new(capacity: Capacity) -> Self {
Self {
occupied: BitVec::new_fill(false, num_elements as u64),
occupied: BitVec::new_fill(false, capacity.capacity() as u64),
}
}
}
Expand Down Expand Up @@ -63,9 +63,10 @@ impl<T: Copy> BucketOccupied for IndexBucketUsingRefCountBits<T> {
matches!(entry.get_slot_count_enum(), OccupiedEnum::Free)
}
fn offset_to_first_data() -> usize {
// no header, nothing stored in data stream
0
}
fn new(_num_elements: usize) -> Self {
fn new(_capacity: Capacity) -> Self {
Self {
_phantom: PhantomData,
}
Expand Down Expand Up @@ -169,7 +170,8 @@ impl MultipleSlots {
/// This function maps the original data location into an index in the current bucket storage.
/// This is coupled with how we resize bucket storages.
pub(crate) fn data_loc(&self, storage: &BucketStorage<DataBucket>) -> u64 {
self.storage_offset() << (storage.capacity_pow2 - self.storage_capacity_when_created_pow2())
self.storage_offset()
<< (storage.capacity.capacity_pow2() - self.storage_capacity_when_created_pow2())
}
}

Expand Down

0 comments on commit e953d84

Please sign in to comment.