Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

flush individual buckets every n ages #30855

Merged
merged 1 commit into from
Mar 28, 2023
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
221 changes: 166 additions & 55 deletions runtime/src/in_mem_accounts_index.rs
Original file line number Diff line number Diff line change
Expand Up @@ -107,9 +107,15 @@ pub struct InMemAccountsIndex<T: IndexValue, U: DiskIndexValue + From<T> + Into<

/// possible evictions for next few slots coming up
possible_evictions: RwLock<PossibleEvictions<T>>,
/// when age % ages_to_stay_in_cache == 'age_to_flush_bin_offset', then calculate the next 'ages_to_stay_in_cache' 'possible_evictions'
/// this causes us to scan the entire in-mem hash map every 1/'ages_to_stay_in_cache' instead of each age
age_to_flush_bin_mod: Age,

/// how many more ages to skip before this bucket is flushed (as opposed to being skipped).
/// When this reaches 0, this bucket is flushed.
remaining_ages_to_skip_flushing: AtomicU8,

/// an individual bucket will evict its entries and write to disk every 1/NUM_AGES_TO_DISTRIBUTE_FLUSHES ages
/// Higher numbers mean we flush less buckets/s
/// Lower numbers mean we flush more buckets/s
num_ages_to_distribute_flushes: Age,
}

