Skip to content

Commit

Permalink
Do periodic inbound cleaning for rooted slots (#8436)
Browse files Browse the repository at this point in the history
* Do periodic inbound compaction for rooted slots

* Add comment

* nits

* Consider not_compacted_roots in cleanup_dead_slot

* Renames in AccountsIndex

* Rename to reflect expansion of removed accounts

* Fix a comment

* rename

* Parallelize clean over AccountsIndex

* Some niceties

* Reduce locks and real chunked parallelism

* Measure each step for sampling opportunities

* Just noticed par iter is maybe lazy

* Replace storage scan with optimized index scan

* Various clean-ups

* Clear uncleared_roots even if no updates
  • Loading branch information
ryoqun authored Mar 3, 2020
1 parent 1265afe commit d861033
Show file tree
Hide file tree
Showing 4 changed files with 262 additions and 50 deletions.
2 changes: 1 addition & 1 deletion ledger/src/snapshot_utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -337,7 +337,7 @@ pub fn add_snapshot<P: AsRef<Path>>(
bank: &Bank,
snapshot_storages: &[SnapshotStorage],
) -> Result<SlotSnapshotPaths> {
bank.purge_zero_lamport_accounts();
bank.clean_accounts();
bank.update_accounts_hash();
let slot = bank.slot();
// snapshot_path/slot
Expand Down
228 changes: 207 additions & 21 deletions runtime/src/accounts_db.rs
Original file line number Diff line number Diff line change
Expand Up @@ -638,18 +638,67 @@ impl AccountsDB {
false
}

// Purge zero lamport accounts for garbage collection purposes
// Reclaim older states of rooted non-zero lamport accounts as a general
// AccountsDB bloat mitigation and preprocess for better zero-lamport purging.
fn clean_old_rooted_accounts(&self, purges_in_root: Vec<Pubkey>) {
// This number isn't carefully chosen; just guessed randomly such that
// the hot loop will be the order of ~Xms.
const INDEX_CLEAN_BULK_COUNT: usize = 4096;

let mut measure = Measure::start("clean_old_root-ms");
let reclaim_vecs =
purges_in_root
.par_chunks(INDEX_CLEAN_BULK_COUNT)
.map(|pubkeys: &[Pubkey]| {
let mut reclaims = Vec::new();
let accounts_index = self.accounts_index.read().unwrap();
for pubkey in pubkeys {
accounts_index.clean_rooted_entries(&pubkey, &mut reclaims);
}
reclaims
});
let reclaims: Vec<_> = reclaim_vecs.flatten().collect();
measure.stop();
inc_new_counter_info!("clean-old-root-par-clean-ms", measure.as_ms() as usize);

let mut measure = Measure::start("clean_old_root-ms");
self.handle_reclaims(&reclaims);
measure.stop();
inc_new_counter_info!("clean-old-root-reclaim-ms", measure.as_ms() as usize);
}

fn clear_uncleaned_roots(&self) {
let mut accounts_index = self.accounts_index.write().unwrap();
accounts_index.uncleaned_roots.clear();
drop(accounts_index);
}

// Purge zero lamport accounts and older rooted account states as garbage
// collection
// Only remove those accounts where the entire rooted history of the account
// can be purged because there are no live append vecs in the ancestors
pub fn purge_zero_lamport_accounts(&self, ancestors: &HashMap<u64, usize>) {
pub fn clean_accounts(&self) {
self.report_store_stats();
let mut purges = HashMap::new();
let mut purges_in_root = Vec::new();
let no_ancestors = HashMap::new();
let accounts_index = self.accounts_index.read().unwrap();
accounts_index.scan_accounts(ancestors, |pubkey, (account_info, slot)| {
if account_info.lamports == 0 && accounts_index.is_root(slot) {

accounts_index.scan_accounts(&no_ancestors, |pubkey, (account_info, slot)| {
if account_info.lamports == 0 {
purges.insert(*pubkey, accounts_index.would_purge(pubkey));
} else if accounts_index.uncleaned_roots.contains(&slot) {
purges_in_root.push(*pubkey);
}
});
drop(accounts_index);

if !purges_in_root.is_empty() {
self.clean_old_rooted_accounts(purges_in_root);
}
self.clear_uncleaned_roots();

let accounts_index = self.accounts_index.read().unwrap();

// Calculate store counts as if everything was purged
// Then purge if we can
Expand Down Expand Up @@ -725,9 +774,9 @@ impl AccountsDB {
let mut dead_slots = self.remove_dead_accounts(reclaims);
dead_accounts.stop();

let mut cleanup_dead_slots = Measure::start("reclaims::purge_slots");
self.cleanup_dead_slots(&mut dead_slots);
cleanup_dead_slots.stop();
let mut clean_dead_slots = Measure::start("reclaims::purge_slots");
self.clean_dead_slots(&mut dead_slots);
clean_dead_slots.stop();

let mut purge_slots = Measure::start("reclaims::purge_slots");
for slot in dead_slots {
Expand Down Expand Up @@ -1298,7 +1347,7 @@ impl AccountsDB {
dead_slots
}

fn cleanup_dead_slots(&self, dead_slots: &mut HashSet<Slot>) {
fn clean_dead_slots(&self, dead_slots: &mut HashSet<Slot>) {
if self.dont_cleanup_dead_slots.load(Ordering::Relaxed) {
return;
}
Expand All @@ -1307,7 +1356,7 @@ impl AccountsDB {
{
let mut index = self.accounts_index.write().unwrap();
for slot in dead_slots.iter() {
index.cleanup_dead_slot(*slot);
index.clean_dead_slot(*slot);
}
}
{
Expand Down Expand Up @@ -1928,7 +1977,7 @@ pub mod tests {
//slot is still there, since gc is lazy
assert!(accounts.storage.read().unwrap().0[&0].get(&id).is_some());

//store causes cleanup
//store causes clean
accounts.store(1, &[(&pubkey, &account)]);

//slot is gone
Expand All @@ -1939,6 +1988,149 @@ pub mod tests {
assert_eq!(accounts.load_slow(&ancestors, &pubkey), Some((account, 1)));
}

impl AccountsDB {
fn store_count_for_slot(&self, slot: Slot) -> usize {
let storage = self.storage.read().unwrap();

let slot_storage = storage.0.get(&slot);
if let Some(slot_storage) = slot_storage {
slot_storage.values().nth(0).unwrap().count()
} else {
0
}
}

fn uncleaned_root_count(&self) -> usize {
self.accounts_index.read().unwrap().uncleaned_roots.len()
}
}

#[test]
fn test_clean_old_with_normal_account() {
solana_logger::setup();

let accounts = AccountsDB::new(Vec::new());
let pubkey = Pubkey::new_rand();
let account = Account::new(1, 0, &Account::default().owner);
//store an account
accounts.store(0, &[(&pubkey, &account)]);
accounts.store(1, &[(&pubkey, &account)]);

// simulate slots are rooted after while
accounts.add_root(0);
accounts.add_root(1);

//even if rooted, old state isn't cleaned up
assert_eq!(accounts.store_count_for_slot(0), 1);
assert_eq!(accounts.store_count_for_slot(1), 1);

accounts.clean_accounts();

//now old state is cleaned up
assert_eq!(accounts.store_count_for_slot(0), 0);
assert_eq!(accounts.store_count_for_slot(1), 1);
}

#[test]
fn test_clean_old_with_zero_lamport_account() {
solana_logger::setup();

let accounts = AccountsDB::new(Vec::new());
let pubkey1 = Pubkey::new_rand();
let pubkey2 = Pubkey::new_rand();
let normal_account = Account::new(1, 0, &Account::default().owner);
let zero_account = Account::new(0, 0, &Account::default().owner);
//store an account
accounts.store(0, &[(&pubkey1, &normal_account)]);
accounts.store(1, &[(&pubkey1, &zero_account)]);
accounts.store(0, &[(&pubkey2, &normal_account)]);
accounts.store(1, &[(&pubkey2, &normal_account)]);

//simulate slots are rooted after while
accounts.add_root(0);
accounts.add_root(1);

//even if rooted, old state isn't cleaned up
assert_eq!(accounts.store_count_for_slot(0), 2);
assert_eq!(accounts.store_count_for_slot(1), 2);

accounts.clean_accounts();

//still old state behind zero-lamport account isn't cleaned up
assert_eq!(accounts.store_count_for_slot(0), 1);
assert_eq!(accounts.store_count_for_slot(1), 2);
}

#[test]
fn test_clean_old_with_both_normal_and_zero_lamport_accounts() {
solana_logger::setup();

let accounts = AccountsDB::new(Vec::new());
let pubkey1 = Pubkey::new_rand();
let pubkey2 = Pubkey::new_rand();
let normal_account = Account::new(1, 0, &Account::default().owner);
let zero_account = Account::new(0, 0, &Account::default().owner);
//store an account
accounts.store(0, &[(&pubkey1, &normal_account)]);
accounts.store(1, &[(&pubkey1, &zero_account)]);
accounts.store(0, &[(&pubkey2, &normal_account)]);
accounts.store(2, &[(&pubkey2, &normal_account)]);

//simulate slots are rooted after while
accounts.add_root(0);
accounts.add_root(1);
accounts.add_root(2);

//even if rooted, old state isn't cleaned up
assert_eq!(accounts.store_count_for_slot(0), 2);
assert_eq!(accounts.store_count_for_slot(1), 1);
assert_eq!(accounts.store_count_for_slot(2), 1);

accounts.clean_accounts();

//both zero lamport and normal accounts are cleaned up
assert_eq!(accounts.store_count_for_slot(0), 0);
assert_eq!(accounts.store_count_for_slot(1), 0);
assert_eq!(accounts.store_count_for_slot(2), 1);
}

#[test]
fn test_uncleaned_roots_with_account() {
solana_logger::setup();

let accounts = AccountsDB::new(Vec::new());
let pubkey = Pubkey::new_rand();
let account = Account::new(1, 0, &Account::default().owner);
//store an account
accounts.store(0, &[(&pubkey, &account)]);
assert_eq!(accounts.uncleaned_root_count(), 0);

// simulate slots are rooted after while
accounts.add_root(0);
assert_eq!(accounts.uncleaned_root_count(), 1);

//now uncleaned roots are cleaned up
accounts.clean_accounts();
assert_eq!(accounts.uncleaned_root_count(), 0);
}

#[test]
fn test_uncleaned_roots_with_no_account() {
solana_logger::setup();

let accounts = AccountsDB::new(Vec::new());

assert_eq!(accounts.uncleaned_root_count(), 0);

// simulate slots are rooted after while
accounts.add_root(0);
assert_eq!(accounts.uncleaned_root_count(), 1);

//now uncleaned roots are cleaned up
accounts.clean_accounts();
assert_eq!(accounts.uncleaned_root_count(), 0);
}

fn print_accounts(label: &'static str, accounts: &AccountsDB) {
print_index(label, accounts);
print_count_and_status(label, accounts);
Expand Down Expand Up @@ -2103,12 +2295,6 @@ pub mod tests {
daccounts
}

fn purge_zero_lamport_accounts(accounts: &AccountsDB, slot: Slot) {
let ancestors = vec![(slot as Slot, 0)].into_iter().collect();
info!("ancestors: {:?}", ancestors);
accounts.purge_zero_lamport_accounts(&ancestors);
}

fn assert_no_stores(accounts: &AccountsDB, slot: Slot) {
let stores = accounts.storage.read().unwrap();
info!("{:?}", stores.0.get(&slot));
Expand Down Expand Up @@ -2153,7 +2339,7 @@ pub mod tests {

print_accounts("pre_purge", &accounts);

purge_zero_lamport_accounts(&accounts, current_slot);
accounts.clean_accounts();

print_accounts("post_purge", &accounts);

Expand Down Expand Up @@ -2216,7 +2402,7 @@ pub mod tests {
info!("ancestors: {:?}", ancestors);
let hash = accounts.update_accounts_hash(current_slot, &ancestors);

purge_zero_lamport_accounts(&accounts, current_slot);
accounts.clean_accounts();

assert_eq!(
accounts.update_accounts_hash(current_slot, &ancestors),
Expand Down Expand Up @@ -2287,7 +2473,7 @@ pub mod tests {

print_accounts("accounts", &accounts);

purge_zero_lamport_accounts(&accounts, current_slot);
accounts.clean_accounts();

print_accounts("accounts_post_purge", &accounts);
let accounts = reconstruct_accounts_db_via_serialization(&accounts, current_slot);
Expand Down Expand Up @@ -2356,7 +2542,7 @@ pub mod tests {
fn test_accounts_purge_chained_purge_before_snapshot_restore() {
solana_logger::setup();
with_chained_zero_lamport_accounts(|accounts, current_slot| {
purge_zero_lamport_accounts(&accounts, current_slot);
accounts.clean_accounts();
let accounts = reconstruct_accounts_db_via_serialization(&accounts, current_slot);
(accounts, false)
});
Expand All @@ -2370,7 +2556,7 @@ pub mod tests {
accounts
.dont_cleanup_dead_slots
.store(true, Ordering::Relaxed);
purge_zero_lamport_accounts(&accounts, current_slot);
accounts.clean_accounts();
(accounts, true)
});
}
Expand Down
Loading

0 comments on commit d861033

Please sign in to comment.