Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

disk index: set_anticipated_count to optimally grow disk buckets at startup #31033

Merged
merged 2 commits into from
Apr 4, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
22 changes: 17 additions & 5 deletions bucket_map/src/bucket.rs
Original file line number Diff line number Diff line change
Expand Up @@ -96,6 +96,10 @@ pub struct Bucket<T: Copy + 'static> {
pub data: Vec<BucketStorage<DataBucket>>,
stats: Arc<BucketMapStats>,

/// # entries caller expects the map to need to contain.
/// Used as a hint for the next time we need to grow.
anticipated_size: u64,

pub reallocated: Reallocated<IndexBucket<T>, DataBucket>,
}

Expand Down Expand Up @@ -123,6 +127,7 @@ impl<'b, T: Clone + Copy + 'static> Bucket<T> {
data: vec![],
stats,
reallocated: Reallocated::default(),
anticipated_size: 0,
}
}

Expand Down Expand Up @@ -420,21 +425,28 @@ impl<'b, T: Clone + Copy + 'static> Bucket<T> {
}
}

pub(crate) fn set_anticipated_count(&mut self, count: u64) {
self.anticipated_size = count;
}

pub fn grow_index(&self, current_capacity_pow2: u8) {
if self.index.capacity_pow2 == current_capacity_pow2 {
let mut starting_size_pow2 = self.index.capacity_pow2;
if self.anticipated_size > 0 {
// start the growth at the next pow2 larger than what would be required to hold `anticipated_size`.
// This will prevent unnecessary repeated grows at startup.
starting_size_pow2 = starting_size_pow2.max(self.anticipated_size.ilog2() as u8);
}
let mut m = Measure::start("grow_index");
//debug!("GROW_INDEX: {}", current_capacity_pow2);
let increment = 1;
for i in increment.. {
//increasing the capacity by ^4 reduces the
//likelihood of a re-index collision of 2^(max_search)^2
//1 in 2^32
let mut index = BucketStorage::new_with_capacity(
Arc::clone(&self.drives),
1,
std::mem::size_of::<IndexEntry<T>>() as u64,
// *2 causes rapid growth of index buckets
self.index.capacity_pow2 + i, // * 2,
// the subtle `+ i` here causes us to grow from the starting size by a power of 2 on each iteration of the for loop
starting_size_pow2 + i,
self.index.max_search,
Arc::clone(&self.stats.index),
Arc::clone(&self.index.count),
Expand Down
7 changes: 7 additions & 0 deletions bucket_map/src/bucket_api.rs
Original file line number Diff line number Diff line change
Expand Up @@ -111,6 +111,13 @@ impl<T: Clone + Copy> BucketApi<T> {
}
}

/// caller can specify that the index needs to hold approximately `count` entries soon.
/// This gives a hint to the resizing algorithm and prevents repeated incremental resizes.
pub fn set_anticipated_count(&self, count: u64) {
let mut bucket = self.get_write_bucket();
bucket.as_mut().unwrap().set_anticipated_count(count);
}

pub fn update<F>(&self, key: &Pubkey, updatefn: F)
where
F: FnMut(Option<(&[T], RefCount)>) -> Option<(Vec<T>, RefCount)>,
Expand Down
5 changes: 5 additions & 0 deletions runtime/src/in_mem_accounts_index.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1061,6 +1061,9 @@ impl<T: IndexValue, U: DiskIndexValue + From<T> + Into<T>> InMemAccountsIndex<T,
// merge all items into the disk index now
let disk = self.bucket.as_ref().unwrap();
let mut count = 0;
let current_len = disk.bucket_len();
let anticipated = insert.len();
disk.set_anticipated_count((anticipated as u64).saturating_add(current_len));
insert.into_iter().for_each(|(slot, k, v)| {
let entry = (slot, v);
let new_ref_count = u64::from(!v.is_cached());
Expand All @@ -1085,6 +1088,8 @@ impl<T: IndexValue, U: DiskIndexValue + From<T> + Into<T>> InMemAccountsIndex<T,
}
});
});
// remove the guidance for how many entries the bucket will eventually contain since we have added all we knew about
disk.set_anticipated_count(0);
self.stats().inc_insert_count(count);
}

Expand Down