diff --git a/runtime/src/accounts_db.rs b/runtime/src/accounts_db.rs index 4b8ab84bf169ab..7d46cddecb56cb 100644 --- a/runtime/src/accounts_db.rs +++ b/runtime/src/accounts_db.rs @@ -9115,7 +9115,7 @@ impl AccountsDb { // get duplicate keys from acct idx. We have to wait until we've finished flushing. for (slot, key) in self .accounts_index - .retrieve_duplicate_keys_from_startup() + .populate_and_retrieve_duplicate_keys_from_startup() .into_iter() .flatten() { diff --git a/runtime/src/accounts_index.rs b/runtime/src/accounts_index.rs index a29d91ef2f632a..44afd7535d638b 100644 --- a/runtime/src/accounts_index.rs +++ b/runtime/src/accounts_index.rs @@ -1685,11 +1685,14 @@ impl + Into> AccountsIndex { } /// return Vec> because the internal vecs are already allocated per bin - pub fn retrieve_duplicate_keys_from_startup(&self) -> Vec> { + pub(crate) fn populate_and_retrieve_duplicate_keys_from_startup( + &self, + ) -> Vec> { (0..self.bins()) + .into_par_iter() .map(|pubkey_bin| { let r_account_maps = &self.account_maps[pubkey_bin]; - r_account_maps.retrieve_duplicate_keys_from_startup() + r_account_maps.populate_and_retrieve_duplicate_keys_from_startup() }) .collect() } @@ -2699,6 +2702,7 @@ pub mod tests { index.set_startup(Startup::Normal); } assert!(gc.is_empty()); + index.populate_and_retrieve_duplicate_keys_from_startup(); for lock in &[false, true] { let read_lock = if *lock { diff --git a/runtime/src/in_mem_accounts_index.rs b/runtime/src/in_mem_accounts_index.rs index 1641d278f20168..7ab9847856c381 100644 --- a/runtime/src/in_mem_accounts_index.rs +++ b/runtime/src/in_mem_accounts_index.rs @@ -13,7 +13,7 @@ use { solana_measure::measure::Measure, solana_sdk::{clock::Slot, pubkey::Pubkey}, std::{ - collections::{hash_map::Entry, HashMap}, + collections::{hash_map::Entry, HashMap, HashSet}, fmt::Debug, ops::{Bound, RangeBounds, RangeInclusive}, sync::{ @@ -128,8 +128,11 @@ pub enum InsertNewEntryResults { struct StartupInfo { /// entries to add next time we are flushing to disk insert: Vec<(Slot, Pubkey, T)>, - /// pubkeys that were found to have duplicate index entries - duplicates: Vec<(Slot, Pubkey)>, + /// entries that were found to have duplicate index entries. + /// When all entries have been inserted, these can be resolved and held in memory. + duplicates: Vec<(Slot, Pubkey, T)>, + /// pubkeys that were already added to disk and later found to be duplicates, + duplicates_put_on_disk: HashSet<(Slot, Pubkey)>, } #[derive(Default, Debug)] @@ -1040,7 +1043,8 @@ impl + Into> InMemAccountsIndex + Into> InMemAccountsIndex { - // merge this in, mark as duplicate - duplicates.push((slot, k)); - if current_slot_list.len() == 1 { + Some((current_slot_list, ref_count)) => { + // already on disk, so remember the new (slot, info) for later + startup_info.duplicates.push((slot, k, entry.1)); + if let Some((slot, _)) = current_slot_list.first() { // accurately account for there being a duplicate for the first entry that was previously added to the disk index. // That entry could not have known yet that it was a duplicate. // It is important to capture each slot with a duplicate because of slot limits applied to clean. - let first_entry_slot = current_slot_list[0].0; - duplicates.push((first_entry_slot, k)); + startup_info.duplicates_put_on_disk.insert((*slot, k)); } - let mut slot_list = Vec::with_capacity(current_slot_list.len() + 1); - slot_list.extend_from_slice(current_slot_list); - slot_list.push((entry.0, entry.1.into())); // will never be from the same slot that already exists in the list - ref_count += new_ref_count; - Some((slot_list, ref_count)) + Some((current_slot_list.to_vec(), ref_count)) } None => { count += 1; @@ -1075,22 +1074,28 @@ impl + Into> InMemAccountsIndex 1 /// These were collected for this bin when we did batch inserts in the bg flush threads. - pub fn retrieve_duplicate_keys_from_startup(&self) -> Vec<(Slot, Pubkey)> { + /// Insert these into the in-mem index, then return the duplicate (Slot, Pubkey) + pub(crate) fn populate_and_retrieve_duplicate_keys_from_startup(&self) -> Vec<(Slot, Pubkey)> { let mut write = self.startup_info.lock().unwrap(); // in order to return accurate and complete duplicates, we must have nothing left remaining to insert assert!(write.insert.is_empty()); - std::mem::take(&mut write.duplicates) + let duplicates = std::mem::take(&mut write.duplicates); + let duplicates_put_on_disk = std::mem::take(&mut write.duplicates_put_on_disk); + drop(write); + duplicates_put_on_disk + .into_iter() + .chain(duplicates.into_iter().map(|(slot, key, info)| { + let entry = PreAllocatedAccountMapEntry::new(slot, info, &self.storage, true); + self.insert_new_entry_if_missing_with_lock(key, entry); + (slot, key) + })) + .collect() } /// synchronize the in-mem index with the disk index