Skip to content

Commit

Permalink
at startup, keep duplicates in in-memory index since they will be cle…
Browse files Browse the repository at this point in the history
…aned shortly (#30736)

at startup, keep duplicates in in-memory index since they will be cleaned soon
  • Loading branch information
jeffwashington authored Mar 22, 2023
1 parent 81ef2a0 commit 9a1d5ea
Show file tree
Hide file tree
Showing 3 changed files with 34 additions and 25 deletions.
2 changes: 1 addition & 1 deletion runtime/src/accounts_db.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9115,7 +9115,7 @@ impl AccountsDb {
// get duplicate keys from acct idx. We have to wait until we've finished flushing.
for (slot, key) in self
.accounts_index
.retrieve_duplicate_keys_from_startup()
.populate_and_retrieve_duplicate_keys_from_startup()
.into_iter()
.flatten()
{
Expand Down
8 changes: 6 additions & 2 deletions runtime/src/accounts_index.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1685,11 +1685,14 @@ impl<T: IndexValue, U: DiskIndexValue + From<T> + Into<T>> AccountsIndex<T, U> {
}

/// return Vec<Vec<>> because the internal vecs are already allocated per bin
pub fn retrieve_duplicate_keys_from_startup(&self) -> Vec<Vec<(Slot, Pubkey)>> {
pub(crate) fn populate_and_retrieve_duplicate_keys_from_startup(
&self,
) -> Vec<Vec<(Slot, Pubkey)>> {
(0..self.bins())
.into_par_iter()
.map(|pubkey_bin| {
let r_account_maps = &self.account_maps[pubkey_bin];
r_account_maps.retrieve_duplicate_keys_from_startup()
r_account_maps.populate_and_retrieve_duplicate_keys_from_startup()
})
.collect()
}
Expand Down Expand Up @@ -2699,6 +2702,7 @@ pub mod tests {
index.set_startup(Startup::Normal);
}
assert!(gc.is_empty());
index.populate_and_retrieve_duplicate_keys_from_startup();

for lock in &[false, true] {
let read_lock = if *lock {
Expand Down
49 changes: 27 additions & 22 deletions runtime/src/in_mem_accounts_index.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ use {
solana_measure::measure::Measure,
solana_sdk::{clock::Slot, pubkey::Pubkey},
std::{
collections::{hash_map::Entry, HashMap},
collections::{hash_map::Entry, HashMap, HashSet},
fmt::Debug,
ops::{Bound, RangeBounds, RangeInclusive},
sync::{
Expand Down Expand Up @@ -128,8 +128,11 @@ pub enum InsertNewEntryResults {
struct StartupInfo<T: IndexValue> {
/// entries to add next time we are flushing to disk
insert: Vec<(Slot, Pubkey, T)>,
/// pubkeys that were found to have duplicate index entries
duplicates: Vec<(Slot, Pubkey)>,
/// entries that were found to have duplicate index entries.
/// When all entries have been inserted, these can be resolved and held in memory.
duplicates: Vec<(Slot, Pubkey, T)>,
/// pubkeys that were already added to disk and later found to be duplicates,
duplicates_put_on_disk: HashSet<(Slot, Pubkey)>,
}

#[derive(Default, Debug)]
Expand Down Expand Up @@ -1040,7 +1043,8 @@ impl<T: IndexValue, U: DiskIndexValue + From<T> + Into<T>> InMemAccountsIndex<T,
);
drop(map_internal);

let mut duplicates = vec![];
// this fn should only be called from a single thread, so holding the lock is fine
let mut startup_info = self.startup_info.lock().unwrap();

// merge all items into the disk index now
let disk = self.bucket.as_ref().unwrap();
Expand All @@ -1050,21 +1054,16 @@ impl<T: IndexValue, U: DiskIndexValue + From<T> + Into<T>> InMemAccountsIndex<T,
let new_ref_count = u64::from(!v.is_cached());
disk.update(&k, |current| {
match current {
Some((current_slot_list, mut ref_count)) => {
// merge this in, mark as duplicate
duplicates.push((slot, k));
if current_slot_list.len() == 1 {
Some((current_slot_list, ref_count)) => {
// already on disk, so remember the new (slot, info) for later
startup_info.duplicates.push((slot, k, entry.1));
if let Some((slot, _)) = current_slot_list.first() {
// accurately account for there being a duplicate for the first entry that was previously added to the disk index.
// That entry could not have known yet that it was a duplicate.
// It is important to capture each slot with a duplicate because of slot limits applied to clean.
let first_entry_slot = current_slot_list[0].0;
duplicates.push((first_entry_slot, k));
startup_info.duplicates_put_on_disk.insert((*slot, k));
}
let mut slot_list = Vec::with_capacity(current_slot_list.len() + 1);
slot_list.extend_from_slice(current_slot_list);
slot_list.push((entry.0, entry.1.into())); // will never be from the same slot that already exists in the list
ref_count += new_ref_count;
Some((slot_list, ref_count))
Some((current_slot_list.to_vec(), ref_count))
}
None => {
count += 1;
Expand All @@ -1075,22 +1074,28 @@ impl<T: IndexValue, U: DiskIndexValue + From<T> + Into<T>> InMemAccountsIndex<T,
});
});
self.stats().inc_insert_count(count);
self.startup_info
.lock()
.unwrap()
.duplicates
.append(&mut duplicates);
}

/// pull out all duplicate pubkeys from 'startup_info'
/// duplicate pubkeys have a slot list with len > 1
/// These were collected for this bin when we did batch inserts in the bg flush threads.
pub fn retrieve_duplicate_keys_from_startup(&self) -> Vec<(Slot, Pubkey)> {
/// Insert these into the in-mem index, then return the duplicate (Slot, Pubkey)
pub(crate) fn populate_and_retrieve_duplicate_keys_from_startup(&self) -> Vec<(Slot, Pubkey)> {
let mut write = self.startup_info.lock().unwrap();
// in order to return accurate and complete duplicates, we must have nothing left remaining to insert
assert!(write.insert.is_empty());

std::mem::take(&mut write.duplicates)
let duplicates = std::mem::take(&mut write.duplicates);
let duplicates_put_on_disk = std::mem::take(&mut write.duplicates_put_on_disk);
drop(write);
duplicates_put_on_disk
.into_iter()
.chain(duplicates.into_iter().map(|(slot, key, info)| {
let entry = PreAllocatedAccountMapEntry::new(slot, info, &self.storage, true);
self.insert_new_entry_if_missing_with_lock(key, entry);
(slot, key)
}))
.collect()
}

/// synchronize the in-mem index with the disk index
Expand Down

0 comments on commit 9a1d5ea

Please sign in to comment.