From 258db771009623b2d97ba6a58c15bf326f46b436 Mon Sep 17 00:00:00 2001 From: "Jeff Washington (jwash)" Date: Sun, 20 Mar 2022 22:00:38 -0500 Subject: [PATCH] AcctIdx: factor 'scan' out of flush_internal (#23777) --- runtime/src/in_mem_accounts_index.rs | 141 ++++++++++++++++----------- 1 file changed, 85 insertions(+), 56 deletions(-) diff --git a/runtime/src/in_mem_accounts_index.rs b/runtime/src/in_mem_accounts_index.rs index 32ac1830386ffe..2ca76dbcafaf1f 100644 --- a/runtime/src/in_mem_accounts_index.rs +++ b/runtime/src/in_mem_accounts_index.rs @@ -64,6 +64,12 @@ pub enum InsertNewEntryResults { ExistedNewEntryNonZeroLamports, } +struct FlushScanResult { + evictions: Vec, + evictions_random: Vec, + dirty_items: Vec<(Pubkey, AccountMapEntry)>, +} + #[allow(dead_code)] // temporary during staging impl InMemAccountsIndex { pub fn new(storage: &Arc>, bin: usize) -> Self { @@ -927,7 +933,64 @@ impl InMemAccountsIndex { } } - fn flush_internal(&self, _flush_guard: &FlushGuard) { + /// scan loop + /// holds read lock + /// identifies items which are dirty and items to evict + fn flush_scan( + &self, + current_age: Age, + startup: bool, + _flush_guard: &FlushGuard, + ) -> FlushScanResult { + let exceeds_budget = self.get_exceeds_budget(); + let map = self.map().read().unwrap(); + let mut evictions = Vec::with_capacity(map.len()); + let mut evictions_random = Vec::default(); + let mut dirty_items = Vec::with_capacity(map.len()); + let mut flush_should_evict_us = 0; + let m = Measure::start("flush_scan"); // we don't care about lock time in this metric - bg threads can wait + for (k, v) in map.iter() { + let mut mse = Measure::start("flush_should_evict"); + let (evict_for_age, slot_list) = + self.should_evict_from_mem(current_age, v, startup, true, exceeds_budget); + mse.stop(); + flush_should_evict_us += mse.as_us(); + if !evict_for_age && !Self::random_chance_of_eviction() { + // not planning to evict this item from memory now, so don't write it to disk yet + continue; + } + + // if we are removing it, then we need to update disk if we're dirty + if v.clear_dirty() { + // step 1: clear the dirty flag + // step 2: perform the update on disk based on the fields in the entry + // If a parallel operation dirties the item again - even while this flush is occurring, + // the last thing the writer will do, after updating contents, is set_dirty(true) + // That prevents dropping an item from cache before disk is updated to latest in mem. + // happens inside of lock on in-mem cache. This is because of deleting items + // it is possible that the item in the cache is marked as dirty while these updates are happening. That is ok. + dirty_items.push((*k, Arc::clone(v))); + } else { + drop(slot_list); + } + if evict_for_age { + evictions.push(*k); + } else { + evictions_random.push(*k); + } + } + Self::update_time_stat(&self.stats().flush_scan_us, m); + Self::update_stat(&self.stats().flush_should_evict_us, flush_should_evict_us); + + FlushScanResult { + evictions, + evictions_random, + dirty_items, + } + } + + /// synchronize the in-mem index with the disk index + fn flush_internal(&self, flush_guard: &FlushGuard) { let current_age = self.storage.current_age(); let iterate_for_age = self.get_should_age(current_age); let startup = self.storage.get_startup(); @@ -937,64 +1000,16 @@ impl InMemAccountsIndex { return; } - let in_mem_count = self.stats().count_in_mem.load(Ordering::Relaxed); - let limit = self.storage.mem_budget_mb; - let estimate_mem = in_mem_count * Self::approx_size_of_one_entry(); - let exceeds_budget = limit - .map(|limit| estimate_mem >= limit * 1024 * 1024) - .unwrap_or_default(); - self.stats() - .estimate_mem - .store(estimate_mem as u64, Ordering::Relaxed); - // may have to loop if disk has to grow and we have to restart { - let mut dirty_items; - let mut evictions; - let mut evictions_random = Vec::default(); let disk = self.bucket.as_ref().unwrap(); let mut flush_entries_updated_on_disk = 0; - let mut flush_should_evict_us = 0; - // scan loop - // holds read lock - { - let map = self.map().read().unwrap(); - evictions = Vec::with_capacity(map.len()); - dirty_items = Vec::with_capacity(map.len()); - let m = Measure::start("flush_scan"); // we don't care about lock time in this metric - bg threads can wait - for (k, v) in map.iter() { - let mut mse = Measure::start("flush_should_evict"); - let (evict_for_age, slot_list) = - self.should_evict_from_mem(current_age, v, startup, true, exceeds_budget); - mse.stop(); - flush_should_evict_us += mse.as_us(); - if !evict_for_age && !Self::random_chance_of_eviction() { - // not planning to evict this item from memory now, so don't write it to disk yet - continue; - } - - // if we are removing it, then we need to update disk if we're dirty - if v.clear_dirty() { - // step 1: clear the dirty flag - // step 2: perform the update on disk based on the fields in the entry - // If a parallel operation dirties the item again - even while this flush is occurring, - // the last thing the writer will do, after updating contents, is set_dirty(true) - // That prevents dropping an item from cache before disk is updated to latest in mem. - // happens inside of lock on in-mem cache. This is because of deleting items - // it is possible that the item in the cache is marked as dirty while these updates are happening. That is ok. - dirty_items.push((*k, Arc::clone(v))); - } else { - drop(slot_list); - } - if evict_for_age { - evictions.push(*k); - } else { - evictions_random.push(*k); - } - } - Self::update_time_stat(&self.stats().flush_scan_us, m); - } + let FlushScanResult { + evictions, + evictions_random, + dirty_items, + } = self.flush_scan(current_age, startup, flush_guard); { // write to disk outside giant read lock let m = Measure::start("flush_update"); // we don't care about lock time in this metric - bg threads can wait @@ -1024,7 +1039,6 @@ impl InMemAccountsIndex { } Self::update_time_stat(&self.stats().flush_update_us, m); } - Self::update_stat(&self.stats().flush_should_evict_us, flush_should_evict_us); Self::update_stat( &self.stats().flush_entries_updated_on_disk, @@ -1044,6 +1058,21 @@ impl InMemAccountsIndex { } } + /// calculate the estimated size of the in-mem index + /// return whether the size exceeds the specified budget + fn get_exceeds_budget(&self) -> bool { + let in_mem_count = self.stats().count_in_mem.load(Ordering::Relaxed); + let limit = self.storage.mem_budget_mb; + let estimate_mem = in_mem_count * Self::approx_size_of_one_entry(); + let exceeds_budget = limit + .map(|limit| estimate_mem >= limit * 1024 * 1024) + .unwrap_or_default(); + self.stats() + .estimate_mem + .store(estimate_mem as u64, Ordering::Relaxed); + exceeds_budget + } + /// for each key in 'keys', look up in map, set age to the future fn move_ages_to_future(&self, next_age: Age, current_age: Age, keys: &[Pubkey]) { let map = self.map().read().unwrap(); @@ -1120,7 +1149,7 @@ impl InMemAccountsIndex { continue; } - // all conditions for removing succeeded, so really evict item from in-mem cache + // all conditions for eviction succeeded, so really evict item from in-mem cache evicted += 1; occupied.remove(); }