Skip to content

Commit

Permalink
wip: improve get_ancient_slots
Browse files Browse the repository at this point in the history
  • Loading branch information
jeffwashington committed Oct 9, 2023
1 parent 05586e8 commit a89dffc
Showing 1 changed file with 64 additions and 40 deletions.
104 changes: 64 additions & 40 deletions accounts-db/src/accounts_db.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1704,10 +1704,6 @@ impl PurgeStats {
/// results from 'split_storages_ancient'
#[derive(Debug, Default, PartialEq)]
struct SplitAncientStorages {
/// # ancient slots
ancient_slot_count: usize,
/// the specific ancient slots
ancient_slots: Vec<Slot>,
/// lowest slot that is not an ancient append vec
first_non_ancient_slot: Slot,
/// slot # of beginning of first aligned chunk starting from the first non ancient slot
Expand All @@ -1719,6 +1715,8 @@ struct SplitAncientStorages {
chunk_count: usize,
/// start and end(exclusive) of normal (non-ancient) slots to be scanned
normal_slot_range: Range<Slot>,
/// chunks that include large files as their own cache file and small files
ancient_chunks: Vec<Range<Slot>>,
}

impl SplitAncientStorages {
Expand All @@ -1745,7 +1743,7 @@ impl SplitAncientStorages {

let first_non_ancient_slot = ancient_slots
.last()
.map(|last_ancient_slot| last_ancient_slot.saturating_add(1))
.map(|range| range.end)
.unwrap_or(range.start);
Self::new_with_ancient_info(range, ancient_slots, first_non_ancient_slot)
}
Expand All @@ -1757,34 +1755,64 @@ impl SplitAncientStorages {
oldest_non_ancient_slot: Slot,
snapshot_storages: &SortedStorages,
treat_as_ancient: impl Fn(&AccountStorageEntry) -> bool,
) -> Vec<Slot> {
) -> Vec<Range<Slot>> {
let range = snapshot_storages.range();
let mut i = 0;
let mut len_trucate = 0;
let mut possible_ancient_slots = snapshot_storages
let mut ranges = Vec::default();
let mut first_slot_in_current_range = None;
let mut last_slot_in_current_range = None::<Slot>;
let mut current_range_count = 0;
let mut ancient_range_with_last_ancient_packed = None;
for (slot, storage) in snapshot_storages
.iter_range(&(range.start..oldest_non_ancient_slot))
.filter_map(|(slot, storage)| {
storage.map(|storage| {
i += 1;
if treat_as_ancient(storage) {
// even though the slot is in range of being an ancient append vec, if it isn't actually a large append vec,
// then we are better off treating all these slots as normally cachable to reduce work in dedup.
// Since this one is large, for the moment, this one becomes the highest slot where we want to individually cache files.
len_trucate = i;
}
slot
})
})
.collect::<Vec<_>>();
possible_ancient_slots.truncate(len_trucate);
possible_ancient_slots
.filter_map(|(slot, storage)| storage.map(|storage| (slot, storage)))
{
let is_ancient = treat_as_ancient(storage);
if is_ancient || current_range_count >= MAX_ITEMS_PER_CHUNK {
if let Some(last_slot_in_current_range) =
std::mem::take(&mut last_slot_in_current_range)
{
// There was a current range of small slots active, so complete it. It has at least 1 slot in it.
// +1 because range is exclusive
ranges.push(
std::mem::take(&mut first_slot_in_current_range).unwrap()
..last_slot_in_current_range + 1,
);
current_range_count = 0;
// first and last slot in current range are both None now.
}
}
if is_ancient {
// remember the highest slot where a large ancient append vec was found
ancient_range_with_last_ancient_packed = Some(ranges.len());
// put this ancient append vec in its own chunk
// +1 because range is exclusive
ranges.push(slot..slot + 1);
// no 'current' range active now
} else {
// non-ancient (ie. small slot), so collect it together with other non-ancient slots
if current_range_count == 0 {
first_slot_in_current_range = Some(slot);
}
current_range_count += 1;
last_slot_in_current_range = Some(slot);
}
}

// If there was an incomplete set of slots that we didn't produce a chunk from, just ignore those. Those will all be treated as non-ancient.
// They are almost certainly all in-tact as a set that has been hashed on prior runs and will continue to use their hashed files until ancient packing puts them together.
if let Some(ancient_range_with_last_ancient_packed) = ancient_range_with_last_ancient_packed
{
ranges.truncate(ancient_range_with_last_ancient_packed);
}

ranges
}

/// create once ancient slots have been identified
/// This is easier to test, removing SortedStorages as a type to deal with here.
fn new_with_ancient_info(
range: &Range<Slot>,
ancient_slots: Vec<Slot>,
ancient_chunks: Vec<Range<Slot>>,
first_non_ancient_slot: Slot,
) -> Self {
if range.is_empty() {
Expand All @@ -1794,7 +1822,6 @@ impl SplitAncientStorages {
}

let max_slot_inclusive = range.end.saturating_sub(1);
let ancient_slot_count = ancient_slots.len();
let first_chunk_start = ((first_non_ancient_slot + MAX_ITEMS_PER_CHUNK)
/ MAX_ITEMS_PER_CHUNK)
* MAX_ITEMS_PER_CHUNK;
Expand All @@ -1808,16 +1835,15 @@ impl SplitAncientStorages {

// 2 is for 2 special chunks - unaligned slots at the beginning and end
let chunk_count =
ancient_slot_count + 2 + non_ancient_slot_count / (MAX_ITEMS_PER_CHUNK as usize);
ancient_chunks.len() + 2 + non_ancient_slot_count / (MAX_ITEMS_PER_CHUNK as usize);

SplitAncientStorages {
ancient_slot_count,
ancient_slots,
first_non_ancient_slot,
first_chunk_start,
non_ancient_slot_count,
chunk_count,
normal_slot_range,
ancient_chunks,
}
}

Expand All @@ -1829,7 +1855,7 @@ impl SplitAncientStorages {
self.normal_slot_range.start
} else {
assert!(
normal_chunk.saturating_add(self.ancient_slot_count) < self.chunk_count,
normal_chunk.saturating_add(self.ancient_chunks.len()) < self.chunk_count,
"out of bounds: {}, {}",
normal_chunk,
self.chunk_count
Expand All @@ -1843,7 +1869,7 @@ impl SplitAncientStorages {

/// ancient slots are the first chunks
fn is_chunk_ancient(&self, chunk: usize) -> bool {
chunk < self.ancient_slot_count
chunk < self.ancient_chunks.len()
}

/// given chunk in 0<=chunk<self.chunk_count
Expand All @@ -1852,14 +1878,10 @@ impl SplitAncientStorages {
fn get_slot_range(&self, chunk: usize) -> Option<Range<Slot>> {
let range = if self.is_chunk_ancient(chunk) {
// ancient append vecs are handled individually
let slot = self.ancient_slots[chunk];
Range {
start: slot,
end: slot + 1,
}
self.ancient_chunks[chunk].clone()
} else {
// normal chunks are after ancient chunks
let normal_chunk = chunk - self.ancient_slot_count;
let normal_chunk = chunk - self.ancient_chunks.len();
if normal_chunk == 0 {
// first slot
Range {
Expand Down Expand Up @@ -7998,8 +8020,10 @@ impl AccountsDb {
&self,
slot: Slot,
ignore: Option<Pubkey>,
list_of_things_to_hash: Vec<Hash>,
) -> AccountsDeltaHash {
let (mut hashes, scan_us, mut accumulate) = self.get_pubkey_hash_for_slot(slot);
let (mut hashes, scan_us, mut accumulate) =
self.get_pubkey_hash_for_slot(slot, list_of_things_to_hash);
let dirty_keys = hashes.iter().map(|(pubkey, _hash)| *pubkey).collect();
if let Some(ignore) = ignore {
hashes.retain(|k| k.0 != ignore);
Expand Down Expand Up @@ -16860,7 +16884,7 @@ pub mod tests {
// 4 = ancient slots: 1, 2, 3 (except 2 is large, 3 is not, so treat 3 as non-ancient)
// 5 = ...
for oldest_non_ancient_slot in 0..6 {
let ancient_slots = SplitAncientStorages::get_ancient_slots(
let ancient_ranges = SplitAncientStorages::get_ancient_slots(
oldest_non_ancient_slot,
&snapshot_storages,
|storage| storage.slot() == 2,
Expand All @@ -16880,7 +16904,7 @@ pub mod tests {
// we're not asking about the big append vec at 2, so nothing
expected.clear();
}
assert_eq!(expected, ancient_slots, "count: {count}");
assert_eq!(expected, ancient_ranges, "count: {count}");
}
}
}
Expand Down

0 comments on commit a89dffc

Please sign in to comment.