Skip to content

Commit

Permalink
disk index: set_anticipated_count to optimally grow disk buckets at s…
Browse files Browse the repository at this point in the history
…tartup (#31033)

* disk index: set_anticipated_count to optimally grow disk buckets at startup

* remove atomic
  • Loading branch information
jeffwashington authored Apr 4, 2023
1 parent 3442f18 commit bc343a4
Show file tree
Hide file tree
Showing 3 changed files with 29 additions and 5 deletions.
22 changes: 17 additions & 5 deletions bucket_map/src/bucket.rs
Original file line number Diff line number Diff line change
Expand Up @@ -96,6 +96,10 @@ pub struct Bucket<T: Copy + 'static> {
pub data: Vec<BucketStorage<DataBucket>>,
stats: Arc<BucketMapStats>,

/// # entries caller expects the map to need to contain.
/// Used as a hint for the next time we need to grow.
anticipated_size: u64,

pub reallocated: Reallocated<IndexBucket<T>, DataBucket>,
}

Expand Down Expand Up @@ -123,6 +127,7 @@ impl<'b, T: Clone + Copy + 'static> Bucket<T> {
data: vec![],
stats,
reallocated: Reallocated::default(),
anticipated_size: 0,
}
}

Expand Down Expand Up @@ -420,21 +425,28 @@ impl<'b, T: Clone + Copy + 'static> Bucket<T> {
}
}

pub(crate) fn set_anticipated_count(&mut self, count: u64) {
self.anticipated_size = count;
}

pub fn grow_index(&self, current_capacity_pow2: u8) {
if self.index.capacity_pow2 == current_capacity_pow2 {
let mut starting_size_pow2 = self.index.capacity_pow2;
if self.anticipated_size > 0 {
// start the growth at the next pow2 larger than what would be required to hold `anticipated_size`.
// This will prevent unnecessary repeated grows at startup.
starting_size_pow2 = starting_size_pow2.max(self.anticipated_size.ilog2() as u8);
}
let mut m = Measure::start("grow_index");
//debug!("GROW_INDEX: {}", current_capacity_pow2);
let increment = 1;
for i in increment.. {
//increasing the capacity by ^4 reduces the
//likelihood of a re-index collision of 2^(max_search)^2
//1 in 2^32
let mut index = BucketStorage::new_with_capacity(
Arc::clone(&self.drives),
1,
std::mem::size_of::<IndexEntry<T>>() as u64,
// *2 causes rapid growth of index buckets
self.index.capacity_pow2 + i, // * 2,
// the subtle `+ i` here causes us to grow from the starting size by a power of 2 on each iteration of the for loop
starting_size_pow2 + i,
self.index.max_search,
Arc::clone(&self.stats.index),
Arc::clone(&self.index.count),
Expand Down
7 changes: 7 additions & 0 deletions bucket_map/src/bucket_api.rs
Original file line number Diff line number Diff line change
Expand Up @@ -111,6 +111,13 @@ impl<T: Clone + Copy> BucketApi<T> {
}
}

/// caller can specify that the index needs to hold approximately `count` entries soon.
/// This gives a hint to the resizing algorithm and prevents repeated incremental resizes.
pub fn set_anticipated_count(&self, count: u64) {
let mut bucket = self.get_write_bucket();
bucket.as_mut().unwrap().set_anticipated_count(count);
}

pub fn update<F>(&self, key: &Pubkey, updatefn: F)
where
F: FnMut(Option<(&[T], RefCount)>) -> Option<(Vec<T>, RefCount)>,
Expand Down
5 changes: 5 additions & 0 deletions runtime/src/in_mem_accounts_index.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1061,6 +1061,9 @@ impl<T: IndexValue, U: DiskIndexValue + From<T> + Into<T>> InMemAccountsIndex<T,
// merge all items into the disk index now
let disk = self.bucket.as_ref().unwrap();
let mut count = 0;
let current_len = disk.bucket_len();
let anticipated = insert.len();
disk.set_anticipated_count((anticipated as u64).saturating_add(current_len));
insert.into_iter().for_each(|(slot, k, v)| {
let entry = (slot, v);
let new_ref_count = u64::from(!v.is_cached());
Expand All @@ -1085,6 +1088,8 @@ impl<T: IndexValue, U: DiskIndexValue + From<T> + Into<T>> InMemAccountsIndex<T,
}
});
});
// remove the guidance for how many entries the bucket will eventually contain since we have added all we knew about
disk.set_anticipated_count(0);
self.stats().inc_insert_count(count);
}

Expand Down

0 comments on commit bc343a4

Please sign in to comment.