Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

disk index: batch insert #31094

Merged
merged 1 commit into from
Apr 10, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
271 changes: 269 additions & 2 deletions bucket_map/src/bucket.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,6 @@ use {

pub struct ReallocatedItems<I: BucketOccupied, D: BucketOccupied> {
// Some if the index was reallocated
// u64 is random associated with the new index
pub index: Option<BucketStorage<I>>,
// Some for a data bucket reallocation
// u64 is data bucket index
Expand Down Expand Up @@ -104,6 +103,10 @@ pub struct Bucket<T: Copy + 'static> {
anticipated_size: u64,

pub reallocated: Reallocated<IndexBucket<T>, DataBucket>,

/// set to true once any entries have been deleted from the index.
/// Deletes indicate that there can be free slots and that the full search range must be searched for an entry.
at_least_one_entry_deleted: bool,
}

impl<'b, T: Clone + Copy + 'static> Bucket<T> {
Expand Down Expand Up @@ -131,6 +134,7 @@ impl<'b, T: Clone + Copy + 'static> Bucket<T> {
stats,
reallocated: Reallocated::default(),
anticipated_size: 0,
at_least_one_entry_deleted: false,
}
}

Expand Down Expand Up @@ -269,12 +273,124 @@ impl<'b, T: Clone + Copy + 'static> Bucket<T> {
Err(BucketMapError::IndexNoSpace(index.contents.capacity()))
}

pub fn read_value(&self, key: &Pubkey) -> Option<(&[T], RefCount)> {
pub(crate) fn read_value(&self, key: &Pubkey) -> Option<(&[T], RefCount)> {
//debug!("READ_VALUE: {:?}", key);
let (elem, _) = self.find_index_entry(key)?;
Some(elem.read_value(&self.index, &self.data))
}

/// for each item in `items`, get the hash value when hashed with `random`.
/// Return a vec of tuples:
/// (hash_value, key, value)
fn index_entries(
items: impl Iterator<Item = (Pubkey, T)>,
count: usize,
random: u64,
) -> Vec<(u64, Pubkey, T)> {
let mut inserts = Vec::with_capacity(count);
items.for_each(|(key, v)| {
let ix = Self::bucket_index_ix(&key, random);
inserts.push((ix, key, v));
});
inserts
}

/// insert all of `items` into the index.
/// return duplicates
pub(crate) fn batch_insert_non_duplicates(
&mut self,
items: impl Iterator<Item = (Pubkey, T)>,
count: usize,
) -> Vec<(Pubkey, T, T)> {
assert!(
!self.at_least_one_entry_deleted,
"efficient batch insertion can only occur prior to any deletes"
);
let current_len = self.index.count.load(Ordering::Relaxed);
let anticipated = count as u64;
self.set_anticipated_count((anticipated).saturating_add(current_len));
let mut entries = Self::index_entries(items, count, self.random);
let mut duplicates = Vec::default();
// insert, but resizes may be necessary
loop {
let cap = self.index.capacity();
// sort entries by their index % cap, so we'll search over the same spots in the file close to each other
// `reverse()` is so we can efficiently pop off the end but get ascending order index values
// sort before calling to make `batch_insert_non_duplicates_internal` easier to test.
entries.sort_unstable_by(|a, b| (a.0 % cap).cmp(&(b.0 % cap)).reverse());

let result = Self::batch_insert_non_duplicates_internal(
&mut self.index,
&self.data,
&mut entries,
&mut duplicates,
);
match result {
Ok(_result) => {
// everything added
self.set_anticipated_count(0);
return duplicates;
}
Err(error) => {
// resize and add more
// `entries` will have had items removed from it
self.grow(error);
self.handle_delayed_grows();
}
}
}
}

/// sort `entries` by hash value
/// insert as much of `entries` as possible into `index`.
/// return an error if the index needs to resize.
/// for every entry that already exists in `index`, add it (and the value already in the index) to `duplicates`
pub fn batch_insert_non_duplicates_internal(
index: &mut BucketStorage<IndexBucket<T>>,
data_buckets: &[BucketStorage<DataBucket>],
reverse_sorted_entries: &mut Vec<(u64, Pubkey, T)>,
duplicates: &mut Vec<(Pubkey, T, T)>,
) -> Result<(), BucketMapError> {
let max_search = index.max_search();
let cap = index.capacity();
let search_end = max_search.min(cap);

// pop one entry at a time to insert
'outer: while let Some((ix_entry_raw, k, v)) = reverse_sorted_entries.pop() {
let ix_entry = ix_entry_raw % cap;
// search for an empty spot starting at `ix_entry`
for search in 0..search_end {
let ix_index = (ix_entry + search) % cap;
let elem = IndexEntryPlaceInBucket::new(ix_index);
if index.try_lock(ix_index) {
// found free element and occupied it
// These fields will be overwritten after allocation by callers.
// Since this part of the mmapped file could have previously been used by someone else, there can be garbage here.
elem.init(index, &k);

// new data stored should be stored in IndexEntry and NOT in data file
// new data len is 1
elem.set_slot_count_enum_value(index, OccupiedEnum::OneSlotInIndex(&v));
continue 'outer; // this 'insertion' is completed: inserted successfully
} else {
// occupied, see if the key already exists here
if elem.key(index) == &k {
let (v_existing, _ref_count_existing) =
elem.read_value(index, data_buckets);
duplicates.push((k, v, *v_existing.first().unwrap()));
continue 'outer; // this 'insertion' is completed: found a duplicate entry
}
}
}
// search loop ended without finding a spot to insert this key
// so, remember the item we were trying to insert for next time after resizing
reverse_sorted_entries.push((ix_entry_raw, k, v));
return Err(BucketMapError::IndexNoSpace(cap));
}

Ok(())
}

pub fn try_write(
&mut self,
key: &Pubkey,
Expand Down Expand Up @@ -417,6 +533,7 @@ impl<'b, T: Clone + Copy + 'static> Bucket<T> {

pub fn delete_key(&mut self, key: &Pubkey) {
if let Some((elem, elem_ix)) = self.find_index_entry(key) {
self.at_least_one_entry_deleted = true;
if let OccupiedEnum::MultipleSlots(multiple_slots) =
elem.get_slot_count_enum(&self.index)
{
Expand Down Expand Up @@ -637,3 +754,153 @@ impl<'b, T: Clone + Copy + 'static> Bucket<T> {
self.insert(key, (&new, refct));
}
}

#[cfg(test)]
mod tests {
use {super::*, tempfile::tempdir};

#[test]
fn test_index_entries() {
for v in 10..12u64 {
for random in 1..3 {
for len in 1..3 {
let raw = (0..len)
.map(|l| {
let k = Pubkey::from([l as u8; 32]);
(k, v + (l as u64))
})
.collect::<Vec<_>>();
let hashed = Bucket::index_entries(raw.clone().into_iter(), len, random);
assert_eq!(hashed.len(), len);
(0..len).for_each(|i| {
let raw = raw[i];
let hashed = hashed[i];
assert_eq!(Bucket::<u64>::bucket_index_ix(&raw.0, random), hashed.0);
assert_eq!(raw.0, hashed.1);
assert_eq!(raw.1, hashed.2);
});
}
}
}
}

fn create_test_index(max_search: Option<u8>) -> BucketStorage<IndexBucket<u64>> {
let tmpdir = tempdir().unwrap();
let paths: Vec<PathBuf> = vec![tmpdir.path().to_path_buf()];
assert!(!paths.is_empty());
let max_search = max_search.unwrap_or(2);
BucketStorage::<IndexBucket<u64>>::new(
Arc::new(paths),
1,
std::mem::size_of::<crate::index_entry::IndexEntry<u64>>() as u64,
max_search,
Arc::default(),
Arc::default(),
)
}

#[test]
fn batch_insert_non_duplicates_internal_simple() {
solana_logger::setup();
// add 2 entries, make sure they are added in the buckets we expect
let random = 1;
let data_buckets = Vec::default();
for v in 10..12u64 {
for len in 1..3 {
let raw = (0..len)
.map(|l| {
let k = Pubkey::from([l as u8; 32]);
(k, v + (l as u64))
})
.collect::<Vec<_>>();
let mut hashed = Bucket::index_entries(raw.clone().into_iter(), len, random);
let hashed_raw = hashed.clone();

let mut index = create_test_index(None);

let mut duplicates = Vec::default();
assert!(Bucket::<u64>::batch_insert_non_duplicates_internal(
&mut index,
&Vec::default(),
&mut hashed,
&mut duplicates,
)
.is_ok());

assert_eq!(hashed.len(), 0);
(0..len).for_each(|i| {
let raw = hashed_raw[i];
let elem = IndexEntryPlaceInBucket::new(raw.0 % index.capacity());
let (value, ref_count) = elem.read_value(&index, &data_buckets);
assert_eq!(ref_count, 1);
assert_eq!(value, &[hashed_raw[i].2]);
});
}
}
}

#[test]
fn batch_insert_non_duplicates_internal_same_ix_exceeds_max_search() {
solana_logger::setup();
// add `len` entries with the same ix, make sure they are added in subsequent buckets.
// adjust `max_search`. If we try to add an entry that causes us to exceed `max_search`, then assert that the adding fails with an error and
// the colliding item remains in `entries`
let random = 1;
let data_buckets = Vec::default();
for max_search in [2usize, 3] {
for v in 10..12u64 {
for len in 1..(max_search + 1) {
let raw = (0..len)
.map(|l| {
let k = Pubkey::from([l as u8; 32]);
(k, v + (l as u64))
})
.collect::<Vec<_>>();
let mut hashed = Bucket::index_entries(raw.clone().into_iter(), len, random);
let common_ix = 2; // both are put at same ix
hashed.iter_mut().for_each(|mut v| {
v.0 = common_ix;
});
let hashed_raw = hashed.clone();

let mut index = create_test_index(Some(max_search as u8));

let mut duplicates = Vec::default();
let result = Bucket::<u64>::batch_insert_non_duplicates_internal(
&mut index,
&Vec::default(),
&mut hashed,
&mut duplicates,
);

assert_eq!(
hashed.len(),
if len > max_search { 1 } else { 0 },
"len: {len}"
);
(0..len).for_each(|i| {
assert!(if len > max_search {
result.is_err()
} else {
result.is_ok()
});
let raw = hashed_raw[i];
if i == 0 && len > max_search {
// max search was exceeded and the first entry was unable to be inserted, so it remained in `hashed`
assert_eq!(hashed[0], hashed_raw[0]);
} else {
// we insert in reverse order when ix values are equal, so we expect to find item[1] in item[1]'s expected ix and item[0] will be 1 search distance away from expected ix
let search_required = (len - i - 1) as u64;
let elem = IndexEntryPlaceInBucket::new(
(raw.0 + search_required) % index.capacity(),
);
let (value, ref_count) = elem.read_value(&index, &data_buckets);
assert_eq!(ref_count, 1);
assert_eq!(value, &[hashed_raw[i].2]);
}
});
}
}
}
}
}
14 changes: 14 additions & 0 deletions bucket_map/src/bucket_api.rs
Original file line number Diff line number Diff line change
Expand Up @@ -118,6 +118,20 @@ impl<T: Clone + Copy> BucketApi<T> {
bucket.as_mut().unwrap().set_anticipated_count(count);
}

/// batch insert of `items`. Assumption is a single slot list element and ref_count == 1.
/// For any pubkeys that already exist, the failed insertion data and the existing data are returned.
pub fn batch_insert_non_duplicates(
&self,
items: impl Iterator<Item = (Pubkey, T)>,
count: usize,
) -> Vec<(Pubkey, T, T)> {
let mut bucket = self.get_write_bucket();
bucket
.as_mut()
.unwrap()
.batch_insert_non_duplicates(items, count)
}

pub fn update<F>(&self, key: &Pubkey, updatefn: F)
where
F: FnMut(Option<(&[T], RefCount)>) -> Option<(Vec<T>, RefCount)>,
Expand Down
Loading