Skip to content

Commit

Permalink
iterate contents of AccountStorage without exposing internals (#29719)
Browse files Browse the repository at this point in the history
* iterate contents of AccountStorage without exposing internals

* Update runtime/src/accounts_db.rs

Co-authored-by: Brooks <[email protected]>

* Update runtime/src/accounts_db.rs

Co-authored-by: Brooks <[email protected]>

* Update runtime/src/accounts_db.rs

Co-authored-by: Brooks <[email protected]>

* compile errors

Co-authored-by: Brooks <[email protected]>
  • Loading branch information
jeffwashington and brooksprumo authored Jan 16, 2023
1 parent da39c48 commit ef30083
Show file tree
Hide file tree
Showing 2 changed files with 73 additions and 75 deletions.
37 changes: 33 additions & 4 deletions runtime/src/account_storage.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
//! Manage the map of slot -> append vecs
use {
crate::accounts_db::{AccountStorageEntry, AppendVecId, SlotStores},
crate::accounts_db::{AccountStorageEntry, AppendVecId, SlotStores, SnapshotStorageOne},
dashmap::DashMap,
solana_sdk::clock::Slot,
std::{
Expand Down Expand Up @@ -115,10 +115,10 @@ impl AccountStorage {
})
}

/// iterate through all (slot, append-vecs)
pub(crate) fn iter(&self) -> dashmap::iter::Iter<Slot, SlotStores> {
/// iterate through all (slot, append-vec)
pub(crate) fn iter(&self) -> AccountStorageIter<'_> {
assert!(self.shrink_in_progress_map.is_empty());
self.map.iter()
AccountStorageIter::new(self)
}

pub(crate) fn insert(&self, slot: Slot, store: Arc<AccountStorageEntry>) {
Expand Down Expand Up @@ -211,6 +211,35 @@ impl AccountStorage {
}
}

/// iterate contents of AccountStorage without exposing internals
pub struct AccountStorageIter<'a> {
iter: dashmap::iter::Iter<'a, Slot, SlotStores>,
}

impl<'a> AccountStorageIter<'a> {
pub fn new(storage: &'a AccountStorage) -> Self {
Self {
iter: storage.map.iter(),
}
}
}

impl<'a> Iterator for AccountStorageIter<'a> {
type Item = (Slot, SnapshotStorageOne);

fn next(&mut self) -> Option<Self::Item> {
for entry in self.iter.by_ref() {
// if no stores for a slot, then don't return the item at all, loops to try next slot
let slot = entry.key();
let stores = entry.value();
if let Some((_, store)) = stores.read().unwrap().iter().next() {
return Some((*slot, Arc::clone(store)));
}
}
None
}
}

