diff --git a/bucket_map/src/bucket.rs b/bucket_map/src/bucket.rs index ea00eebef7b2ba..82da1e6260a23e 100644 --- a/bucket_map/src/bucket.rs +++ b/bucket_map/src/bucket.rs @@ -12,6 +12,7 @@ use { solana_sdk::pubkey::Pubkey, std::{ collections::hash_map::DefaultHasher, + fmt::Debug, hash::{Hash, Hasher}, marker::PhantomData, ops::RangeBounds, @@ -76,7 +77,7 @@ pub struct Bucket { pub reallocated: Reallocated, } -impl<'b, T: Clone + Copy + 'static> Bucket { +impl<'b, T: Clone + Copy + Debug + Default + 'static> Bucket { pub fn new( drives: Arc>, max_search: MaxSearch, @@ -86,7 +87,7 @@ impl<'b, T: Clone + Copy + 'static> Bucket { let index = BucketStorage::new( Arc::clone(&drives), 1, - std::mem::size_of::() as u64, + std::mem::size_of::>() as u64, max_search, Arc::clone(&stats.index), count, @@ -108,7 +109,7 @@ impl<'b, T: Clone + Copy + 'static> Bucket { if self.index.is_free(i) { continue; } - let ix: &IndexEntry = self.index.get(i); + let ix: &IndexEntry = self.index.get(i); rv.push(ix.key); } rv @@ -124,7 +125,7 @@ impl<'b, T: Clone + Copy + 'static> Bucket { if self.index.is_free(ii) { continue; } - let ix: &IndexEntry = self.index.get(ii); + let ix: &IndexEntry = self.index.get(ii); let key = ix.key; if range.map(|r| r.contains(&key)).unwrap_or(true) { let val = ix.read_value(self); @@ -138,14 +139,14 @@ impl<'b, T: Clone + Copy + 'static> Bucket { result } - pub fn find_entry(&self, key: &Pubkey) -> Option<(&IndexEntry, u64)> { + pub fn find_entry(&self, key: &Pubkey) -> Option<(&IndexEntry, u64)> { Self::bucket_find_entry(&self.index, key, self.random) } fn find_entry_mut<'a>( &'a self, key: &Pubkey, - ) -> Result<(bool, &'a mut IndexEntry, u64), BucketMapError> { + ) -> Result<(bool, &'a mut IndexEntry, u64), BucketMapError> { let ix = Self::bucket_index_ix(&self.index, key, self.random); let mut first_free = None; let mut m = Measure::start("bucket_find_entry_mut"); @@ -157,7 +158,7 @@ impl<'b, T: Clone + Copy + 'static> Bucket { } continue; } - let elem: &mut IndexEntry = self.index.get_mut(ii); + let elem: &mut IndexEntry = self.index.get_mut(ii); if elem.key == *key { m.stop(); self.stats @@ -174,7 +175,7 @@ impl<'b, T: Clone + Copy + 'static> Bucket { .fetch_add(m.as_us(), Ordering::Relaxed); match first_free { Some(ii) => { - let elem: &mut IndexEntry = self.index.get_mut(ii); + let elem: &mut IndexEntry = self.index.get_mut(ii); Ok((false, elem, ii)) } None => Err(self.index_no_space()), @@ -185,14 +186,14 @@ impl<'b, T: Clone + Copy + 'static> Bucket { index: &'a BucketStorage, key: &Pubkey, random: u64, - ) -> Option<(&'a IndexEntry, u64)> { + ) -> Option<(&'a IndexEntry, u64)> { let ix = Self::bucket_index_ix(index, key, random); for i in ix..ix + index.max_search() { let ii = i % index.capacity(); if index.is_free(ii) { continue; } - let elem: &IndexEntry = index.get(ii); + let elem: &IndexEntry = index.get(ii); if elem.key == *key { return Some((elem, ii)); } @@ -215,7 +216,7 @@ impl<'b, T: Clone + Copy + 'static> Bucket { continue; } index.allocate(ii, elem_uid, is_resizing).unwrap(); - let elem: &mut IndexEntry = index.get_mut(ii); + let elem: &mut IndexEntry = index.get_mut(ii); // These fields will be overwritten after allocation by callers. // Since this part of the mmapped file could have previously been used by someone else, there can be garbage here. elem.init(key); @@ -268,32 +269,48 @@ impl<'b, T: Clone + Copy + 'static> Bucket { pub fn try_write( &mut self, key: &Pubkey, - data: impl Iterator, + mut data: impl Iterator, data_len: usize, ref_count: RefCount, ) -> Result<(), BucketMapError> { - let best_fit_bucket = IndexEntry::data_bucket_from_num_slots(data_len as u64); - if self.data.get(best_fit_bucket as usize).is_none() { + let best_fit_bucket = IndexEntry::::data_bucket_from_num_slots(data_len as u64); + // data_len=0 => nothing to store + // data_len=1 => store in index, so no data bucket + let use_data_storage = data_len > 1; + if use_data_storage && self.data.get(best_fit_bucket as usize).is_none() { // fail early if the data bucket we need doesn't exist - we don't want the index entry partially allocated return Err(BucketMapError::DataNoSpace((best_fit_bucket, 0))); } let (found, elem, elem_ix) = self.find_entry_mut(key)?; if !found { let is_resizing = false; - let elem_uid = IndexEntry::key_uid(key); + let elem_uid = IndexEntry::::key_uid(key); self.index.allocate(elem_ix, elem_uid, is_resizing).unwrap(); // These fields will be overwritten after allocation by callers. // Since this part of the mmapped file could have previously been used by someone else, there can be garbage here. elem.init(key); } elem.ref_count = ref_count; + let old_slots = elem.num_slots; let elem_uid = self.index.uid_unchecked(elem_ix); - let bucket_ix = elem.data_bucket_ix(); - let current_bucket = &self.data[bucket_ix as usize]; let num_slots = data_len as u64; - if best_fit_bucket == bucket_ix && elem.num_slots > 0 { - // in place update + let bucket_ix = elem.data_bucket_ix(); + + if !use_data_storage { + // new data stored should be stored in elem.`first_element` + // new data len is 0 or 1 + elem.num_slots = num_slots; + elem.first_element = data.next().copied().unwrap_or_default(); + if old_slots > 1 { + // free old data location + let elem_loc = elem.data_loc(&self.data[bucket_ix as usize]); + self.data[bucket_ix as usize].free(elem_loc, elem_uid); + } + } else if best_fit_bucket == bucket_ix { + // in place update in same data file + let current_bucket = &self.data[bucket_ix as usize]; let elem_loc = elem.data_loc(current_bucket); + let slice: &mut [T] = current_bucket.get_mut_cell_slice(elem_loc, data_len as u64); assert_eq!(current_bucket.uid(elem_loc), Some(elem_uid)); elem.num_slots = num_slots; @@ -301,12 +318,13 @@ impl<'b, T: Clone + Copy + 'static> Bucket { slice.iter_mut().zip(data).for_each(|(dest, src)| { *dest = *src; }); - Ok(()) } else { // need to move the allocation to a best fit spot + // previous data could have been len=0, len=1, or in a different data file let best_bucket = &self.data[best_fit_bucket as usize]; let cap_power = best_bucket.capacity_pow2; let cap = best_bucket.capacity(); + // start searching at a random spot in the data file let pos = thread_rng().gen_range(0, cap); // max search is increased here by a lot for this search. The idea is that we just have to find an empty bucket somewhere. // We don't mind waiting on a new write (by searching longer). Writing is done in the background only. @@ -319,17 +337,31 @@ impl<'b, T: Clone + Copy + 'static> Bucket { for i in pos..pos + (self.index.max_search() * 10).min(cap) { let ix = i % cap; if best_bucket.is_free(ix) { - let elem_loc = elem.data_loc(current_bucket); - let old_slots = elem.num_slots; + let mut elem_loc = 0; + if old_slots >= 2 { + elem_loc = elem.data_loc(&self.data[bucket_ix as usize]); + } elem.set_storage_offset(ix); elem.set_storage_capacity_when_created_pow2(best_bucket.capacity_pow2); elem.num_slots = num_slots; - if old_slots > 0 { - let current_bucket = &mut self.data[bucket_ix as usize]; - current_bucket.free(elem_loc, elem_uid); + match old_slots { + 2.. => { + // free the entry in the data bucket the data was previously stored in + self.data[bucket_ix as usize].free(elem_loc, elem_uid); + } + 1 => { + // nothing to free in a data bucket + // set `first_element` to default to avoid confusion + assert!(num_slots > 1); + elem.first_element = T::default(); + } + 0 => { + // nothing to free + } } //debug!( "DATA ALLOC {:?} {} {} {}", key, elem.data_location, best_bucket.capacity, elem_uid ); if num_slots > 0 { + assert!(num_slots > 1); let best_bucket = &mut self.data[best_fit_bucket as usize]; best_bucket.allocate(ix, elem_uid, false).unwrap(); let slice = best_bucket.get_mut_cell_slice(ix, num_slots); @@ -340,14 +372,15 @@ impl<'b, T: Clone + Copy + 'static> Bucket { return Ok(()); } } - Err(BucketMapError::DataNoSpace((best_fit_bucket, cap_power))) + return Err(BucketMapError::DataNoSpace((best_fit_bucket, cap_power))); } + Ok(()) } pub fn delete_key(&mut self, key: &Pubkey) { if let Some((elem, elem_ix)) = self.find_entry(key) { let elem_uid = self.index.uid_unchecked(elem_ix); - if elem.num_slots > 0 { + if elem.num_slots > 1 { let ix = elem.data_bucket_ix() as usize; let data_bucket = &self.data[ix]; let loc = elem.data_loc(data_bucket); @@ -372,7 +405,7 @@ impl<'b, T: Clone + Copy + 'static> Bucket { let mut index = BucketStorage::new_with_capacity( Arc::clone(&self.drives), 1, - std::mem::size_of::() as u64, + std::mem::size_of::>() as u64, // *2 causes rapid growth of index buckets self.index.capacity_pow2 + i, // * 2, self.index.max_search, @@ -384,7 +417,7 @@ impl<'b, T: Clone + Copy + 'static> Bucket { for ix in 0..self.index.capacity() { let uid = self.index.uid(ix); if let Some(uid) = uid { - let elem: &IndexEntry = self.index.get(ix); + let elem: &IndexEntry = self.index.get(ix); let new_ix = Self::bucket_create_key(&mut index, &elem.key, uid, random, true); if new_ix.is_err() { @@ -392,10 +425,10 @@ impl<'b, T: Clone + Copy + 'static> Bucket { break; } let new_ix = new_ix.unwrap(); - let new_elem: &mut IndexEntry = index.get_mut(new_ix); + let new_elem: &mut IndexEntry = index.get_mut(new_ix); *new_elem = *elem; /* - let dbg_elem: IndexEntry = *new_elem; + let dbg_elem: IndexEntry = *new_elem; assert_eq!( Self::bucket_find_entry(&index, &elem.key, random).unwrap(), (&dbg_elem, new_ix) @@ -466,7 +499,7 @@ impl<'b, T: Clone + Copy + 'static> Bucket { } fn bucket_index_ix(index: &BucketStorage, key: &Pubkey, random: u64) -> u64 { - let uid = IndexEntry::key_uid(key); + let uid = IndexEntry::::key_uid(key); let mut s = DefaultHasher::new(); uid.hash(&mut s); //the locally generated random will make it hard for an attacker diff --git a/bucket_map/src/bucket_api.rs b/bucket_map/src/bucket_api.rs index 695d6836b9aeb4..66ebeed0d5b639 100644 --- a/bucket_map/src/bucket_api.rs +++ b/bucket_map/src/bucket_api.rs @@ -5,6 +5,7 @@ use { }, solana_sdk::pubkey::Pubkey, std::{ + fmt::Debug, ops::RangeBounds, path::PathBuf, sync::{ @@ -25,7 +26,7 @@ pub struct BucketApi { count: Arc, } -impl BucketApi { +impl BucketApi { pub fn new( drives: Arc>, max_search: MaxSearch, diff --git a/bucket_map/src/bucket_map.rs b/bucket_map/src/bucket_map.rs index 86ceef7e84fd2b..d31d7167583a01 100644 --- a/bucket_map/src/bucket_map.rs +++ b/bucket_map/src/bucket_map.rs @@ -25,7 +25,7 @@ impl BucketMapConfig { } } -pub struct BucketMap { +pub struct BucketMap { buckets: Vec>>, drives: Arc>, max_buckets_pow2: u8, @@ -33,7 +33,7 @@ pub struct BucketMap { pub temp_dir: Option, } -impl Drop for BucketMap { +impl Drop for BucketMap { fn drop(&mut self) { if self.temp_dir.is_none() { BucketMap::::erase_previous_drives(&self.drives); @@ -41,7 +41,7 @@ impl Drop for BucketMap { } } -impl std::fmt::Debug for BucketMap { +impl std::fmt::Debug for BucketMap { fn fmt(&self, _f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { Ok(()) } @@ -55,7 +55,7 @@ pub enum BucketMapError { IndexNoSpace(u8), } -impl BucketMap { +impl BucketMap { pub fn new(config: BucketMapConfig) -> Self { assert_ne!( config.max_buckets, 0, @@ -198,29 +198,70 @@ mod tests { } #[test] - fn bucket_map_test_insert2() { + fn bucket_map_test_insert_empty_then_1() { + let key = Pubkey::new_unique(); + let config = BucketMapConfig::new(1 << 1); + let index = BucketMap::new(config); + index.update(&key, |_| Some((Vec::default(), 0))); + assert_eq!(index.read_value(&key), Some((Vec::default(), 0))); + index.update(&key, |_| Some((vec![0], 0))); + assert_eq!(index.read_value(&key), Some((vec![0], 0))); + index.update(&key, |_| Some((Vec::default(), 0))); + assert_eq!(index.read_value(&key), Some((Vec::default(), 0))); + } + + #[test] + fn bucket_map_test_insert2_1_element() { + for pass in 0..3 { + let key = Pubkey::new_unique(); + let config = BucketMapConfig::new(1 << 1); + let index = BucketMap::new(config); + let src = [0]; + let value = &src; + if pass == 0 { + index.insert(&key, (value, 0)); + } else { + // try always succeeds when we have a len of 1 since it can be stored in place in the index file + let result = index.try_insert(&key, (value, 0)); + assert!(result.is_ok()); + assert_eq!(index.read_value(&key), Some((value.to_vec(), 0))); + if pass == 2 { + // another call to try insert again - should still work + let result = index.try_insert(&key, (value, 0)); + assert!(result.is_ok()); + assert_eq!(index.read_value(&key), Some((value.to_vec(), 0))); + } + } + assert_eq!(index.read_value(&key), Some((value.to_vec(), 0))); + } + } + + #[test] + fn bucket_map_test_insert2_2_elements() { for pass in 0..3 { let key = Pubkey::new_unique(); let config = BucketMapConfig::new(1 << 1); let index = BucketMap::new(config); let bucket = index.get_bucket(&key); + let src = [0, 1]; + let value = &src; if pass == 0 { - index.insert(&key, (&[0], 0)); + index.insert(&key, (value, 0)); } else { - let result = index.try_insert(&key, (&[0], 0)); + let result = index.try_insert(&key, (value, 0)); assert!(result.is_err()); assert_eq!(index.read_value(&key), None); if pass == 2 { // another call to try insert again - should still return an error - let result = index.try_insert(&key, (&[0], 0)); + let result = index.try_insert(&key, (value, 0)); assert!(result.is_err()); assert_eq!(index.read_value(&key), None); } bucket.grow(result.unwrap_err()); - let result = index.try_insert(&key, (&[0], 0)); + let result = index.try_insert(&key, (value, 0)); assert!(result.is_ok()); } - assert_eq!(index.read_value(&key), Some((vec![0], 0))); + assert_eq!(index.read_value(&key), Some((value.to_vec(), 0))); } } @@ -371,7 +412,7 @@ mod tests { }) .collect::>(); let hash_map = RwLock::new(HashMap::, RefCount)>::new()); - let max_slot_list_len = 3; + let max_slot_list_len = 5; let all_keys = Mutex::new(vec![]); let gen_rand_value = || { diff --git a/bucket_map/src/index_entry.rs b/bucket_map/src/index_entry.rs index 7917488c735752..13528a8b020ce2 100644 --- a/bucket_map/src/index_entry.rs +++ b/bucket_map/src/index_entry.rs @@ -19,12 +19,14 @@ use { #[derive(Debug, Copy, Clone, PartialEq, Eq)] // one instance of this per item in the index // stored in the index bucket -pub struct IndexEntry { +pub struct IndexEntry { pub key: Pubkey, // can this be smaller if we have reduced the keys into buckets already? pub ref_count: RefCount, // can this be smaller? Do we ever need more than 4B refcounts? storage_cap_and_offset: PackedStorage, // if the bucket doubled, the index can be recomputed using create_bucket_capacity_pow2 pub num_slots: Slot, // can this be smaller? epoch size should ~ be the max len. this is the num elements in the slot list + /// the first 'data element. This will only be meaningful if `num_slots`=1. Otherwise, all values are in the data bucket. + pub first_element: T, } /// Pack the storage offset and capacity-when-crated-pow2 fields into a single u64 @@ -36,7 +38,7 @@ struct PackedStorage { offset: B56, } -impl IndexEntry { +impl IndexEntry { pub fn init(&mut self, pubkey: &Pubkey) { self.key = *pubkey; self.ref_count = 0; @@ -63,7 +65,7 @@ impl IndexEntry { /// min index, such that 2^index >= num_slots /// index = ceiling(log2(num_slots)) /// special case, when slot slice empty, return 0th index. - pub fn data_bucket_from_num_slots(num_slots: Slot) -> u64 { + pub(crate) fn data_bucket_from_num_slots(num_slots: Slot) -> u64 { // Compute the ceiling of log2 for integer if num_slots == 0 { 0 @@ -72,7 +74,7 @@ impl IndexEntry { } } - pub fn data_bucket_ix(&self) -> u64 { + pub(crate) fn data_bucket_ix(&self) -> u64 { Self::data_bucket_from_num_slots(self.num_slots) } @@ -94,19 +96,28 @@ impl IndexEntry { self.storage_offset() << (storage.capacity_pow2 - self.storage_capacity_when_created_pow2()) } - pub fn read_value<'a, T: 'static>(&self, bucket: &'a Bucket) -> Option<(&'a [T], RefCount)> { - let slice = if self.num_slots > 0 { - let data_bucket_ix = self.data_bucket_ix(); - let data_bucket = &bucket.data[data_bucket_ix as usize]; - let loc = self.data_loc(data_bucket); - let uid = Self::key_uid(&self.key); - assert_eq!(Some(uid), data_bucket.uid(loc)); - data_bucket.get_cell_slice(loc, self.num_slots) - } else { - // num_slots is 0. This means we don't have an actual allocation. - BucketStorage::get_empty_cell_slice() - }; - Some((slice, self.ref_count)) + pub fn read_value<'a>(&'a self, bucket: &'a Bucket) -> Option<(&'a [T], RefCount)> { + Some(( + match self.num_slots { + 2.. => { + let data_bucket_ix = self.data_bucket_ix(); + let data_bucket = &bucket.data[data_bucket_ix as usize]; + let loc = self.data_loc(data_bucket); + let uid = Self::key_uid(&self.key); + assert_eq!(Some(uid), data_bucket.uid(loc)); + data_bucket.get_cell_slice::(loc, self.num_slots) + } + 1 => { + // only element is in `first_element` + std::slice::from_ref(&self.first_element) + } + 0 => { + // num_slots is 0. This means we don't have an actual allocation. + BucketStorage::get_empty_cell_slice() + } + }, + self.ref_count, + )) } pub fn key_uid(key: &Pubkey) -> Uid { @@ -120,13 +131,14 @@ impl IndexEntry { mod tests { use super::*; - impl IndexEntry { + impl IndexEntry { pub fn new(key: Pubkey) -> Self { IndexEntry { key, ref_count: 0, storage_cap_and_offset: PackedStorage::default(), num_slots: 0, + first_element: T::default(), } } } @@ -136,7 +148,7 @@ mod tests { #[test] fn test_api() { for offset in [0, 1, u32::MAX as u64] { - let mut index = IndexEntry::new(solana_sdk::pubkey::new_rand()); + let mut index = IndexEntry::::new(solana_sdk::pubkey::new_rand()); if offset != 0 { index.set_storage_offset(offset); } @@ -153,14 +165,17 @@ mod tests { #[test] fn test_size() { assert_eq!(std::mem::size_of::(), 1 + 7); - assert_eq!(std::mem::size_of::(), 32 + 8 + 8 + 8); + assert_eq!( + std::mem::size_of::>(), + 32 + 8 + 8 + 8 + std::mem::size_of::() + ); } #[test] #[should_panic(expected = "New storage offset must fit into 7 bytes!")] fn test_set_storage_offset_value_too_large() { let too_big = 1 << 56; - let mut index = IndexEntry::new(Pubkey::new_unique()); + let mut index = IndexEntry::::new(Pubkey::new_unique()); index.set_storage_offset(too_big); } @@ -168,17 +183,22 @@ mod tests { fn test_data_bucket_from_num_slots() { for n in 0..512 { assert_eq!( - IndexEntry::data_bucket_from_num_slots(n), + IndexEntry::::data_bucket_from_num_slots(n), (n as f64).log2().ceil() as u64 ); } - assert_eq!(IndexEntry::data_bucket_from_num_slots(u32::MAX as u64), 32); + assert_eq!(IndexEntry::::data_bucket_from_num_slots(1), 0); + assert_eq!(IndexEntry::::data_bucket_from_num_slots(2), 1); + assert_eq!( + IndexEntry::::data_bucket_from_num_slots(u32::MAX as u64), + 32 + ); assert_eq!( - IndexEntry::data_bucket_from_num_slots(u32::MAX as u64 + 1), + IndexEntry::::data_bucket_from_num_slots(u32::MAX as u64 + 1), 32 ); assert_eq!( - IndexEntry::data_bucket_from_num_slots(u32::MAX as u64 + 2), + IndexEntry::::data_bucket_from_num_slots(u32::MAX as u64 + 2), 33 ); }