From eaaa1b66b86927275b9cf13d838845e530658e81 Mon Sep 17 00:00:00 2001 From: jeff washington Date: Thu, 6 Apr 2023 17:13:40 -0500 Subject: [PATCH] disk index: batch insert --- bucket_map/src/bucket.rs | 143 ++++++++++++++++++++++++--- bucket_map/src/bucket_api.rs | 14 +++ bucket_map/src/bucket_storage.rs | 3 +- runtime/src/accounts_index.rs | 6 +- runtime/src/in_mem_accounts_index.rs | 45 +++------ 5 files changed, 162 insertions(+), 49 deletions(-) diff --git a/bucket_map/src/bucket.rs b/bucket_map/src/bucket.rs index d5ab4595789af2..d2fa8910985952 100644 --- a/bucket_map/src/bucket.rs +++ b/bucket_map/src/bucket.rs @@ -31,7 +31,7 @@ use { pub struct ReallocatedItems { // Some if the index was reallocated // u64 is random associated with the new index - pub index: Option<(u64, BucketStorage)>, + pub index: Option>, // Some for a data bucket reallocation // u64 is data bucket index pub data: Option<(u64, BucketStorage)>, @@ -104,6 +104,10 @@ pub struct Bucket { anticipated_size: u64, pub reallocated: Reallocated, DataBucket>, + + /// set to true once any entries have been deleted from the index. + /// Deletes indicate that there can be free slots and that the full search range must be searched for an entry. + at_least_one_entry_deleted: bool, } impl<'b, T: Clone + Copy + 'static> Bucket { @@ -131,6 +135,7 @@ impl<'b, T: Clone + Copy + 'static> Bucket { stats, reallocated: Reallocated::default(), anticipated_size: 0, + at_least_one_entry_deleted: false, } } @@ -183,7 +188,7 @@ impl<'b, T: Clone + Copy + 'static> Bucket { key: &Pubkey, random: u64, ) -> Result<(Option>, u64), BucketMapError> { - let ix = Self::bucket_index_ix(index, key, random); + let ix = Self::bucket_index_ix(key, random) % index.capacity(); let mut first_free = None; let mut m = Measure::start("bucket_find_index_entry_mut"); let capacity = index.capacity(); @@ -222,7 +227,7 @@ impl<'b, T: Clone + Copy + 'static> Bucket { key: &Pubkey, random: u64, ) -> Option<(IndexEntryPlaceInBucket, u64)> { - let ix = Self::bucket_index_ix(index, key, random); + let ix = Self::bucket_index_ix(key, random) % index.capacity(); for i in ix..ix + index.max_search() { let ii = i % index.capacity(); if index.is_free(ii) { @@ -243,7 +248,7 @@ impl<'b, T: Clone + Copy + 'static> Bucket { is_resizing: bool, ) -> Result { let mut m = Measure::start("bucket_create_key"); - let ix = Self::bucket_index_ix(index, key, random); + let ix = Self::bucket_index_ix(key, random) % index.capacity(); for i in ix..ix + index.max_search() { let ii = i % index.capacity(); if !index.is_free(ii) { @@ -269,12 +274,121 @@ impl<'b, T: Clone + Copy + 'static> Bucket { Err(BucketMapError::IndexNoSpace(index.contents.capacity())) } - pub fn read_value(&self, key: &Pubkey) -> Option<(&[T], RefCount)> { + pub(crate) fn read_value(&self, key: &Pubkey) -> Option<(&[T], RefCount)> { //debug!("READ_VALUE: {:?}", key); let (elem, _) = self.find_index_entry(key)?; Some(elem.read_value(&self.index, &self.data)) } + /// for each item in `items`, get the hash value when hashed with `random`. + /// Return a vec of tuples: + /// (hash_value, key, value) + fn index_entries( + items: impl Iterator, + count: usize, + random: u64, + ) -> Vec<(u64, Pubkey, T)> { + let mut inserts = Vec::with_capacity(count); + items.for_each(|(key, v)| { + let ix = Self::bucket_index_ix(&key, random); + inserts.push((ix, key, v)); + }); + inserts + } + + /// insert all of `items` into the index. + /// return duplicates + pub(crate) fn batch_insert_non_duplicates( + &mut self, + items: impl Iterator, + count: usize, + ) -> Vec<(Pubkey, T, T)> { + assert!( + !self.at_least_one_entry_deleted, + "efficient batch insertion can only occur prior to any deletes" + ); + let current_len = self.index.count.load(Ordering::Relaxed); + let anticipated = count as u64; + self.set_anticipated_count((anticipated).saturating_add(current_len)); + let mut entries = Self::index_entries(items, count, self.random); + let mut duplicates = Vec::default(); + // insert, but resizes may be necessary + loop { + let result = Self::batch_insert_non_duplicates_internal( + &mut self.index, + &self.data, + &mut entries, + &mut duplicates, + ); + match result { + Ok(_result) => { + // everything added + self.set_anticipated_count(0); + return duplicates; + } + Err(error) => { + // resize and add more + // `entries` will have had items removed from it + self.grow(error); + self.handle_delayed_grows(); + } + } + } + } + + /// sort `entries` by hash value + /// insert as much of `entries` as possible into `index`. + /// return an error if the index needs to resize. + /// for every entry that already exists in `index`, add it (and the value already in the index) to `duplicates` + pub fn batch_insert_non_duplicates_internal( + index: &mut BucketStorage>, + data_buckets: &[BucketStorage], + entries: &mut Vec<(u64, Pubkey, T)>, + duplicates: &mut Vec<(Pubkey, T, T)>, + ) -> Result<(), BucketMapError> { + let max_search = index.max_search(); + let cap = index.capacity(); + // sort entries by their index % cap, so we'll search over the same spots in the file close to each other + // `reverse()` is so we can efficiently pop off the end but get ascending order index values + entries.sort_by(|a, b| (a.0 % cap).cmp(&(b.0 % cap)).reverse()); + let search_end = max_search.min(cap); + + // pop one entry at a time to insert + 'outer: while let Some((ix_entry_raw, k, v)) = entries.pop() { + let ix_entry = ix_entry_raw % cap; + // search for an empty spot starting at `ix_entry` + for search in 0..search_end { + let ix_index = (ix_entry + search) % cap; + let elem = IndexEntryPlaceInBucket::new(ix_index); + if index.try_lock(ix_index) { + // found free element and occupied it + // These fields will be overwritten after allocation by callers. + // Since this part of the mmapped file could have previously been used by someone else, there can be garbage here. + elem.init(index, &k); + + // new data stored should be stored in IndexEntry and NOT in data file + // new data len is 1 + elem.set_slot_count_enum_value(index, OccupiedEnum::OneSlotInIndex(&v)); + continue 'outer; // this 'insertion' is completed: inserted successfully + } else { + // occupied, see if the key already exists here + if elem.key(index) == &k { + let (v_existing, _ref_count_existing) = + elem.read_value(index, data_buckets); + duplicates.push((k, v, *v_existing.first().unwrap())); + continue 'outer; // this 'insertion' is completed: found a duplicate entry + } + } + } + // search loop ended without finding a spot to insert this key + // so, remember the item we were trying to insert for next time after resizing + entries.push((ix_entry_raw, k, v)); + return Err(BucketMapError::IndexNoSpace(cap)); + } + + Ok(()) + } + pub fn try_write( &mut self, key: &Pubkey, @@ -417,6 +531,7 @@ impl<'b, T: Clone + Copy + 'static> Bucket { pub fn delete_key(&mut self, key: &Pubkey) { if let Some((elem, elem_ix)) = self.find_index_entry(key) { + self.at_least_one_entry_deleted = true; if let OccupiedEnum::MultipleSlots(multiple_slots) = elem.get_slot_count_enum(&self.index) { @@ -460,12 +575,12 @@ impl<'b, T: Clone + Copy + 'static> Bucket { // index may have allocated something larger than we asked for, // so, in case we fail to reindex into this larger size, grow from this size next iteration. current_capacity = index.capacity(); - let random = thread_rng().gen(); let mut valid = true; for ix in 0..self.index.capacity() { if !self.index.is_free(ix) { let elem: &IndexEntry = self.index.get(ix); - let new_ix = Self::bucket_create_key(&mut index, &elem.key, random, true); + let new_ix = + Self::bucket_create_key(&mut index, &elem.key, self.random, true); if new_ix.is_err() { valid = false; break; @@ -486,7 +601,7 @@ impl<'b, T: Clone + Copy + 'static> Bucket { if valid { self.stats.index.update_max_size(index.capacity()); let mut items = self.reallocated.items.lock().unwrap(); - items.index = Some((random, index)); + items.index = Some(index); self.reallocated.add_reallocation(); break; } @@ -506,12 +621,11 @@ impl<'b, T: Clone + Copy + 'static> Bucket { } } - pub fn apply_grow_index(&mut self, random: u64, index: BucketStorage>) { + pub fn apply_grow_index(&mut self, index: BucketStorage>) { self.stats .index .resize_grow(self.index.capacity_bytes(), index.capacity_bytes()); - self.random = random; self.index = index; } @@ -568,15 +682,14 @@ impl<'b, T: Clone + Copy + 'static> Bucket { items.data = Some((data_index, new_bucket)); } - fn bucket_index_ix(index: &BucketStorage>, key: &Pubkey, random: u64) -> u64 { + fn bucket_index_ix(key: &Pubkey, random: u64) -> u64 { let mut s = DefaultHasher::new(); key.hash(&mut s); //the locally generated random will make it hard for an attacker //to deterministically cause all the pubkeys to land in the same //location in any bucket on all validators random.hash(&mut s); - let ix = s.finish(); - ix % index.capacity() + s.finish() //debug!( "INDEX_IX: {:?} uid:{} loc: {} cap:{}", key, uid, location, index.capacity() ); } @@ -601,8 +714,8 @@ impl<'b, T: Clone + Copy + 'static> Bucket { // swap out the bucket that was resized previously with a read lock let mut items = std::mem::take(&mut *self.reallocated.items.lock().unwrap()); - if let Some((random, bucket)) = items.index.take() { - self.apply_grow_index(random, bucket); + if let Some(bucket) = items.index.take() { + self.apply_grow_index(bucket); } else { // data bucket let (i, new_bucket) = items.data.take().unwrap(); diff --git a/bucket_map/src/bucket_api.rs b/bucket_map/src/bucket_api.rs index 05275c5308ef6a..b6b43ebc042698 100644 --- a/bucket_map/src/bucket_api.rs +++ b/bucket_map/src/bucket_api.rs @@ -118,6 +118,20 @@ impl BucketApi { bucket.as_mut().unwrap().set_anticipated_count(count); } + /// batch insert of `items`. Assumption is a single slot list element and ref_count == 1. + /// For any pubkeys that already exist, the failed insertion data and the existing data are returned. + pub fn batch_insert_non_duplicates( + &self, + items: impl Iterator, + count: usize, + ) -> Vec<(Pubkey, T, T)> { + let mut bucket = self.get_write_bucket(); + bucket + .as_mut() + .unwrap() + .batch_insert_non_duplicates(items, count) + } + pub fn update(&self, key: &Pubkey, updatefn: F) where F: FnMut(Option<(&[T], RefCount)>) -> Option<(Vec, RefCount)>, diff --git a/bucket_map/src/bucket_storage.rs b/bucket_map/src/bucket_storage.rs index 72b68319d205da..24210437f4ad1d 100644 --- a/bucket_map/src/bucket_storage.rs +++ b/bucket_map/src/bucket_storage.rs @@ -218,7 +218,8 @@ impl BucketStorage { self.contents.is_free(entry, ix as usize) } - fn try_lock(&mut self, ix: u64) -> bool { + /// try to occupy `ix`. return true if successful + pub(crate) fn try_lock(&mut self, ix: u64) -> bool { let start = self.get_start_offset_with_header(ix); let entry = &mut self.mmap[start..]; if self.contents.is_free(entry, ix as usize) { diff --git a/runtime/src/accounts_index.rs b/runtime/src/accounts_index.rs index 44afd7535d638b..a5e252fa7281ab 100644 --- a/runtime/src/accounts_index.rs +++ b/runtime/src/accounts_index.rs @@ -2491,8 +2491,8 @@ pub mod tests { assert_eq!(num, 1); // not zero lamports - let index = AccountsIndex::::default_for_tests(); - let account_info: AccountInfoTest = 0 as AccountInfoTest; + let index = AccountsIndex::::default_for_tests(); + let account_info = false; let items = vec![(*pubkey, account_info)]; index.set_startup(Startup::Startup); index.insert_new_if_missing_into_primary_index(slot, items.len(), items.into_iter()); @@ -2516,7 +2516,7 @@ pub mod tests { assert!(index .get_for_tests(pubkey, Some(&ancestors), None) .is_some()); - assert_eq!(index.ref_count_from_storage(pubkey), 0); // cached, so 0 + assert_eq!(index.ref_count_from_storage(pubkey), 1); index.unchecked_scan_accounts( "", &ancestors, diff --git a/runtime/src/in_mem_accounts_index.rs b/runtime/src/in_mem_accounts_index.rs index 8217f76489791d..9230ddcad4320c 100644 --- a/runtime/src/in_mem_accounts_index.rs +++ b/runtime/src/in_mem_accounts_index.rs @@ -1060,36 +1060,21 @@ impl + Into> InMemAccountsIndex { - // already on disk, so remember the new (slot, info) for later - duplicates.duplicates.push((slot, k, entry.1)); - if let Some((slot, _)) = current_slot_list.first() { - // accurately account for there being a duplicate for the first entry that was previously added to the disk index. - // That entry could not have known yet that it was a duplicate. - // It is important to capture each slot with a duplicate because of slot limits applied to clean. - duplicates.duplicates_put_on_disk.insert((*slot, k)); - } - Some((current_slot_list.to_vec(), ref_count)) - } - None => { - count += 1; - // not on disk, insert it - Some((vec![(entry.0, entry.1.into())], new_ref_count)) - } - } - }); - }); - // remove the guidance for how many entries the bucket will eventually contain since we have added all we knew about - disk.set_anticipated_count(0); + let mut count = insert.len() as u64; + for (k, entry, duplicate_entry) in disk.batch_insert_non_duplicates( + insert.into_iter().map(|(slot, k, v)| (k, (slot, v.into()))), + count as usize, + ) { + duplicates.duplicates.push((entry.0, k, entry.1.into())); + // accurately account for there being a duplicate for the first entry that was previously added to the disk index. + // That entry could not have known yet that it was a duplicate. + // It is important to capture each slot with a duplicate because of slot limits applied to clean. + duplicates + .duplicates_put_on_disk + .insert((duplicate_entry.0, k)); + count -= 1; + } + self.stats().inc_insert_count(count); }