Skip to content
This repository has been archived by the owner on Jan 13, 2025. It is now read-only.

Commit

Permalink
disk index: batch insert
Browse files Browse the repository at this point in the history
  • Loading branch information
jeffwashington committed Apr 7, 2023
1 parent 6dc18fe commit 01f9746
Show file tree
Hide file tree
Showing 4 changed files with 147 additions and 47 deletions.
129 changes: 115 additions & 14 deletions bucket_map/src/bucket.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ use {
pub struct ReallocatedItems<I: BucketOccupied, D: BucketOccupied> {
// Some if the index was reallocated
// u64 is random associated with the new index
pub index: Option<(u64, BucketStorage<I>)>,
pub index: Option<BucketStorage<I>>,
// Some for a data bucket reallocation
// u64 is data bucket index
pub data: Option<(u64, BucketStorage<D>)>,
Expand Down Expand Up @@ -104,6 +104,10 @@ pub struct Bucket<T: Copy + 'static> {
anticipated_size: u64,

pub reallocated: Reallocated<IndexBucket<T>, DataBucket>,

/// set to true once any entries have been deleted from the index.
/// Deletes indicate that there can be free slots and that the full search range must be searched for an entry.
at_least_one_entry_deleted: bool,
}

impl<'b, T: Clone + Copy + 'static> Bucket<T> {
Expand Down Expand Up @@ -131,6 +135,7 @@ impl<'b, T: Clone + Copy + 'static> Bucket<T> {
stats,
reallocated: Reallocated::default(),
anticipated_size: 0,
at_least_one_entry_deleted: false,
}
}

Expand Down Expand Up @@ -183,7 +188,7 @@ impl<'b, T: Clone + Copy + 'static> Bucket<T> {
key: &Pubkey,
random: u64,
) -> Result<(Option<IndexEntryPlaceInBucket<T>>, u64), BucketMapError> {
let ix = Self::bucket_index_ix(index, key, random);
let ix = Self::bucket_index_ix(key, random) % index.capacity();
let mut first_free = None;
let mut m = Measure::start("bucket_find_index_entry_mut");
let capacity = index.capacity();
Expand Down Expand Up @@ -222,7 +227,7 @@ impl<'b, T: Clone + Copy + 'static> Bucket<T> {
key: &Pubkey,
random: u64,
) -> Option<(IndexEntryPlaceInBucket<T>, u64)> {
let ix = Self::bucket_index_ix(index, key, random);
let ix = Self::bucket_index_ix(key, random) % index.capacity();
for i in ix..ix + index.max_search() {
let ii = i % index.capacity();
if index.is_free(ii) {
Expand All @@ -243,7 +248,7 @@ impl<'b, T: Clone + Copy + 'static> Bucket<T> {
is_resizing: bool,
) -> Result<u64, BucketMapError> {
let mut m = Measure::start("bucket_create_key");
let ix = Self::bucket_index_ix(index, key, random);
let ix = Self::bucket_index_ix(key, random) % index.capacity();
for i in ix..ix + index.max_search() {
let ii = i % index.capacity();
if !index.is_free(ii) {
Expand Down Expand Up @@ -275,6 +280,103 @@ impl<'b, T: Clone + Copy + 'static> Bucket<T> {
Some(elem.read_value(&self.index, &self.data))
}

fn index_entries(
items: impl Iterator<Item = (Pubkey, T)>,
count: usize,
random: u64,
) -> Vec<(u64, Pubkey, T)> {
let mut inserts = Vec::with_capacity(count);
items.for_each(|(key, v)| {
let ix = Self::bucket_index_ix(&key, random);
inserts.push((ix, key, v));
});
inserts
}

pub fn batch_insert_non_duplicates(
&mut self,
items: impl Iterator<Item = (Pubkey, T)>,
count: usize,
) -> Vec<(Pubkey, T, T)> {
assert!(
!self.at_least_one_entry_deleted,
"efficient batch insertion can only occur prior to any deletes"
);
let current_len = self.index.count.load(Ordering::Relaxed);
let anticipated = count as u64;
self.set_anticipated_count((anticipated).saturating_add(current_len));
let mut entries = Self::index_entries(items, count, self.random);
let mut duplicates = Vec::default();
// insert, but resizes may be necessary
loop {
let result = Self::batch_insert_non_duplicates_internal(
&mut self.index,
&self.data,
&mut entries,
&mut duplicates,
);
match result {
Ok(_result) => {
self.set_anticipated_count(0);
return duplicates;
}
Err(error) => {
self.grow(error);
self.handle_delayed_grows();
}
}
}
}

pub fn batch_insert_non_duplicates_internal(
index: &mut BucketStorage<IndexBucket<T>>,
data_buckets: &[BucketStorage<DataBucket>],
entries: &mut Vec<(u64, Pubkey, T)>,
duplicates: &mut Vec<(Pubkey, T, T)>,
) -> Result<(), BucketMapError> {
let max_search = index.max_search();
let is_resizing = false;
let cap = index.capacity();
// sort entries by their index % cap, so we'll search over the same spots in the file close to each other
entries.sort_by(|a, b| (a.0 % cap).cmp(&(b.0 % cap)).reverse());
let search_end = max_search.min(cap);
// walk through each entry to insert
'outer: while let Some((ix_entry_raw, k, v)) = entries.pop() {
let ix_entry = ix_entry_raw % cap;
// search for an empty spot starting at `ix_entry`
for search in 0..search_end {
let ix_index = (ix_entry + search) % cap;
let elem = IndexEntryPlaceInBucket::new(ix_index);
if index.is_free(ix_index) {
// found free element, store this entry
index.occupy(ix_index, is_resizing).unwrap();
// These fields will be overwritten after allocation by callers.
// Since this part of the mmapped file could have previously been used by someone else, there can be garbage here.
elem.init(index, &k);

// new data stored should be stored in IndexEntry and NOT in data file
// new data len is 1
elem.set_slot_count_enum_value(index, OccupiedEnum::OneSlotInIndex(&v));
continue 'outer; // this 'insertion' is completed: inserted successfully
} else {
// occupied, see if the key already exists here
if elem.key(index) == &k {
let (v_existing, _ref_count_existing) =
elem.read_value(index, data_buckets);
duplicates.push((k, v, *v_existing.first().unwrap()));
continue 'outer; // this 'insertion' is completed: found a duplicate entry
}
}
}
// search loop ended without finding a spot to insert this key
// so, remember the item we were trying to insert for next time after resizing
entries.push((ix_entry_raw, k, v));
return Err(BucketMapError::IndexNoSpace(cap));
}

Ok(())
}

pub fn try_write(
&mut self,
key: &Pubkey,
Expand Down Expand Up @@ -417,6 +519,7 @@ impl<'b, T: Clone + Copy + 'static> Bucket<T> {

pub fn delete_key(&mut self, key: &Pubkey) {
if let Some((elem, elem_ix)) = self.find_index_entry(key) {
self.at_least_one_entry_deleted = true;
if let OccupiedEnum::MultipleSlots(multiple_slots) =
elem.get_slot_count_enum(&self.index)
{
Expand Down Expand Up @@ -460,12 +563,12 @@ impl<'b, T: Clone + Copy + 'static> Bucket<T> {
// index may have allocated something larger than we asked for,
// so, in case we fail to reindex into this larger size, grow from this size next iteration.
current_capacity = index.capacity();
let random = thread_rng().gen();
let mut valid = true;
for ix in 0..self.index.capacity() {
if !self.index.is_free(ix) {
let elem: &IndexEntry<T> = self.index.get(ix);
let new_ix = Self::bucket_create_key(&mut index, &elem.key, random, true);
let new_ix =
Self::bucket_create_key(&mut index, &elem.key, self.random, true);
if new_ix.is_err() {
valid = false;
break;
Expand All @@ -486,7 +589,7 @@ impl<'b, T: Clone + Copy + 'static> Bucket<T> {
if valid {
self.stats.index.update_max_size(index.capacity());
let mut items = self.reallocated.items.lock().unwrap();
items.index = Some((random, index));
items.index = Some(index);
self.reallocated.add_reallocation();
break;
}
Expand All @@ -506,12 +609,11 @@ impl<'b, T: Clone + Copy + 'static> Bucket<T> {
}
}

pub fn apply_grow_index(&mut self, random: u64, index: BucketStorage<IndexBucket<T>>) {
pub fn apply_grow_index(&mut self, index: BucketStorage<IndexBucket<T>>) {
self.stats
.index
.resize_grow(self.index.capacity_bytes(), index.capacity_bytes());

self.random = random;
self.index = index;
}

Expand Down Expand Up @@ -568,15 +670,14 @@ impl<'b, T: Clone + Copy + 'static> Bucket<T> {
items.data = Some((data_index, new_bucket));
}

fn bucket_index_ix(index: &BucketStorage<IndexBucket<T>>, key: &Pubkey, random: u64) -> u64 {
fn bucket_index_ix(key: &Pubkey, random: u64) -> u64 {
let mut s = DefaultHasher::new();
key.hash(&mut s);
//the locally generated random will make it hard for an attacker
//to deterministically cause all the pubkeys to land in the same
//location in any bucket on all validators
random.hash(&mut s);
let ix = s.finish();
ix % index.capacity()
s.finish()
//debug!( "INDEX_IX: {:?} uid:{} loc: {} cap:{}", key, uid, location, index.capacity() );
}

Expand All @@ -601,8 +702,8 @@ impl<'b, T: Clone + Copy + 'static> Bucket<T> {
// swap out the bucket that was resized previously with a read lock
let mut items = std::mem::take(&mut *self.reallocated.items.lock().unwrap());

if let Some((random, bucket)) = items.index.take() {
self.apply_grow_index(random, bucket);
if let Some(bucket) = items.index.take() {
self.apply_grow_index(bucket);
} else {
// data bucket
let (i, new_bucket) = items.data.take().unwrap();
Expand Down
14 changes: 14 additions & 0 deletions bucket_map/src/bucket_api.rs
Original file line number Diff line number Diff line change
Expand Up @@ -118,6 +118,20 @@ impl<T: Clone + Copy> BucketApi<T> {
bucket.as_mut().unwrap().set_anticipated_count(count);
}

/// batch insert of `items`. Assumption is a single slot list element and ref_count == 1.
/// For any pubkeys that already exist, the failed insertion data and the existing data are returned.
pub fn batch_insert_non_duplicates(
&self,
items: impl Iterator<Item = (Pubkey, T)>,
count: usize,
) -> Vec<(Pubkey, T, T)> {
let mut bucket = self.get_write_bucket();
bucket
.as_mut()
.unwrap()
.batch_insert_non_duplicates(items, count)
}

pub fn update<F>(&self, key: &Pubkey, updatefn: F)
where
F: FnMut(Option<(&[T], RefCount)>) -> Option<(Vec<T>, RefCount)>,
Expand Down
6 changes: 3 additions & 3 deletions runtime/src/accounts_index.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2491,8 +2491,8 @@ pub mod tests {
assert_eq!(num, 1);

// not zero lamports
let index = AccountsIndex::<AccountInfoTest, AccountInfoTest>::default_for_tests();
let account_info: AccountInfoTest = 0 as AccountInfoTest;
let index = AccountsIndex::<bool, bool>::default_for_tests();
let account_info = false;
let items = vec![(*pubkey, account_info)];
index.set_startup(Startup::Startup);
index.insert_new_if_missing_into_primary_index(slot, items.len(), items.into_iter());
Expand All @@ -2516,7 +2516,7 @@ pub mod tests {
assert!(index
.get_for_tests(pubkey, Some(&ancestors), None)
.is_some());
assert_eq!(index.ref_count_from_storage(pubkey), 0); // cached, so 0
assert_eq!(index.ref_count_from_storage(pubkey), 1);
index.unchecked_scan_accounts(
"",
&ancestors,
Expand Down
45 changes: 15 additions & 30 deletions runtime/src/in_mem_accounts_index.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1060,36 +1060,21 @@ impl<T: IndexValue, U: DiskIndexValue + From<T> + Into<T>> InMemAccountsIndex<T,

// merge all items into the disk index now
let disk = self.bucket.as_ref().unwrap();
let mut count = 0;
let current_len = disk.bucket_len();
let anticipated = insert.len();
disk.set_anticipated_count((anticipated as u64).saturating_add(current_len));
insert.into_iter().for_each(|(slot, k, v)| {
let entry = (slot, v);
let new_ref_count = u64::from(!v.is_cached());
disk.update(&k, |current| {
match current {
Some((current_slot_list, ref_count)) => {
// already on disk, so remember the new (slot, info) for later
duplicates.duplicates.push((slot, k, entry.1));
if let Some((slot, _)) = current_slot_list.first() {
// accurately account for there being a duplicate for the first entry that was previously added to the disk index.
// That entry could not have known yet that it was a duplicate.
// It is important to capture each slot with a duplicate because of slot limits applied to clean.
duplicates.duplicates_put_on_disk.insert((*slot, k));
}
Some((current_slot_list.to_vec(), ref_count))
}
None => {
count += 1;
// not on disk, insert it
Some((vec![(entry.0, entry.1.into())], new_ref_count))
}
}
});
});
// remove the guidance for how many entries the bucket will eventually contain since we have added all we knew about
disk.set_anticipated_count(0);
let mut count = insert.len() as u64;
for (k, entry, duplicate_entry) in disk.batch_insert_non_duplicates(
insert.into_iter().map(|(slot, k, v)| (k, (slot, v.into()))),
count as usize,
) {
duplicates.duplicates.push((entry.0, k, entry.1.into()));
// accurately account for there being a duplicate for the first entry that was previously added to the disk index.
// That entry could not have known yet that it was a duplicate.
// It is important to capture each slot with a duplicate because of slot limits applied to clean.
duplicates
.duplicates_put_on_disk
.insert((duplicate_entry.0, k));
count -= 1;
}

self.stats().inc_insert_count(count);
}

Expand Down

0 comments on commit 01f9746

Please sign in to comment.