From 7d0a6e0e0d8c09da52fb58d028d0892e5c7fbd2e Mon Sep 17 00:00:00 2001 From: jeff washington Date: Wed, 15 Mar 2023 15:50:55 -0500 Subject: [PATCH] bucket map stores uid in use field in memory bitvec --- Cargo.lock | 1 + bucket_map/Cargo.toml | 1 + bucket_map/src/bucket.rs | 35 +++++--- bucket_map/src/bucket_api.rs | 2 + bucket_map/src/bucket_storage.rs | 143 +++++++++++++++++++++++++++---- bucket_map/src/index_entry.rs | 11 ++- 6 files changed, 161 insertions(+), 32 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 7596c613161eb7..06470cff60740b 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -5031,6 +5031,7 @@ dependencies = [ name = "solana-bucket-map" version = "1.16.0" dependencies = [ + "bv", "fs_extra", "log", "memmap2", diff --git a/bucket_map/Cargo.toml b/bucket_map/Cargo.toml index 5da8fb56a1a4d2..956bc3ad0d7911 100644 --- a/bucket_map/Cargo.toml +++ b/bucket_map/Cargo.toml @@ -11,6 +11,7 @@ license = { workspace = true } edition = { workspace = true } [dependencies] +bv = { workspace = true, features = ["serde"] } log = { workspace = true } memmap2 = { workspace = true } modular-bitfield = { workspace = true } diff --git a/bucket_map/src/bucket.rs b/bucket_map/src/bucket.rs index ea00eebef7b2ba..78a63731ead428 100644 --- a/bucket_map/src/bucket.rs +++ b/bucket_map/src/bucket.rs @@ -82,6 +82,7 @@ impl<'b, T: Clone + Copy + 'static> Bucket { max_search: MaxSearch, stats: Arc, count: Arc, + use_bit_field: bool, ) -> Self { let index = BucketStorage::new( Arc::clone(&drives), @@ -90,6 +91,7 @@ impl<'b, T: Clone + Copy + 'static> Bucket { max_search, Arc::clone(&stats.index), count, + use_bit_field, ); Self { random: thread_rng().gen(), @@ -142,10 +144,7 @@ impl<'b, T: Clone + Copy + 'static> Bucket { Self::bucket_find_entry(&self.index, key, self.random) } - fn find_entry_mut<'a>( - &'a self, - key: &Pubkey, - ) -> Result<(bool, &'a mut IndexEntry, u64), BucketMapError> { + fn find_entry_mut<'a>(&mut self, key: &Pubkey) -> Result<(bool, u64), BucketMapError> { let ix = Self::bucket_index_ix(&self.index, key, self.random); let mut first_free = None; let mut m = Measure::start("bucket_find_entry_mut"); @@ -164,7 +163,7 @@ impl<'b, T: Clone + Copy + 'static> Bucket { .index .find_entry_mut_us .fetch_add(m.as_us(), Ordering::Relaxed); - return Ok((true, elem, ii)); + return Ok((true, ii)); } } m.stop(); @@ -175,7 +174,7 @@ impl<'b, T: Clone + Copy + 'static> Bucket { match first_free { Some(ii) => { let elem: &mut IndexEntry = self.index.get_mut(ii); - Ok((false, elem, ii)) + Ok((false, ii)) } None => Err(self.index_no_space()), } @@ -189,6 +188,7 @@ impl<'b, T: Clone + Copy + 'static> Bucket { let ix = Self::bucket_index_ix(index, key, random); for i in ix..ix + index.max_search() { let ii = i % index.capacity(); + assert!(ii < index.capacity(), "{}, {}", ii, index.capacity()); if index.is_free(ii) { continue; } @@ -236,8 +236,9 @@ impl<'b, T: Clone + Copy + 'static> Bucket { } pub fn addref(&mut self, key: &Pubkey) -> Option { - if let Ok((found, elem, _)) = self.find_entry_mut(key) { + if let Ok((found, elem_ix)) = self.find_entry_mut(key) { if found { + let elem: &mut IndexEntry = self.index.get_mut(elem_ix); elem.ref_count += 1; return Some(elem.ref_count); } @@ -246,8 +247,9 @@ impl<'b, T: Clone + Copy + 'static> Bucket { } pub fn unref(&mut self, key: &Pubkey) -> Option { - if let Ok((found, elem, _)) = self.find_entry_mut(key) { + if let Ok((found, elem_ix)) = self.find_entry_mut(key) { if found { + let elem: &mut IndexEntry = self.index.get_mut(elem_ix); elem.ref_count -= 1; return Some(elem.ref_count); } @@ -277,14 +279,18 @@ impl<'b, T: Clone + Copy + 'static> Bucket { // fail early if the data bucket we need doesn't exist - we don't want the index entry partially allocated return Err(BucketMapError::DataNoSpace((best_fit_bucket, 0))); } - let (found, elem, elem_ix) = self.find_entry_mut(key)?; + let (found, elem_ix) = self.find_entry_mut(key)?; + let elem: &mut IndexEntry; if !found { let is_resizing = false; - let elem_uid = IndexEntry::key_uid(key); + let elem_uid = IndexEntry::key_uid2(key); self.index.allocate(elem_ix, elem_uid, is_resizing).unwrap(); + elem = self.index.get_mut(elem_ix); // These fields will be overwritten after allocation by callers. // Since this part of the mmapped file could have previously been used by someone else, there can be garbage here. elem.init(key); + } else { + elem = self.index.get_mut(elem_ix); } elem.ref_count = ref_count; let elem_uid = self.index.uid_unchecked(elem_ix); @@ -378,6 +384,7 @@ impl<'b, T: Clone + Copy + 'static> Bucket { self.index.max_search, Arc::clone(&self.stats.index), Arc::clone(&self.index.count), + self.index.bit_field.is_some(), ); let random = thread_rng().gen(); let mut valid = true; @@ -440,6 +447,7 @@ impl<'b, T: Clone + Copy + 'static> Bucket { self.index.max_search, Arc::clone(&self.stats.data), Arc::default(), + false, )) } self.data.push(bucket); @@ -450,7 +458,7 @@ impl<'b, T: Clone + Copy + 'static> Bucket { /// grow a data bucket /// The application of the new bucket is deferred until the next write lock. - pub fn grow_data(&self, data_index: u64, current_capacity_pow2: u8) { + pub fn grow_data(&self, data_index: u64, current_capacity_pow2: u8, use_bit_field: bool) { let new_bucket = BucketStorage::new_resized( &self.drives, self.index.max_search, @@ -459,6 +467,7 @@ impl<'b, T: Clone + Copy + 'static> Bucket { 1 << data_index, Self::elem_size(), &self.stats.data, + use_bit_field, ); self.reallocated.add_reallocation(); let mut items = self.reallocated.items.lock().unwrap(); @@ -466,7 +475,7 @@ impl<'b, T: Clone + Copy + 'static> Bucket { } fn bucket_index_ix(index: &BucketStorage, key: &Pubkey, random: u64) -> u64 { - let uid = IndexEntry::key_uid(key); + let uid = IndexEntry::full_key_uid(key); let mut s = DefaultHasher::new(); uid.hash(&mut s); //the locally generated random will make it hard for an attacker @@ -484,7 +493,7 @@ impl<'b, T: Clone + Copy + 'static> Bucket { match err { BucketMapError::DataNoSpace((data_index, current_capacity_pow2)) => { //debug!("GROWING SPACE {:?}", (data_index, current_capacity_pow2)); - self.grow_data(data_index, current_capacity_pow2); + self.grow_data(data_index, current_capacity_pow2, false); } BucketMapError::IndexNoSpace(current_capacity_pow2) => { //debug!("GROWING INDEX {}", sz); diff --git a/bucket_map/src/bucket_api.rs b/bucket_map/src/bucket_api.rs index 695d6836b9aeb4..c369e564f91ec1 100644 --- a/bucket_map/src/bucket_api.rs +++ b/bucket_map/src/bucket_api.rs @@ -83,6 +83,7 @@ impl BucketApi { } fn get_write_bucket(&self) -> RwLockWriteGuard>> { + let use_bit_field = true; let mut bucket = self.bucket.write().unwrap(); if bucket.is_none() { *bucket = Some(Bucket::new( @@ -90,6 +91,7 @@ impl BucketApi { self.max_search, Arc::clone(&self.stats), Arc::clone(&self.count), + use_bit_field, )); } else { let write = bucket.as_mut().unwrap(); diff --git a/bucket_map/src/bucket_storage.rs b/bucket_map/src/bucket_storage.rs index b69166eb8467c9..6db9c37bba5e9b 100644 --- a/bucket_map/src/bucket_storage.rs +++ b/bucket_map/src/bucket_storage.rs @@ -1,5 +1,6 @@ use { crate::{bucket_stats::BucketStats, MaxSearch}, + bv::BitVec, memmap2::MmapMut, rand::{thread_rng, Rng}, solana_measure::measure::Measure, @@ -47,7 +48,7 @@ struct Header { impl Header { /// try to lock this entry with 'uid' /// return true if it could be locked - fn try_lock(&mut self, uid: Uid) -> bool { + fn try_lock2(&mut self, uid: Uid) -> bool { if self.lock == UID_UNLOCKED { self.lock = uid; true @@ -56,12 +57,12 @@ impl Header { } } /// mark this entry as unlocked - fn unlock(&mut self, expected: Uid) { + fn unlock2(&mut self, expected: Uid) { assert_eq!(expected, self.lock); self.lock = UID_UNLOCKED; } /// uid that has locked this entry or None if unlocked - fn uid(&self) -> Option { + fn uid2(&self) -> Option { if self.lock == UID_UNLOCKED { None } else { @@ -74,6 +75,13 @@ impl Header { } } +struct HeaderOffsets { + offset_of_this_header: usize, + offset_of_this_cell: usize, + /// index of this entry in the current header + index_of_cell_within_header_group: usize, +} + pub struct BucketStorage { path: PathBuf, mmap: MmapMut, @@ -82,6 +90,7 @@ pub struct BucketStorage { pub count: Arc, pub stats: Arc, pub max_search: MaxSearch, + pub bit_field: Option, } #[derive(Debug)] @@ -104,8 +113,14 @@ impl BucketStorage { max_search: MaxSearch, stats: Arc, count: Arc, + use_bit_field: bool, ) -> Self { - let cell_size = elem_size * num_elems + std::mem::size_of::
() as u64; + let cell_size = elem_size * num_elems + + if use_bit_field { + 0 + } else { + std::mem::size_of::
() as u64 + }; let (mmap, path) = Self::new_map(&drives, cell_size as usize, capacity_pow2, &stats); Self { path, @@ -115,6 +130,7 @@ impl BucketStorage { capacity_pow2, stats, max_search, + bit_field: use_bit_field.then(|| BitVec::new_fill(false, 1u64 << capacity_pow2)), } } @@ -129,6 +145,7 @@ impl BucketStorage { max_search: MaxSearch, stats: Arc, count: Arc, + use_bit_field: bool, ) -> Self { Self::new_with_capacity( drives, @@ -138,11 +155,29 @@ impl BucketStorage { max_search, stats, count, + use_bit_field, ) - } + } /* + fn calculate_header_offset(&self, ix: u64) -> HeaderOffsets { + // the header bits are always before the entries they correspond to + let size_header = std::mem::size_of::
(); + let items_per_header = u64::BITS as u64; + let earlier_headers = ix / items_per_header; + let offset_in_header = ix % items_per_header; + let all_entries_with_earlier_header = earlier_headers * items_per_header; + let size_of_previous_headers = earlier_headers * size_header; + let size_of_previous_entries = all_entries_with_earlier_header * self.cell_size; // cell size should be a constant todo + let offset_of_this_header = size_of_previous_headers.saturating_add(size_of_previous_entries); + let offset_of_this_cell = offset_of_this_header + size_header + offset_in_header * self.cell_size; + HeaderOffsets { + offset_of_this_header, + offset_of_this_cell, + } + }*/ /// return ref to header of item 'ix' in mmapped file fn header_ptr(&self, ix: u64) -> &Header { + assert!(self.bit_field.is_none()); let ix = (ix * self.cell_size) as usize; let hdr_slice: &[u8] = &self.mmap[ix..ix + std::mem::size_of::
()]; unsafe { @@ -154,6 +189,7 @@ impl BucketStorage { /// return ref to header of item 'ix' in mmapped file #[allow(clippy::mut_from_ref)] fn header_mut_ptr(&self, ix: u64) -> &mut Header { + assert!(self.bit_field.is_none()); let ix = (ix * self.cell_size) as usize; let hdr_slice: &[u8] = &self.mmap[ix..ix + std::mem::size_of::
()]; unsafe { @@ -165,14 +201,44 @@ impl BucketStorage { /// return uid allocated at index 'ix' or None if vacant pub fn uid(&self, ix: u64) -> Option { assert!(ix < self.capacity(), "bad index size"); - self.header_ptr(ix).uid() + if let Some(bit_field) = self.bit_field.as_ref() { + bit_field.get(ix).then_some(1) + } else { + self.header_ptr(ix).uid2() + } } /// true if the entry at index 'ix' is free (as opposed to being allocated) pub fn is_free(&self, ix: u64) -> bool { // note that the terminology in the implementation is locked or unlocked. // but our api is allocate/free - self.header_ptr(ix).is_unlocked() + self.bit_field + .as_ref() + .map(|bit_field| { + /* + use log::*; + error!( + "len: {}, i: {}, capacity: {}", + bit_field.len(), + ix, + self.capacity() + );*/ + !bit_field.get(ix) + }) + .or_else(|| Some(self.header_ptr(ix).is_unlocked())) + .unwrap() + } + + fn try_lock(&mut self, ix: u64, uid: Uid) -> bool { + if let Some(bit_field) = self.bit_field.as_mut() { + let in_use = bit_field.get(ix); + if !in_use { + bit_field.set(ix, true); + } + !in_use + } else { + self.header_mut_ptr(ix).try_lock2(uid) + } } /// caller knows id is not empty @@ -182,12 +248,17 @@ impl BucketStorage { /// 'is_resizing' true if caller is resizing the index (so don't increment count) /// 'is_resizing' false if caller is adding an item to the index (so increment count) - pub fn allocate(&self, ix: u64, uid: Uid, is_resizing: bool) -> Result<(), BucketStorageError> { + pub fn allocate( + &mut self, + ix: u64, + uid: Uid, + is_resizing: bool, + ) -> Result<(), BucketStorageError> { assert!(ix < self.capacity(), "allocate: bad index size"); assert!(UID_UNLOCKED != uid, "allocate: bad uid"); let mut e = Err(BucketStorageError::AlreadyAllocated); //debug!("ALLOC {} {}", ix, uid); - if self.header_mut_ptr(ix).try_lock(uid) { + if self.try_lock(ix, uid) { e = Ok(()); if !is_resizing { self.count.fetch_add(1, Ordering::Relaxed); @@ -196,16 +267,30 @@ impl BucketStorage { e } + fn unlock(&mut self, ix: u64, uid: Uid) { + if let Some(bit_field) = self.bit_field.as_mut() { + assert!(bit_field.get(ix)); + bit_field.set(ix, false); + } else { + self.header_mut_ptr(ix).unlock2(uid); + } + } + pub fn free(&mut self, ix: u64, uid: Uid) { assert!(ix < self.capacity(), "bad index size"); assert!(UID_UNLOCKED != uid, "free: bad uid"); - self.header_mut_ptr(ix).unlock(uid); + self.unlock(ix, uid); self.count.fetch_sub(1, Ordering::Relaxed); } pub fn get(&self, ix: u64) -> &T { assert!(ix < self.capacity(), "bad index size"); - let start = (ix * self.cell_size) as usize + std::mem::size_of::
(); + let start = (ix * self.cell_size) as usize + + if self.bit_field.is_none() { + std::mem::size_of::
() + } else { + 0 + }; let end = start + std::mem::size_of::(); let item_slice: &[u8] = &self.mmap[start..end]; unsafe { @@ -221,7 +306,12 @@ impl BucketStorage { pub fn get_cell_slice(&self, ix: u64, len: u64) -> &[T] { assert!(ix < self.capacity(), "bad index size"); let ix = self.cell_size * ix; - let start = ix as usize + std::mem::size_of::
(); + let start = ix as usize + + if self.bit_field.is_none() { + std::mem::size_of::
() + } else { + 0 + }; let end = start + std::mem::size_of::() * len as usize; //debug!("GET slice {} {}", start, end); let item_slice: &[u8] = &self.mmap[start..end]; @@ -234,7 +324,12 @@ impl BucketStorage { #[allow(clippy::mut_from_ref)] pub fn get_mut(&self, ix: u64) -> &mut T { assert!(ix < self.capacity(), "bad index size"); - let start = (ix * self.cell_size) as usize + std::mem::size_of::
(); + let start = (ix * self.cell_size) as usize + + if self.bit_field.is_none() { + std::mem::size_of::
() + } else { + 0 + }; let end = start + std::mem::size_of::(); let item_slice: &[u8] = &self.mmap[start..end]; unsafe { @@ -247,7 +342,12 @@ impl BucketStorage { pub fn get_mut_cell_slice(&self, ix: u64, len: u64) -> &mut [T] { assert!(ix < self.capacity(), "bad index size"); let ix = self.cell_size * ix; - let start = ix as usize + std::mem::size_of::
(); + let start = ix as usize + + if self.bit_field.is_none() { + std::mem::size_of::
() + } else { + 0 + }; let end = start + std::mem::size_of::() * len as usize; //debug!("GET mut slice {} {}", start, end); let item_slice: &[u8] = &self.mmap[start..end]; @@ -349,6 +449,7 @@ impl BucketStorage { num_elems: u64, elem_size: u64, stats: &Arc, + use_bit_field: bool, ) -> Self { let mut new_bucket = Self::new_with_capacity( Arc::clone(drives), @@ -360,6 +461,7 @@ impl BucketStorage { bucket .map(|bucket| Arc::clone(&bucket.count)) .unwrap_or_default(), + use_bit_field, ); if let Some(bucket) = bucket { new_bucket.copy_contents(bucket); @@ -384,10 +486,17 @@ mod test { let paths: Vec = vec![tmpdir.path().to_path_buf()]; assert!(!paths.is_empty()); - let mut storage = - BucketStorage::new(Arc::new(paths), 1, 1, 1, Arc::default(), Arc::default()); + let mut storage = BucketStorage::new( + Arc::new(paths), + 1, + 1, + 1, + Arc::default(), + Arc::default(), + true, + ); let ix = 0; - let uid = Uid::MAX; + let uid = 1; //Uid::MAX; assert!(storage.is_free(ix)); assert!(storage.allocate(ix, uid, false).is_ok()); assert!(storage.allocate(ix, uid, false).is_err()); diff --git a/bucket_map/src/index_entry.rs b/bucket_map/src/index_entry.rs index 7917488c735752..312dcded8a3630 100644 --- a/bucket_map/src/index_entry.rs +++ b/bucket_map/src/index_entry.rs @@ -99,7 +99,7 @@ impl IndexEntry { let data_bucket_ix = self.data_bucket_ix(); let data_bucket = &bucket.data[data_bucket_ix as usize]; let loc = self.data_loc(data_bucket); - let uid = Self::key_uid(&self.key); + let uid = Self::key_uid2(&self.key); assert_eq!(Some(uid), data_bucket.uid(loc)); data_bucket.get_cell_slice(loc, self.num_slots) } else { @@ -109,7 +109,14 @@ impl IndexEntry { Some((slice, self.ref_count)) } - pub fn key_uid(key: &Pubkey) -> Uid { + /// uid in maps is 1 or 0, where 0 is empty, 1 is in-use + pub fn key_uid2(key: &Pubkey) -> Uid { + 1/* + let mut s = DefaultHasher::new(); + key.hash(&mut s); + s.finish().max(1u64)*/ + } + pub fn full_key_uid(key: &Pubkey) -> Uid { let mut s = DefaultHasher::new(); key.hash(&mut s); s.finish().max(1u64)