impl<T: IndexValue, U: DiskIndexValue + From<T> + Into<T>> Debug for InMemAccountsIndex<T, U> {
Expand Down Expand Up @@ -146,7 +152,7 @@ struct FlushScanResult<T> {

impl<T: IndexValue, U: DiskIndexValue + From<T> + Into<T>> InMemAccountsIndex<T, U> {
pub fn new(storage: &Arc<BucketMapHolder<T, U>>, bin: usize) -> Self {
let ages_to_stay_in_cache = storage.ages_to_stay_in_cache;
let num_ages_to_distribute_flushes = Age::MAX - storage.ages_to_stay_in_cache;
Self {
map_internal: RwLock::default(),
storage: Arc::clone(storage),
Expand All @@ -163,23 +169,13 @@ impl<T: IndexValue, U: DiskIndexValue + From<T> + Into<T>> InMemAccountsIndex<T,
// initialize this to max, to make it clear we have not flushed at age 0, the starting age
last_age_flushed: AtomicU8::new(Age::MAX),
startup_info: Mutex::default(),
possible_evictions: RwLock::new(PossibleEvictions::new(ages_to_stay_in_cache)),
possible_evictions: RwLock::new(PossibleEvictions::new(1)),
brooksprumo marked this conversation as resolved.
Show resolved Hide resolved
// Spread out the scanning across all ages within the window.
// This causes us to scan 1/N of the bins each 'Age'
age_to_flush_bin_mod: thread_rng().gen_range(0, ages_to_stay_in_cache),
}
}

/// # ages to scan ahead
fn ages_to_scan_ahead(&self, current_age: Age) -> Age {
let ages_to_stay_in_cache = self.storage.ages_to_stay_in_cache;
if (self.age_to_flush_bin_mod == current_age % ages_to_stay_in_cache)
&& !self.storage.get_startup()
{
// scan ahead multiple ages
ages_to_stay_in_cache
} else {
1 // just current age
remaining_ages_to_skip_flushing: AtomicU8::new(
thread_rng().gen_range(0, num_ages_to_distribute_flushes),
),
num_ages_to_distribute_flushes,
}
}

Expand Down Expand Up @@ -937,8 +933,9 @@ impl<T: IndexValue, U: DiskIndexValue + From<T> + Into<T>> InMemAccountsIndex<T,
current_age: Age,
entry: &AccountMapEntry<T>,
startup: bool,
ages_flushing_now: Age,
) -> bool {
startup || (current_age == entry.age())
startup || current_age.wrapping_sub(entry.age()) <= ages_flushing_now
brooksprumo marked this conversation as resolved.
Show resolved Hide resolved
}

/// return true if 'entry' should be evicted from the in-mem index
Expand All @@ -949,10 +946,11 @@ impl<T: IndexValue, U: DiskIndexValue + From<T> + Into<T>> InMemAccountsIndex<T,
startup: bool,
update_stats: bool,
exceeds_budget: bool,
ages_flushing_now: Age,
) -> (bool, Option<std::sync::RwLockReadGuard<'a, SlotList<T>>>) {
// this could be tunable dynamically based on memory pressure
// we could look at more ages or we could throw out more items we are choosing to keep in the cache
if Self::should_evict_based_on_age(current_age, entry, startup) {
if Self::should_evict_based_on_age(current_age, entry, startup, ages_flushing_now) {
if exceeds_budget {
// if we are already holding too many items in-mem, then we need to be more aggressive at kicking things out
(true, None)
Expand Down Expand Up @@ -981,45 +979,53 @@ impl<T: IndexValue, U: DiskIndexValue + From<T> + Into<T>> InMemAccountsIndex<T,
}
}

/// fill in `possible_evictions` from `iter` by checking age
fn gather_possible_evictions<'a>(
iter: impl Iterator<Item = (&'a Pubkey, &'a Arc<AccountMapEntryInner<T>>)>,
possible_evictions: &mut PossibleEvictions<T>,
startup: bool,
current_age: Age,
ages_flushing_now: Age,
can_randomly_flush: bool,
) {
for (k, v) in iter {
let mut random = false;
if !startup && current_age.wrapping_sub(v.age()) > ages_flushing_now {
if !can_randomly_flush || !Self::random_chance_of_eviction() {
// not planning to evict this item from memory within 'ages_flushing_now' ages
continue;
}
random = true;
}

possible_evictions.insert(0, *k, Arc::clone(v), random);
}
}

/// scan loop
/// holds read lock
/// identifies items which are dirty and items to evict
/// identifies items which are potential candidates to evict
fn flush_scan(
&self,
current_age: Age,
startup: bool,
_flush_guard: &FlushGuard,
ages_flushing_now: Age,
) -> FlushScanResult<T> {
let mut possible_evictions = self.possible_evictions.write().unwrap();
if let Some(result) = possible_evictions.get_possible_evictions() {
// we have previously calculated the possible evictions for this age
return result;
}
brooksprumo marked this conversation as resolved.
Show resolved Hide resolved
// otherwise, we need to scan some number of ages into the future now
let ages_to_scan = self.ages_to_scan_ahead(current_age);
possible_evictions.reset(ages_to_scan);

possible_evictions.reset(1);
let m;
{
let map = self.map_internal.read().unwrap();
m = Measure::start("flush_scan"); // we don't care about lock time in this metric - bg threads can wait
for (k, v) in map.iter() {
let random = Self::random_chance_of_eviction();
let age_offset = if random {
thread_rng().gen_range(0, ages_to_scan)
} else if startup {
0
} else {
let ages_in_future = v.age().wrapping_sub(current_age);
if ages_in_future >= ages_to_scan {
// not planning to evict this item from memory within the next few ages
continue;
}
ages_in_future
};

possible_evictions.insert(age_offset, *k, Arc::clone(v), random);
}
Self::gather_possible_evictions(
map.iter(),
&mut possible_evictions,
startup,
current_age,
ages_flushing_now,
true,
);
}
Self::update_time_stat(&self.stats().flush_scan_us, m);

Expand Down Expand Up @@ -1109,16 +1115,36 @@ impl<T: IndexValue, U: DiskIndexValue + From<T> + Into<T>> InMemAccountsIndex<T,
return;
}

if startup {
self.write_startup_info_to_disk();
}
brooksprumo marked this conversation as resolved.
Show resolved Hide resolved

let ages_flushing_now = if iterate_for_age && !startup {
let old_value = self
.remaining_ages_to_skip_flushing
.fetch_sub(1, Ordering::AcqRel);
if old_value == 0 {
self.remaining_ages_to_skip_flushing
.store(self.num_ages_to_distribute_flushes, Ordering::Release);
} else {
// skipping iteration of the buckets at the current age, but mark the bucket as having aged
assert_eq!(current_age, self.storage.current_age());
self.set_has_aged(current_age, can_advance_age);
return;
}
self.num_ages_to_distribute_flushes
} else {
// just 1 age to flush. 0 means age == age
0
};

Self::update_stat(&self.stats().buckets_scanned, 1);

// scan in-mem map for items that we may evict
let FlushScanResult {
mut evictions_age_possible,
mut evictions_random,
} = self.flush_scan(current_age, startup, flush_guard);

if startup {
self.write_startup_info_to_disk();
}
} = self.flush_scan(current_age, startup, flush_guard, ages_flushing_now);

// write to disk outside in-mem map read lock
{
Expand Down Expand Up @@ -1146,6 +1172,7 @@ impl<T: IndexValue, U: DiskIndexValue + From<T> + Into<T>> InMemAccountsIndex<T,
startup,
true,
exceeds_budget,
ages_flushing_now,
);
slot_list = slot_list_temp;
mse.stop();
Expand Down Expand Up @@ -1222,8 +1249,20 @@ impl<T: IndexValue, U: DiskIndexValue + From<T> + Into<T>> InMemAccountsIndex<T,
.collect::<Vec<_>>();

let m = Measure::start("flush_evict");
self.evict_from_cache(evictions_age, current_age, startup, false);
self.evict_from_cache(evictions_random, current_age, startup, true);
self.evict_from_cache(
evictions_age,
current_age,
startup,
false,
ages_flushing_now,
);
self.evict_from_cache(
evictions_random,
current_age,
startup,
true,
ages_flushing_now,
);
Self::update_time_stat(&self.stats().flush_evict_us, m);
}

