Skip to content
This repository has been archived by the owner on Jan 13, 2025. It is now read-only.

better duplicate key stats during index generation #30829

Merged
merged 1 commit into from
Mar 21, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
59 changes: 40 additions & 19 deletions runtime/src/accounts_db.rs
Original file line number Diff line number Diff line change
Expand Up @@ -620,8 +620,10 @@ struct GenerateIndexTimings {
pub index_flush_us: u64,
pub rent_paying: AtomicUsize,
pub amount_to_top_off_rent: AtomicU64,
pub total_duplicates: u64,
pub total_including_duplicates: u64,
pub accounts_data_len_dedup_time_us: u64,
pub total_duplicate_slot_keys: u64,
pub populate_duplicate_keys_us: u64,
}

#[derive(Default, Debug, PartialEq, Eq)]
Expand Down Expand Up @@ -668,8 +670,8 @@ impl GenerateIndexTimings {
i64
),
(
"total_items_with_duplicates",
self.total_duplicates as i64,
"total_items_including_duplicates",
self.total_including_duplicates as i64,
i64
),
("total_items", self.total_items as i64, i64),
Expand All @@ -678,6 +680,16 @@ impl GenerateIndexTimings {
self.accounts_data_len_dedup_time_us as i64,
i64
),
(
"total_duplicate_slot_keys",
self.total_duplicate_slot_keys as i64,
i64
),
(
"populate_duplicate_keys_us",
self.populate_duplicate_keys_us as i64,
i64
),
);
}
}
Expand Down Expand Up @@ -8982,7 +8994,7 @@ impl AccountsDb {
let insertion_time_us = AtomicU64::new(0);
let rent_paying = AtomicUsize::new(0);
let amount_to_top_off_rent = AtomicU64::new(0);
let total_duplicates = AtomicU64::new(0);
let total_including_duplicates = AtomicU64::new(0);
let storage_info_timings = Mutex::new(GenerateIndexTimings::default());
let scan_time: u64 = slots
.par_chunks(chunk_size)
Expand Down Expand Up @@ -9025,7 +9037,8 @@ impl AccountsDb {
rent_paying.fetch_add(rent_paying_this_slot, Ordering::Relaxed);
amount_to_top_off_rent
.fetch_add(amount_to_top_off_rent_this_slot, Ordering::Relaxed);
total_duplicates.fetch_add(total_this_slot, Ordering::Relaxed);
total_including_duplicates
.fetch_add(total_this_slot, Ordering::Relaxed);
accounts_data_len
.fetch_add(accounts_data_len_this_slot, Ordering::Relaxed);
let mut rent_paying_accounts_by_partition =
Expand Down Expand Up @@ -9088,28 +9101,34 @@ impl AccountsDb {
.sum();

let mut index_flush_us = 0;
let mut total_duplicate_slot_keys = 0;
let mut populate_duplicate_keys_us = 0;
if pass == 0 {
// tell accounts index we are done adding the initial accounts at startup
let mut m = Measure::start("accounts_index_idle_us");
self.accounts_index.set_startup(Startup::Normal);
m.stop();
index_flush_us = m.as_us();

// this has to happen before visit_duplicate_pubkeys_during_startup below
// get duplicate keys from acct idx. We have to wait until we've finished flushing.
for (slot, key) in self
.accounts_index
.retrieve_duplicate_keys_from_startup()
.into_iter()
.flatten()
{
match self.uncleaned_pubkeys.entry(slot) {
Occupied(mut occupied) => occupied.get_mut().push(key),
Vacant(vacant) => {
vacant.insert(vec![key]);
populate_duplicate_keys_us = measure_us!({
// this has to happen before visit_duplicate_pubkeys_during_startup below
// get duplicate keys from acct idx. We have to wait until we've finished flushing.
for (slot, key) in self
.accounts_index
.retrieve_duplicate_keys_from_startup()
.into_iter()
.flatten()
{
total_duplicate_slot_keys += 1;
match self.uncleaned_pubkeys.entry(slot) {
Occupied(mut occupied) => occupied.get_mut().push(key),
Vacant(vacant) => {
vacant.insert(vec![key]);
}
}
}
}
})
.1;
}

let storage_info_timings = storage_info_timings.into_inner().unwrap();
Expand All @@ -9123,7 +9142,9 @@ impl AccountsDb {
total_items,
rent_paying,
amount_to_top_off_rent,
total_duplicates: total_duplicates.load(Ordering::Relaxed),
total_duplicate_slot_keys,
populate_duplicate_keys_us,
total_including_duplicates: total_including_duplicates.load(Ordering::Relaxed),
storage_size_accounts_map_us: storage_info_timings.storage_size_accounts_map_us,
storage_size_accounts_map_flatten_us: storage_info_timings
.storage_size_accounts_map_flatten_us,
Expand Down
11 changes: 9 additions & 2 deletions runtime/src/in_mem_accounts_index.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1051,12 +1051,19 @@ impl<T: IndexValue, U: DiskIndexValue + From<T> + Into<T>> InMemAccountsIndex<T,
disk.update(&k, |current| {
match current {
Some((current_slot_list, mut ref_count)) => {
// merge this in, mark as conflict
// merge this in, mark as duplicate
duplicates.push((slot, k));
if current_slot_list.len() == 1 {
// accurately account for there being a duplicate for the first entry that was previously added to the disk index.
// That entry could not have known yet that it was a duplicate.
// It is important to capture each slot with a duplicate because of slot limits applied to clean.
let first_entry_slot = current_slot_list[0].0;
duplicates.push((first_entry_slot, k));
}
let mut slot_list = Vec::with_capacity(current_slot_list.len() + 1);
slot_list.extend_from_slice(current_slot_list);
slot_list.push((entry.0, entry.1.into())); // will never be from the same slot that already exists in the list
ref_count += new_ref_count;
duplicates.push((slot, k));
Some((slot_list, ref_count))
}
None => {
Expand Down