From d63359a3fffeeaba18bfc249e3ff97c90892ee11 Mon Sep 17 00:00:00 2001 From: "Jeff Washington (jwash)" Date: Mon, 10 Apr 2023 13:08:54 -0500 Subject: [PATCH] disk index: batch insert (#31094) --- bucket_map/src/bucket.rs | 271 +++++++++++++++++++++- bucket_map/src/bucket_api.rs | 14 ++ bucket_map/src/bucket_map.rs | 334 ++++++++++++++++----------- bucket_map/src/bucket_storage.rs | 3 +- runtime/src/accounts_index.rs | 6 +- runtime/src/in_mem_accounts_index.rs | 45 ++-- 6 files changed, 502 insertions(+), 171 deletions(-) diff --git a/bucket_map/src/bucket.rs b/bucket_map/src/bucket.rs index 4129aa15b1eb13..14781a9a1a72e4 100644 --- a/bucket_map/src/bucket.rs +++ b/bucket_map/src/bucket.rs @@ -30,7 +30,6 @@ use { pub struct ReallocatedItems { // Some if the index was reallocated - // u64 is random associated with the new index pub index: Option>, // Some for a data bucket reallocation // u64 is data bucket index @@ -104,6 +103,10 @@ pub struct Bucket { anticipated_size: u64, pub reallocated: Reallocated, DataBucket>, + + /// set to true once any entries have been deleted from the index. + /// Deletes indicate that there can be free slots and that the full search range must be searched for an entry. + at_least_one_entry_deleted: bool, } impl<'b, T: Clone + Copy + 'static> Bucket { @@ -131,6 +134,7 @@ impl<'b, T: Clone + Copy + 'static> Bucket { stats, reallocated: Reallocated::default(), anticipated_size: 0, + at_least_one_entry_deleted: false, } } @@ -269,12 +273,124 @@ impl<'b, T: Clone + Copy + 'static> Bucket { Err(BucketMapError::IndexNoSpace(index.contents.capacity())) } - pub fn read_value(&self, key: &Pubkey) -> Option<(&[T], RefCount)> { + pub(crate) fn read_value(&self, key: &Pubkey) -> Option<(&[T], RefCount)> { //debug!("READ_VALUE: {:?}", key); let (elem, _) = self.find_index_entry(key)?; Some(elem.read_value(&self.index, &self.data)) } + /// for each item in `items`, get the hash value when hashed with `random`. + /// Return a vec of tuples: + /// (hash_value, key, value) + fn index_entries( + items: impl Iterator, + count: usize, + random: u64, + ) -> Vec<(u64, Pubkey, T)> { + let mut inserts = Vec::with_capacity(count); + items.for_each(|(key, v)| { + let ix = Self::bucket_index_ix(&key, random); + inserts.push((ix, key, v)); + }); + inserts + } + + /// insert all of `items` into the index. + /// return duplicates + pub(crate) fn batch_insert_non_duplicates( + &mut self, + items: impl Iterator, + count: usize, + ) -> Vec<(Pubkey, T, T)> { + assert!( + !self.at_least_one_entry_deleted, + "efficient batch insertion can only occur prior to any deletes" + ); + let current_len = self.index.count.load(Ordering::Relaxed); + let anticipated = count as u64; + self.set_anticipated_count((anticipated).saturating_add(current_len)); + let mut entries = Self::index_entries(items, count, self.random); + let mut duplicates = Vec::default(); + // insert, but resizes may be necessary + loop { + let cap = self.index.capacity(); + // sort entries by their index % cap, so we'll search over the same spots in the file close to each other + // `reverse()` is so we can efficiently pop off the end but get ascending order index values + // sort before calling to make `batch_insert_non_duplicates_internal` easier to test. + entries.sort_unstable_by(|a, b| (a.0 % cap).cmp(&(b.0 % cap)).reverse()); + + let result = Self::batch_insert_non_duplicates_internal( + &mut self.index, + &self.data, + &mut entries, + &mut duplicates, + ); + match result { + Ok(_result) => { + // everything added + self.set_anticipated_count(0); + return duplicates; + } + Err(error) => { + // resize and add more + // `entries` will have had items removed from it + self.grow(error); + self.handle_delayed_grows(); + } + } + } + } + + /// sort `entries` by hash value + /// insert as much of `entries` as possible into `index`. + /// return an error if the index needs to resize. + /// for every entry that already exists in `index`, add it (and the value already in the index) to `duplicates` + pub fn batch_insert_non_duplicates_internal( + index: &mut BucketStorage>, + data_buckets: &[BucketStorage], + reverse_sorted_entries: &mut Vec<(u64, Pubkey, T)>, + duplicates: &mut Vec<(Pubkey, T, T)>, + ) -> Result<(), BucketMapError> { + let max_search = index.max_search(); + let cap = index.capacity(); + let search_end = max_search.min(cap); + + // pop one entry at a time to insert + 'outer: while let Some((ix_entry_raw, k, v)) = reverse_sorted_entries.pop() { + let ix_entry = ix_entry_raw % cap; + // search for an empty spot starting at `ix_entry` + for search in 0..search_end { + let ix_index = (ix_entry + search) % cap; + let elem = IndexEntryPlaceInBucket::new(ix_index); + if index.try_lock(ix_index) { + // found free element and occupied it + // These fields will be overwritten after allocation by callers. + // Since this part of the mmapped file could have previously been used by someone else, there can be garbage here. + elem.init(index, &k); + + // new data stored should be stored in IndexEntry and NOT in data file + // new data len is 1 + elem.set_slot_count_enum_value(index, OccupiedEnum::OneSlotInIndex(&v)); + continue 'outer; // this 'insertion' is completed: inserted successfully + } else { + // occupied, see if the key already exists here + if elem.key(index) == &k { + let (v_existing, _ref_count_existing) = + elem.read_value(index, data_buckets); + duplicates.push((k, v, *v_existing.first().unwrap())); + continue 'outer; // this 'insertion' is completed: found a duplicate entry + } + } + } + // search loop ended without finding a spot to insert this key + // so, remember the item we were trying to insert for next time after resizing + reverse_sorted_entries.push((ix_entry_raw, k, v)); + return Err(BucketMapError::IndexNoSpace(cap)); + } + + Ok(()) + } + pub fn try_write( &mut self, key: &Pubkey, @@ -417,6 +533,7 @@ impl<'b, T: Clone + Copy + 'static> Bucket { pub fn delete_key(&mut self, key: &Pubkey) { if let Some((elem, elem_ix)) = self.find_index_entry(key) { + self.at_least_one_entry_deleted = true; if let OccupiedEnum::MultipleSlots(multiple_slots) = elem.get_slot_count_enum(&self.index) { @@ -637,3 +754,153 @@ impl<'b, T: Clone + Copy + 'static> Bucket { self.insert(key, (&new, refct)); } } + +#[cfg(test)] +mod tests { + use {super::*, tempfile::tempdir}; + + #[test] + fn test_index_entries() { + for v in 10..12u64 { + for random in 1..3 { + for len in 1..3 { + let raw = (0..len) + .map(|l| { + let k = Pubkey::from([l as u8; 32]); + (k, v + (l as u64)) + }) + .collect::>(); + let hashed = Bucket::index_entries(raw.clone().into_iter(), len, random); + assert_eq!(hashed.len(), len); + (0..len).for_each(|i| { + let raw = raw[i]; + let hashed = hashed[i]; + assert_eq!(Bucket::::bucket_index_ix(&raw.0, random), hashed.0); + assert_eq!(raw.0, hashed.1); + assert_eq!(raw.1, hashed.2); + }); + } + } + } + } + + fn create_test_index(max_search: Option) -> BucketStorage> { + let tmpdir = tempdir().unwrap(); + let paths: Vec = vec![tmpdir.path().to_path_buf()]; + assert!(!paths.is_empty()); + let max_search = max_search.unwrap_or(2); + BucketStorage::>::new( + Arc::new(paths), + 1, + std::mem::size_of::>() as u64, + max_search, + Arc::default(), + Arc::default(), + ) + } + + #[test] + fn batch_insert_non_duplicates_internal_simple() { + solana_logger::setup(); + // add 2 entries, make sure they are added in the buckets we expect + let random = 1; + let data_buckets = Vec::default(); + for v in 10..12u64 { + for len in 1..3 { + let raw = (0..len) + .map(|l| { + let k = Pubkey::from([l as u8; 32]); + (k, v + (l as u64)) + }) + .collect::>(); + let mut hashed = Bucket::index_entries(raw.clone().into_iter(), len, random); + let hashed_raw = hashed.clone(); + + let mut index = create_test_index(None); + + let mut duplicates = Vec::default(); + assert!(Bucket::::batch_insert_non_duplicates_internal( + &mut index, + &Vec::default(), + &mut hashed, + &mut duplicates, + ) + .is_ok()); + + assert_eq!(hashed.len(), 0); + (0..len).for_each(|i| { + let raw = hashed_raw[i]; + let elem = IndexEntryPlaceInBucket::new(raw.0 % index.capacity()); + let (value, ref_count) = elem.read_value(&index, &data_buckets); + assert_eq!(ref_count, 1); + assert_eq!(value, &[hashed_raw[i].2]); + }); + } + } + } + + #[test] + fn batch_insert_non_duplicates_internal_same_ix_exceeds_max_search() { + solana_logger::setup(); + // add `len` entries with the same ix, make sure they are added in subsequent buckets. + // adjust `max_search`. If we try to add an entry that causes us to exceed `max_search`, then assert that the adding fails with an error and + // the colliding item remains in `entries` + let random = 1; + let data_buckets = Vec::default(); + for max_search in [2usize, 3] { + for v in 10..12u64 { + for len in 1..(max_search + 1) { + let raw = (0..len) + .map(|l| { + let k = Pubkey::from([l as u8; 32]); + (k, v + (l as u64)) + }) + .collect::>(); + let mut hashed = Bucket::index_entries(raw.clone().into_iter(), len, random); + let common_ix = 2; // both are put at same ix + hashed.iter_mut().for_each(|mut v| { + v.0 = common_ix; + }); + let hashed_raw = hashed.clone(); + + let mut index = create_test_index(Some(max_search as u8)); + + let mut duplicates = Vec::default(); + let result = Bucket::::batch_insert_non_duplicates_internal( + &mut index, + &Vec::default(), + &mut hashed, + &mut duplicates, + ); + + assert_eq!( + hashed.len(), + if len > max_search { 1 } else { 0 }, + "len: {len}" + ); + (0..len).for_each(|i| { + assert!(if len > max_search { + result.is_err() + } else { + result.is_ok() + }); + let raw = hashed_raw[i]; + if i == 0 && len > max_search { + // max search was exceeded and the first entry was unable to be inserted, so it remained in `hashed` + assert_eq!(hashed[0], hashed_raw[0]); + } else { + // we insert in reverse order when ix values are equal, so we expect to find item[1] in item[1]'s expected ix and item[0] will be 1 search distance away from expected ix + let search_required = (len - i - 1) as u64; + let elem = IndexEntryPlaceInBucket::new( + (raw.0 + search_required) % index.capacity(), + ); + let (value, ref_count) = elem.read_value(&index, &data_buckets); + assert_eq!(ref_count, 1); + assert_eq!(value, &[hashed_raw[i].2]); + } + }); + } + } + } + } +} diff --git a/bucket_map/src/bucket_api.rs b/bucket_map/src/bucket_api.rs index 05275c5308ef6a..b6b43ebc042698 100644 --- a/bucket_map/src/bucket_api.rs +++ b/bucket_map/src/bucket_api.rs @@ -118,6 +118,20 @@ impl BucketApi { bucket.as_mut().unwrap().set_anticipated_count(count); } + /// batch insert of `items`. Assumption is a single slot list element and ref_count == 1. + /// For any pubkeys that already exist, the failed insertion data and the existing data are returned. + pub fn batch_insert_non_duplicates( + &self, + items: impl Iterator, + count: usize, + ) -> Vec<(Pubkey, T, T)> { + let mut bucket = self.get_write_bucket(); + bucket + .as_mut() + .unwrap() + .batch_insert_non_duplicates(items, count) + } + pub fn update(&self, key: &Pubkey, updatefn: F) where F: FnMut(Option<(&[T], RefCount)>) -> Option<(Vec, RefCount)>, diff --git a/bucket_map/src/bucket_map.rs b/bucket_map/src/bucket_map.rs index 5a218bde58f774..bc61d5c67ac4de 100644 --- a/bucket_map/src/bucket_map.rs +++ b/bucket_map/src/bucket_map.rs @@ -355,161 +355,225 @@ mod tests { fn hashmap_compare() { use std::sync::Mutex; solana_logger::setup(); - let maps = (0..2) - .map(|max_buckets_pow2| { - let config = BucketMapConfig::new(1 << max_buckets_pow2); - BucketMap::new(config) - }) - .collect::>(); - let hash_map = RwLock::new(HashMap::, RefCount)>::new()); - let max_slot_list_len = 5; - let all_keys = Mutex::new(vec![]); - - let gen_rand_value = || { - let count = thread_rng().gen_range(0, max_slot_list_len); - let v = (0..count) - .map(|x| (x as usize, x as usize /*thread_rng().gen::()*/)) + for mut use_batch_insert in [true, false] { + let maps = (0..2) + .map(|max_buckets_pow2| { + let config = BucketMapConfig::new(1 << max_buckets_pow2); + BucketMap::new(config) + }) .collect::>(); - let range = thread_rng().gen_range(0, 100); - // pick ref counts that are useful and common - let rc = if range < 50 { - 1 - } else if range < 60 { - 0 - } else if range < 70 { - 2 - } else { - thread_rng().gen_range(0, MAX_LEGAL_REFCOUNT) + let hash_map = RwLock::new(HashMap::, RefCount)>::new()); + let max_slot_list_len = 5; + let all_keys = Mutex::new(vec![]); + + let gen_rand_value = || { + let count = thread_rng().gen_range(0, max_slot_list_len); + let v = (0..count) + .map(|x| (x as usize, x as usize /*thread_rng().gen::()*/)) + .collect::>(); + let range = thread_rng().gen_range(0, 100); + // pick ref counts that are useful and common + let rc = if range < 50 { + 1 + } else if range < 60 { + 0 + } else if range < 70 { + 2 + } else { + thread_rng().gen_range(0, MAX_LEGAL_REFCOUNT) + }; + + (v, rc) }; - (v, rc) - }; + let get_key = || { + let mut keys = all_keys.lock().unwrap(); + if keys.is_empty() { + return None; + } + let len = keys.len(); + Some(keys.remove(thread_rng().gen_range(0, len))) + }; + let return_key = |key| { + let mut keys = all_keys.lock().unwrap(); + keys.push(key); + }; - let get_key = || { - let mut keys = all_keys.lock().unwrap(); - if keys.is_empty() { - return None; - } - let len = keys.len(); - Some(keys.remove(thread_rng().gen_range(0, len))) - }; - let return_key = |key| { - let mut keys = all_keys.lock().unwrap(); - keys.push(key); - }; - - let verify = || { - let mut maps = maps - .iter() - .map(|map| { - let mut r = vec![]; - for bin in 0..map.num_buckets() { - r.append( - &mut map.buckets[bin] - .items_in_range(&None::<&std::ops::RangeInclusive>), - ); - } - r - }) - .collect::>(); - let hm = hash_map.read().unwrap(); - for (k, v) in hm.iter() { - for map in maps.iter_mut() { - for i in 0..map.len() { - if k == &map[i].pubkey { - assert_eq!(map[i].slot_list, v.0); - assert_eq!(map[i].ref_count, v.1); - map.remove(i); - break; + let verify = || { + let mut maps = maps + .iter() + .map(|map| { + let mut r = vec![]; + for bin in 0..map.num_buckets() { + r.append( + &mut map.buckets[bin] + .items_in_range(&None::<&std::ops::RangeInclusive>), + ); + } + r + }) + .collect::>(); + let hm = hash_map.read().unwrap(); + for (k, v) in hm.iter() { + for map in maps.iter_mut() { + for i in 0..map.len() { + if k == &map[i].pubkey { + assert_eq!(map[i].slot_list, v.0); + assert_eq!(map[i].ref_count, v.1); + map.remove(i); + break; + } } } } + for map in maps.iter() { + assert!(map.is_empty()); + } + }; + let mut initial: usize = 100; // put this many items in to start + if use_batch_insert { + // insert a lot more when inserting with batch to make sure we hit resizing during batch + initial *= 3; } - for map in maps.iter() { - assert!(map.is_empty()); - } - }; - let mut initial = 100; // put this many items in to start - - // do random operations: insert, update, delete, add/unref in random order - // verify consistency between hashmap and all bucket maps - for i in 0..10000 { - if initial > 0 { - initial -= 1; - } - if initial > 0 || thread_rng().gen_range(0, 5) == 0 { - // insert - let k = solana_sdk::pubkey::new_rand(); - let v = gen_rand_value(); - hash_map.write().unwrap().insert(k, v.clone()); - let insert = thread_rng().gen_range(0, 2) == 0; - maps.iter().for_each(|map| { - if insert { - map.insert(&k, (&v.0, v.1)) - } else { - map.update(&k, |current| { - assert!(current.is_none()); - Some(v.clone()) - }) + + // do random operations: insert, update, delete, add/unref in random order + // verify consistency between hashmap and all bucket maps + for i in 0..10000 { + initial = initial.saturating_sub(1); + if initial > 0 || thread_rng().gen_range(0, 5) == 0 { + // insert + let mut to_add = 1; + if initial > 1 && use_batch_insert { + to_add = thread_rng().gen_range(1, (initial / 4).max(2)); + initial -= to_add; } - }); - return_key(k); - } - if thread_rng().gen_range(0, 10) == 0 { - // update - if let Some(k) = get_key() { - let hm = hash_map.read().unwrap(); - let (v, rc) = gen_rand_value(); - let v_old = hm.get(&k); + + let additions = (0..to_add) + .map(|_| { + let k = solana_sdk::pubkey::new_rand(); + let mut v = gen_rand_value(); + if use_batch_insert { + // refcount has to be 1 to use batch insert + v.1 = 1; + // len has to be 1 to use batch insert + if v.0.len() > 1 { + v.0.truncate(1); + } else if v.0.is_empty() { + loop { + let mut new_v = gen_rand_value(); + if !new_v.0.is_empty() { + v.0 = vec![new_v.0.pop().unwrap()]; + break; + } + } + } + } + (k, v) + }) + .collect::>(); + + additions.clone().into_iter().for_each(|(k, v)| { + hash_map.write().unwrap().insert(k, v); + return_key(k); + }); let insert = thread_rng().gen_range(0, 2) == 0; maps.iter().for_each(|map| { - if insert { - map.insert(&k, (&v, rc)) + // batch insert can only work for the map with only 1 bucket so that we can batch add to a single bucket + let batch_insert_now = map.buckets.len() == 1 + && use_batch_insert + && thread_rng().gen_range(0, 2) == 0; + if batch_insert_now { + // batch insert into the map with 1 bucket 50% of the time + assert!(map + .get_bucket_from_index(0) + .batch_insert_non_duplicates( + additions + .clone() + .into_iter() + .map(|(k, mut v)| (k, v.0.pop().unwrap())), + to_add, + ) + .is_empty()); } else { - map.update(&k, |current| { - assert_eq!(current, v_old.map(|(v, rc)| (&v[..], *rc)), "{k}"); - Some((v.clone(), rc)) - }) + additions.clone().into_iter().for_each(|(k, v)| { + if insert { + map.insert(&k, (&v.0, v.1)) + } else { + map.update(&k, |current| { + assert!(current.is_none()); + Some(v.clone()) + }) + } + }); } }); - drop(hm); - hash_map.write().unwrap().insert(k, (v, rc)); - return_key(k); + + if use_batch_insert && initial == 1 { + // done using batch insert once we have added the initial entries + // now, the test can remove, update, addref, etc. + use_batch_insert = false; + } } - } - if thread_rng().gen_range(0, 20) == 0 { - // delete - if let Some(k) = get_key() { - let mut hm = hash_map.write().unwrap(); - hm.remove(&k); - maps.iter().for_each(|map| { - map.delete_key(&k); - }); + if use_batch_insert && initial > 0 { + // if we are using batch insert, it is illegal to update, delete, or addref/unref an account until all batch inserts are complete + continue; } - } - if thread_rng().gen_range(0, 10) == 0 { - // add/unref - if let Some(k) = get_key() { - let mut inc = thread_rng().gen_range(0, 2) == 0; - let mut hm = hash_map.write().unwrap(); - let (v, mut rc) = hm.get(&k).map(|(v, rc)| (v.to_vec(), *rc)).unwrap(); - if !inc && rc == 0 { - // can't decrement rc=0 - inc = true; + if thread_rng().gen_range(0, 10) == 0 { + // update + if let Some(k) = get_key() { + let hm = hash_map.read().unwrap(); + let (v, rc) = gen_rand_value(); + let v_old = hm.get(&k); + let insert = thread_rng().gen_range(0, 2) == 0; + maps.iter().for_each(|map| { + if insert { + map.insert(&k, (&v, rc)) + } else { + map.update(&k, |current| { + assert_eq!(current, v_old.map(|(v, rc)| (&v[..], *rc)), "{k}"); + Some((v.clone(), rc)) + }) + } + }); + drop(hm); + hash_map.write().unwrap().insert(k, (v, rc)); + return_key(k); } - rc = if inc { rc + 1 } else { rc - 1 }; - hm.insert(k, (v.to_vec(), rc)); - maps.iter().for_each(|map| { - map.update(&k, |current| Some((current.unwrap().0.to_vec(), rc))) - }); + } + if thread_rng().gen_range(0, 20) == 0 { + // delete + if let Some(k) = get_key() { + let mut hm = hash_map.write().unwrap(); + hm.remove(&k); + maps.iter().for_each(|map| { + map.delete_key(&k); + }); + } + } + if thread_rng().gen_range(0, 10) == 0 { + // add/unref + if let Some(k) = get_key() { + let mut inc = thread_rng().gen_range(0, 2) == 0; + let mut hm = hash_map.write().unwrap(); + let (v, mut rc) = hm.get(&k).map(|(v, rc)| (v.to_vec(), *rc)).unwrap(); + if !inc && rc == 0 { + // can't decrement rc=0 + inc = true; + } + rc = if inc { rc + 1 } else { rc - 1 }; + hm.insert(k, (v.to_vec(), rc)); + maps.iter().for_each(|map| { + map.update(&k, |current| Some((current.unwrap().0.to_vec(), rc))) + }); - return_key(k); + return_key(k); + } + } + if i % 1000 == 0 { + verify(); } } - if i % 1000 == 0 { - verify(); - } + verify(); } - verify(); } } diff --git a/bucket_map/src/bucket_storage.rs b/bucket_map/src/bucket_storage.rs index 72b68319d205da..24210437f4ad1d 100644 --- a/bucket_map/src/bucket_storage.rs +++ b/bucket_map/src/bucket_storage.rs @@ -218,7 +218,8 @@ impl BucketStorage { self.contents.is_free(entry, ix as usize) } - fn try_lock(&mut self, ix: u64) -> bool { + /// try to occupy `ix`. return true if successful + pub(crate) fn try_lock(&mut self, ix: u64) -> bool { let start = self.get_start_offset_with_header(ix); let entry = &mut self.mmap[start..]; if self.contents.is_free(entry, ix as usize) { diff --git a/runtime/src/accounts_index.rs b/runtime/src/accounts_index.rs index 44afd7535d638b..a5e252fa7281ab 100644 --- a/runtime/src/accounts_index.rs +++ b/runtime/src/accounts_index.rs @@ -2491,8 +2491,8 @@ pub mod tests { assert_eq!(num, 1); // not zero lamports - let index = AccountsIndex::::default_for_tests(); - let account_info: AccountInfoTest = 0 as AccountInfoTest; + let index = AccountsIndex::::default_for_tests(); + let account_info = false; let items = vec![(*pubkey, account_info)]; index.set_startup(Startup::Startup); index.insert_new_if_missing_into_primary_index(slot, items.len(), items.into_iter()); @@ -2516,7 +2516,7 @@ pub mod tests { assert!(index .get_for_tests(pubkey, Some(&ancestors), None) .is_some()); - assert_eq!(index.ref_count_from_storage(pubkey), 0); // cached, so 0 + assert_eq!(index.ref_count_from_storage(pubkey), 1); index.unchecked_scan_accounts( "", &ancestors, diff --git a/runtime/src/in_mem_accounts_index.rs b/runtime/src/in_mem_accounts_index.rs index 8217f76489791d..9230ddcad4320c 100644 --- a/runtime/src/in_mem_accounts_index.rs +++ b/runtime/src/in_mem_accounts_index.rs @@ -1060,36 +1060,21 @@ impl + Into> InMemAccountsIndex { - // already on disk, so remember the new (slot, info) for later - duplicates.duplicates.push((slot, k, entry.1)); - if let Some((slot, _)) = current_slot_list.first() { - // accurately account for there being a duplicate for the first entry that was previously added to the disk index. - // That entry could not have known yet that it was a duplicate. - // It is important to capture each slot with a duplicate because of slot limits applied to clean. - duplicates.duplicates_put_on_disk.insert((*slot, k)); - } - Some((current_slot_list.to_vec(), ref_count)) - } - None => { - count += 1; - // not on disk, insert it - Some((vec![(entry.0, entry.1.into())], new_ref_count)) - } - } - }); - }); - // remove the guidance for how many entries the bucket will eventually contain since we have added all we knew about - disk.set_anticipated_count(0); + let mut count = insert.len() as u64; + for (k, entry, duplicate_entry) in disk.batch_insert_non_duplicates( + insert.into_iter().map(|(slot, k, v)| (k, (slot, v.into()))), + count as usize, + ) { + duplicates.duplicates.push((entry.0, k, entry.1.into())); + // accurately account for there being a duplicate for the first entry that was previously added to the disk index. + // That entry could not have known yet that it was a duplicate. + // It is important to capture each slot with a duplicate because of slot limits applied to clean. + duplicates + .duplicates_put_on_disk + .insert((duplicate_entry.0, k)); + count -= 1; + } + self.stats().inc_insert_count(count); }