Skip to content

Commit

Permalink
don't start extra threads for shrink/clean/hash (#23858)
Browse files Browse the repository at this point in the history
  • Loading branch information
jeffwashington authored Mar 23, 2022
1 parent 911aa5b commit 7b89222
Show file tree
Hide file tree
Showing 4 changed files with 29 additions and 10 deletions.
11 changes: 7 additions & 4 deletions runtime/src/accounts_db.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ use {
ScanResult, SlotList, SlotSlice, ZeroLamport, ACCOUNTS_INDEX_CONFIG_FOR_BENCHMARKS,
ACCOUNTS_INDEX_CONFIG_FOR_TESTING,
},
accounts_index_storage::Startup,
accounts_update_notifier_interface::AccountsUpdateNotifier,
active_stats::{ActiveStatItem, ActiveStats},
ancestors::Ancestors,
Expand Down Expand Up @@ -6856,7 +6857,8 @@ impl AccountsDb {
let account = AccountSharedData::new(lamports, space, &owner);
let added = AtomicUsize::default();
for pass in 0..=passes {
self.accounts_index.set_startup(true);
self.accounts_index
.set_startup(Startup::StartupWithExtraThreads);
let roots_in_this_pass = roots
.iter()
.skip(pass * per_pass)
Expand Down Expand Up @@ -6907,7 +6909,7 @@ impl AccountsDb {
self.maybe_throttle_index_generation();
self.store_accounts_frozen((*slot, &add[..]), Some(&hashes[..]), None, None);
});
self.accounts_index.set_startup(false);
self.accounts_index.set_startup(Startup::Normal);
}
info!("added {} filler accounts", added.load(Ordering::Relaxed));
}
Expand Down Expand Up @@ -6941,7 +6943,8 @@ impl AccountsDb {
let passes = if verify { 2 } else { 1 };
for pass in 0..passes {
if pass == 0 {
self.accounts_index.set_startup(true);
self.accounts_index
.set_startup(Startup::StartupWithExtraThreads);
}
let storage_info = StorageSizeAndCountMap::default();
let total_processed_slots_across_all_threads = AtomicU64::new(0);
Expand Down Expand Up @@ -7079,7 +7082,7 @@ impl AccountsDb {
if pass == 0 {
// tell accounts index we are done adding the initial accounts at startup
let mut m = Measure::start("accounts_index_idle_us");
self.accounts_index.set_startup(false);
self.accounts_index.set_startup(Startup::Normal);
m.stop();
index_flush_us = m.as_us();
}
Expand Down
4 changes: 2 additions & 2 deletions runtime/src/accounts_index.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
use {
crate::{
accounts_index_storage::AccountsIndexStorage,
accounts_index_storage::{AccountsIndexStorage, Startup},
ancestors::Ancestors,
bucket_map_holder::{Age, BucketMapHolder},
contains::Contains,
Expand Down Expand Up @@ -1505,7 +1505,7 @@ impl<T: IndexValue> AccountsIndex<T> {
iter.hold_range_in_memory(range, start_holding, thread_pool);
}

pub fn set_startup(&self, value: bool) {
pub fn set_startup(&self, value: Startup) {
self.storage.set_startup(value);
}

Expand Down
19 changes: 17 additions & 2 deletions runtime/src/accounts_index_storage.rs
Original file line number Diff line number Diff line change
Expand Up @@ -86,13 +86,28 @@ impl BgThreads {
}
}

/// modes the system can be in
pub enum Startup {
/// not startup, but steady state execution
Normal,
/// startup (not steady state execution)
/// requesting 'startup'-like behavior where in-mem acct idx items are flushed asap
Startup,
/// startup (not steady state execution)
/// but also requesting additional threads to be running to flush the acct idx to disk asap
/// The idea is that the best perf to ssds will be with multiple threads,
/// but during steady state, we can't allocate as many threads because we'd starve the rest of the system.
StartupWithExtraThreads,
}

impl<T: IndexValue> AccountsIndexStorage<T> {
/// startup=true causes:
/// in mem to act in a way that flushes to disk asap
/// also creates some additional bg threads to facilitate flushing to disk asap
/// startup=false is 'normal' operation
pub fn set_startup(&self, value: bool) {
if value {
pub fn set_startup(&self, startup: Startup) {
let value = !matches!(startup, Startup::Normal);
if matches!(startup, Startup::StartupWithExtraThreads) {
// create some additional bg threads to help get things to the disk index asap
*self.startup_worker_threads.lock().unwrap() = Some(BgThreads::new(
&self.storage,
Expand Down
5 changes: 3 additions & 2 deletions runtime/src/bank.rs
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ use {
ACCOUNTS_DB_CONFIG_FOR_BENCHMARKS, ACCOUNTS_DB_CONFIG_FOR_TESTING,
},
accounts_index::{AccountSecondaryIndexes, IndexKey, ScanConfig, ScanResult},
accounts_index_storage::Startup,
accounts_update_notifier_interface::AccountsUpdateNotifier,
ancestors::{Ancestors, AncestorsForSerialization},
blockhash_queue::BlockhashQueue,
Expand Down Expand Up @@ -6039,7 +6040,7 @@ impl Bank {
.accounts
.accounts_db
.accounts_index
.set_startup(true);
.set_startup(Startup::Startup);
let mut shrink_all_slots_time = Measure::start("shrink_all_slots");
if !accounts_db_skip_shrink && self.slot() > 0 {
info!("shrinking..");
Expand All @@ -6055,7 +6056,7 @@ impl Bank {
.accounts
.accounts_db
.accounts_index
.set_startup(false);
.set_startup(Startup::Normal);

info!("verify_hash..");
let mut verify2_time = Measure::start("verify_hash");
Expand Down

0 comments on commit 7b89222

Please sign in to comment.