/// exists while there is a shrink in progress
/// keeps track of the 'new_store' being created and the 'old_store' being replaced.
pub(crate) struct ShrinkInProgress<'a> {
Expand Down
111 changes: 40 additions & 71 deletions runtime/src/accounts_db.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6526,42 +6526,22 @@ impl AccountsDb {

fn report_store_stats(&self) {
let mut total_count = 0;
let mut min = std::usize::MAX;
let mut min_slot = 0;
let mut max = 0;
let mut max_slot = 0;
let mut newest_slot = 0;
let mut oldest_slot = std::u64::MAX;
let mut total_bytes = 0;
let mut total_alive_bytes = 0;
for iter_item in self.storage.iter() {
let slot = iter_item.key();
let slot_stores = iter_item.value().read().unwrap();
total_count += slot_stores.len();
if slot_stores.len() < min {
min = slot_stores.len();
min_slot = *slot;
}

if slot_stores.len() > max {
max = slot_stores.len();
max_slot = *slot;
}
if *slot > newest_slot {
newest_slot = *slot;
}
for (slot, store) in self.storage.iter() {
total_count += 1;
newest_slot = std::cmp::max(newest_slot, slot);

if *slot < oldest_slot {
oldest_slot = *slot;
}
oldest_slot = std::cmp::min(oldest_slot, slot);

for store in slot_stores.values() {
total_alive_bytes += Self::page_align(store.alive_bytes() as u64);
total_bytes += store.total_bytes();
}
total_alive_bytes += Self::page_align(store.alive_bytes() as u64);
total_bytes += store.total_bytes();
}
info!("total_stores: {}, newest_slot: {}, oldest_slot: {}, max_slot: {} (num={}), min_slot: {} (num={})",
total_count, newest_slot, oldest_slot, max_slot, max, min_slot, min);
info!(
"total_stores: {total_count}, newest_slot: {newest_slot}, oldest_slot: {oldest_slot}"
);

let total_alive_ratio = if total_bytes > 0 {
total_alive_bytes as f64 / total_bytes as f64
Expand Down Expand Up @@ -8266,14 +8246,13 @@ impl AccountsDb {
ancestors: Option<&Ancestors>,
) -> (SnapshotStoragesOne, Vec<Slot>) {
let mut m = Measure::start("get slots");
let slots_and_storages = self
let mut slots_and_storages = self
.storage
.iter()
.filter_map(|entry| {
let slot = *entry.key() as Slot;
.filter_map(|(slot, store)| {
requested_slots
.contains(&slot)
.then_some((slot, Arc::clone(entry.value())))
.then_some((slot, Some(store)))
})
.collect::<Vec<_>>();
m.stop();
Expand All @@ -8282,24 +8261,19 @@ impl AccountsDb {
let chunk_size = 5_000;
let wide = self.thread_pool_clean.install(|| {
slots_and_storages
.par_chunks(chunk_size)
.par_chunks_mut(chunk_size)
.map(|slots_and_storages| {
slots_and_storages
.iter()
.iter_mut()
.filter(|(slot, _)| {
self.accounts_index.is_alive_root(*slot)
|| ancestors
.map(|ancestors| ancestors.contains_key(slot))
.unwrap_or_default()
})
.filter_map(|(slot, storages)| {
storages
.read()
.unwrap()
.values()
.next()
.filter(|x| x.has_accounts())
.map(|storage| (Arc::clone(storage), *slot))
.filter_map(|(slot, store)| {
let store = std::mem::take(store).unwrap();
store.has_accounts().then_some((store, *slot))
})
.collect::<Vec<(SnapshotStorageOne, Slot)>>()
})
Expand Down Expand Up @@ -8925,23 +8899,22 @@ impl AccountsDb {
) {
// store count and size for each storage
let mut storage_size_storages_time = Measure::start("storage_size_storages");
for slot_stores in self.storage.iter() {
for (id, store) in slot_stores.value().read().unwrap().iter() {
// Should be default at this point
assert_eq!(store.alive_bytes(), 0);
if let Some(entry) = stored_sizes_and_counts.get(id) {
trace!(
"id: {} setting count: {} cur: {}",
id,
entry.count,
store.count(),
);
store.count_and_status.write().unwrap().0 = entry.count;
store.alive_bytes.store(entry.stored_size, Ordering::SeqCst);
} else {
trace!("id: {} clearing count", id);
store.count_and_status.write().unwrap().0 = 0;
}
for (_slot, store) in self.storage.iter() {
let id = store.append_vec_id();
// Should be default at this point
assert_eq!(store.alive_bytes(), 0);
if let Some(entry) = stored_sizes_and_counts.get(&id) {
trace!(
"id: {} setting count: {} cur: {}",
id,
entry.count,
store.count(),
);
store.count_and_status.write().unwrap().0 = entry.count;
store.alive_bytes.store(entry.stored_size, Ordering::SeqCst);
} else {
trace!("id: {} clearing count", id);
store.count_and_status.write().unwrap().0 = 0;
}
}
storage_size_storages_time.stop();
Expand Down Expand Up @@ -10761,7 +10734,7 @@ pub mod tests {
let mut append_vec_histogram = HashMap::new();
let mut all_storages = vec![];
for slot_storage in accounts.storage.iter() {
all_storages.extend(slot_storage.read().unwrap().values().cloned())
all_storages.push(slot_storage.1)
}
for storage in all_storages {
*append_vec_histogram.entry(storage.slot()).or_insert(0) += 1;
Expand Down Expand Up @@ -15433,10 +15406,8 @@ pub mod tests {
accounts.add_root_and_flush_write_cache(slot0);

// fake out the store count to avoid the assert
for slot_stores in accounts.storage.iter() {
for (_id, store) in slot_stores.value().read().unwrap().iter() {
store.alive_bytes.store(0, Ordering::Release);
}
for (_, store) in accounts.storage.iter() {
store.alive_bytes.store(0, Ordering::Release);
}

// populate based on made up hash data
Expand All @@ -15450,12 +15421,10 @@ pub mod tests {
);
accounts.set_storage_count_and_alive_bytes(dashmap, &mut GenerateIndexTimings::default());
assert_eq!(accounts.storage.len(), 1);
for slot_stores in accounts.storage.iter() {
for (id, store) in slot_stores.value().read().unwrap().iter() {
assert_eq!(id, &0);
assert_eq!(store.count_and_status.read().unwrap().0, 3);
assert_eq!(store.alive_bytes.load(Ordering::Acquire), 2);
}
for (_, store) in accounts.storage.iter() {
assert_eq!(store.append_vec_id(), 0);
assert_eq!(store.count_and_status.read().unwrap().0, 3);
assert_eq!(store.alive_bytes.load(Ordering::Acquire), 2);
}
}

Expand Down

0 comments on commit ef30083

Please sign in to comment.