Skip to content

Commit

Permalink
better distribution algorithm
Browse files Browse the repository at this point in the history
  • Loading branch information
jeffwashington committed Mar 24, 2023
1 parent e6f0ddd commit a92843e
Showing 1 changed file with 107 additions and 32 deletions.
139 changes: 107 additions & 32 deletions runtime/src/in_mem_accounts_index.rs
Original file line number Diff line number Diff line change
Expand Up @@ -110,10 +110,6 @@ pub struct InMemAccountsIndex<T: IndexValue, U: DiskIndexValue + From<T> + Into<
/// possible evictions for next few slots coming up
possible_evictions: RwLock<PossibleEvictions<T>>,

/// when age % ages_to_stay_in_cache == 'age_to_flush_bin_offset', then scan the bucket to calculate 'possible_evictions'
/// this causes us to scan an individual bucket of the in-mem hash map every 1/'NUM_AGES_TO_DISTRIBUTE_FLUSHES' instead of each age
age_to_flush_bin_mod: Age,

/// age when this bucket was last actually flushed (as opposed to being skipped).
/// When we skip flushing a bucket at an age, this helps us keep track of which items should be aged out.
last_age_flush_not_skipped: AtomicU8,
Expand Down Expand Up @@ -172,8 +168,9 @@ impl<T: IndexValue, U: DiskIndexValue + From<T> + Into<T>> InMemAccountsIndex<T,
possible_evictions: RwLock::new(PossibleEvictions::new(1)),
// Spread out the scanning across all ages within the window.
// This causes us to scan 1/N of the bins each 'Age'
age_to_flush_bin_mod: thread_rng().gen_range(0, NUM_AGES_TO_DISTRIBUTE_FLUSHES),
last_age_flush_not_skipped: AtomicU8::default(),
last_age_flush_not_skipped: AtomicU8::new(
thread_rng().gen_range(0, NUM_AGES_TO_DISTRIBUTE_FLUSHES),
),
}
}

Expand Down Expand Up @@ -977,6 +974,29 @@ impl<T: IndexValue, U: DiskIndexValue + From<T> + Into<T>> InMemAccountsIndex<T,
}
}

/// fill in `possible_evictions` from `iter` by checking age
fn gather_possible_evictions<'a>(
iter: impl Iterator<Item = (&'a Pubkey, &'a Arc<AccountMapEntryInner<T>>)>,
possible_evictions: &mut PossibleEvictions<T>,
startup: bool,
current_age: Age,
ages_flushing_now: Age,
can_randomly_flush: bool,
) {
for (k, v) in iter {
let mut random = false;
if !startup && current_age.wrapping_sub(v.age()) > ages_flushing_now {
if !can_randomly_flush || !Self::random_chance_of_eviction() {
// not planning to evict this item from memory within 'ages_flushing_now' ages
continue;
}
random = true;
}

possible_evictions.insert(0, *k, Arc::clone(v), random);
}
}

/// scan loop
/// holds read lock
/// identifies items which are potential candidates to evict
Expand All @@ -985,23 +1005,22 @@ impl<T: IndexValue, U: DiskIndexValue + From<T> + Into<T>> InMemAccountsIndex<T,
current_age: Age,
startup: bool,
_flush_guard: &FlushGuard,
ages_to_flush_now: Age,
ages_flushing_now: Age,
) -> FlushScanResult<T> {
let mut possible_evictions = self.possible_evictions.write().unwrap();
possible_evictions.reset(1);
let m;
{
let map = self.map_internal.read().unwrap();
m = Measure::start("flush_scan"); // we don't care about lock time in this metric - bg threads can wait
for (k, v) in map.iter() {
let random = Self::random_chance_of_eviction();
if !startup && !random && current_age.wrapping_sub(v.age()) > ages_to_flush_now {
// not planning to evict this item from memory within 'ages_to_flush_now' ages
continue;
}

possible_evictions.insert(0, *k, Arc::clone(v), random);
}
Self::gather_possible_evictions(
map.iter(),
&mut possible_evictions,
startup,
current_age,
ages_flushing_now,
true,
);
}
Self::update_time_stat(&self.stats().flush_scan_us, m);

Expand Down Expand Up @@ -1095,24 +1114,23 @@ impl<T: IndexValue, U: DiskIndexValue + From<T> + Into<T>> InMemAccountsIndex<T,
self.write_startup_info_to_disk();
}

if iterate_for_age
&& !startup
&& current_age % NUM_AGES_TO_DISTRIBUTE_FLUSHES != self.age_to_flush_bin_mod
{
// skipped iteration of the buckets at the current age, so age the bucket
assert_eq!(current_age, self.storage.current_age());
self.set_has_aged(current_age, can_advance_age);
return;
}

let ages_flushing_now = if !startup {
current_age.wrapping_sub(
let ages_flushing_now = if iterate_for_age && !startup {
let old_value = self
.last_age_flush_not_skipped
.fetch_sub(1, Ordering::AcqRel);
if old_value == 0 {
self.last_age_flush_not_skipped
.swap(current_age, Ordering::AcqRel),
)
.store(NUM_AGES_TO_DISTRIBUTE_FLUSHES, Ordering::Release);
} else {
// skipping iteration of the buckets at the current age, but mark the bucket as having aged
assert_eq!(current_age, self.storage.current_age());
self.set_has_aged(current_age, can_advance_age);
return;
}
NUM_AGES_TO_DISTRIBUTE_FLUSHES
} else {
// just 1 age to flush
1
// just 1 age to flush. 0 means age == age
0
};

Self::update_stat(&self.stats().buckets_scanned, 1);
Expand Down Expand Up @@ -1543,6 +1561,63 @@ mod tests {
}
}

#[test]
fn test_gather_possible_evictions() {
solana_logger::setup();
let startup = false;
let ref_count = 1;
let pks = (0..=255)
.map(|i| Pubkey::from([i as u8; 32]))
.collect::<Vec<_>>();
let accounts = (0..=255)
.map(|age| {
let one_element_slot_list = vec![(0, 0)];
let one_element_slot_list_entry = Arc::new(AccountMapEntryInner::new(
one_element_slot_list,
ref_count,
AccountMapEntryMeta::default(),
));
one_element_slot_list_entry.set_age(age);
one_element_slot_list_entry
})
.collect::<Vec<_>>();
let both = pks.iter().zip(accounts.iter()).collect::<Vec<_>>();

for current_age in 0..=255 {
for ages_flushing_now in 0..=255 {
let mut possible_evictions = PossibleEvictions::new(1);
possible_evictions.reset(1);
InMemAccountsIndex::<u64, u64>::gather_possible_evictions(
both.iter().cloned(),
&mut possible_evictions,
startup,
current_age,
ages_flushing_now,
false, // true=can_randomly_flush
);
let evictions = possible_evictions.possible_evictions.pop().unwrap();
assert_eq!(
evictions.evictions_age_possible.len(),
1 + ages_flushing_now as usize
);
evictions.evictions_age_possible.iter().for_each(|(_k, v)| {
assert!(
InMemAccountsIndex::<u64, u64>::should_evict_based_on_age(
current_age,
v,
startup,
ages_flushing_now,
),
"current_age: {}, age: {}, ages_flushing_now: {}",
current_age,
v.age(),
ages_flushing_now
);
});
}
}
}

#[test]
fn test_should_evict_from_mem() {
solana_logger::setup();
Expand Down

0 comments on commit a92843e

Please sign in to comment.