Skip to content
This repository has been archived by the owner on Jan 13, 2025. It is now read-only.

Commit

Permalink
flush entire bucket of writes at once
Browse files Browse the repository at this point in the history
  • Loading branch information
jeffwashington committed Mar 14, 2023
1 parent 89d5efa commit 0b0f95c
Show file tree
Hide file tree
Showing 2 changed files with 71 additions and 14 deletions.
13 changes: 13 additions & 0 deletions runtime/src/bucket_map_holder_stats.rs
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,8 @@ pub struct BucketMapHolderStats {
bins: u64,
pub estimate_mem: AtomicU64,
pub flush_should_evict_us: AtomicU64,
pub buckets_written_to_disk: AtomicU64,
pub buckets_skipped_writing_to_disk: AtomicU64,
}

impl BucketMapHolderStats {
Expand Down Expand Up @@ -242,6 +244,17 @@ impl BucketMapHolderStats {
self.flush_should_evict_us.swap(0, Ordering::Relaxed),
i64
),
(
"buckets_written_to_disk",
self.buckets_written_to_disk.swap(0, Ordering::Relaxed),
i64
),
(
"buckets_skipped_writing_to_disk",
self.buckets_skipped_writing_to_disk
.swap(0, Ordering::Relaxed),
i64
),
(
"count_in_mem",
self.count_in_mem.load(Ordering::Relaxed),
Expand Down
72 changes: 58 additions & 14 deletions runtime/src/in_mem_accounts_index.rs
Original file line number Diff line number Diff line change
Expand Up @@ -981,19 +981,29 @@ impl<T: IndexValue, U: DiskIndexValue + From<T> + Into<T>> InMemAccountsIndex<T,
/// scan loop
/// holds read lock
/// identifies items which are dirty and items to evict
/// can_write: true if this bucket should write everything dirty now
fn flush_scan(
&self,
current_age: Age,
startup: bool,
_flush_guard: &FlushGuard,
can_write: bool,
) -> FlushScanResult<T> {
let can_write = can_write | startup;
let mut possible_evictions = self.possible_evictions.write().unwrap();
if let Some(result) = possible_evictions.get_possible_evictions() {
// we have previously calculated the possible evictions for this age
return result;
if !can_write {
if let Some(result) = possible_evictions.get_possible_evictions() {
// we have previously calculated the possible evictions for this age
return result;
}
}
// otherwise, we need to scan some number of ages into the future now
let ages_to_scan = self.ages_to_scan_ahead(current_age);
let ages_to_scan = if can_write {
// if we can write this bucket now, then scan just a single age into the future
1
} else {
self.ages_to_scan_ahead(current_age)
};
possible_evictions.reset(ages_to_scan);

let m;
Expand All @@ -1008,11 +1018,16 @@ impl<T: IndexValue, U: DiskIndexValue + From<T> + Into<T>> InMemAccountsIndex<T,
0
} else {
let ages_in_future = v.age().wrapping_sub(current_age);
if ages_in_future >= ages_to_scan {
// not planning to evict this item from memory within the next few ages
continue;
if can_write && v.dirty() {
// flush all dirty entries now, regardless of age
0
} else {
if ages_in_future >= ages_to_scan {
// not planning to evict this item from memory within the next few ages
continue;
}
ages_in_future
}
ages_in_future
};

possible_evictions.insert(age_offset, *k, Arc::clone(v), random);
Expand Down Expand Up @@ -1086,6 +1101,22 @@ impl<T: IndexValue, U: DiskIndexValue + From<T> + Into<T>> InMemAccountsIndex<T,
std::mem::take(&mut write.duplicates)
}

/// only write to disk when we have exceeded a threshold, then write entire buckets
/// until we are below the threshold
fn get_write_bucket_to_disk(&self) -> bool {
self.storage
.mem_budget_mb
.map(|budget_mb| {
// at half the overall bytes allowed
let threshold_flush_bytes = budget_mb * 1024 * 1024 / 2;
// assume each entry is this many bytes
let average_size = 400;
let threshold_flush_entries = threshold_flush_bytes / average_size;
self.stats().count_in_mem.load(Ordering::Relaxed) > threshold_flush_entries
})
.unwrap_or(true)
}

/// synchronize the in-mem index with the disk index
fn flush_internal(&self, flush_guard: &FlushGuard, can_advance_age: bool) {
let current_age = self.storage.current_age();
Expand All @@ -1097,11 +1128,21 @@ impl<T: IndexValue, U: DiskIndexValue + From<T> + Into<T>> InMemAccountsIndex<T,
return;
}

let write_to_disk = self.get_write_bucket_to_disk();
Self::update_stat(
if write_to_disk {
&self.stats().buckets_written_to_disk
} else {
&self.stats().buckets_skipped_writing_to_disk
},
1,
);

// scan in-mem map for items that we may evict
let FlushScanResult {
mut evictions_age_possible,
mut evictions_random,
} = self.flush_scan(current_age, startup, flush_guard);
} = self.flush_scan(current_age, startup, flush_guard, write_to_disk);

if startup {
self.write_startup_info_to_disk();
Expand All @@ -1125,6 +1166,7 @@ impl<T: IndexValue, U: DiskIndexValue + From<T> + Into<T>> InMemAccountsIndex<T,
] {
for (k, v) in check_for_eviction_and_dirty.drain(..) {
let mut slot_list = None;

if !is_random {
let mut mse = Measure::start("flush_should_evict");
let (evict_for_age, slot_list_temp) = self.should_evict_from_mem(
Expand All @@ -1137,17 +1179,19 @@ impl<T: IndexValue, U: DiskIndexValue + From<T> + Into<T>> InMemAccountsIndex<T,
slot_list = slot_list_temp;
mse.stop();
flush_should_evict_us += mse.as_us();
if evict_for_age {
evictions_age.push(k);
} else {
if !evict_for_age {
// not evicting, so don't write, even if dirty
continue;
}
} else if v.ref_count() != 1 {
continue;
}
// if we are evicting it, then we need to update disk if we're dirty
if v.clear_dirty() {

// try to evict this entry. If it is dirty and we don't write it (or if it becomes dirty in the interim), then we will fail to evict it this time.
evictions_age.push(k);

// we cannot evict dirty items, so first try to update disk
if write_to_disk && v.clear_dirty() {
// step 1: clear the dirty flag
// step 2: perform the update on disk based on the fields in the entry
// If a parallel operation dirties the item again - even while this flush is occurring,
Expand Down

0 comments on commit 0b0f95c

Please sign in to comment.