diff --git a/runtime/src/accounts_index_storage.rs b/runtime/src/accounts_index_storage.rs index 96700bd535a29f..4799b357514ea4 100644 --- a/runtime/src/accounts_index_storage.rs +++ b/runtime/src/accounts_index_storage.rs @@ -1,27 +1,25 @@ use crate::accounts_index::{AccountsIndexConfig, IndexValue}; use crate::bucket_map_holder::BucketMapHolder; use crate::in_mem_accounts_index::InMemAccountsIndex; +use crate::waitable_condvar::WaitableCondvar; use std::fmt::Debug; use std::{ sync::{ atomic::{AtomicBool, Ordering}, - Arc, + Arc, Mutex, }, thread::{Builder, JoinHandle}, }; -// eventually hold the bucket map -// Also manages the lifetime of the background processing threads. -// When this instance is dropped, it will drop the bucket map and cleanup -// and it will stop all the background threads and join them. +/// Manages the lifetime of the background processing threads. pub struct AccountsIndexStorage { - // for managing the bg threads - exit: Arc, - handles: Option>>, + _bg_threads: BgThreads, - // eventually the backing storage pub storage: Arc>, pub in_mem: Vec>>, + + /// set_startup(true) creates bg threads which are kept alive until set_startup(false) + startup_worker_threads: Mutex>, } impl Debug for AccountsIndexStorage { @@ -30,10 +28,17 @@ impl Debug for AccountsIndexStorage { } } -impl Drop for AccountsIndexStorage { +/// low-level managing the bg threads +struct BgThreads { + exit: Arc, + handles: Option>>, + wait: Arc, +} + +impl Drop for BgThreads { fn drop(&mut self) { self.exit.store(true, Ordering::Relaxed); - self.storage.wait_dirty_or_aged.notify_all(); + self.wait.notify_all(); if let Some(handles) = self.handles.take() { handles .into_iter() @@ -42,34 +47,23 @@ impl Drop for AccountsIndexStorage { } } -impl AccountsIndexStorage { - pub fn add_worker_threads(existing: &Self, threads: usize) -> Self { - Self::allocate( - Arc::clone(&existing.storage), - existing.in_mem.clone(), - threads, - ) - } - - pub fn set_startup(&self, value: bool) { - self.storage.set_startup(self, value); - } - - fn allocate( - storage: Arc>, - in_mem: Vec>>, +impl BgThreads { + fn new( + storage: &Arc>, + in_mem: &[Arc>], threads: usize, ) -> Self { + // stop signal used for THIS batch of bg threads let exit = Arc::new(AtomicBool::default()); let handles = Some( (0..threads) .into_iter() .map(|_| { - let storage_ = Arc::clone(&storage); + let storage_ = Arc::clone(storage); let exit_ = Arc::clone(&exit); - let in_mem_ = in_mem.clone(); + let in_mem_ = in_mem.to_vec(); - // note that rayon use here causes us to exhaust # rayon threads and many tests running in parallel deadlock + // note that using rayon here causes us to exhaust # rayon threads and many tests running in parallel deadlock Builder::new() .name("solana-idx-flusher".to_string()) .spawn(move || { @@ -80,28 +74,58 @@ impl AccountsIndexStorage { .collect(), ); - Self { + BgThreads { exit, handles, - storage, - in_mem, + wait: Arc::clone(&storage.wait_dirty_or_aged), + } + } +} + +impl AccountsIndexStorage { + /// startup=true causes: + /// in mem to act in a way that flushes to disk asap + /// also creates some additional bg threads to facilitate flushing to disk asap + /// startup=false is 'normal' operation + pub fn set_startup(&self, value: bool) { + if value { + // create some additional bg threads to help get things to the disk index asap + *self.startup_worker_threads.lock().unwrap() = Some(BgThreads::new( + &self.storage, + &self.in_mem, + Self::num_threads(), + )); } + self.storage.set_startup(value); + if !value { + // shutdown the bg threads + *self.startup_worker_threads.lock().unwrap() = None; + } + } + + fn num_threads() -> usize { + std::cmp::max(2, num_cpus::get() / 4) } + /// allocate BucketMapHolder and InMemAccountsIndex[] pub fn new(bins: usize, config: &Option) -> Self { - let num_threads = std::cmp::max(2, num_cpus::get() / 4); let threads = config .as_ref() .and_then(|config| config.flush_threads) - .unwrap_or(num_threads); + .unwrap_or_else(Self::num_threads); - let storage = Arc::new(BucketMapHolder::new(bins, config, threads)); + let storage = Arc::new(BucketMapHolder::new(bins, config)); let in_mem = (0..bins) .into_iter() .map(|bin| Arc::new(InMemAccountsIndex::new(&storage, bin))) .collect::>(); - Self::allocate(storage, in_mem, threads) + Self { + _bg_threads: BgThreads::new(&storage, &in_mem, threads), + storage, + in_mem, + startup_worker_threads: Mutex::default(), + } } } diff --git a/runtime/src/bucket_map_holder.rs b/runtime/src/bucket_map_holder.rs index 34ccbfaa25ff97..c954f1540d04c8 100644 --- a/runtime/src/bucket_map_holder.rs +++ b/runtime/src/bucket_map_holder.rs @@ -1,5 +1,4 @@ use crate::accounts_index::{AccountsIndexConfig, IndexValue}; -use crate::accounts_index_storage::AccountsIndexStorage; use crate::bucket_map_holder_stats::BucketMapHolderStats; use crate::in_mem_accounts_index::{InMemAccountsIndex, SlotT}; use crate::waitable_condvar::WaitableCondvar; @@ -25,12 +24,10 @@ pub struct BucketMapHolder { age_timer: AtomicInterval, // used by bg processing to know when any bucket has become dirty - pub wait_dirty_or_aged: WaitableCondvar, + pub wait_dirty_or_aged: Arc, next_bucket_to_flush: Mutex, bins: usize, - _threads: usize, - // how much mb are we allowed to keep in the in-mem index? // Rest goes to disk. pub mem_budget_mb: Option, @@ -41,8 +38,6 @@ pub struct BucketMapHolder { /// and writing to disk in parallel are. /// Note startup is an optimization and is not required for correctness. startup: AtomicBool, - - startup_worker_threads: Mutex>>, } impl Debug for BucketMapHolder { @@ -78,26 +73,25 @@ impl BucketMapHolder { self.startup.load(Ordering::Relaxed) } - pub fn set_startup(&self, storage: &AccountsIndexStorage, value: bool) { - if value { - let num_threads = std::cmp::max(2, num_cpus::get() / 4); - *self.startup_worker_threads.lock().unwrap() = Some( - AccountsIndexStorage::add_worker_threads(storage, num_threads), - ); - } else { + /// startup=true causes: + /// in mem to act in a way that flushes to disk asap + /// startup=false is 'normal' operation + pub fn set_startup(&self, value: bool) { + if !value { self.wait_for_idle(); - *self.startup_worker_threads.lock().unwrap() = None; } self.startup.store(value, Ordering::Relaxed) } + /// return when the bg threads have reached an 'idle' state pub(crate) fn wait_for_idle(&self) { assert!(self.get_startup()); if self.disk.is_none() { return; } - // when age has incremented twice, we know that we have made it through scanning all bins, so we are 'idle' + // when age has incremented twice, we know that we have made it through scanning all bins since we started waiting, + // so we are then 'idle' let end_age = self.current_age().wrapping_add(2); loop { self.wait_dirty_or_aged @@ -117,7 +111,7 @@ impl BucketMapHolder { self.maybe_advance_age(); } - // have all buckets been flushed at the current age? + /// have all buckets been flushed at the current age? pub fn all_buckets_flushed_at_current_age(&self) -> bool { self.count_ages_flushed() >= self.bins } @@ -136,7 +130,7 @@ impl BucketMapHolder { } } - pub fn new(bins: usize, config: &Option, threads: usize) -> Self { + pub fn new(bins: usize, config: &Option) -> Self { const DEFAULT_AGE_TO_STAY_IN_CACHE: Age = 5; let ages_to_stay_in_cache = config .as_ref() @@ -154,14 +148,12 @@ impl BucketMapHolder { count_ages_flushed: AtomicUsize::default(), age: AtomicU8::default(), stats: BucketMapHolderStats::new(bins), - wait_dirty_or_aged: WaitableCondvar::default(), + wait_dirty_or_aged: Arc::default(), next_bucket_to_flush: Mutex::new(0), age_timer: AtomicInterval::default(), bins, startup: AtomicBool::default(), mem_budget_mb, - startup_worker_threads: Mutex::default(), - _threads: threads, } } @@ -301,7 +293,7 @@ pub mod tests { fn test_next_bucket_to_flush() { solana_logger::setup(); let bins = 4; - let test = BucketMapHolder::::new(bins, &Some(AccountsIndexConfig::default()), 1); + let test = BucketMapHolder::::new(bins, &Some(AccountsIndexConfig::default())); let visited = (0..bins) .into_iter() .map(|_| AtomicUsize::default()) @@ -325,7 +317,7 @@ pub mod tests { fn test_age_increment() { solana_logger::setup(); let bins = 4; - let test = BucketMapHolder::::new(bins, &Some(AccountsIndexConfig::default()), 1); + let test = BucketMapHolder::::new(bins, &Some(AccountsIndexConfig::default())); for age in 0..513 { assert_eq!(test.current_age(), (age % 256) as Age); @@ -345,7 +337,7 @@ pub mod tests { fn test_throttle() { solana_logger::setup(); let bins = 100; - let test = BucketMapHolder::::new(bins, &Some(AccountsIndexConfig::default()), 1); + let test = BucketMapHolder::::new(bins, &Some(AccountsIndexConfig::default())); let bins = test.bins as u64; let interval_ms = test.age_interval_ms(); // 90% of time elapsed, all but 1 bins flushed, should not wait since we'll end up right on time @@ -374,7 +366,7 @@ pub mod tests { fn test_age_time() { solana_logger::setup(); let bins = 1; - let test = BucketMapHolder::::new(bins, &Some(AccountsIndexConfig::default()), 1); + let test = BucketMapHolder::::new(bins, &Some(AccountsIndexConfig::default())); let threads = 2; let time = AGE_MS * 5 / 2; let expected = (time / AGE_MS) as Age; @@ -394,7 +386,7 @@ pub mod tests { fn test_age_broad() { solana_logger::setup(); let bins = 4; - let test = BucketMapHolder::::new(bins, &Some(AccountsIndexConfig::default()), 1); + let test = BucketMapHolder::::new(bins, &Some(AccountsIndexConfig::default())); assert_eq!(test.current_age(), 0); for _ in 0..bins { assert!(!test.all_buckets_flushed_at_current_age()); diff --git a/runtime/src/in_mem_accounts_index.rs b/runtime/src/in_mem_accounts_index.rs index 7f03a14e39d407..95a388ecb7df79 100644 --- a/runtime/src/in_mem_accounts_index.rs +++ b/runtime/src/in_mem_accounts_index.rs @@ -871,7 +871,6 @@ mod tests { let holder = Arc::new(BucketMapHolder::new( BINS_FOR_TESTING, &Some(AccountsIndexConfig::default()), - 1, )); let bin = 0; InMemAccountsIndex::new(&holder, bin)