From 96224a37dd7e1ebf556ff07dafaa78f94f72a14e Mon Sep 17 00:00:00 2001 From: jeff washington Date: Mon, 27 Feb 2023 11:20:02 -0600 Subject: [PATCH] after shrinking, wait until fg processing is complete before swapping out storages. --- runtime/src/accounts_db.rs | 53 ++++++++++++++++++++++++++++++++++++++ 1 file changed, 53 insertions(+) diff --git a/runtime/src/accounts_db.rs b/runtime/src/accounts_db.rs index b729e8dc91add7..a8aa5bc5aeeaec 100644 --- a/runtime/src/accounts_db.rs +++ b/runtime/src/accounts_db.rs @@ -1980,6 +1980,7 @@ pub(crate) struct ShrinkStats { dead_accounts: AtomicU64, alive_accounts: AtomicU64, accounts_loaded: AtomicU64, + wait_for_fg_us: AtomicU64, } impl ShrinkStats { @@ -2078,6 +2079,11 @@ impl ShrinkStats { self.accounts_loaded.swap(0, Ordering::Relaxed) as i64, i64 ), + ( + "wait_for_fg_us", + self.wait_for_fg_us.swap(0, Ordering::Relaxed) as i64, + i64 + ), ); } } @@ -2195,6 +2201,11 @@ impl ShrinkAncientStats { self.shrink_stats.accounts_loaded.swap(0, Ordering::Relaxed) as i64, i64 ), + ( + "wait_for_fg_us", + self.shrink_stats.wait_for_fg_us.swap(0, Ordering::Relaxed) as i64, + i64 + ), ( "ancient_append_vecs_shrunk", self.ancient_append_vecs_shrunk.swap(0, Ordering::Relaxed) as i64, @@ -3952,6 +3963,27 @@ impl AccountsDb { shrink_in_progress: Option, shrink_can_be_active: bool, ) { + self.wait_until_foreground_processing_advances(stats); + + self.remove_old_stores_shrink_internal( + shrink_collect, + stats, + shrink_in_progress, + shrink_can_be_active, + ) + } + + /// common code from shrink and combine_ancient_slots + /// get rid of all original store_ids in the slot + fn remove_old_stores_shrink_internal<'a, T: ShrinkCollectRefs<'a>>( + &self, + shrink_collect: &ShrinkCollect<'a, T>, + stats: &ShrinkStats, + shrink_in_progress: Option, + shrink_can_be_active: bool, + ) { + self.wait_until_foreground_processing_advances(stats); + let mut time = Measure::start("remove_old_stores_shrink"); // Purge old, overwritten storage entries let dead_storages = self.mark_dirty_dead_stores( @@ -4068,6 +4100,27 @@ impl AccountsDb { self.bank_progress.report(); } + fn wait_until_foreground_processing_advances(&self, stats: &ShrinkStats) { + let (_, wait_us) = measure_us!({ + // now that we have created a new storage and updated the index, we need to make sure there are no fg processes which could try + // to load the accounts from the old storage. Any index entries loaded from the index would not have a store_id, so we might try + // to load from the wrong storage. + // So, do not release 'shrink_collect' and thus, the index entries we're moving, until the fg processes + // have advanced and no longer need to disambiguate between the two storages (old and newly shrunk) + let bank_count_start = self.bank_progress.get_bank_creation_count(); + while bank_count_start + .wrapping_sub(self.bank_progress.get_bank_frozen_or_destroyed_count()) + < u32::MAX / 2 + { + _ = self + .bank_progress + .bank_frozen_or_destroyed + .wait_timeout(std::time::Duration::from_millis(10)); + } + }); + stats.wait_for_fg_us.fetch_add(wait_us, Ordering::Relaxed); + } + pub(crate) fn update_shrink_stats(shrink_stats: &ShrinkStats, stats_sub: ShrinkStatsSub) { shrink_stats .num_slots_shrunk