Skip to content

Commit

Permalink
reduce contention on startup index generation (#31006)
Browse files Browse the repository at this point in the history
  • Loading branch information
jeffwashington authored Mar 31, 2023
1 parent fc2bcdf commit 9600643
Showing 1 changed file with 24 additions and 15 deletions.
39 changes: 24 additions & 15 deletions runtime/src/in_mem_accounts_index.rs
Original file line number Diff line number Diff line change
Expand Up @@ -103,7 +103,7 @@ pub struct InMemAccountsIndex<T: IndexValue, U: DiskIndexValue + From<T> + Into<
flushing_active: AtomicBool,

/// info to streamline initial index generation
startup_info: Mutex<StartupInfo<T>>,
startup_info: StartupInfo<T>,

/// possible evictions for next few slots coming up
possible_evictions: RwLock<PossibleEvictions<T>>,
Expand Down Expand Up @@ -131,16 +131,22 @@ pub enum InsertNewEntryResults {
}

#[derive(Default, Debug)]
struct StartupInfo<T: IndexValue> {
/// entries to add next time we are flushing to disk
insert: Vec<(Slot, Pubkey, T)>,
struct StartupInfoDuplicates<T: IndexValue> {
/// entries that were found to have duplicate index entries.
/// When all entries have been inserted, these can be resolved and held in memory.
duplicates: Vec<(Slot, Pubkey, T)>,
/// pubkeys that were already added to disk and later found to be duplicates,
duplicates_put_on_disk: HashSet<(Slot, Pubkey)>,
}

#[derive(Default, Debug)]
struct StartupInfo<T: IndexValue> {
/// entries to add next time we are flushing to disk
insert: Mutex<Vec<(Slot, Pubkey, T)>>,
/// pubkeys with more than 1 entry
duplicates: Mutex<StartupInfoDuplicates<T>>,
}

#[derive(Default, Debug)]
/// result from scanning in-mem index during flush
struct FlushScanResult<T> {
Expand Down Expand Up @@ -168,7 +174,7 @@ impl<T: IndexValue, U: DiskIndexValue + From<T> + Into<T>> InMemAccountsIndex<T,
flushing_active: AtomicBool::default(),
// initialize this to max, to make it clear we have not flushed at age 0, the starting age
last_age_flushed: AtomicU8::new(Age::MAX),
startup_info: Mutex::default(),
startup_info: StartupInfo::default(),
possible_evictions: RwLock::new(PossibleEvictions::new(1)),
// Spread out the scanning across all ages within the window.
// This causes us to scan 1/N of the bins each 'Age'
Expand Down Expand Up @@ -663,7 +669,7 @@ impl<T: IndexValue, U: DiskIndexValue + From<T> + Into<T>> InMemAccountsIndex<T,
assert!(self.storage.get_startup());
assert!(self.bucket.is_some());

let insert = &mut self.startup_info.lock().unwrap().insert;
let mut insert = self.startup_info.insert.lock().unwrap();
items
.into_iter()
.for_each(|(k, v)| insert.push((slot, k, v)));
Expand Down Expand Up @@ -1033,7 +1039,7 @@ impl<T: IndexValue, U: DiskIndexValue + From<T> + Into<T>> InMemAccountsIndex<T,
}

fn write_startup_info_to_disk(&self) {
let insert = std::mem::take(&mut self.startup_info.lock().unwrap().insert);
let insert = std::mem::take(&mut *self.startup_info.insert.lock().unwrap());
if insert.is_empty() {
// nothing to insert for this bin
return;
Expand All @@ -1050,7 +1056,7 @@ impl<T: IndexValue, U: DiskIndexValue + From<T> + Into<T>> InMemAccountsIndex<T,
drop(map_internal);

// this fn should only be called from a single thread, so holding the lock is fine
let mut startup_info = self.startup_info.lock().unwrap();
let mut duplicates = self.startup_info.duplicates.lock().unwrap();

// merge all items into the disk index now
let disk = self.bucket.as_ref().unwrap();
Expand All @@ -1062,12 +1068,12 @@ impl<T: IndexValue, U: DiskIndexValue + From<T> + Into<T>> InMemAccountsIndex<T,
match current {
Some((current_slot_list, ref_count)) => {
// already on disk, so remember the new (slot, info) for later
startup_info.duplicates.push((slot, k, entry.1));
duplicates.duplicates.push((slot, k, entry.1));
if let Some((slot, _)) = current_slot_list.first() {
// accurately account for there being a duplicate for the first entry that was previously added to the disk index.
// That entry could not have known yet that it was a duplicate.
// It is important to capture each slot with a duplicate because of slot limits applied to clean.
startup_info.duplicates_put_on_disk.insert((*slot, k));
duplicates.duplicates_put_on_disk.insert((*slot, k));
}
Some((current_slot_list.to_vec(), ref_count))
}
Expand All @@ -1087,13 +1093,16 @@ impl<T: IndexValue, U: DiskIndexValue + From<T> + Into<T>> InMemAccountsIndex<T,
/// These were collected for this bin when we did batch inserts in the bg flush threads.
/// Insert these into the in-mem index, then return the duplicate (Slot, Pubkey)
pub(crate) fn populate_and_retrieve_duplicate_keys_from_startup(&self) -> Vec<(Slot, Pubkey)> {
let mut write = self.startup_info.lock().unwrap();
let inserts = self.startup_info.insert.lock().unwrap();
// in order to return accurate and complete duplicates, we must have nothing left remaining to insert
assert!(write.insert.is_empty());
assert!(inserts.is_empty());
drop(inserts);

let mut duplicate_items = self.startup_info.duplicates.lock().unwrap();
let duplicates = std::mem::take(&mut duplicate_items.duplicates);
let duplicates_put_on_disk = std::mem::take(&mut duplicate_items.duplicates_put_on_disk);
drop(duplicate_items);

let duplicates = std::mem::take(&mut write.duplicates);
let duplicates_put_on_disk = std::mem::take(&mut write.duplicates_put_on_disk);
drop(write);
duplicates_put_on_disk
.into_iter()
.chain(duplicates.into_iter().map(|(slot, key, info)| {
Expand Down

0 comments on commit 9600643

Please sign in to comment.