Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

disk index: pre-flight disk index grow #31052

Closed
wants to merge 1 commit into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
71 changes: 64 additions & 7 deletions bucket_map/src/bucket.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ use {
},
MaxSearch, RefCount,
},
bv::BitVec,
rand::{thread_rng, Rng},
solana_measure::measure::Measure,
solana_sdk::pubkey::Pubkey,
Expand Down Expand Up @@ -183,7 +184,7 @@ impl<'b, T: Clone + Copy + 'static> Bucket<T> {
key: &Pubkey,
random: u64,
) -> Result<(Option<IndexEntryPlaceInBucket<T>>, u64), BucketMapError> {
let ix = Self::bucket_index_ix(index, key, random);
let ix = Self::bucket_index_ix(index.capacity(), key, random);
let mut first_free = None;
let mut m = Measure::start("bucket_find_index_entry_mut");
let capacity = index.capacity();
Expand Down Expand Up @@ -222,7 +223,7 @@ impl<'b, T: Clone + Copy + 'static> Bucket<T> {
key: &Pubkey,
random: u64,
) -> Option<(IndexEntryPlaceInBucket<T>, u64)> {
let ix = Self::bucket_index_ix(index, key, random);
let ix = Self::bucket_index_ix(index.capacity(), key, random);
for i in ix..ix + index.max_search() {
let ii = i % index.capacity();
if index.is_free(ii) {
Expand All @@ -243,7 +244,7 @@ impl<'b, T: Clone + Copy + 'static> Bucket<T> {
is_resizing: bool,
) -> Result<u64, BucketMapError> {
let mut m = Measure::start("bucket_create_key");
let ix = Self::bucket_index_ix(index, key, random);
let ix = Self::bucket_index_ix(index.capacity(), key, random);
for i in ix..ix + index.max_search() {
let ii = i % index.capacity();
if !index.is_free(ii) {
Expand Down Expand Up @@ -435,6 +436,7 @@ impl<'b, T: Clone + Copy + 'static> Bucket<T> {

pub fn grow_index(&self, current_capacity_pow2: u8) {
if self.index.contents.capacity_pow2() == current_capacity_pow2 {
let mut resize_verify_us = 0;
let mut starting_size_pow2 = self.index.contents.capacity_pow2();
if self.anticipated_size > 0 {
// start the growth at the next pow2 larger than what would be required to hold `anticipated_size`.
Expand All @@ -446,6 +448,59 @@ impl<'b, T: Clone + Copy + 'static> Bucket<T> {
let mut count = 0;
loop {
count += 1;
// grow relative to the current capacity
// todo: this needs to be page aligned if we do that below so cap is exactly the same
let random = thread_rng().gen();

let mut verify_us = Measure::start("resize_verify_us");
let new_capacity = 1 << (starting_size_pow2 + count);
if new_capacity > 1_000_000 {
use log::*;error!("allocating new fill: {}, count: {}, max search: {}", new_capacity, count, self.index.max_search);
}
let mut occupied = BitVec::<u64>::new_fill(false, new_capacity);
let mut last_ones = Vec::default();
let mut valid = true;
for ix in 0..self.index.capacity() {
if !self.index.is_free(ix) {
let elem: &IndexEntry<T> = self.index.get(ix);
let ix_new = Self::bucket_index_ix(new_capacity, &elem.key, random);
valid = false;
for look in 0..(self.index.max_search as u64) {
let look = (ix_new + look) % new_capacity;
if !occupied.get(look) {
if new_capacity > 1_000_000 {
last_ones.push((&elem.key, ix_new, look));
}

occupied.set(look, true);
valid = true;
break;
}
}
if !valid {
if new_capacity > 1_000_000 {
use log::*;error!("invalid: allocating new fill: {}, count: {}, max search: {}, ix_new: {ix_new}, any false: {:?}, old capacity: {}, last ones: {:?}", new_capacity, count, self.index.max_search,
(0..(self.index.max_search as u64)).map(|i| occupied.get((ix_new + i) % new_capacity)).any(|v| !v),
self.index.capacity(),
last_ones
);
}

break;
}
}
}
verify_us.stop();
resize_verify_us += verify_us.as_us();
if !valid {
continue;
}

let nc = Capacity::Pow2(starting_size_pow2 + count);
if nc.capacity() > 1_000_000 {
use log::*;error!("{}, size: {}", line!(), nc.capacity());
}

let mut index = BucketStorage::new_with_capacity(
Arc::clone(&self.drives),
1,
Expand All @@ -456,8 +511,6 @@ impl<'b, T: Clone + Copy + 'static> Bucket<T> {
Arc::clone(&self.stats.index),
Arc::clone(&self.index.count),
);
let random = thread_rng().gen();
let mut valid = true;
for ix in 0..self.index.capacity() {
if !self.index.is_free(ix) {
let elem: &IndexEntry<T> = self.index.get(ix);
Expand Down Expand Up @@ -499,6 +552,10 @@ impl<'b, T: Clone + Copy + 'static> Bucket<T> {
.index
.resize_us
.fetch_add(m.as_us(), Ordering::Relaxed);
self.stats
.index
.resize_verify_us
.fetch_add(resize_verify_us, Ordering::Relaxed);
}
}

Expand Down Expand Up @@ -564,15 +621,15 @@ impl<'b, T: Clone + Copy + 'static> Bucket<T> {
items.data = Some((data_index, new_bucket));
}

fn bucket_index_ix(index: &BucketStorage<IndexBucket<T>>, key: &Pubkey, random: u64) -> u64 {
fn bucket_index_ix(capacity: u64, key: &Pubkey, random: u64) -> u64 {
let mut s = DefaultHasher::new();
key.hash(&mut s);
//the locally generated random will make it hard for an attacker
//to deterministically cause all the pubkeys to land in the same
//location in any bucket on all validators
random.hash(&mut s);
let ix = s.finish();
ix % index.capacity()
ix % capacity
//debug!( "INDEX_IX: {:?} uid:{} loc: {} cap:{}", key, uid, location, index.capacity() );
}

Expand Down
1 change: 1 addition & 0 deletions bucket_map/src/bucket_stats.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ pub struct BucketStats {
pub failed_resizes: AtomicU64,
pub max_size: AtomicU64,
pub resize_us: AtomicU64,
pub resize_verify_us: AtomicU64,
pub new_file_us: AtomicU64,
pub flush_file_us: AtomicU64,
pub mmap_us: AtomicU64,
Expand Down
1 change: 1 addition & 0 deletions bucket_map/src/bucket_storage.rs
Original file line number Diff line number Diff line change
Expand Up @@ -353,6 +353,7 @@ impl<O: BucketOccupied> BucketStorage<O> {
// the file so that we won't have to resize it later, which may be
// expensive.
//debug!("GROWING file {}", capacity * cell_size as u64);
//use log::*;error!("growing: {}", bytes);
data.seek(SeekFrom::Start(bytes - 1)).unwrap();
data.write_all(&[0]).unwrap();
data.rewind().unwrap();
Expand Down
6 changes: 6 additions & 0 deletions runtime/src/bucket_map_holder_stats.rs
Original file line number Diff line number Diff line change
Expand Up @@ -452,6 +452,12 @@ impl BucketMapHolderStats {
.unwrap_or_default(),
i64
),
(
"resize_verify_us",
disk.map(|disk| disk.stats.index.resize_verify_us.swap(0, Ordering::Relaxed))
.unwrap_or_default(),
i64
),
(
"disk_index_find_index_entry_mut_us",
disk.map(|disk| disk
Expand Down