diff --git a/accounts-db/src/accounts_db.rs b/accounts-db/src/accounts_db.rs index 81f0b3a8fa42ad..f1d0324128c148 100644 --- a/accounts-db/src/accounts_db.rs +++ b/accounts-db/src/accounts_db.rs @@ -89,7 +89,6 @@ use { hash::Hash, pubkey::Pubkey, rent_collector::RentCollector, - saturating_add_assign, timing::AtomicInterval, transaction::SanitizedTransaction, }, @@ -100,6 +99,7 @@ use { fs, hash::{Hash as StdHash, Hasher as StdHasher}, io::Result as IoResult, + num::Saturating, ops::{Range, RangeBounds}, path::{Path, PathBuf}, sync::{ @@ -1742,21 +1742,21 @@ impl SplitAncientStorages { #[derive(Debug, Default)] struct FlushStats { - num_flushed: usize, - num_purged: usize, - total_size: u64, + num_flushed: Saturating, + num_purged: Saturating, + total_size: Saturating, store_accounts_timing: StoreAccountsTiming, - store_accounts_total_us: u64, + store_accounts_total_us: Saturating, } impl FlushStats { fn accumulate(&mut self, other: &Self) { - saturating_add_assign!(self.num_flushed, other.num_flushed); - saturating_add_assign!(self.num_purged, other.num_purged); - saturating_add_assign!(self.total_size, other.total_size); + self.num_flushed += other.num_flushed; + self.num_purged += other.num_purged; + self.total_size += other.total_size; self.store_accounts_timing .accumulate(&other.store_accounts_timing); - saturating_add_assign!(self.store_accounts_total_us, other.store_accounts_total_us); + self.store_accounts_total_us += other.store_accounts_total_us; } } @@ -1884,26 +1884,20 @@ pub(crate) struct ShrinkAncientStats { #[derive(Debug, Default)] pub(crate) struct ShrinkStatsSub { pub(crate) store_accounts_timing: StoreAccountsTiming, - pub(crate) rewrite_elapsed_us: u64, - pub(crate) create_and_insert_store_elapsed_us: u64, - pub(crate) unpackable_slots_count: usize, - pub(crate) newest_alive_packed_count: usize, + pub(crate) rewrite_elapsed_us: Saturating, + pub(crate) create_and_insert_store_elapsed_us: Saturating, + pub(crate) unpackable_slots_count: Saturating, + pub(crate) newest_alive_packed_count: Saturating, } impl ShrinkStatsSub { pub(crate) fn accumulate(&mut self, other: &Self) { self.store_accounts_timing .accumulate(&other.store_accounts_timing); - saturating_add_assign!(self.rewrite_elapsed_us, other.rewrite_elapsed_us); - saturating_add_assign!( - self.create_and_insert_store_elapsed_us, - other.create_and_insert_store_elapsed_us - ); - saturating_add_assign!(self.unpackable_slots_count, other.unpackable_slots_count); - saturating_add_assign!( - self.newest_alive_packed_count, - other.newest_alive_packed_count - ); + self.rewrite_elapsed_us += other.rewrite_elapsed_us; + self.create_and_insert_store_elapsed_us += other.create_and_insert_store_elapsed_us; + self.unpackable_slots_count += other.unpackable_slots_count; + self.newest_alive_packed_count += other.newest_alive_packed_count; } } #[derive(Debug, Default)] @@ -3979,7 +3973,7 @@ impl AccountsDb { let (shrink_in_progress, time_us) = measure_us!( self.get_store_for_shrink(slot, shrink_collect.alive_total_bytes as u64) ); - stats_sub.create_and_insert_store_elapsed_us = time_us; + stats_sub.create_and_insert_store_elapsed_us = Saturating(time_us); // here, we're writing back alive_accounts. That should be an atomic operation // without use of rather wide locks in this whole function, because we're @@ -3992,7 +3986,7 @@ impl AccountsDb { ); rewrite_elapsed.stop(); - stats_sub.rewrite_elapsed_us = rewrite_elapsed.as_us(); + stats_sub.rewrite_elapsed_us = Saturating(rewrite_elapsed.as_us()); // `store_accounts_frozen()` above may have purged accounts from some // other storage entries (the ones that were just overwritten by this @@ -4024,7 +4018,7 @@ impl AccountsDb { .fetch_add(1, Ordering::Relaxed); } shrink_stats.create_and_insert_store_elapsed.fetch_add( - stats_sub.create_and_insert_store_elapsed_us, + stats_sub.create_and_insert_store_elapsed_us.0, Ordering::Relaxed, ); shrink_stats.store_accounts_elapsed.fetch_add( @@ -4041,12 +4035,12 @@ impl AccountsDb { ); shrink_stats .rewrite_elapsed - .fetch_add(stats_sub.rewrite_elapsed_us, Ordering::Relaxed); + .fetch_add(stats_sub.rewrite_elapsed_us.0, Ordering::Relaxed); shrink_stats .unpackable_slots_count - .fetch_add(stats_sub.unpackable_slots_count as u64, Ordering::Relaxed); + .fetch_add(stats_sub.unpackable_slots_count.0 as u64, Ordering::Relaxed); shrink_stats.newest_alive_packed_count.fetch_add( - stats_sub.newest_alive_packed_count as u64, + stats_sub.newest_alive_packed_count.0 as u64, Ordering::Relaxed, ); } @@ -4454,7 +4448,8 @@ impl AccountsDb { let (mut shrink_in_progress, create_and_insert_store_elapsed_us) = measure_us!( current_ancient.create_if_necessary(slot, self, shrink_collect.alive_total_bytes) ); - stats_sub.create_and_insert_store_elapsed_us = create_and_insert_store_elapsed_us; + stats_sub.create_and_insert_store_elapsed_us = + Saturating(create_and_insert_store_elapsed_us); let available_bytes = current_ancient.accounts_file().accounts.remaining_bytes(); // split accounts in 'slot' into: // 'Primary', which can fit in 'current_ancient' @@ -4516,7 +4511,7 @@ impl AccountsDb { } assert_eq!(bytes_remaining_to_write, 0); rewrite_elapsed.stop(); - stats_sub.rewrite_elapsed_us = rewrite_elapsed.as_us(); + stats_sub.rewrite_elapsed_us = Saturating(rewrite_elapsed.as_us()); if slot != current_ancient.slot() { // all append vecs in this slot have been combined into an ancient append vec @@ -6116,9 +6111,9 @@ impl AccountsDb { }); datapoint_info!( "accounts_db-flush_accounts_cache_aggressively", - ("num_flushed", flush_stats.num_flushed, i64), - ("num_purged", flush_stats.num_purged, i64), - ("total_flush_size", flush_stats.total_size, i64), + ("num_flushed", flush_stats.num_flushed.0, i64), + ("num_purged", flush_stats.num_purged.0, i64), + ("total_flush_size", flush_stats.total_size.0, i64), ("total_cache_size", self.accounts_cache.size(), i64), ("total_frozen_slots", excess_slot_count, i64), ("total_slots", self.accounts_cache.num_slots(), i64), @@ -6146,7 +6141,7 @@ impl AccountsDb { ("num_accounts_saved", num_accounts_saved, i64), ( "store_accounts_total_us", - flush_stats.store_accounts_total_us, + flush_stats.store_accounts_total_us.0, i64 ), ( @@ -6235,9 +6230,7 @@ impl AccountsDb { mut should_flush_f: Option<&mut impl FnMut(&Pubkey, &AccountSharedData) -> bool>, max_clean_root: Option, ) -> FlushStats { - let mut num_purged = 0; - let mut total_size = 0; - let mut num_flushed = 0; + let mut flush_stats = FlushStats::default(); let iter_items: Vec<_> = slot_cache.iter().collect(); let mut purged_slot_pubkeys: HashSet<(Slot, Pubkey)> = HashSet::new(); let mut pubkey_to_slot_set: Vec<(Pubkey, Slot)> = vec![]; @@ -6263,15 +6256,15 @@ impl AccountsDb { .unwrap_or(true); if should_flush { let hash = iter_item.value().hash(); - total_size += aligned_stored_size(account.data().len()) as u64; - num_flushed += 1; + flush_stats.total_size += aligned_stored_size(account.data().len()) as u64; + flush_stats.num_flushed += 1; Some(((key, account), hash)) } else { // If we don't flush, we have to remove the entry from the // index, since it's equivalent to purging purged_slot_pubkeys.insert((slot, *key)); pubkey_to_slot_set.push((*key, slot)); - num_purged += 1; + flush_stats.num_purged += 1; None } }) @@ -6288,13 +6281,12 @@ impl AccountsDb { &HashSet::default(), ); - let mut store_accounts_timing = StoreAccountsTiming::default(); - let mut store_accounts_total_us = 0; if !is_dead_slot { // This ensures that all updates are written to an AppendVec, before any // updates to the index happen, so anybody that sees a real entry in the index, // will be able to find the account in storage - let flushed_store = self.create_and_insert_store(slot, total_size, "flush_slot_cache"); + let flushed_store = + self.create_and_insert_store(slot, flush_stats.total_size.0, "flush_slot_cache"); let (store_accounts_timing_inner, store_accounts_total_inner_us) = measure_us!(self .store_accounts_frozen( (slot, &accounts[..]), @@ -6302,8 +6294,8 @@ impl AccountsDb { &flushed_store, StoreReclaims::Default, )); - store_accounts_timing = store_accounts_timing_inner; - store_accounts_total_us = store_accounts_total_inner_us; + flush_stats.store_accounts_timing = store_accounts_timing_inner; + flush_stats.store_accounts_total_us = Saturating(store_accounts_total_inner_us); // If the above sizing function is correct, just one AppendVec is enough to hold // all the data for the slot @@ -6315,13 +6307,8 @@ impl AccountsDb { // There is some racy condition for existing readers who just has read exactly while // flushing. That case is handled by retry_to_get_account_accessor() assert!(self.accounts_cache.remove_slot(slot).is_some()); - FlushStats { - num_flushed, - num_purged, - total_size, - store_accounts_timing, - store_accounts_total_us, - } + + flush_stats } /// flush all accounts in this slot diff --git a/accounts-db/src/ancient_append_vecs.rs b/accounts-db/src/ancient_append_vecs.rs index a7991652e63b44..09fe17a39e1078 100644 --- a/accounts-db/src/ancient_append_vecs.rs +++ b/accounts-db/src/ancient_append_vecs.rs @@ -20,10 +20,10 @@ use { rand::{thread_rng, Rng}, rayon::prelude::{IntoParallelIterator, IntoParallelRefIterator, ParallelIterator}, solana_measure::measure_us, - solana_sdk::{account::ReadableAccount, clock::Slot, saturating_add_assign}, + solana_sdk::{account::ReadableAccount, clock::Slot}, std::{ collections::HashMap, - num::NonZeroU64, + num::{NonZeroU64, Saturating}, sync::{atomic::Ordering, Arc, Mutex}, }, }; @@ -68,9 +68,9 @@ struct AncientSlotInfos { /// subset of all_infos shrink_indexes: Vec, /// total alive bytes across contents of 'shrink_indexes' - total_alive_bytes_shrink: u64, + total_alive_bytes_shrink: Saturating, /// total alive bytes across all slots - total_alive_bytes: u64, + total_alive_bytes: Saturating, } impl AncientSlotInfos { @@ -106,7 +106,7 @@ impl AncientSlotInfos { if should_shrink { // alive ratio is too low, so prioritize combining this slot with others // to reduce disk space used - saturating_add_assign!(self.total_alive_bytes_shrink, alive_bytes); + self.total_alive_bytes_shrink += alive_bytes; self.shrink_indexes.push(self.all_infos.len()); } else { let already_ideal_size = u64::from(ideal_size) * 80 / 100; @@ -122,7 +122,7 @@ impl AncientSlotInfos { alive_bytes, should_shrink, }); - saturating_add_assign!(self.total_alive_bytes, alive_bytes); + self.total_alive_bytes += alive_bytes; } was_randomly_shrunk } @@ -150,20 +150,20 @@ impl AncientSlotInfos { /// clear 'should_shrink' for storages after a cutoff to limit how many storages we shrink fn clear_should_shrink_after_cutoff(&mut self, percent_of_alive_shrunk_data: u64) { - let mut bytes_to_shrink_due_to_ratio = 0; + let mut bytes_to_shrink_due_to_ratio = Saturating(0); // shrink enough slots to write 'percent_of_alive_shrunk_data'% of the total alive data // from slots that exceeded the shrink threshold. // The goal is to limit overall i/o in this pass while making progress. - let threshold_bytes = self.total_alive_bytes_shrink * percent_of_alive_shrunk_data / 100; + let threshold_bytes = self.total_alive_bytes_shrink.0 * percent_of_alive_shrunk_data / 100; for info_index in &self.shrink_indexes { let info = &mut self.all_infos[*info_index]; - if bytes_to_shrink_due_to_ratio >= threshold_bytes { + if bytes_to_shrink_due_to_ratio.0 >= threshold_bytes { // we exceeded the amount to shrink due to alive ratio, so don't shrink this one just due to 'should_shrink' // It MAY be shrunk based on total capacity still. // Mark it as false for 'should_shrink' so it gets evaluated solely based on # of files. info.should_shrink = false; } else { - saturating_add_assign!(bytes_to_shrink_due_to_ratio, info.alive_bytes); + bytes_to_shrink_due_to_ratio += info.alive_bytes; } } } @@ -187,11 +187,11 @@ impl AncientSlotInfos { // these indexes into 'all_infos' are useless once we truncate 'all_infos', so make sure they're cleared out to avoid any issues self.shrink_indexes.clear(); let total_storages = self.all_infos.len(); - let mut cumulative_bytes = 0u64; + let mut cumulative_bytes = Saturating(0u64); let low_threshold = max_storages * 50 / 100; for (i, info) in self.all_infos.iter().enumerate() { - saturating_add_assign!(cumulative_bytes, info.alive_bytes); - let ancient_storages_required = (cumulative_bytes / ideal_storage_size + 1) as usize; + cumulative_bytes += info.alive_bytes; + let ancient_storages_required = (cumulative_bytes.0 / ideal_storage_size + 1) as usize; let storages_remaining = total_storages - i - 1; // if the remaining uncombined storages and the # of resulting @@ -459,11 +459,11 @@ impl AccountsDb { write_ancient_accounts.metrics.accumulate(&ShrinkStatsSub { store_accounts_timing, - rewrite_elapsed_us, - create_and_insert_store_elapsed_us, - unpackable_slots_count: 0, - newest_alive_packed_count: 0, + rewrite_elapsed_us: Saturating(rewrite_elapsed_us), + create_and_insert_store_elapsed_us: Saturating(create_and_insert_store_elapsed_us), + ..ShrinkStatsSub::default() }); + write_ancient_accounts .shrinks_in_progress .insert(target_slot, shrink_in_progress); @@ -791,7 +791,7 @@ impl<'a> PackedAncientStorage<'a> { // starting at first entry in current_alive_accounts let mut partial_inner_index = 0; // 0 bytes written so far from the current set of accounts - let mut partial_bytes_written = 0; + let mut partial_bytes_written = Saturating(0); // pack a new storage each iteration of this outer loop loop { let mut bytes_total = 0usize; @@ -806,11 +806,11 @@ impl<'a> PackedAncientStorage<'a> { current_alive_accounts = accounts_to_combine.next(); // reset partial progress since we're starting over with a new set of alive accounts partial_inner_index = 0; - partial_bytes_written = 0; + partial_bytes_written = Saturating(0); continue; } let bytes_remaining_this_slot = - alive_accounts.bytes.saturating_sub(partial_bytes_written); + alive_accounts.bytes.saturating_sub(partial_bytes_written.0); let bytes_total_with_this_slot = bytes_total.saturating_add(bytes_remaining_this_slot); let mut partial_inner_index_max_exclusive; @@ -832,7 +832,7 @@ impl<'a> PackedAncientStorage<'a> { break; } // this account fits - saturating_add_assign!(partial_bytes_written, account_size); + partial_bytes_written += account_size; bytes_total = new_size; partial_inner_index_max_exclusive += 1; } @@ -2203,12 +2203,15 @@ pub mod tests { Vec::default() } ); - assert_eq!(infos.total_alive_bytes, alive_bytes_expected as u64); - assert_eq!(infos.total_alive_bytes_shrink, alive_bytes_expected as u64); + assert_eq!(infos.total_alive_bytes.0, alive_bytes_expected as u64); + assert_eq!( + infos.total_alive_bytes_shrink.0, + alive_bytes_expected as u64 + ); } else { assert!(infos.shrink_indexes.is_empty()); - assert_eq!(infos.total_alive_bytes, alive_bytes_expected as u64); - assert_eq!(infos.total_alive_bytes_shrink, 0); + assert_eq!(infos.total_alive_bytes.0, alive_bytes_expected as u64); + assert_eq!(infos.total_alive_bytes_shrink.0, 0); } } } @@ -2239,8 +2242,8 @@ pub mod tests { } assert!(infos.all_infos.is_empty()); assert!(infos.shrink_indexes.is_empty()); - assert_eq!(infos.total_alive_bytes, 0); - assert_eq!(infos.total_alive_bytes_shrink, 0); + assert_eq!(infos.total_alive_bytes.0, 0); + assert_eq!(infos.total_alive_bytes_shrink.0, 0); } } @@ -2269,8 +2272,8 @@ pub mod tests { if !alive { assert!(infos.all_infos.is_empty()); assert!(infos.shrink_indexes.is_empty()); - assert_eq!(infos.total_alive_bytes, 0); - assert_eq!(infos.total_alive_bytes_shrink, 0); + assert_eq!(infos.total_alive_bytes.0, 0); + assert_eq!(infos.total_alive_bytes_shrink.0, 0); } else { assert_eq!(infos.all_infos.len(), slots); let should_shrink = data_size.is_none(); @@ -2290,12 +2293,12 @@ pub mod tests { .map(|(i, _)| i) .collect::>() ); - assert_eq!(infos.total_alive_bytes, alive_bytes_expected); - assert_eq!(infos.total_alive_bytes_shrink, alive_bytes_expected); + assert_eq!(infos.total_alive_bytes.0, alive_bytes_expected); + assert_eq!(infos.total_alive_bytes_shrink.0, alive_bytes_expected); } else { assert!(infos.shrink_indexes.is_empty()); - assert_eq!(infos.total_alive_bytes, alive_bytes_expected); - assert_eq!(infos.total_alive_bytes_shrink, 0); + assert_eq!(infos.total_alive_bytes.0, alive_bytes_expected); + assert_eq!(infos.total_alive_bytes_shrink.0, 0); } } } @@ -2382,12 +2385,12 @@ pub mod tests { Vec::default() } ); - assert_eq!(infos.total_alive_bytes, alive_bytes_expected); - assert_eq!(infos.total_alive_bytes_shrink, alive_bytes_expected); + assert_eq!(infos.total_alive_bytes.0, alive_bytes_expected); + assert_eq!(infos.total_alive_bytes_shrink.0, alive_bytes_expected); } else { assert!(infos.shrink_indexes.is_empty()); - assert_eq!(infos.total_alive_bytes, alive_bytes_expected); - assert_eq!(infos.total_alive_bytes_shrink, 0); + assert_eq!(infos.total_alive_bytes.0, alive_bytes_expected); + assert_eq!(infos.total_alive_bytes_shrink.0, 0); } } } @@ -2702,8 +2705,8 @@ pub mod tests { .collect::>() } ); - assert_eq!(infos.total_alive_bytes, alive_bytes_expected); - assert_eq!(infos.total_alive_bytes_shrink, dead_bytes); + assert_eq!(infos.total_alive_bytes.0, alive_bytes_expected); + assert_eq!(infos.total_alive_bytes_shrink.0, dead_bytes); } } } @@ -2869,8 +2872,13 @@ pub mod tests { if swap { infos.all_infos = infos.all_infos.into_iter().rev().collect(); } - infos.total_alive_bytes_shrink = - infos.all_infos.iter().map(|info| info.alive_bytes).sum(); + infos.total_alive_bytes_shrink = Saturating( + infos + .all_infos + .iter() + .map(|info| info.alive_bytes) + .sum::(), + ); match method { TestShouldShrink::FilterAncientSlots => { let tuning = PackedAncientStorageTuning { @@ -2900,7 +2908,7 @@ pub mod tests { percent_of_alive_shrunk_data == 89 || percent_of_alive_shrunk_data == 90 } else { infos.all_infos[infos.shrink_indexes[0]].alive_bytes - >= infos.total_alive_bytes_shrink * percent_of_alive_shrunk_data + >= infos.total_alive_bytes_shrink.0 * percent_of_alive_shrunk_data / 100 }; if modify {