Skip to content

Commit

Permalink
Track purged keys
Browse files Browse the repository at this point in the history
  • Loading branch information
carllin committed Oct 2, 2020
1 parent 3bf739a commit f58d192
Showing 1 changed file with 107 additions and 68 deletions.
175 changes: 107 additions & 68 deletions runtime/src/accounts_db.rs
Original file line number Diff line number Diff line change
Expand Up @@ -100,6 +100,9 @@ pub type SnapshotStorages = Vec<SnapshotStorage>;
// Each slot has a set of storage entries.
pub(crate) type SlotStores = HashMap<usize, Arc<AccountStorageEntry>>;

type PurgedAccountSlots = HashMap<Pubkey, HashSet<Slot>>;
type ReclaimedAccounts = HashMap<AppendVecId, HashSet<usize>>;

trait Versioned {
fn version(&self) -> u64;
}
Expand Down Expand Up @@ -544,7 +547,7 @@ impl AccountsDB {
&self,
purges_in_root: Vec<Pubkey>,
max_clean_root: Option<Slot>,
) -> (HashSet<Slot>, HashMap<AppendVecId, HashSet<usize>>) {
) -> (PurgedAccountSlots, ReclaimedAccounts) {
// This number isn't carefully chosen; just guessed randomly such that
// the hot loop will be the order of ~Xms.
const INDEX_CLEAN_BULK_COUNT: usize = 4096;
Expand All @@ -566,11 +569,11 @@ impl AccountsDB {
inc_new_counter_info!("clean-old-root-par-clean-ms", clean_rooted.as_ms() as usize);

let mut measure = Measure::start("clean_old_root_reclaims");
let (dead_slots, removed_accounts) = self.handle_reclaims(&reclaims, None, false);
let (purged_account_slots, removed_accounts) = self.handle_reclaims(&reclaims, None, false);
measure.stop();
debug!("{} {}", clean_rooted, measure);
inc_new_counter_info!("clean-old-root-reclaim-ms", measure.as_ms() as usize);
(dead_slots, removed_accounts)
(purged_account_slots, removed_accounts)
}

fn do_reset_uncleaned_roots(
Expand Down Expand Up @@ -599,12 +602,30 @@ impl AccountsDB {
// do not match the criteria of deleting all appendvecs which contain them
// then increment their storage count.
let mut already_counted = HashSet::new();
for (_pubkey, (account_infos, ref_count_from_storage)) in purges.iter() {
for (pubkey, (account_infos, ref_count_from_storage)) in purges.iter() {
let no_delete = if account_infos.len() as u64 != *ref_count_from_storage {
debug!(
"calc_delete_dependencies(),
pubkey: {},
account_infos: {:?},
account_infos_len: {},
ref_count_from_storage: {}",
pubkey,
account_infos,
account_infos.len(),
ref_count_from_storage,
);
true
} else {
let mut no_delete = false;
for (_slot, account_info) in account_infos {
debug!(
"calc_delete_dependencies()
storage id: {},
count len: {}",
account_info.store_id,
store_counts.get(&account_info.store_id).unwrap().0,
);
if store_counts.get(&account_info.store_id).unwrap().0 != 0 {
no_delete = true;
break;
Expand Down Expand Up @@ -685,6 +706,7 @@ impl AccountsDB {
if let Some((list, index)) = accounts_index.get(pubkey, None, max_clean_root) {
let (slot, account_info) = &list[index];
if account_info.lamports == 0 {
debug!("purging zero lamport {}, slot: {}", pubkey, slot);
purges.insert(*pubkey, accounts_index.would_purge(pubkey));
}
if accounts_index.uncleaned_roots.contains(slot) {
Expand All @@ -693,6 +715,7 @@ impl AccountsDB {
if let Some(max_clean_root) = max_clean_root {
assert!(*slot <= max_clean_root);
}
debug!("purging uncleaned {}, slot: {}", pubkey, slot);
purges_in_root.push(*pubkey);
}
}
Expand All @@ -713,11 +736,11 @@ impl AccountsDB {
accounts_scan.stop();

let mut clean_old_rooted = Measure::start("clean_old_roots");
let (old_rooted_dead_slots, removed_accounts) = {
let (purged_account_slots, removed_accounts) = {
if !purges_in_root.is_empty() {
self.clean_old_rooted_accounts(purges_in_root, max_clean_root)
} else {
(HashSet::new(), HashMap::new())
(HashMap::new(), HashMap::new())
}
};
self.do_reset_uncleaned_roots(&mut candidates, max_clean_root);
Expand All @@ -729,15 +752,24 @@ impl AccountsDB {
// Then purge if we can
let mut store_counts: HashMap<AppendVecId, (usize, HashSet<Pubkey>)> = HashMap::new();
for (key, (account_infos, ref_count)) in purges.iter_mut() {
let mut ref_count_needs_update = false;
if purged_account_slots.contains_key(&key) {
*ref_count = self
.accounts_index
.read()
.unwrap()
.ref_count_from_storage(&key);
}
account_infos.retain(|(slot, account_info)| {
if old_rooted_dead_slots.contains(slot) {
// ref count is only updated when a slot is dead in
// `clean_dead_slots()`
ref_count_needs_update = true;
let was_slot_purged = purged_account_slots
.get(&key)
.map(|slots_removed| slots_removed.contains(slot))
.unwrap_or(false);
if was_slot_purged {
// No need to look up the slot storage below if the entire
// slot was purged
return false;
}
// Check if this update in `slot`to the account with `key` was reclaimed earlier by
// Check if this update in `slot` to the account with `key` was reclaimed earlier by
// `clean_old_rooted_accounts()`
let was_reclaimed = removed_accounts
.get(&account_info.store_id)
Expand All @@ -759,18 +791,14 @@ impl AccountsDB {
let count = store.count_and_status.read().unwrap().0;
count - 1
};
debug!(
"store_counts, inserting slot: {}, store id: {}, count: {}",
slot, account_info.store_id, count
);
store_counts.insert(account_info.store_id, (count, key_set));
}
true
});

if ref_count_needs_update {
*ref_count = self
.accounts_index
.read()
.unwrap()
.ref_count_from_storage(&key);
}
}
store_counts_time.stop();

Expand Down Expand Up @@ -856,9 +884,9 @@ impl AccountsDB {
reclaims: SlotSlice<AccountInfo>,
expected_single_dead_slot: Option<Slot>,
no_dead_slot: bool,
) -> (HashSet<Slot>, HashMap<AppendVecId, HashSet<usize>>) {
) -> (PurgedAccountSlots, ReclaimedAccounts) {
if reclaims.is_empty() {
return (HashSet::new(), HashMap::new());
return (HashMap::new(), HashMap::new());
}

let (dead_slots, removed_accounts) =
Expand All @@ -871,29 +899,33 @@ impl AccountsDB {
assert!(dead_slots.contains(&expected_single_dead_slot));
}
}
if !dead_slots.is_empty() {
self.process_dead_slots(&dead_slots);
}
(dead_slots, removed_accounts)
let purged_account_slots = if !dead_slots.is_empty() {
self.process_dead_slots(&dead_slots)
} else {
HashMap::new()
};
(purged_account_slots, removed_accounts)
}

// Must be kept private!, does sensitive cleanup that should only be called from
// supported pipelines in AccountsDb
fn process_dead_slots(&self, dead_slots: &HashSet<Slot>) {
fn process_dead_slots(&self, dead_slots: &HashSet<Slot>) -> PurgedAccountSlots {
let mut clean_dead_slots = Measure::start("reclaims::purge_slots");
self.clean_dead_slots(&dead_slots);
let purged_account_slots = self.clean_dead_slots(&dead_slots);
clean_dead_slots.stop();

let mut purge_slots = Measure::start("reclaims::purge_slots");
self.purge_slots(&dead_slots);
purge_slots.stop();

debug!(
"process_dead_slots({}): {} {}",
"process_dead_slots({}): {} {} {:?}",
dead_slots.len(),
clean_dead_slots,
purge_slots
purge_slots,
dead_slots,
);
purged_account_slots
}

fn do_shrink_stale_slot(&self, slot: Slot) -> usize {
Expand Down Expand Up @@ -2081,10 +2113,10 @@ impl AccountsDB {
&self,
reclaims: SlotSlice<AccountInfo>,
expected_slot: Option<Slot>,
) -> (HashSet<Slot>, HashMap<AppendVecId, HashSet<usize>>) {
) -> (HashSet<Slot>, ReclaimedAccounts) {
let storage = self.storage.read().unwrap();
let mut dead_slots = HashSet::new();
let mut removed_accounts: HashMap<AppendVecId, HashSet<usize>> = HashMap::new();
let mut removed_accounts: ReclaimedAccounts = HashMap::new();
for (slot, account_info) in reclaims {
removed_accounts
.entry(account_info.store_id)
Expand Down Expand Up @@ -2121,7 +2153,8 @@ impl AccountsDB {
(dead_slots, removed_accounts)
}

fn clean_dead_slots(&self, dead_slots: &HashSet<Slot>) {
fn clean_dead_slots(&self, dead_slots: &HashSet<Slot>) -> PurgedAccountSlots {
let mut purged_account_slots: PurgedAccountSlots = HashMap::new();
{
let mut measure = Measure::start("clean_dead_slots-ms");
let storage = self.storage.read().unwrap();
Expand Down Expand Up @@ -2153,7 +2186,8 @@ impl AccountsDB {
})
};
let index = self.accounts_index.read().unwrap();
for (_slot, pubkey) in slot_pubkeys {
for (slot, pubkey) in slot_pubkeys {
purged_account_slots.entry(pubkey).or_default().insert(slot);
index.unref_from_storage(&pubkey);
}
drop(index);
Expand All @@ -2171,6 +2205,7 @@ impl AccountsDB {
bank_hashes.remove(slot);
}
}
purged_account_slots
}

fn hash_accounts(
Expand Down Expand Up @@ -3394,41 +3429,44 @@ pub mod tests {
// CREATE SLOT 1
let latest_slot = 1;

// Modify the first 10 of the slot 0 accounts as updates in slot 1
// Modify the first 10 of the accounts from slot 0 in slot 1
modify_accounts(&accounts, &pubkeys, latest_slot, 10, 3);
// Overwrite account 30 from slot 0 with lamports=0 into slot 1.
// Slot 1 should now have 10 + 1 = 11 accounts
let account = Account::new(0, 0, &Account::default().owner);
accounts.store(latest_slot, &[(&pubkeys[30], &account)]);

// Create 10 new accounts in slot 1
// Create 10 new accounts in slot 1, should now have 11 = 10 = 21
// accounts
create_account(&accounts, &mut pubkeys1, latest_slot, 10, 0, 0);

// Store a lamports=0 account in slot 1. Slot 1 should now have
// 10 + 10 + 1 = 21 accounts
let account = Account::new(0, 0, &Account::default().owner);
accounts.store(latest_slot, &[(&pubkeys[30], &account)]);
accounts.add_root(latest_slot);
assert!(check_storage(&accounts, 1, 21));

// CREATE SLOT 2
let latest_slot = 2;
let mut pubkeys2: Vec<Pubkey> = vec![];

// Modify original slot 0 accounts in slot 2
// Modify first 20 of the accounts from slot 0 in slot 2
modify_accounts(&accounts, &pubkeys, latest_slot, 20, 4);
accounts.clean_accounts(None);
// Overwrite account 31 from slot 0 with lamports=0 into slot 2.
// Slot 2 should now have 20 + 1 = 21 accounts
let account = Account::new(0, 0, &Account::default().owner);
accounts.store(latest_slot, &[(&pubkeys[31], &account)]);

// Create 10 new accounts in slot 2
// Create 10 new accounts in slot 2. Slot 2 should now have
// 21 + 10 = 31 accounts
create_account(&accounts, &mut pubkeys2, latest_slot, 10, 0, 0);

// Store a lamports=0 account in slot 2. Slot 2 should now have
// 10 + 10 + 10 + 1 = 31 accounts
let account = Account::new(0, 0, &Account::default().owner);
accounts.store(latest_slot, &[(&pubkeys[31], &account)]);
accounts.add_root(latest_slot);
assert!(check_storage(&accounts, 2, 31));

accounts.clean_accounts(None);
// The first 20 accounts have been modified in slot 2, so only
// 80 accounts left in slot 0.
assert!(check_storage(&accounts, 0, 80));
// The first 20 accounts of slot 0 have been updated in slot 2, as well as
// accounts 30 and 31 (overwritten with zero-lamport accounts in slot 1 and
// slot 2 respectively), so only 78 accounts are left in slot 0's storage entries.
assert!(check_storage(&accounts, 0, 78));
// 10 of the 21 accounts have been modified in slot 2, so only 11
// accounts left in slot 1.
assert!(check_storage(&accounts, 1, 11));
Expand Down Expand Up @@ -3518,6 +3556,7 @@ pub mod tests {
let accounts = AccountsDB::new_single();
accounts.add_root(0);

// Step A
let mut current_slot = 1;
accounts.store(current_slot, &[(&pubkey, &account)]);
// Store another live account to slot 1 which will prevent any purge
Expand All @@ -3541,9 +3580,10 @@ pub mod tests {
assert_eq!(slot1, current_slot);
assert_eq!(slot1, slot2);
assert_eq!(account_info1.store_id, account_info2.store_id);
println!("stored into {}", account_info1.store_id);

// Step B
current_slot += 1;
let zero_lamport_slot = current_slot;
accounts.store(current_slot, &[(&pubkey, &zero_lamport_account)]);
accounts.add_root(current_slot);

Expand All @@ -3558,24 +3598,23 @@ pub mod tests {

accounts.print_accounts_stats("post_purge");

// Make sure the index is not touched
assert_eq!(
accounts
.accounts_index
.read()
.unwrap()
.account_maps
.get(&pubkey)
.unwrap()
.1
.read()
.unwrap()
.len(),
2
);

// slot 1 & 2 should have stores
check_storage(&accounts, 1, 2);
// The earlier entry for pubkey in the account index is purged,
let (slot_list_len, index_slot) = {
let index = accounts.accounts_index.read().unwrap();
let account_entry = index.account_maps.get(&pubkey).unwrap();
let slot_list = account_entry.1.read().unwrap();
(slot_list.len(), slot_list[0].0)
};
assert_eq!(slot_list_len, 1);
// Zero lamport entry was not the one purged
assert_eq!(index_slot, zero_lamport_slot);
// The ref count should still be 2 because no slots were purged
assert_eq!(accounts.ref_count_for_pubkey(&pubkey), 2);

// storage for slot 1 had 2 accounts, now has 1 after pubkey 1
// was reclaimed
check_storage(&accounts, 1, 1);
// storage for slot 2 had 1 accounts, now has 1
check_storage(&accounts, 2, 1);
}

Expand Down

0 comments on commit f58d192

Please sign in to comment.