Skip to content

Commit

Permalink
AcctIdx: factor 'scan' out of flush_internal (#23777)
Browse files Browse the repository at this point in the history
  • Loading branch information
jeffwashington authored Mar 21, 2022
1 parent f34434f commit 258db77
Showing 1 changed file with 85 additions and 56 deletions.
141 changes: 85 additions & 56 deletions runtime/src/in_mem_accounts_index.rs
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,12 @@ pub enum InsertNewEntryResults {
ExistedNewEntryNonZeroLamports,
}

struct FlushScanResult<T> {
evictions: Vec<Pubkey>,
evictions_random: Vec<Pubkey>,
dirty_items: Vec<(Pubkey, AccountMapEntry<T>)>,
}

#[allow(dead_code)] // temporary during staging
impl<T: IndexValue> InMemAccountsIndex<T> {
pub fn new(storage: &Arc<BucketMapHolder<T>>, bin: usize) -> Self {
Expand Down Expand Up @@ -927,7 +933,64 @@ impl<T: IndexValue> InMemAccountsIndex<T> {
}
}

fn flush_internal(&self, _flush_guard: &FlushGuard) {
/// scan loop
/// holds read lock
/// identifies items which are dirty and items to evict
fn flush_scan(
&self,
current_age: Age,
startup: bool,
_flush_guard: &FlushGuard,
) -> FlushScanResult<T> {
let exceeds_budget = self.get_exceeds_budget();
let map = self.map().read().unwrap();
let mut evictions = Vec::with_capacity(map.len());
let mut evictions_random = Vec::default();
let mut dirty_items = Vec::with_capacity(map.len());
let mut flush_should_evict_us = 0;
let m = Measure::start("flush_scan"); // we don't care about lock time in this metric - bg threads can wait
for (k, v) in map.iter() {
let mut mse = Measure::start("flush_should_evict");
let (evict_for_age, slot_list) =
self.should_evict_from_mem(current_age, v, startup, true, exceeds_budget);
mse.stop();
flush_should_evict_us += mse.as_us();
if !evict_for_age && !Self::random_chance_of_eviction() {
// not planning to evict this item from memory now, so don't write it to disk yet
continue;
}

// if we are removing it, then we need to update disk if we're dirty
if v.clear_dirty() {
// step 1: clear the dirty flag
// step 2: perform the update on disk based on the fields in the entry
// If a parallel operation dirties the item again - even while this flush is occurring,
// the last thing the writer will do, after updating contents, is set_dirty(true)
// That prevents dropping an item from cache before disk is updated to latest in mem.
// happens inside of lock on in-mem cache. This is because of deleting items
// it is possible that the item in the cache is marked as dirty while these updates are happening. That is ok.
dirty_items.push((*k, Arc::clone(v)));
} else {
drop(slot_list);
}
if evict_for_age {
evictions.push(*k);
} else {
evictions_random.push(*k);
}
}
Self::update_time_stat(&self.stats().flush_scan_us, m);
Self::update_stat(&self.stats().flush_should_evict_us, flush_should_evict_us);

FlushScanResult {
evictions,
evictions_random,
dirty_items,
}
}

/// synchronize the in-mem index with the disk index
fn flush_internal(&self, flush_guard: &FlushGuard) {
let current_age = self.storage.current_age();
let iterate_for_age = self.get_should_age(current_age);
let startup = self.storage.get_startup();
Expand All @@ -937,64 +1000,16 @@ impl<T: IndexValue> InMemAccountsIndex<T> {
return;
}

let in_mem_count = self.stats().count_in_mem.load(Ordering::Relaxed);
let limit = self.storage.mem_budget_mb;
let estimate_mem = in_mem_count * Self::approx_size_of_one_entry();
let exceeds_budget = limit
.map(|limit| estimate_mem >= limit * 1024 * 1024)
.unwrap_or_default();
self.stats()
.estimate_mem
.store(estimate_mem as u64, Ordering::Relaxed);

// may have to loop if disk has to grow and we have to restart
{
let mut dirty_items;
let mut evictions;
let mut evictions_random = Vec::default();
let disk = self.bucket.as_ref().unwrap();

let mut flush_entries_updated_on_disk = 0;
let mut flush_should_evict_us = 0;
// scan loop
// holds read lock
{
let map = self.map().read().unwrap();
evictions = Vec::with_capacity(map.len());
dirty_items = Vec::with_capacity(map.len());
let m = Measure::start("flush_scan"); // we don't care about lock time in this metric - bg threads can wait
for (k, v) in map.iter() {
let mut mse = Measure::start("flush_should_evict");
let (evict_for_age, slot_list) =
self.should_evict_from_mem(current_age, v, startup, true, exceeds_budget);
mse.stop();
flush_should_evict_us += mse.as_us();
if !evict_for_age && !Self::random_chance_of_eviction() {
// not planning to evict this item from memory now, so don't write it to disk yet
continue;
}

// if we are removing it, then we need to update disk if we're dirty
if v.clear_dirty() {
// step 1: clear the dirty flag
// step 2: perform the update on disk based on the fields in the entry
// If a parallel operation dirties the item again - even while this flush is occurring,
// the last thing the writer will do, after updating contents, is set_dirty(true)
// That prevents dropping an item from cache before disk is updated to latest in mem.
// happens inside of lock on in-mem cache. This is because of deleting items
// it is possible that the item in the cache is marked as dirty while these updates are happening. That is ok.
dirty_items.push((*k, Arc::clone(v)));
} else {
drop(slot_list);
}
if evict_for_age {
evictions.push(*k);
} else {
evictions_random.push(*k);
}
}
Self::update_time_stat(&self.stats().flush_scan_us, m);
}
let FlushScanResult {
evictions,
evictions_random,
dirty_items,
} = self.flush_scan(current_age, startup, flush_guard);
{
// write to disk outside giant read lock
let m = Measure::start("flush_update"); // we don't care about lock time in this metric - bg threads can wait
Expand Down Expand Up @@ -1024,7 +1039,6 @@ impl<T: IndexValue> InMemAccountsIndex<T> {
}
Self::update_time_stat(&self.stats().flush_update_us, m);
}
Self::update_stat(&self.stats().flush_should_evict_us, flush_should_evict_us);

Self::update_stat(
&self.stats().flush_entries_updated_on_disk,
Expand All @@ -1044,6 +1058,21 @@ impl<T: IndexValue> InMemAccountsIndex<T> {
}
}

/// calculate the estimated size of the in-mem index
/// return whether the size exceeds the specified budget
fn get_exceeds_budget(&self) -> bool {
let in_mem_count = self.stats().count_in_mem.load(Ordering::Relaxed);
let limit = self.storage.mem_budget_mb;
let estimate_mem = in_mem_count * Self::approx_size_of_one_entry();
let exceeds_budget = limit
.map(|limit| estimate_mem >= limit * 1024 * 1024)
.unwrap_or_default();
self.stats()
.estimate_mem
.store(estimate_mem as u64, Ordering::Relaxed);
exceeds_budget
}

/// for each key in 'keys', look up in map, set age to the future
fn move_ages_to_future(&self, next_age: Age, current_age: Age, keys: &[Pubkey]) {
let map = self.map().read().unwrap();
Expand Down Expand Up @@ -1120,7 +1149,7 @@ impl<T: IndexValue> InMemAccountsIndex<T> {
continue;
}

// all conditions for removing succeeded, so really evict item from in-mem cache
// all conditions for eviction succeeded, so really evict item from in-mem cache
evicted += 1;
occupied.remove();
}
Expand Down

0 comments on commit 258db77

Please sign in to comment.