From ff3025bb8cafac66c9e486624ab16b3242d9f065 Mon Sep 17 00:00:00 2001 From: jeff washington Date: Fri, 24 Feb 2023 17:45:11 -0600 Subject: [PATCH 1/4] add index_entries_being_shrunk to ShrinkCollect --- runtime/src/accounts_db.rs | 35 ++++++++++++++++++++++++++++------- runtime/src/accounts_index.rs | 17 ++++++++++++++--- 2 files changed, 42 insertions(+), 10 deletions(-) diff --git a/runtime/src/accounts_db.rs b/runtime/src/accounts_db.rs index 5baecc2235c2e6..dbcd3b1d17ede5 100644 --- a/runtime/src/accounts_db.rs +++ b/runtime/src/accounts_db.rs @@ -37,10 +37,11 @@ use { ZeroLamportAccounts, }, accounts_index::{ - AccountIndexGetResult, AccountSecondaryIndexes, AccountsIndex, AccountsIndexConfig, - AccountsIndexRootsStats, AccountsIndexScanResult, DiskIndexValue, IndexKey, IndexValue, - IsCached, RefCount, ScanConfig, ScanResult, SlotList, UpsertReclaim, ZeroLamport, - ACCOUNTS_INDEX_CONFIG_FOR_BENCHMARKS, ACCOUNTS_INDEX_CONFIG_FOR_TESTING, + AccountIndexGetResult, AccountMapEntry, AccountSecondaryIndexes, AccountsIndex, + AccountsIndexConfig, AccountsIndexRootsStats, AccountsIndexScanResult, DiskIndexValue, + IndexKey, IndexValue, IsCached, RefCount, ScanConfig, ScanResult, SlotList, + UpsertReclaim, ZeroLamport, ACCOUNTS_INDEX_CONFIG_FOR_BENCHMARKS, + ACCOUNTS_INDEX_CONFIG_FOR_TESTING, }, accounts_index_storage::Startup, accounts_update_notifier_interface::AccountsUpdateNotifier, @@ -465,6 +466,9 @@ pub(crate) struct ShrinkCollect<'a, T: ShrinkCollectRefs<'a>> { pub(crate) total_starting_accounts: usize, /// true if all alive accounts are zero lamports pub(crate) all_are_zero_lamports: bool, + /// index entries that need to be held in memory while shrink is in progress + /// These aren't read - they are just held so that entries cannot be flushed. + pub(crate) _index_entries_being_shrunk: Vec>, } pub const ACCOUNTS_DB_CONFIG_FOR_TESTING: AccountsDbConfig = AccountsDbConfig { @@ -499,6 +503,8 @@ struct LoadAccountsIndexForShrink<'a, T: ShrinkCollectRefs<'a>> { unrefed_pubkeys: Vec<&'a Pubkey>, /// true if all alive accounts are zero lamport accounts all_are_zero_lamports: bool, + /// index entries we need to hold onto to keep them from getting flushed + index_entries_being_shrunk: Vec>, } pub struct GetUniqueAccountsResult<'a> { @@ -3211,7 +3217,7 @@ impl AccountsDb { let mut useful = 0; self.accounts_index.scan( pubkeys.iter(), - |pubkey, slots_refs| { + |pubkey, slots_refs, _entry| { let mut useless = true; if let Some((slot_list, ref_count)) = slots_refs { let index_in_slot_list = self.accounts_index.latest_slot( @@ -3277,6 +3283,7 @@ impl AccountsDb { } }, None, + false, ); found_not_zero_accum.fetch_add(found_not_zero, Ordering::Relaxed); not_found_on_fork_accum.fetch_add(not_found_on_fork, Ordering::Relaxed); @@ -3763,9 +3770,10 @@ impl AccountsDb { let mut dead = 0; let mut index = 0; let mut all_are_zero_lamports = true; + let mut index_entries_being_shrunk = Vec::with_capacity(accounts.len()); self.accounts_index.scan( accounts.iter().map(|account| account.pubkey()), - |pubkey, slots_refs| { + |pubkey, slots_refs, entry| { let mut result = AccountsIndexScanResult::None; if let Some((slot_list, ref_count)) = slots_refs { let stored_account = &accounts[index]; @@ -3782,6 +3790,9 @@ impl AccountsDb { result = AccountsIndexScanResult::Unref; dead += 1; } else { + // Hold onto the index entry arc so that it cannot be flushed. + // Since we are shrinking these entries, we need to disambiguate append_vec_ids during this period and those only exist in the in-memory accounts index. + index_entries_being_shrunk.push(Arc::clone(entry.unwrap())); all_are_zero_lamports &= stored_account.lamports() == 0; alive_accounts.add(ref_count, stored_account); alive += 1; @@ -3791,6 +3802,7 @@ impl AccountsDb { result }, None, + true, ); assert_eq!(index, std::cmp::min(accounts.len(), count)); stats.alive_accounts.fetch_add(alive, Ordering::Relaxed); @@ -3800,6 +3812,7 @@ impl AccountsDb { alive_accounts, unrefed_pubkeys, all_are_zero_lamports, + index_entries_being_shrunk, } } @@ -3862,6 +3875,7 @@ impl AccountsDb { .accounts_loaded .fetch_add(len as u64, Ordering::Relaxed); let all_are_zero_lamports_collect = Mutex::new(true); + let index_entries_being_shrunk_outer = Mutex::new(Vec::default()); self.thread_pool_clean.install(|| { stored_accounts .par_chunks(SHRINK_COLLECT_CHUNK_SIZE) @@ -3870,6 +3884,7 @@ impl AccountsDb { alive_accounts, mut unrefed_pubkeys, all_are_zero_lamports, + mut index_entries_being_shrunk, } = self.load_accounts_index_for_shrink(stored_accounts, stats, slot); // collect @@ -3881,6 +3896,10 @@ impl AccountsDb { .lock() .unwrap() .append(&mut unrefed_pubkeys); + index_entries_being_shrunk_outer + .lock() + .unwrap() + .append(&mut index_entries_being_shrunk); if !all_are_zero_lamports { *all_are_zero_lamports_collect.lock().unwrap() = false; } @@ -3919,6 +3938,7 @@ impl AccountsDb { alive_total_bytes, total_starting_accounts: len, all_are_zero_lamports: all_are_zero_lamports_collect.into_inner().unwrap(), + _index_entries_being_shrunk: index_entries_being_shrunk_outer.into_inner().unwrap(), } } @@ -8034,11 +8054,12 @@ impl AccountsDb { pubkeys_removed_from_accounts_index.contains(pubkey); !already_removed }), - |_pubkey, _slots_refs| { + |_pubkey, _slots_refs, _entry| { /* unused */ AccountsIndexScanResult::Unref }, Some(AccountsIndexScanResult::Unref), + false, ) }); }); diff --git a/runtime/src/accounts_index.rs b/runtime/src/accounts_index.rs index 0e150a38490e2f..6c6d6f3014d0e8 100644 --- a/runtime/src/accounts_index.rs +++ b/runtime/src/accounts_index.rs @@ -1356,11 +1356,13 @@ impl + Into> AccountsIndex { /// For each pubkey, find the slot list in the accounts index /// apply 'avoid_callback_result' if specified. /// otherwise, call `callback` + /// if 'provide_entry_in_callback' is true, populate callback with the Arc of the entry itself. pub(crate) fn scan<'a, F, I>( &self, pubkeys: I, mut callback: F, avoid_callback_result: Option, + provide_entry_in_callback: bool, ) where // params: // pubkey looked up @@ -1368,9 +1370,14 @@ impl + Into> AccountsIndex { // None if 'pubkey' is not in accounts index. // slot_list: comes from accounts index for 'pubkey' // ref_count: refcount of entry in index + // entry, if 'provide_entry_in_callback' is true // if 'avoid_callback_result' is Some(_), then callback is NOT called // and _ is returned as if callback were called. - F: FnMut(&'a Pubkey, Option<(&SlotList, RefCount)>) -> AccountsIndexScanResult, + F: FnMut( + &'a Pubkey, + Option<(&SlotList, RefCount)>, + Option<&AccountMapEntry>, + ) -> AccountsIndexScanResult, I: Iterator, { let mut lock = None; @@ -1390,7 +1397,11 @@ impl + Into> AccountsIndex { *result } else { let slot_list = &locked_entry.slot_list.read().unwrap(); - callback(pubkey, Some((slot_list, locked_entry.ref_count()))) + callback( + pubkey, + Some((slot_list, locked_entry.ref_count())), + provide_entry_in_callback.then_some(locked_entry), + ) }; cache = match result { AccountsIndexScanResult::Unref => { @@ -1404,7 +1415,7 @@ impl + Into> AccountsIndex { }; } None => { - avoid_callback_result.unwrap_or_else(|| callback(pubkey, None)); + avoid_callback_result.unwrap_or_else(|| callback(pubkey, None, None)); } } (cache, ()) From 0b6670f2fdb8f5233ca6277d88f4aba848f04693 Mon Sep 17 00:00:00 2001 From: jeff washington Date: Fri, 24 Feb 2023 14:07:26 -0600 Subject: [PATCH 2/4] make store_id an Option in mem acct idx --- runtime/src/account_info.rs | 37 +++++++++++++++++++--------- runtime/src/account_storage.rs | 42 ++++++++++++++++++------------- runtime/src/accounts_db.rs | 45 +++++++++++++++++++--------------- 3 files changed, 76 insertions(+), 48 deletions(-) diff --git a/runtime/src/account_info.rs b/runtime/src/account_info.rs index f993a001d25c6b..a7350162b0916a 100644 --- a/runtime/src/account_info.rs +++ b/runtime/src/account_info.rs @@ -21,7 +21,7 @@ pub type StoredSize = u32; /// specify where account data is located #[derive(Debug, PartialEq, Eq)] pub enum StorageLocation { - AppendVec(AppendVecId, Offset), + AppendVec(Option, Offset), Cached, } @@ -84,8 +84,9 @@ pub struct PackedOffsetAndFlags { #[derive(Default, Debug, PartialEq, Eq, Clone, Copy)] pub struct AccountInfo { - /// index identifying the append storage - store_id: AppendVecId, + /// index identifying the append vec storage + /// None means the default one associated with the slot + store_id: Option, account_offset_and_flags: AccountOffsetAndFlags, } @@ -97,6 +98,23 @@ pub struct AccountOffsetAndFlags { packed_offset_and_flags: PackedOffsetAndFlags, } +impl From for AccountInfo { + fn from(account_offset_and_flags: AccountOffsetAndFlags) -> Self { + Self { + store_id: None, + account_offset_and_flags, + } + } +} + +impl From for AccountOffsetAndFlags { + fn from(info: AccountInfo) -> Self { + Self { + packed_offset_and_flags: info.account_offset_and_flags.packed_offset_and_flags, + } + } +} + impl ZeroLamport for AccountInfo { fn is_zero_lamport(&self) -> bool { self.account_offset_and_flags @@ -120,9 +138,6 @@ impl IsCached for StorageLocation { } } -/// We have to have SOME value for store_id when we are cached -const CACHE_VIRTUAL_STORAGE_ID: AppendVecId = AppendVecId::MAX; - impl AccountInfo { pub fn new(storage_location: StorageLocation, lamports: u64) -> Self { let mut packed_offset_and_flags = PackedOffsetAndFlags::default(); @@ -143,7 +158,7 @@ impl AccountInfo { } StorageLocation::Cached => { packed_offset_and_flags.set_offset_reduced(CACHED_OFFSET); - CACHE_VIRTUAL_STORAGE_ID + None } }; packed_offset_and_flags.set_is_zero_lamport(lamports == 0); @@ -160,7 +175,7 @@ impl AccountInfo { (offset / ALIGN_BOUNDARY_OFFSET) as OffsetReduced } - pub fn store_id(&self) -> AppendVecId { + pub fn store_id(&self) -> Option { // if the account is in a cached store, the store_id is meaningless assert!(!self.is_cached()); self.store_id @@ -202,7 +217,7 @@ mod test { ALIGN_BOUNDARY_OFFSET, 4 * ALIGN_BOUNDARY_OFFSET, ] { - let info = AccountInfo::new(StorageLocation::AppendVec(0, offset), 0); + let info = AccountInfo::new(StorageLocation::AppendVec(None, offset), 0); assert!(info.offset() == offset); } } @@ -211,13 +226,13 @@ mod test { #[should_panic(expected = "illegal offset")] fn test_illegal_offset() { let offset = (MAXIMUM_APPEND_VEC_FILE_SIZE - (ALIGN_BOUNDARY_OFFSET as u64)) as Offset; - AccountInfo::new(StorageLocation::AppendVec(0, offset), 0); + AccountInfo::new(StorageLocation::AppendVec(None, offset), 0); } #[test] #[should_panic(expected = "illegal offset")] fn test_alignment() { let offset = 1; // not aligned - AccountInfo::new(StorageLocation::AppendVec(0, offset), 0); + AccountInfo::new(StorageLocation::AppendVec(None, offset), 0); } } diff --git a/runtime/src/account_storage.rs b/runtime/src/account_storage.rs index 4479b8d338509e..c8568e956028c9 100644 --- a/runtime/src/account_storage.rs +++ b/runtime/src/account_storage.rs @@ -50,23 +50,30 @@ impl AccountStorage { pub(crate) fn get_account_storage_entry( &self, slot: Slot, - store_id: AppendVecId, + store_id: Option, ) -> Option> { let lookup_in_map = || { - self.map - .get(&slot) - .and_then(|r| (r.id == store_id).then_some(Arc::clone(&r.storage))) + self.map.get(&slot).and_then(|r| { + Self::match_store_id(store_id, r.id).then_some(Arc::clone(&r.storage)) + }) }; lookup_in_map() .or_else(|| { self.shrink_in_progress_map.get(&slot).and_then(|entry| { - (entry.value().append_vec_id() == store_id).then(|| Arc::clone(entry.value())) + Self::match_store_id(store_id, entry.value().append_vec_id()) + .then(|| Arc::clone(entry.value())) }) }) .or_else(lookup_in_map) } + fn match_store_id(expected: Option, store_id: AppendVecId) -> bool { + expected + .map(|expected| expected == store_id) + .unwrap_or(true) + } + /// assert if shrink in progress is active pub(crate) fn assert_no_shrink_in_progress(&self) { assert!(self.shrink_in_progress_map.is_empty()); @@ -276,7 +283,7 @@ pub(crate) mod tests { let slot = 0; let id = 0; // empty everything - assert!(storage.get_account_storage_entry(slot, id).is_none()); + assert!(storage.get_account_storage_entry(slot, None).is_none()); // add a map store let common_store_path = Path::new(""); @@ -299,11 +306,12 @@ pub(crate) mod tests { .map .insert(slot, AccountStorageReference { id, storage: entry }); + //todo not sure about these append vec id values // look in map assert_eq!( store_file_size, storage - .get_account_storage_entry(slot, id) + .get_account_storage_entry(slot, None) .map(|entry| entry.accounts.capacity()) .unwrap_or_default() ); @@ -315,7 +323,7 @@ pub(crate) mod tests { assert_eq!( store_file_size, storage - .get_account_storage_entry(slot, id) + .get_account_storage_entry(slot, None) .map(|entry| entry.accounts.capacity()) .unwrap_or_default() ); @@ -327,7 +335,7 @@ pub(crate) mod tests { assert_eq!( store_file_size2, storage - .get_account_storage_entry(slot, id) + .get_account_storage_entry(slot, None) .map(|entry| entry.accounts.capacity()) .unwrap_or_default() ); @@ -515,10 +523,10 @@ pub(crate) mod tests { let missing_id = 9999; let slot = sample.slot(); // id is missing since not in maps at all - assert!(storage.get_account_storage_entry(slot, id).is_none()); + assert!(storage.get_account_storage_entry(slot, None).is_none()); // missing should always be missing assert!(storage - .get_account_storage_entry(slot, missing_id) + .get_account_storage_entry(slot, Some(missing_id)) .is_none()); storage.map.insert( slot, @@ -528,23 +536,23 @@ pub(crate) mod tests { }, ); // id is found in map - assert!(storage.get_account_storage_entry(slot, id).is_some()); + assert!(storage.get_account_storage_entry(slot, None).is_some()); assert!(storage - .get_account_storage_entry(slot, missing_id) + .get_account_storage_entry(slot, Some(missing_id)) .is_none()); storage .shrink_in_progress_map .insert(slot, Arc::clone(&sample)); // id is found in map assert!(storage - .get_account_storage_entry(slot, missing_id) + .get_account_storage_entry(slot, Some(missing_id)) .is_none()); - assert!(storage.get_account_storage_entry(slot, id).is_some()); + assert!(storage.get_account_storage_entry(slot, None).is_some()); storage.map.remove(&slot); // id is found in shrink_in_progress_map assert!(storage - .get_account_storage_entry(slot, missing_id) + .get_account_storage_entry(slot, Some(missing_id)) .is_none()); - assert!(storage.get_account_storage_entry(slot, id).is_some()); + assert!(storage.get_account_storage_entry(slot, None).is_some()); } } diff --git a/runtime/src/accounts_db.rs b/runtime/src/accounts_db.rs index dbcd3b1d17ede5..b729e8dc91add7 100644 --- a/runtime/src/accounts_db.rs +++ b/runtime/src/accounts_db.rs @@ -20,7 +20,7 @@ use { crate::{ - account_info::{AccountInfo, StorageLocation}, + account_info::{AccountInfo, AccountOffsetAndFlags, StorageLocation}, account_storage::{ meta::{ StorableAccountsWithHashesAndWriteVersions, StoredAccountMeta, @@ -684,6 +684,7 @@ impl GenerateIndexTimings { impl IndexValue for AccountInfo {} impl DiskIndexValue for AccountInfo {} +impl DiskIndexValue for AccountOffsetAndFlags {} impl ZeroLamport for AccountSharedData { fn is_zero_lamport(&self) -> bool { @@ -1352,7 +1353,7 @@ struct RemoveUnrootedSlotsSynchronization { signal: Condvar, } -type AccountInfoAccountsIndex = AccountsIndex; +type AccountInfoAccountsIndex = AccountsIndex; // This structure handles the load/store of the accounts #[derive(Debug)] @@ -3367,7 +3368,7 @@ impl AccountsDb { .unwrap() - 1; debug!( - "store_counts, inserting slot: {}, store id: {}, count: {}", + "store_counts, inserting slot: {}, store id: {:?}, count: {}", slot, account_info.store_id(), count ); store_counts.insert(*slot, (count, key_set)); @@ -6194,7 +6195,7 @@ impl AccountsDb { storage.add_account(stored_size); infos.push(AccountInfo::new( - StorageLocation::AppendVec(store_id, offsets[0]), + StorageLocation::AppendVec(Some(store_id), offsets[0]), accounts_and_meta_to_store .account(i) .map(|account| account.lamports()) @@ -8675,7 +8676,7 @@ impl AccountsDb { pubkey, IndexAccountMapEntry { write_version: _write_version, - store_id, + store_id: _store_id, stored_account, }, )| { @@ -8702,7 +8703,7 @@ impl AccountsDb { ( pubkey, AccountInfo::new( - StorageLocation::AppendVec(store_id, stored_account.offset), // will never be cached + StorageLocation::AppendVec(None, stored_account.offset), // will never be cached stored_account.account_meta.lamports, ), ) @@ -8934,7 +8935,7 @@ impl AccountsDb { count += 1; let ai = AccountInfo::new( StorageLocation::AppendVec( - account_info.store_id, + None, account_info.stored_account.offset, ), // will never be cached account_info.stored_account.account_meta.lamports, @@ -9350,7 +9351,9 @@ impl AccountsDb { pub fn get_append_vec_id(&self, pubkey: &Pubkey, slot: Slot) -> Option { let ancestors = vec![(slot, 1)].into_iter().collect(); let result = self.accounts_index.get(pubkey, Some(&ancestors), None); - result.map(|(list, index)| list.slot_list()[index].1.store_id()) + result + .map(|(list, index)| list.slot_list()[index].1.store_id()) + .flatten() } pub fn alive_account_count_in_slot(&self, slot: Slot) -> usize { @@ -10446,7 +10449,7 @@ pub mod tests { if let Some(index) = add_to_index { let account_info = AccountInfo::new( - StorageLocation::AppendVec(storage.append_vec_id(), offsets[0]), + StorageLocation::AppendVec(Some(storage.append_vec_id()), offsets[0]), account.lamports(), ); index.upsert( @@ -11246,11 +11249,13 @@ pub mod tests { //slot is still there, since gc is lazy assert_eq!( - accounts - .storage - .get_slot_storage_entry(0) - .unwrap() - .append_vec_id(), + Some( + accounts + .storage + .get_slot_storage_entry(0) + .unwrap() + .append_vec_id() + ), id ); @@ -13590,10 +13595,10 @@ pub mod tests { let key0 = Pubkey::new_from_array([0u8; 32]); let key1 = Pubkey::new_from_array([1u8; 32]); let key2 = Pubkey::new_from_array([2u8; 32]); - let info0 = AccountInfo::new(StorageLocation::AppendVec(0, 0), 0); - let info1 = AccountInfo::new(StorageLocation::AppendVec(1, 0), 0); - let info2 = AccountInfo::new(StorageLocation::AppendVec(2, 0), 0); - let info3 = AccountInfo::new(StorageLocation::AppendVec(3, 0), 0); + let info0 = AccountInfo::new(StorageLocation::AppendVec(None, 0), 0); + let info1 = AccountInfo::new(StorageLocation::AppendVec(None, 0), 0); + let info2 = AccountInfo::new(StorageLocation::AppendVec(None, 0), 0); + let info3 = AccountInfo::new(StorageLocation::AppendVec(None, 0), 0); let mut reclaims = vec![]; accounts_index.upsert( 0, @@ -16041,7 +16046,7 @@ pub mod tests { } let do_test = |test_params: TestParameters| { - let account_info = AccountInfo::new(StorageLocation::AppendVec(42, 128), 0); + let account_info = AccountInfo::new(StorageLocation::AppendVec(None, 128), 0); let pubkey = solana_sdk::pubkey::new_rand(); let mut key_set = HashSet::default(); key_set.insert(pubkey); @@ -17613,7 +17618,7 @@ pub mod tests { if let Some(storage) = db.get_storage_for_slot(slot) { storage.accounts.account_iter().for_each(|account| { let info = AccountInfo::new( - StorageLocation::AppendVec(storage.append_vec_id(), account.offset), + StorageLocation::AppendVec(None, account.offset), account.lamports(), ); db.accounts_index.upsert( From bb17a7075766cdf1adb28fc7971b1b882939393e Mon Sep 17 00:00:00 2001 From: jeff washington Date: Fri, 24 Feb 2023 18:05:00 -0600 Subject: [PATCH 3/4] make store_id an Option in mem acct idx --- runtime/src/bank_creation_freezing_progress.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/runtime/src/bank_creation_freezing_progress.rs b/runtime/src/bank_creation_freezing_progress.rs index ad491207738f48..92cb2cd32db0da 100644 --- a/runtime/src/bank_creation_freezing_progress.rs +++ b/runtime/src/bank_creation_freezing_progress.rs @@ -22,7 +22,7 @@ pub(crate) struct BankCreationFreezingProgress { bank_freeze_or_destruction_count: AtomicU32, /// enable waiting for bank_freeze_or_destruction_count to increment - bank_frozen_or_destroyed: Arc, + pub(crate) bank_frozen_or_destroyed: Arc, last_report: AtomicInterval, } From 96224a37dd7e1ebf556ff07dafaa78f94f72a14e Mon Sep 17 00:00:00 2001 From: jeff washington Date: Mon, 27 Feb 2023 11:20:02 -0600 Subject: [PATCH 4/4] after shrinking, wait until fg processing is complete before swapping out storages. --- runtime/src/accounts_db.rs | 53 ++++++++++++++++++++++++++++++++++++++ 1 file changed, 53 insertions(+) diff --git a/runtime/src/accounts_db.rs b/runtime/src/accounts_db.rs index b729e8dc91add7..a8aa5bc5aeeaec 100644 --- a/runtime/src/accounts_db.rs +++ b/runtime/src/accounts_db.rs @@ -1980,6 +1980,7 @@ pub(crate) struct ShrinkStats { dead_accounts: AtomicU64, alive_accounts: AtomicU64, accounts_loaded: AtomicU64, + wait_for_fg_us: AtomicU64, } impl ShrinkStats { @@ -2078,6 +2079,11 @@ impl ShrinkStats { self.accounts_loaded.swap(0, Ordering::Relaxed) as i64, i64 ), + ( + "wait_for_fg_us", + self.wait_for_fg_us.swap(0, Ordering::Relaxed) as i64, + i64 + ), ); } } @@ -2195,6 +2201,11 @@ impl ShrinkAncientStats { self.shrink_stats.accounts_loaded.swap(0, Ordering::Relaxed) as i64, i64 ), + ( + "wait_for_fg_us", + self.shrink_stats.wait_for_fg_us.swap(0, Ordering::Relaxed) as i64, + i64 + ), ( "ancient_append_vecs_shrunk", self.ancient_append_vecs_shrunk.swap(0, Ordering::Relaxed) as i64, @@ -3952,6 +3963,27 @@ impl AccountsDb { shrink_in_progress: Option, shrink_can_be_active: bool, ) { + self.wait_until_foreground_processing_advances(stats); + + self.remove_old_stores_shrink_internal( + shrink_collect, + stats, + shrink_in_progress, + shrink_can_be_active, + ) + } + + /// common code from shrink and combine_ancient_slots + /// get rid of all original store_ids in the slot + fn remove_old_stores_shrink_internal<'a, T: ShrinkCollectRefs<'a>>( + &self, + shrink_collect: &ShrinkCollect<'a, T>, + stats: &ShrinkStats, + shrink_in_progress: Option, + shrink_can_be_active: bool, + ) { + self.wait_until_foreground_processing_advances(stats); + let mut time = Measure::start("remove_old_stores_shrink"); // Purge old, overwritten storage entries let dead_storages = self.mark_dirty_dead_stores( @@ -4068,6 +4100,27 @@ impl AccountsDb { self.bank_progress.report(); } + fn wait_until_foreground_processing_advances(&self, stats: &ShrinkStats) { + let (_, wait_us) = measure_us!({ + // now that we have created a new storage and updated the index, we need to make sure there are no fg processes which could try + // to load the accounts from the old storage. Any index entries loaded from the index would not have a store_id, so we might try + // to load from the wrong storage. + // So, do not release 'shrink_collect' and thus, the index entries we're moving, until the fg processes + // have advanced and no longer need to disambiguate between the two storages (old and newly shrunk) + let bank_count_start = self.bank_progress.get_bank_creation_count(); + while bank_count_start + .wrapping_sub(self.bank_progress.get_bank_frozen_or_destroyed_count()) + < u32::MAX / 2 + { + _ = self + .bank_progress + .bank_frozen_or_destroyed + .wait_timeout(std::time::Duration::from_millis(10)); + } + }); + stats.wait_for_fg_us.fetch_add(wait_us, Ordering::Relaxed); + } + pub(crate) fn update_shrink_stats(shrink_stats: &ShrinkStats, stats_sub: ShrinkStatsSub) { shrink_stats .num_slots_shrunk