Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

flush entire bucket of writes at once #30715

Closed
wants to merge 13 commits into from
5 changes: 4 additions & 1 deletion runtime/src/bucket_map_holder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ pub struct BucketMapHolder<T: IndexValue, U: DiskIndexValue + From<T> + Into<T>>
// used by bg processing to know when any bucket has become dirty
pub wait_dirty_or_aged: Arc<WaitableCondvar>,
next_bucket_to_flush: AtomicUsize,
bins: usize,
pub(crate) bins: usize,

pub threads: usize,

Expand All @@ -68,6 +68,8 @@ pub struct BucketMapHolder<T: IndexValue, U: DiskIndexValue + From<T> + Into<T>>
/// Note startup is an optimization and is not required for correctness.
startup: AtomicBool,
_phantom: PhantomData<T>,

pub(crate) write_active: AtomicBool,
}

impl<T: IndexValue, U: DiskIndexValue + From<T> + Into<T>> Debug for BucketMapHolder<T, U> {
Expand Down Expand Up @@ -258,6 +260,7 @@ impl<T: IndexValue, U: DiskIndexValue + From<T> + Into<T>> BucketMapHolder<T, U>
startup: AtomicBool::default(),
mem_budget_mb,
threads,
write_active: AtomicBool::default(),
_phantom: PhantomData,
}
}
Expand Down
27 changes: 27 additions & 0 deletions runtime/src/bucket_map_holder_stats.rs
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,8 @@ pub struct BucketMapHolderStats {
pub per_bucket_count: Vec<AtomicUsize>,
pub flush_entries_updated_on_disk: AtomicU64,
pub flush_entries_evicted_from_mem: AtomicU64,
pub flush_entries_remaining_age: AtomicU64,
pub flush_entries_scanned: AtomicU64,
pub active_threads: AtomicU64,
pub get_range_us: AtomicU64,
last_age: AtomicU8,
Expand All @@ -62,6 +64,8 @@ pub struct BucketMapHolderStats {
bins: u64,
pub estimate_mem: AtomicU64,
pub flush_should_evict_us: AtomicU64,
pub buckets_written_to_disk: AtomicU64,
pub buckets_skipped_writing_to_disk: AtomicU64,
}

impl BucketMapHolderStats {
Expand Down Expand Up @@ -242,6 +246,17 @@ impl BucketMapHolderStats {
self.flush_should_evict_us.swap(0, Ordering::Relaxed),
i64
),
(
"buckets_written_to_disk",
self.buckets_written_to_disk.swap(0, Ordering::Relaxed),
i64
),
(
"buckets_skipped_writing_to_disk",
self.buckets_skipped_writing_to_disk
.swap(0, Ordering::Relaxed),
i64
),
(
"count_in_mem",
self.count_in_mem.load(Ordering::Relaxed),
Expand Down Expand Up @@ -486,6 +501,18 @@ impl BucketMapHolderStats {
.swap(0, Ordering::Relaxed),
i64
),
(
"flush_entries_remaining_age",
self.flush_entries_remaining_age
.swap(0, Ordering::Relaxed),
i64
),
(
"flush_entries_scanned",
self.flush_entries_scanned
.swap(0, Ordering::Relaxed),
i64
),
);
} else {
datapoint_info!(
Expand Down
157 changes: 134 additions & 23 deletions runtime/src/in_mem_accounts_index.rs
Original file line number Diff line number Diff line change
Expand Up @@ -110,6 +110,8 @@ pub struct InMemAccountsIndex<T: IndexValue, U: DiskIndexValue + From<T> + Into<
/// when age % ages_to_stay_in_cache == 'age_to_flush_bin_offset', then calculate the next 'ages_to_stay_in_cache' 'possible_evictions'
/// this causes us to scan the entire in-mem hash map every 1/'ages_to_stay_in_cache' instead of each age
age_to_flush_bin_mod: Age,

next_age_to_consider_writing: AtomicU8,
}

impl<T: IndexValue, U: DiskIndexValue + From<T> + Into<T>> Debug for InMemAccountsIndex<T, U> {
Expand Down Expand Up @@ -164,6 +166,7 @@ impl<T: IndexValue, U: DiskIndexValue + From<T> + Into<T>> InMemAccountsIndex<T,
// Spread out the scanning across all ages within the window.
// This causes us to scan 1/N of the bins each 'Age'
age_to_flush_bin_mod: thread_rng().gen_range(0, ages_to_stay_in_cache),
next_age_to_consider_writing: AtomicU8::default(),
}
}

Expand Down Expand Up @@ -946,14 +949,15 @@ impl<T: IndexValue, U: DiskIndexValue + From<T> + Into<T>> InMemAccountsIndex<T,
startup: bool,
update_stats: bool,
exceeds_budget: bool,
ignore_age: bool,
) -> (bool, Option<std::sync::RwLockReadGuard<'a, SlotList<T>>>) {
// this could be tunable dynamically based on memory pressure
// we could look at more ages or we could throw out more items we are choosing to keep in the cache
if Self::should_evict_based_on_age(current_age, entry, startup) {
if ignore_age || Self::should_evict_based_on_age(current_age, entry, startup) {
if exceeds_budget {
// if we are already holding too many items in-mem, then we need to be more aggressive at kicking things out
(true, None)
} else if entry.ref_count() != 1 {
} else if false && entry.ref_count() != 1 {
Self::update_stat(&self.stats().held_in_mem.ref_count, 1);
(false, None)
} else {
Expand Down Expand Up @@ -981,24 +985,37 @@ impl<T: IndexValue, U: DiskIndexValue + From<T> + Into<T>> InMemAccountsIndex<T,
/// scan loop
/// holds read lock
/// identifies items which are dirty and items to evict
/// can_write: true if this bucket should write everything dirty now
fn flush_scan(
&self,
current_age: Age,
startup: bool,
_flush_guard: &FlushGuard,
can_write: bool,
) -> FlushScanResult<T> {
let can_write = can_write | startup;
let mut possible_evictions = self.possible_evictions.write().unwrap();
if let Some(result) = possible_evictions.get_possible_evictions() {
// we have previously calculated the possible evictions for this age
return result;
if !can_write {
if let Some(result) = possible_evictions.get_possible_evictions() {
// we have previously calculated the possible evictions for this age
return result;
}
}
// otherwise, we need to scan some number of ages into the future now
let ages_to_scan = self.ages_to_scan_ahead(current_age);
let ages_to_scan = if can_write {
// if we can write this bucket now, then scan just a single age into the future
1
} else {
self.ages_to_scan_ahead(current_age)
};
possible_evictions.reset(ages_to_scan);

let mut flush_entries_remaining_age = 0;

let m;
{
let map = self.map_internal.read().unwrap();
Self::update_stat(&self.stats().flush_entries_scanned, map.len() as u64);
m = Measure::start("flush_scan"); // we don't care about lock time in this metric - bg threads can wait
for (k, v) in map.iter() {
let random = Self::random_chance_of_eviction();
Expand All @@ -1008,16 +1025,38 @@ impl<T: IndexValue, U: DiskIndexValue + From<T> + Into<T>> InMemAccountsIndex<T,
0
} else {
let ages_in_future = v.age().wrapping_sub(current_age);
if ages_in_future >= ages_to_scan {
// not planning to evict this item from memory within the next few ages
continue;
if false {
//can_write && v.dirty() {
// flush all dirty entries now, regardless of age
0
} else {
if !can_write && ages_in_future >= ages_to_scan {
// not planning to evict this item from memory within the next few ages
continue;
} else if can_write {
// ages_in_future >= Age::MAX.saturating_sub(self.storage.ages_to_stay_in_cache) {
if ages_in_future > 0
&& ages_in_future < self.storage.ages_to_stay_in_cache
{
flush_entries_remaining_age += 1;
continue;
}
// clear all read entries that are not within 5 of current
0
} else {
ages_in_future
}
}
ages_in_future
};

possible_evictions.insert(age_offset, *k, Arc::clone(v), random);
}
}
Self::update_stat(
&self.stats().flush_entries_remaining_age,
flush_entries_remaining_age,
);

Self::update_time_stat(&self.stats().flush_scan_us, m);

possible_evictions.get_possible_evictions().unwrap()
Expand Down Expand Up @@ -1086,6 +1125,47 @@ impl<T: IndexValue, U: DiskIndexValue + From<T> + Into<T>> InMemAccountsIndex<T,
std::mem::take(&mut write.duplicates)
}

/// only write to disk when we have exceeded a threshold, then write entire buckets
/// until we are below the threshold
fn get_write_bucket_to_disk(&self, current_age: Age) -> bool {
let next_age_to_consider_writing =
self.next_age_to_consider_writing.load(Ordering::Relaxed);
if next_age_to_consider_writing.wrapping_sub(current_age)
< self.storage.ages_to_stay_in_cache
{
// it has been too recently that we tried to flush this bucket
return false;
}

let result = self
.storage
.mem_budget_mb
.map(|budget_mb| {
// at half the overall bytes allowed
let threshold_flush_bytes = budget_mb * 1024 * 1024 / 10;
// assume each entry is this many bytes
let average_size = 400;
let threshold_flush_entries = threshold_flush_bytes / average_size;
let total_items = self.stats().count_in_mem.load(Ordering::Relaxed);
total_items > threshold_flush_entries && {
// only try to flush bins that contain an above average # of entries
// make divisor slightly larger so even if perfectly spaced out, average bins will trigger to flush
let average_per_bin = total_items / (self.storage.bins + 1);
average_per_bin < self.map_internal.read().unwrap().len()
}
})
.unwrap_or(true);

if result {
// don't revisit this bucket again until a few slot times
self.next_age_to_consider_writing.store(
current_age.wrapping_add(self.storage.ages_to_stay_in_cache),
Ordering::Relaxed,
);
}
result
}

/// synchronize the in-mem index with the disk index
fn flush_internal(&self, flush_guard: &FlushGuard, can_advance_age: bool) {
let current_age = self.storage.current_age();
Expand All @@ -1097,15 +1177,36 @@ impl<T: IndexValue, U: DiskIndexValue + From<T> + Into<T>> InMemAccountsIndex<T,
return;
}

if startup {
self.write_startup_info_to_disk();
}

let mut write_to_disk = self.get_write_bucket_to_disk(current_age);
if write_to_disk {
write_to_disk = !self.storage.write_active.swap(true, Ordering::Relaxed);
}
Self::update_stat(
if write_to_disk {
&self.stats().buckets_written_to_disk
} else {
&self.stats().buckets_skipped_writing_to_disk
},
1,
);
if !write_to_disk {
if iterate_for_age {
// completed iteration of the buckets at the current age
assert_eq!(current_age, self.storage.current_age());
self.set_has_aged(current_age, can_advance_age);
}
return;
}

// scan in-mem map for items that we may evict
let FlushScanResult {
mut evictions_age_possible,
mut evictions_random,
} = self.flush_scan(current_age, startup, flush_guard);

if startup {
self.write_startup_info_to_disk();
}
} = self.flush_scan(current_age, startup, flush_guard, write_to_disk);

// write to disk outside in-mem map read lock
{
Expand All @@ -1125,6 +1226,7 @@ impl<T: IndexValue, U: DiskIndexValue + From<T> + Into<T>> InMemAccountsIndex<T,
] {
for (k, v) in check_for_eviction_and_dirty.drain(..) {
let mut slot_list = None;

if !is_random {
let mut mse = Measure::start("flush_should_evict");
let (evict_for_age, slot_list_temp) = self.should_evict_from_mem(
Expand All @@ -1133,21 +1235,24 @@ impl<T: IndexValue, U: DiskIndexValue + From<T> + Into<T>> InMemAccountsIndex<T,
startup,
true,
exceeds_budget,
write_to_disk,
);
slot_list = slot_list_temp;
mse.stop();
flush_should_evict_us += mse.as_us();
if evict_for_age {
evictions_age.push(k);
} else {
if !evict_for_age {
// not evicting, so don't write, even if dirty
continue;
}
} else if v.ref_count() != 1 {
continue;
//continue;
}
// if we are evicting it, then we need to update disk if we're dirty
if v.clear_dirty() {

// try to evict this entry. If it is dirty and we don't write it (or if it becomes dirty in the interim), then we will fail to evict it this time.
evictions_age.push(k);

// we cannot evict dirty items, so first try to update disk
if write_to_disk && v.clear_dirty() {
// step 1: clear the dirty flag
// step 2: perform the update on disk based on the fields in the entry
// If a parallel operation dirties the item again - even while this flush is occurring,
Expand Down Expand Up @@ -1214,6 +1319,11 @@ impl<T: IndexValue, U: DiskIndexValue + From<T> + Into<T>> InMemAccountsIndex<T,
self.set_has_aged(current_age, can_advance_age);
}
}

if write_to_disk {
// no longer active
self.storage.write_active.store(false, Ordering::Relaxed);
}
}

/// calculate the estimated size of the in-mem index
Expand Down Expand Up @@ -1297,8 +1407,9 @@ impl<T: IndexValue, U: DiskIndexValue + From<T> + Into<T>> InMemAccountsIndex<T,
}

if v.dirty()
|| (!randomly_evicted
&& !Self::should_evict_based_on_age(current_age, v, startup))
/*
|| (!randomly_evicted
&& !Self::should_evict_based_on_age(current_age, v, startup))*/
{
// marked dirty or bumped in age after we looked above
// these evictions will be handled in later passes (at later ages)
Expand Down