diff --git a/bucket_map/src/bucket.rs b/bucket_map/src/bucket.rs index e4ec015c142909..dda1103ae15964 100644 --- a/bucket_map/src/bucket.rs +++ b/bucket_map/src/bucket.rs @@ -4,7 +4,7 @@ use { bucket_map::BucketMapError, bucket_stats::BucketMapStats, bucket_storage::{BucketStorage, Uid, DEFAULT_CAPACITY_POW2}, - index_entry::IndexEntry, + index_entry::{IndexEntry, IndexEntrySingle}, MaxSearch, RefCount, }, rand::{thread_rng, Rng}, @@ -76,6 +76,355 @@ pub struct Bucket { pub reallocated: Reallocated, } +pub struct BucketSingle { + drives: Arc>, + //index + pub index: BucketStorage, + //random offset for the index + random: u64, + _phantom: PhantomData, + stats: Arc, + + pub reallocated: Reallocated, +} + +impl BucketSingle { + pub fn new( + drives: Arc>, + max_search: MaxSearch, + stats: Arc, + count: Arc, + ) -> Self { + let index = BucketStorage::new( + Arc::clone(&drives), + 1, + std::mem::size_of::>() as u64, + max_search, + Arc::clone(&stats.index), + count, + ); + Self { + random: thread_rng().gen(), + drives, + index, + _phantom: PhantomData::default(), + stats, + reallocated: Reallocated::default(), + } + } + + pub fn keys(&self) -> Vec { + let mut rv = vec![]; + for i in 0..self.index.capacity() { + if self.index.is_free(i) { + continue; + } + let ix: &IndexEntrySingle = self.index.get(i); + rv.push(ix.key); + } + rv + } + + pub fn items_in_range(&self, range: &Option<&R>) -> Vec> + where + R: RangeBounds, + { + let mut result = Vec::with_capacity(self.index.count.load(Ordering::Relaxed) as usize); + for i in 0..self.index.capacity() { + let ii = i % self.index.capacity(); + if self.index.is_free(ii) { + continue; + } + let ix: &IndexEntrySingle = self.index.get(ii); + let key = ix.key; + if range.map(|r| r.contains(&key)).unwrap_or(true) { + result.push(BucketItem { + pubkey: key, + ref_count: 1, + slot_list: vec![], // todo val.map(|(v, _ref_count)| v.to_vec()).unwrap_or_default(), + }); + } + } + result + } + + pub fn find_entry(&self, key: &Pubkey) -> Option<(&IndexEntrySingle, u64)> { + Self::bucket_find_entry(&self.index, key, self.random) + } + + fn find_entry_mut<'a>( + &'a self, + key: &Pubkey, + ) -> Result<(bool, &'a mut IndexEntrySingle, u64), BucketMapError> { + let ix = Self::bucket_index_ix(&self.index, key, self.random); + let mut first_free = None; + let mut m = Measure::start("bucket_find_entry_mut"); + for i in ix..ix + self.index.max_search() { + let ii = i % self.index.capacity(); + if self.index.is_free(ii) { + if first_free.is_none() { + first_free = Some(ii); + } + continue; + } + let elem: &mut IndexEntrySingle = self.index.get_mut(ii); + if elem.key == *key { + m.stop(); + self.stats + .index + .find_entry_mut_us + .fetch_add(m.as_us(), Ordering::Relaxed); + return Ok((true, elem, ii)); + } + } + m.stop(); + self.stats + .index + .find_entry_mut_us + .fetch_add(m.as_us(), Ordering::Relaxed); + match first_free { + Some(ii) => { + let elem: &mut IndexEntrySingle = self.index.get_mut(ii); + Ok((false, elem, ii)) + } + None => Err(self.index_no_space()), + } + } + + fn bucket_find_entry<'a>( + index: &'a BucketStorage, + key: &Pubkey, + random: u64, + ) -> Option<(&'a IndexEntrySingle, u64)> { + let ix = Self::bucket_index_ix(index, key, random); + for i in ix..ix + index.max_search() { + let ii = i % index.capacity(); + if index.is_free(ii) { + continue; + } + let elem: &IndexEntrySingle = index.get(ii); + if elem.key == *key { + return Some((elem, ii)); + } + } + None + } + + fn bucket_create_key( + index: &mut BucketStorage, + entry: &IndexEntrySingle, + elem_uid: Uid, + random: u64, + is_resizing: bool, + ) -> Result { + let mut m = Measure::start("bucket_create_key"); + let ix = Self::bucket_index_ix(index, &entry.key, random); + for i in ix..ix + index.max_search() { + let ii = i % index.capacity(); + if !index.is_free(ii) { + continue; + } + index.allocate(ii, elem_uid, is_resizing).unwrap(); + let elem: &mut IndexEntrySingle = index.get_mut(ii); + // These fields will be overwritten after allocation by callers. + // Since this part of the mmapped file could have previously been used by someone else, there can be garbage here. + elem.init(&entry.key, entry.info); + //debug!( "INDEX ALLOC {:?} {} {} {}", key, ii, index.capacity, elem_uid ); + m.stop(); + index + .stats + .find_entry_mut_us + .fetch_add(m.as_us(), Ordering::Relaxed); + return Ok(ii); + } + m.stop(); + index + .stats + .find_entry_mut_us + .fetch_add(m.as_us(), Ordering::Relaxed); + Err(BucketMapError::IndexNoSpace(index.capacity_pow2)) + } + + pub fn addref(&mut self, _key: &Pubkey) -> Option { + // remove + None + } + + pub fn unref(&mut self, _key: &Pubkey) -> Option { + // eliminate + None + } + + pub fn read_value(&self, key: &Pubkey) -> Option<&T> { + //debug!("READ_VALUE: {:?}", key); + let (elem, _) = self.find_entry(key)?; + Some(&elem.info) + } + + fn index_no_space(&self) -> BucketMapError { + BucketMapError::IndexNoSpace(self.index.capacity_pow2) + } + + pub fn try_write(&mut self, key: &Pubkey, data: T) -> Result<(), BucketMapError> { + let (found, elem, elem_ix) = self.find_entry_mut(key)?; + if !found { + let is_resizing = false; + let elem_uid = IndexEntry::key_uid(key); + self.index.allocate(elem_ix, elem_uid, is_resizing).unwrap(); + // These fields will be overwritten after allocation by callers. + // Since this part of the mmapped file could have previously been used by someone else, there can be garbage here. + elem.init(key, data); + } else { + elem.info = data; + } + Ok(()) + } + + pub fn delete_key(&mut self, key: &Pubkey) { + if let Some((_elem, elem_ix)) = self.find_entry(key) { + let elem_uid = self.index.uid_unchecked(elem_ix); + //debug!("INDEX FREE {:?} {}", key, elem_uid); + self.index.free(elem_ix, elem_uid); + } + } + + pub fn grow_index(&self, current_capacity_pow2: u8) { + if self.index.capacity_pow2 == current_capacity_pow2 { + let mut m = Measure::start("grow_index"); + //debug!("GROW_INDEX: {}", current_capacity_pow2); + let increment = 1; + for i in increment.. { + //increasing the capacity by ^4 reduces the + //likelihood of a re-index collision of 2^(max_search)^2 + //1 in 2^32 + let mut index = BucketStorage::new_with_capacity( + Arc::clone(&self.drives), + 1, + std::mem::size_of::>() as u64, + // *2 causes rapid growth of index buckets + self.index.capacity_pow2 + i, // * 2, + self.index.max_search, + Arc::clone(&self.stats.index), + Arc::clone(&self.index.count), + ); + let random = thread_rng().gen(); + let mut valid = true; + for ix in 0..self.index.capacity() { + let uid = self.index.uid(ix); + if let Some(uid) = uid { + let elem: &IndexEntrySingle = self.index.get(ix); + let new_ix = Self::bucket_create_key(&mut index, &elem, uid, random, true); + if new_ix.is_err() { + valid = false; + break; + } + let new_ix = new_ix.unwrap(); + let new_elem: &mut IndexEntrySingle = index.get_mut(new_ix); + *new_elem = *elem; + /* + let dbg_elem: IndexEntrySingle = *new_elem; + assert_eq!( + Self::bucket_find_entry(&index, &elem.key, random).unwrap(), + (&dbg_elem, new_ix) + ); + */ + } + } + if valid { + self.stats.index.update_max_size(index.capacity()); + let mut items = self.reallocated.items.lock().unwrap(); + items.index = Some((random, index)); + self.reallocated.add_reallocation(); + break; + } + } + m.stop(); + self.stats.index.resizes.fetch_add(1, Ordering::Relaxed); + self.stats + .index + .resize_us + .fetch_add(m.as_us(), Ordering::Relaxed); + } + } + + pub fn apply_grow_index(&mut self, random: u64, index: BucketStorage) { + self.random = random; + self.index = index; + } + + fn bucket_index_ix(index: &BucketStorage, key: &Pubkey, random: u64) -> u64 { + let uid = IndexEntry::key_uid(key); + let mut s = DefaultHasher::new(); + uid.hash(&mut s); + //the locally generated random will make it hard for an attacker + //to deterministically cause all the pubkeys to land in the same + //location in any bucket on all validators + random.hash(&mut s); + let ix = s.finish(); + ix % index.capacity() + //debug!( "INDEX_IX: {:?} uid:{} loc: {} cap:{}", key, uid, location, index.capacity() ); + } + + /// grow the appropriate piece. Note this takes an immutable ref. + /// The actual grow is set into self.reallocated and applied later on a write lock + pub fn grow(&self, err: BucketMapError) { + match err { + BucketMapError::DataNoSpace((_data_index, _current_capacity_pow2)) => { + //debug!("GROWING SPACE {:?}", (data_index, current_capacity_pow2)); + unimplemented!(); + } + BucketMapError::IndexNoSpace(current_capacity_pow2) => { + //debug!("GROWING INDEX {}", sz); + self.grow_index(current_capacity_pow2); + } + } + } + + /// if a bucket was resized previously with a read lock, then apply that resize now + pub fn handle_delayed_grows(&mut self) { + if self.reallocated.get_reallocated() { + // swap out the bucket that was resized previously with a read lock + let mut items = std::mem::take(&mut *self.reallocated.items.lock().unwrap()); + + if let Some((random, bucket)) = items.index.take() { + self.apply_grow_index(random, bucket); + } else { + // data bucket + let (_i, _new_bucket) = items.data.take().unwrap(); + unimplemented!(); + } + } + } + + pub fn insert(&mut self, key: &Pubkey, value: T) { + let new = value; + loop { + let rv = self.try_write(key, new); + match rv { + Ok(_) => return, + Err(err) => { + self.grow(err); + self.handle_delayed_grows(); + } + } + } + } + + pub fn update(&mut self, key: &Pubkey, mut updatefn: F) + where + F: FnMut(Option<&T>) -> Option, + { + let current = self.read_value(key); + let new = updatefn(current); + if new.is_none() { + self.delete_key(key); + return; + } + let new = new.unwrap(); + self.insert(key, new); + } +} + impl<'b, T: Clone + Copy + 'b> Bucket { pub fn new( drives: Arc>, diff --git a/bucket_map/src/bucket_api.rs b/bucket_map/src/bucket_api.rs index 4f33f6e71384f8..da08ed9e84a2b6 100644 --- a/bucket_map/src/bucket_api.rs +++ b/bucket_map/src/bucket_api.rs @@ -1,7 +1,10 @@ use { crate::{ - bucket::Bucket, bucket_item::BucketItem, bucket_map::BucketMapError, - bucket_stats::BucketMapStats, MaxSearch, RefCount, + bucket::{Bucket, BucketSingle}, + bucket_item::BucketItem, + bucket_map::BucketMapError, + bucket_stats::BucketMapStats, + MaxSearch, RefCount, }, solana_sdk::pubkey::Pubkey, std::{ @@ -15,7 +18,7 @@ use { }; type LockedBucket = RwLock>>; - +type LockedBucketSingle = RwLock>>; pub struct BucketApi { drives: Arc>, max_search: MaxSearch, @@ -143,3 +146,128 @@ impl BucketApi { .try_write(pubkey, value.0.iter(), value.0.len(), value.1) } } + +pub struct BucketApiSingle { + drives: Arc>, + max_search: MaxSearch, + pub stats: Arc, + + bucket: LockedBucketSingle, + count: Arc, +} + +impl BucketApiSingle { + pub fn new( + drives: Arc>, + max_search: MaxSearch, + stats: Arc, + ) -> Self { + Self { + drives, + max_search, + stats, + bucket: RwLock::default(), + count: Arc::default(), + } + } + + /// Get the items for bucket + pub fn items_in_range(&self, range: &Option<&R>) -> Vec> + where + R: RangeBounds, + { + self.bucket + .read() + .unwrap() + .as_ref() + .map(|bucket| bucket.items_in_range(range)) + .unwrap_or_default() + } + + /// Get the Pubkeys + pub fn keys(&self) -> Vec { + self.bucket + .read() + .unwrap() + .as_ref() + .map_or_else(Vec::default, |bucket| bucket.keys()) + } + + /// Get the values for Pubkey `key` + pub fn read_value(&self, key: &Pubkey) -> Option<(Vec, RefCount)> { + self.bucket + .read() + .unwrap() + .as_ref() + .and_then(|bucket| bucket.read_value(key).map(|value| (vec![*value], 1))) + } + + pub fn bucket_len(&self) -> u64 { + self.count.load(Ordering::Relaxed) + } + + pub fn delete_key(&self, key: &Pubkey) { + let mut bucket = self.get_write_bucket(); + if let Some(bucket) = bucket.as_mut() { + bucket.delete_key(key) + } + } + + fn get_write_bucket(&self) -> RwLockWriteGuard>> { + let mut bucket = self.bucket.write().unwrap(); + if bucket.is_none() { + *bucket = Some(BucketSingle::new( + Arc::clone(&self.drives), + self.max_search, + Arc::clone(&self.stats), + Arc::clone(&self.count), + )); + } else { + let write = bucket.as_mut().unwrap(); + write.handle_delayed_grows(); + } + bucket + } + + pub fn addref(&self, key: &Pubkey) -> Option { + self.get_write_bucket() + .as_mut() + .and_then(|bucket| bucket.addref(key)) + } + + pub fn unref(&self, key: &Pubkey) -> Option { + self.get_write_bucket() + .as_mut() + .and_then(|bucket| bucket.unref(key)) + } + + pub fn insert(&self, pubkey: &Pubkey, value: (&[T], RefCount)) { + let mut bucket = self.get_write_bucket(); + bucket.as_mut().unwrap().insert(pubkey, value.0[0]) + } + + pub fn grow(&self, err: BucketMapError) { + // grows are special - they get a read lock and modify 'reallocated' + // the grown changes are applied the next time there is a write lock taken + if let Some(bucket) = self.bucket.read().unwrap().as_ref() { + bucket.grow(err) + } + } + + pub fn update(&self, key: &Pubkey, updatefn: F) + where + F: FnMut(Option<&T>) -> Option, + { + let mut bucket = self.get_write_bucket(); + bucket.as_mut().unwrap().update(key, updatefn) + } + + pub fn try_write( + &self, + pubkey: &Pubkey, + value: (&[T], RefCount), + ) -> Result<(), BucketMapError> { + let mut bucket = self.get_write_bucket(); + bucket.as_mut().unwrap().try_write(pubkey, value.0[0]) + } +} diff --git a/bucket_map/src/bucket_map.rs b/bucket_map/src/bucket_map.rs index bad5cec5a2ed44..ecfd1ee43a2154 100644 --- a/bucket_map/src/bucket_map.rs +++ b/bucket_map/src/bucket_map.rs @@ -1,7 +1,11 @@ //! BucketMap is a mostly contention free concurrent map backed by MmapMut use { - crate::{bucket_api::BucketApi, bucket_stats::BucketMapStats, MaxSearch, RefCount}, + crate::{ + bucket_api::{BucketApi, BucketApiSingle}, + bucket_stats::BucketMapStats, + MaxSearch, RefCount, + }, solana_sdk::pubkey::Pubkey, std::{convert::TryInto, fmt::Debug, fs, path::PathBuf, sync::Arc}, tempfile::TempDir, @@ -33,6 +37,14 @@ pub struct BucketMap { pub temp_dir: Option, } +pub struct BucketMapSingle { + buckets: Vec>>, + drives: Arc>, + max_buckets_pow2: u8, + pub stats: Arc, + pub temp_dir: Option, +} + impl Drop for BucketMap { fn drop(&mut self) { if self.temp_dir.is_none() { @@ -47,6 +59,20 @@ impl std::fmt::Debug for BucketMap { } } +impl Drop for BucketMapSingle { + fn drop(&mut self) { + if self.temp_dir.is_none() { + BucketMapSingle::::erase_previous_drives(&self.drives); + } + } +} + +impl std::fmt::Debug for BucketMapSingle { + fn fmt(&self, _f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + Ok(()) + } +} + #[derive(Debug)] pub enum BucketMapError { /// (bucket_index, current_capacity_pow2) @@ -174,6 +200,125 @@ impl BucketMap { } } +impl BucketMapSingle { + pub fn new(config: BucketMapConfig) -> Self { + assert_ne!( + config.max_buckets, 0, + "Max number of buckets must be non-zero" + ); + assert!( + config.max_buckets.is_power_of_two(), + "Max number of buckets must be a power of two" + ); + // this should be <= 1 << DEFAULT_CAPACITY or we end up searching the same items over and over - probably not a big deal since it is so small anyway + const MAX_SEARCH: MaxSearch = 32; + let max_search = config.max_search.unwrap_or(MAX_SEARCH); + + if let Some(drives) = config.drives.as_ref() { + Self::erase_previous_drives(drives); + } + let mut temp_dir = None; + let drives = config.drives.unwrap_or_else(|| { + temp_dir = Some(TempDir::new().unwrap()); + vec![temp_dir.as_ref().unwrap().path().to_path_buf()] + }); + let drives = Arc::new(drives); + + let stats = Arc::default(); + let buckets = (0..config.max_buckets) + .map(|_| { + Arc::new(BucketApiSingle::new( + Arc::clone(&drives), + max_search, + Arc::clone(&stats), + )) + }) + .collect(); + + // A simple log2 function that is correct if x is a power of two + let log2 = |x: usize| usize::BITS - x.leading_zeros() - 1; + + Self { + buckets, + drives, + max_buckets_pow2: log2(config.max_buckets) as u8, + stats, + temp_dir, + } + } + + fn erase_previous_drives(drives: &[PathBuf]) { + drives.iter().for_each(|folder| { + let _ = fs::remove_dir_all(folder); + let _ = fs::create_dir_all(folder); + }) + } + + pub fn num_buckets(&self) -> usize { + self.buckets.len() + } + + /// Get the values for Pubkey `key` + pub fn read_value(&self, key: &Pubkey) -> Option<(Vec, RefCount)> { + self.get_bucket(key).read_value(key) + } + + /// Delete the Pubkey `key` + pub fn delete_key(&self, key: &Pubkey) { + self.get_bucket(key).delete_key(key); + } + + /// Update Pubkey `key`'s value with 'value' + pub fn insert(&self, key: &Pubkey, value: (&[T], RefCount)) { + self.get_bucket(key).insert(key, value) + } + + /// Update Pubkey `key`'s value with 'value' + pub fn try_insert(&self, key: &Pubkey, value: (&[T], RefCount)) -> Result<(), BucketMapError> { + self.get_bucket(key).try_write(key, value) + } + + /// Update Pubkey `key`'s value with function `updatefn` + pub fn update(&self, key: &Pubkey, updatefn: F) + where + F: FnMut(Option<&T>) -> Option, + { + self.get_bucket(key).update(key, updatefn) + } + + pub fn get_bucket(&self, key: &Pubkey) -> &Arc> { + self.get_bucket_from_index(self.bucket_ix(key)) + } + + pub fn get_bucket_from_index(&self, ix: usize) -> &Arc> { + &self.buckets[ix] + } + + /// Get the bucket index for Pubkey `key` + pub fn bucket_ix(&self, key: &Pubkey) -> usize { + if self.max_buckets_pow2 > 0 { + let location = read_be_u64(key.as_ref()); + (location >> (u64::BITS - self.max_buckets_pow2 as u32)) as usize + } else { + 0 + } + } + + /// Increment the refcount for Pubkey `key` + pub fn addref(&self, key: &Pubkey) -> Option { + let ix = self.bucket_ix(key); + let bucket = &self.buckets[ix]; + bucket.addref(key) + } + + /// Decrement the refcount for Pubkey `key` + pub fn unref(&self, key: &Pubkey) -> Option { + let ix = self.bucket_ix(key); + let bucket = &self.buckets[ix]; + bucket.unref(key) + } +} + /// Look at the first 8 bytes of the input and reinterpret them as a u64 fn read_be_u64(input: &[u8]) -> u64 { assert!(input.len() >= std::mem::size_of::()); diff --git a/bucket_map/src/index_entry.rs b/bucket_map/src/index_entry.rs index 91e77ac8c78c93..db21d702cbbc01 100644 --- a/bucket_map/src/index_entry.rs +++ b/bucket_map/src/index_entry.rs @@ -15,6 +15,15 @@ use { }, }; +#[repr(C)] +#[derive(Debug, Copy, Clone, PartialEq, Eq)] +// one instance of this per item in the index +// stored in the index bucket +pub struct IndexEntrySingle { + pub key: Pubkey, // can this be smaller if we have reduced the keys into buckets already? + pub info: T, +} + #[repr(C)] #[derive(Debug, Copy, Clone, PartialEq, Eq)] // one instance of this per item in the index @@ -36,6 +45,13 @@ struct PackedStorage { offset: B56, } +impl IndexEntrySingle { + pub fn init(&mut self, pubkey: &Pubkey, info: T) { + self.key = *pubkey; + self.info = info; + } +} + impl IndexEntry { pub fn init(&mut self, pubkey: &Pubkey) { self.key = *pubkey;