Expand Down Expand Up @@ -1267,6 +1306,7 @@ impl<T: IndexValue, U: DiskIndexValue + From<T> + Into<T>> InMemAccountsIndex<T,
current_age: Age,
startup: bool,
randomly_evicted: bool,
ages_flushing_now: Age,
) {
if evictions.is_empty() {
return;
Expand Down Expand Up @@ -1317,7 +1357,12 @@ impl<T: IndexValue, U: DiskIndexValue + From<T> + Into<T>> InMemAccountsIndex<T,

if v.dirty()
|| (!randomly_evicted
&& !Self::should_evict_based_on_age(current_age, v, startup))
&& !Self::should_evict_based_on_age(
current_age,
v,
startup,
ages_flushing_now,
))
{
// marked dirty or bumped in age after we looked above
// these evictions will be handled in later passes (at later ages)
Expand Down Expand Up @@ -1513,13 +1558,71 @@ mod tests {
startup,
false,
false,
1,
)
.0,
ref_count == 1
);
}
}

#[test]
fn test_gather_possible_evictions() {
solana_logger::setup();
let startup = false;
let ref_count = 1;
let pks = (0..=255)
.map(|i| Pubkey::from([i as u8; 32]))
.collect::<Vec<_>>();
let accounts = (0..=255)
.map(|age| {
let one_element_slot_list = vec![(0, 0)];
let one_element_slot_list_entry = Arc::new(AccountMapEntryInner::new(
one_element_slot_list,
ref_count,
AccountMapEntryMeta::default(),
));
one_element_slot_list_entry.set_age(age);
one_element_slot_list_entry
})
.collect::<Vec<_>>();
let both = pks.iter().zip(accounts.iter()).collect::<Vec<_>>();

for current_age in 0..=255 {
for ages_flushing_now in 0..=255 {
let mut possible_evictions = PossibleEvictions::new(1);
possible_evictions.reset(1);
InMemAccountsIndex::<u64, u64>::gather_possible_evictions(
both.iter().cloned(),
&mut possible_evictions,
startup,
current_age,
ages_flushing_now,
false, // true=can_randomly_flush
);
let evictions = possible_evictions.possible_evictions.pop().unwrap();
assert_eq!(
evictions.evictions_age_possible.len(),
1 + ages_flushing_now as usize
);
evictions.evictions_age_possible.iter().for_each(|(_k, v)| {
assert!(
InMemAccountsIndex::<u64, u64>::should_evict_based_on_age(
current_age,
v,
startup,
ages_flushing_now,
),
"current_age: {}, age: {}, ages_flushing_now: {}",
current_age,
v.age(),
ages_flushing_now
);
});
}
}
}

#[test]
fn test_should_evict_from_mem() {
solana_logger::setup();
Expand Down Expand Up @@ -1547,6 +1650,7 @@ mod tests {
startup,
false,
true,
0,
)
.0
);
Expand All @@ -1563,6 +1667,7 @@ mod tests {
startup,
false,
false,
0,
)
.0
);
Expand All @@ -1575,6 +1680,7 @@ mod tests {
startup,
false,
false,
0,
)
.0
);
Expand All @@ -1591,6 +1697,7 @@ mod tests {
startup,
false,
false,
0,
)
.0
);
Expand All @@ -1610,6 +1717,7 @@ mod tests {
startup,
false,
false,
0,
)
.0
);
Expand All @@ -1624,6 +1732,7 @@ mod tests {
startup,
false,
false,
0,
)
.0
);
Expand All @@ -1638,6 +1747,7 @@ mod tests {
startup,
false,
false,
0,
)
.0
);
Expand All @@ -1652,6 +1762,7 @@ mod tests {
startup,
false,
false,
0,
)
.0
);
Expand Down