From 55dc0323094d7936f079b313f22e4b02f04fd0ed Mon Sep 17 00:00:00 2001 From: "Jeff Washington (jwash)" Date: Fri, 26 Mar 2021 16:32:59 -0500 Subject: [PATCH 01/17] lazy calculate account hash --- runtime/src/accounts_cache.rs | 68 ++++++++- runtime/src/accounts_db.rs | 268 ++++++++++++++++++++++------------ 2 files changed, 235 insertions(+), 101 deletions(-) diff --git a/runtime/src/accounts_cache.rs b/runtime/src/accounts_cache.rs index 1dc2bd8c98c763..f62e551feecb20 100644 --- a/runtime/src/accounts_cache.rs +++ b/runtime/src/accounts_cache.rs @@ -2,6 +2,7 @@ use dashmap::DashMap; use solana_sdk::{ account::{AccountSharedData, ReadableAccount}, clock::Slot, + genesis_config::ClusterType, hash::Hash, pubkey::Pubkey, }; @@ -47,7 +48,14 @@ impl SlotCacheInner { ); } - pub fn insert(&self, pubkey: &Pubkey, account: AccountSharedData, hash: Hash) { + pub fn insert( + &self, + pubkey: &Pubkey, + account: AccountSharedData, + hash: Option, + hash_slot: Slot, + cluster_type: ClusterType, + ) { if self.cache.contains_key(pubkey) { self.same_account_writes.fetch_add(1, Ordering::Relaxed); self.same_account_writes_size @@ -56,7 +64,16 @@ impl SlotCacheInner { self.unique_account_writes_size .fetch_add(account.data().len() as u64, Ordering::Relaxed); } - self.cache.insert(*pubkey, CachedAccount { account, hash }); + self.cache.insert( + *pubkey, + Arc::new(CachedAccountInner { + account, + hash: RwLock::new(hash), + hash_slot, + cluster_type, + pubkey: *pubkey, + }), + ); } pub fn get_cloned(&self, pubkey: &Pubkey) -> Option { @@ -89,10 +106,39 @@ impl Deref for SlotCacheInner { } } -#[derive(Debug, Clone)] -pub struct CachedAccount { +pub struct LazyHash { + pub account: Arc, + pub hash: RwLock, +} + +pub type CachedAccount = Arc; + +#[derive(Debug)] +pub struct CachedAccountInner { pub account: AccountSharedData, - pub hash: Hash, + hash: RwLock>, + pub hash_slot: Slot, + cluster_type: ClusterType, + pubkey: Pubkey, +} + +impl CachedAccountInner { + pub fn hash(&self) -> Hash { + let hash = self.hash.read().unwrap(); + match *hash { + Some(hash) => hash, + None => { + let hash = crate::accounts_db::AccountsDb::hash_account( + self.hash_slot, + &self.account, + &self.pubkey, + &self.cluster_type, + ); + *self.hash.write().unwrap() = Some(hash.clone()); + hash + } + } + } } #[derive(Debug, Default)] @@ -128,7 +174,15 @@ impl AccountsCache { ); } - pub fn store(&self, slot: Slot, pubkey: &Pubkey, account: AccountSharedData, hash: Hash) { + pub fn store( + &self, + slot: Slot, + pubkey: &Pubkey, + account: AccountSharedData, + hash: Option, + hash_slot: Slot, + cluster_type: ClusterType, + ) { let slot_cache = self.slot_cache(slot).unwrap_or_else(|| // DashMap entry.or_insert() returns a RefMut, essentially a write lock, // which is dropped after this block ends, minimizing time held by the lock. @@ -140,7 +194,7 @@ impl AccountsCache { .or_insert(Arc::new(SlotCacheInner::default())) .clone()); - slot_cache.insert(pubkey, account, hash); + slot_cache.insert(pubkey, account, hash, hash_slot, cluster_type); } pub fn load(&self, slot: Slot, pubkey: &Pubkey) -> Option { diff --git a/runtime/src/accounts_db.rs b/runtime/src/accounts_db.rs index 30650d524f9514..6c76dbed77ac96 100644 --- a/runtime/src/accounts_db.rs +++ b/runtime/src/accounts_db.rs @@ -19,7 +19,7 @@ //! commit for each slot entry would be indexed. use crate::{ - accounts_cache::{AccountsCache, CachedAccount, SlotCache}, + accounts_cache::{AccountsCache, CachedAccount, CachedAccountInner, SlotCache}, accounts_hash::{AccountsHash, CalculateHashIntermediate, HashStats, PreviousPass}, accounts_index::{ AccountIndex, AccountsIndex, AccountsIndexRootsStats, Ancestors, IndexKey, IsCached, @@ -57,7 +57,9 @@ use std::{ ops::{Range, RangeBounds}, path::{Path, PathBuf}, sync::atomic::{AtomicBool, AtomicU64, AtomicUsize, Ordering}, + sync::mpsc::{channel, Receiver, SendError, Sender, SyncSender}, sync::{Arc, Mutex, MutexGuard, RwLock}, + thread::{Builder, JoinHandle}, time::Instant, }; use tempfile::TempDir; @@ -94,6 +96,7 @@ const CACHE_VIRTUAL_OFFSET: usize = 0; const CACHE_VIRTUAL_STORED_SIZE: usize = 0; type DashMapVersionHash = DashMap; +type DashMapLazyHasher = DashMap; lazy_static! { // FROZEN_ACCOUNT_PANIC is used to signal local_cluster that an AccountsDb panic has occurred, @@ -230,10 +233,10 @@ impl<'a> LoadedAccount<'a> { } } - pub fn loaded_hash(&self) -> &Hash { + pub fn loaded_hash(&self) -> Hash { match self { - LoadedAccount::Stored(stored_account_meta) => &stored_account_meta.hash, - LoadedAccount::Cached((_, cached_account)) => &cached_account.hash, + LoadedAccount::Stored(stored_account_meta) => *stored_account_meta.hash, + LoadedAccount::Cached((_, cached_account)) => cached_account.hash(), } } @@ -280,7 +283,7 @@ impl<'a> LoadedAccount<'a> { match self { LoadedAccount::Stored(stored_account_meta) => stored_account_meta.clone_account(), LoadedAccount::Cached((_, cached_account)) => match cached_account { - Cow::Owned(cached_account) => cached_account.account, + Cow::Owned(cached_account) => cached_account.account.clone(), Cow::Borrowed(cached_account) => cached_account.account.clone(), }, } @@ -691,6 +694,10 @@ pub struct AccountsDb { pub accounts_cache: AccountsCache, + sender_bg_hasher: Option>>, + + store_hasher: Option>>, + recycle_stores: RwLock, /// distribute the accounts across storage lists @@ -1068,6 +1075,8 @@ impl Default for AccountsDb { accounts_index: AccountsIndex::default(), storage: AccountStorage::default(), accounts_cache: AccountsCache::default(), + store_hasher: None, + sender_bg_hasher: None, recycle_stores: RwLock::new(RecycleStores::default()), uncleaned_pubkeys: DashMap::new(), next_id: AtomicUsize::new(0), @@ -1109,7 +1118,7 @@ impl AccountsDb { account_indexes: HashSet, caching_enabled: bool, ) -> Self { - let new = if !paths.is_empty() { + let mut new = if !paths.is_empty() { Self { paths, temp_paths: None, @@ -1131,6 +1140,8 @@ impl AccountsDb { ..Self::default() } }; + + new.start_store_hasher(); { for path in new.paths.iter() { std::fs::create_dir_all(path).expect("Create directory failed."); @@ -1298,6 +1309,23 @@ impl AccountsDb { } } + fn background_hasher(receiver: Receiver, cluster_type: ClusterType) { + let map = DashMapLazyHasher::default(); + } + + fn start_store_hasher(&mut self) { + let (sender, receiver) = channel(); + let cluster_type = self.expected_cluster_type(); + //self.store_hasher = Arc::new( + Builder::new() + .name("solana-accounts-db-store-hasher".to_string()) + .spawn(move || { + Self::background_hasher(receiver, cluster_type); + }) + .unwrap();//); + self.sender_bg_hasher = Some(Mutex::new(sender)); + } + fn purge_keys_exact<'a, C: 'a>( &'a self, pubkey_to_slot_set: &'a [(Pubkey, C)], @@ -1860,7 +1888,7 @@ impl AccountsDb { store_accounts_timing = self.store_accounts_frozen( slot, &accounts, - &hashes, + Some(&hashes), Some(Box::new(move |_, _| shrunken_store.clone())), Some(Box::new(write_versions.into_iter())), ); @@ -2259,7 +2287,7 @@ impl AccountsDb { self.get_account_accessor_from_cache_or_storage(slot, pubkey, store_id, offset) .get_loaded_account() - .map(|loaded_account| *loaded_account.loaded_hash()) + .map(|loaded_account| loaded_account.loaded_hash()) .unwrap() } @@ -3270,30 +3298,31 @@ impl AccountsDb { let iter_items: Vec<_> = slot_cache.iter().collect(); let mut purged_slot_pubkeys: HashSet<(Slot, Pubkey)> = HashSet::new(); let mut pubkey_to_slot_set: Vec<(Pubkey, Slot)> = vec![]; - let (accounts, hashes): (Vec<(&Pubkey, &AccountSharedData)>, Vec) = iter_items - .iter() - .filter_map(|iter_item| { - let key = iter_item.key(); - let account = &iter_item.value().account; - let should_flush = should_flush_f - .as_mut() - .map(|should_flush_f| should_flush_f(key, account)) - .unwrap_or(true); - if should_flush { - let hash = iter_item.value().hash; - total_size += (account.data().len() + STORE_META_OVERHEAD) as u64; - num_flushed += 1; - Some(((key, account), hash)) - } else { - // If we don't flush, we have to remove the entry from the - // index, since it's equivalent to purging - purged_slot_pubkeys.insert((slot, *key)); - pubkey_to_slot_set.push((*key, slot)); - num_purged += 1; - None - } - }) - .unzip(); + let (accounts, hashes): (Vec<(&Pubkey, &AccountSharedData)>, Vec) = + iter_items + .iter() + .filter_map(|iter_item| { + let key = iter_item.key(); + let account = &iter_item.value().account; + let should_flush = should_flush_f + .as_mut() + .map(|should_flush_f| should_flush_f(key, account)) + .unwrap_or(true); + if should_flush { + let hash = iter_item.value().hash(); + total_size += (account.data().len() + STORE_META_OVERHEAD) as u64; + num_flushed += 1; + Some(((key, account), hash)) + } else { + // If we don't flush, we have to remove the entry from the + // index, since it's equivalent to purging + purged_slot_pubkeys.insert((slot, *key)); + pubkey_to_slot_set.push((*key, slot)); + num_purged += 1; + None + } + }) + .unzip(); let is_dead_slot = accounts.is_empty(); // Remove the account index entries from earlier roots that are outdated by later roots. @@ -3315,7 +3344,7 @@ impl AccountsDb { self.store_accounts_frozen( slot, &accounts, - &hashes, + Some(&hashes), Some(Box::new(move |_, _| flushed_store.clone())), None, ); @@ -3351,16 +3380,28 @@ impl AccountsDb { fn write_accounts_to_cache( &self, slot: Slot, - hashes: &[Hash], + hashes: Option<&[Hash]>, accounts_and_meta_to_store: &[(StoredMeta, &AccountSharedData)], ) -> Vec { - assert_eq!(hashes.len(), accounts_and_meta_to_store.len()); - accounts_and_meta_to_store - .iter() - .zip(hashes) - .map(|((meta, account), hash)| { + let len = accounts_and_meta_to_store.len(); + let empty = vec![]; + let hashes = match hashes { + Some(hashes) => { + assert_eq!(hashes.len(), len); + hashes + }, + None => &empty, + }; + + (0..len) + .into_iter() + .map(|i| { + let (meta, account) = &accounts_and_meta_to_store[i]; + let hash = if hashes.is_empty() { + None } + else {Some(hashes[i])}; self.accounts_cache - .store(slot, &meta.pubkey, (**account).clone(), *hash); + .store(slot, &meta.pubkey, (*account).clone(), hash, slot, self.expected_cluster_type()); AccountInfo { store_id: CACHE_VIRTUAL_STORAGE_ID, offset: CACHE_VIRTUAL_OFFSET, @@ -3378,7 +3419,7 @@ impl AccountsDb { &self, slot: Slot, accounts: &[(&Pubkey, &AccountSharedData)], - hashes: &[Hash], + hashes: Option<&[Hash]>, storage_finder: F, mut write_version_producer: P, is_cached_store: bool, @@ -3405,12 +3446,44 @@ impl AccountsDb { if self.caching_enabled && is_cached_store { self.write_accounts_to_cache(slot, hashes, &accounts_and_meta_to_store) } else { - self.write_accounts_to_storage( - slot, - hashes, - storage_finder, - &accounts_and_meta_to_store, - ) + match hashes { + Some(hashes) => { + self.write_accounts_to_storage( + slot, + hashes, + storage_finder, + &accounts_and_meta_to_store, + ) + }, + None => { + // hash any accounts where we were lazy in calculating the hash + let mut hash_time = Measure::start("hash_accounts"); + let mut total_data = 0; + let mut stats = BankHashStats::default(); + let len = accounts_and_meta_to_store.len(); + let mut hashes = Vec::with_capacity(len); + for i in 0..len { + let mut hash = hashes[i]; + let account = accounts[i]; + total_data += account.1.data().len(); + stats.update(account.1); + hash = Self::hash_account( + slot, + account.1, + account.0, + &self.expected_cluster_type(), + ); + hashes.push(hash); + } + + self.write_accounts_to_storage( + slot, + &hashes, + storage_finder, + &accounts_and_meta_to_store, + ) + } + } } } @@ -3554,7 +3627,7 @@ impl AccountsDb { &self.expected_cluster_type(), pubkey, ); - if computed_hash != *loaded_hash { + if computed_hash != loaded_hash { mismatch_found .fetch_add(1, Ordering::Relaxed); return None; @@ -3562,7 +3635,7 @@ impl AccountsDb { } sum += balance as u128; - Some(*loaded_hash) + Some(loaded_hash) }, ) } else { @@ -3739,7 +3812,7 @@ impl AccountsDb { let source_item = CalculateHashIntermediate::new( version, - *loaded_account.loaded_hash(), + loaded_account.loaded_hash(), balance, slot, pubkey, @@ -3856,11 +3929,11 @@ impl AccountsDb { slot, |loaded_account: LoadedAccount| { // Cache only has one version per key, don't need to worry about versioning - Some((*loaded_account.pubkey(), *loaded_account.loaded_hash())) + Some((*loaded_account.pubkey(), loaded_account.loaded_hash())) }, |accum: &DashMap, loaded_account: LoadedAccount| { let loaded_write_version = loaded_account.write_version(); - let loaded_hash = *loaded_account.loaded_hash(); + let loaded_hash = loaded_account.loaded_hash(); let should_insert = if let Some(existing_entry) = accum.get(loaded_account.pubkey()) { loaded_write_version > existing_entry.value().version() @@ -4098,36 +4171,6 @@ impl AccountsDb { inc_new_counter_info!("clean_stored_dead_slots-ms", measure.as_ms() as usize); } - fn hash_accounts( - &self, - slot: Slot, - accounts: &[(&Pubkey, &AccountSharedData)], - cluster_type: &ClusterType, - ) -> Vec { - let mut stats = BankHashStats::default(); - let mut total_data = 0; - let hashes: Vec<_> = accounts - .iter() - .map(|(pubkey, account)| { - total_data += account.data().len(); - stats.update(account); - Self::hash_account(slot, account, pubkey, cluster_type) - }) - .collect(); - - self.stats - .store_total_data - .fetch_add(total_data as u64, Ordering::Relaxed); - - let mut bank_hashes = self.bank_hashes.write().unwrap(); - let slot_info = bank_hashes - .entry(slot) - .or_insert_with(BankHashInfo::default); - slot_info.stats.merge(&stats); - - hashes - } - pub(crate) fn freeze_accounts(&mut self, ancestors: &Ancestors, account_pubkeys: &[Pubkey]) { for account_pubkey in account_pubkeys { if let Some((account, _slot)) = self.load_slow(ancestors, &account_pubkey) { @@ -4193,13 +4236,16 @@ impl AccountsDb { return; } self.assert_frozen_accounts(accounts); - let mut hash_time = Measure::start("hash_accounts"); - let hashes = self.hash_accounts(slot, accounts, &self.expected_cluster_type()); - hash_time.stop(); - self.stats - .store_hash_accounts - .fetch_add(hash_time.as_us(), Ordering::Relaxed); - self.store_accounts_unfrozen(slot, accounts, &hashes, is_cached_store); + + { + let mut bank_hashes = self.bank_hashes.write().unwrap(); + bank_hashes + .entry(slot) + .or_insert_with(BankHashInfo::default); + } + + // we use default hashes for now since the same account may be stored to the cache multiple times + self.store_accounts_unfrozen(slot, accounts, None, is_cached_store); self.report_store_timings(); } @@ -4305,7 +4351,7 @@ impl AccountsDb { &self, slot: Slot, accounts: &[(&Pubkey, &AccountSharedData)], - hashes: &[Hash], + hashes: Option<&[Hash]>, is_cached_store: bool, ) { // This path comes from a store to a non-frozen slot. @@ -4331,7 +4377,7 @@ impl AccountsDb { &'a self, slot: Slot, accounts: &[(&Pubkey, &AccountSharedData)], - hashes: &[Hash], + hashes: Option<&[Hash]>, storage_finder: Option>, write_version_producer: Option>>, ) -> StoreAccountsTiming { @@ -4355,7 +4401,7 @@ impl AccountsDb { &'a self, slot: Slot, accounts: &[(&Pubkey, &AccountSharedData)], - hashes: &[Hash], + hashes: Option<&[Hash]>, storage_finder: Option>, write_version_producer: Option>>, is_cached_store: bool, @@ -4842,7 +4888,7 @@ impl AccountsDb { store_accounts_timing = self.store_accounts_frozen( slot, &accounts, - &hashes, + Some(&hashes), Some(Box::new(move |_, _| shrunken_store.clone())), Some(Box::new(write_versions.into_iter())), ); @@ -7130,11 +7176,13 @@ pub mod tests { let bank_hashes = db.bank_hashes.read().unwrap(); let bank_hash = bank_hashes.get(&some_slot).unwrap(); + /* TODO: temporarily disabled assert_eq!(bank_hash.stats.num_updated_accounts, 1); assert_eq!(bank_hash.stats.num_removed_accounts, 1); assert_eq!(bank_hash.stats.num_lamports_stored, 1); assert_eq!(bank_hash.stats.total_data_len, 2 * some_data_len as u64); assert_eq!(bank_hash.stats.num_executable_accounts, 1); + */ } #[test] @@ -7239,6 +7287,38 @@ pub mod tests { ); } + impl AccountsDb { + fn hash_accounts( + &self, + slot: Slot, + accounts: &[(&Pubkey, &Account)], + cluster_type: &ClusterType, + ) -> Vec { + let mut stats = BankHashStats::default(); + let mut total_data = 0; + let hashes: Vec<_> = accounts + .iter() + .map(|(pubkey, account)| { + total_data += account.data.len(); + stats.update(account); + Self::hash_account(slot, account, pubkey, cluster_type) + }) + .collect(); + + self.stats + .store_total_data + .fetch_add(total_data as u64, Ordering::Relaxed); + + let mut bank_hashes = self.bank_hashes.write().unwrap(); + let slot_info = bank_hashes + .entry(slot) + .or_insert_with(BankHashInfo::default); + slot_info.stats.merge(&stats); + + hashes + } + } + #[test] fn test_verify_bank_hash_bad_account_hash() { use BankHashVerificationError::*; @@ -7256,7 +7336,7 @@ pub mod tests { db.hash_accounts(some_slot, accounts, &ClusterType::Development); // provide bogus account hashes let some_hash = Hash::new(&[0xca; HASH_BYTES]); - db.store_accounts_unfrozen(some_slot, accounts, &[some_hash], false); + db.store_accounts_unfrozen(some_slot, accounts, &mut [some_hash], false); db.add_root(some_slot); assert_matches!( db.verify_bank_hash_and_lamports(some_slot, &ancestors, 1), From 8f6c3ffd70eaaf72e91cf3da1e66696b033aff73 Mon Sep 17 00:00:00 2001 From: "Jeff Washington (jwash)" Date: Fri, 26 Mar 2021 17:17:14 -0500 Subject: [PATCH 02/17] push to bg thread --- runtime/src/accounts_cache.rs | 31 ++++++++++++++++++++----------- runtime/src/accounts_db.rs | 21 ++++++++++++++++++++- 2 files changed, 40 insertions(+), 12 deletions(-) diff --git a/runtime/src/accounts_cache.rs b/runtime/src/accounts_cache.rs index f62e551feecb20..c8b19dfc2dbaf5 100644 --- a/runtime/src/accounts_cache.rs +++ b/runtime/src/accounts_cache.rs @@ -14,6 +14,7 @@ use std::{ Arc, RwLock, }, }; +use log::error; pub type SlotCache = Arc; @@ -55,7 +56,7 @@ impl SlotCacheInner { hash: Option, hash_slot: Slot, cluster_type: ClusterType, - ) { + ) -> CachedAccount { if self.cache.contains_key(pubkey) { self.same_account_writes.fetch_add(1, Ordering::Relaxed); self.same_account_writes_size @@ -64,16 +65,18 @@ impl SlotCacheInner { self.unique_account_writes_size .fetch_add(account.data().len() as u64, Ordering::Relaxed); } + let item = Arc::new(CachedAccountInner { + account, + hash: RwLock::new(hash), + hash_slot, + cluster_type, + pubkey: *pubkey, + }); self.cache.insert( *pubkey, - Arc::new(CachedAccountInner { - account, - hash: RwLock::new(hash), - hash_slot, - cluster_type, - pubkey: *pubkey, - }), + item.clone(), ); + item } pub fn get_cloned(&self, pubkey: &Pubkey) -> Option { @@ -124,9 +127,13 @@ pub struct CachedAccountInner { impl CachedAccountInner { pub fn hash(&self) -> Hash { + error!("locking to read hash"); let hash = self.hash.read().unwrap(); match *hash { - Some(hash) => hash, + Some(hash) => { + error!("done locking to read hash"); + hash + }, None => { let hash = crate::accounts_db::AccountsDb::hash_account( self.hash_slot, @@ -134,7 +141,9 @@ impl CachedAccountInner { &self.pubkey, &self.cluster_type, ); + error!("write locking to set hash"); *self.hash.write().unwrap() = Some(hash.clone()); + error!("done write locking to set hash"); hash } } @@ -182,7 +191,7 @@ impl AccountsCache { hash: Option, hash_slot: Slot, cluster_type: ClusterType, - ) { + ) -> CachedAccount { let slot_cache = self.slot_cache(slot).unwrap_or_else(|| // DashMap entry.or_insert() returns a RefMut, essentially a write lock, // which is dropped after this block ends, minimizing time held by the lock. @@ -194,7 +203,7 @@ impl AccountsCache { .or_insert(Arc::new(SlotCacheInner::default())) .clone()); - slot_cache.insert(pubkey, account, hash, hash_slot, cluster_type); + slot_cache.insert(pubkey, account, hash, hash_slot, cluster_type) } pub fn load(&self, slot: Slot, pubkey: &Pubkey) -> Option { diff --git a/runtime/src/accounts_db.rs b/runtime/src/accounts_db.rs index 6c76dbed77ac96..ad8641e6801481 100644 --- a/runtime/src/accounts_db.rs +++ b/runtime/src/accounts_db.rs @@ -1310,7 +1310,19 @@ impl AccountsDb { } fn background_hasher(receiver: Receiver, cluster_type: ClusterType) { - let map = DashMapLazyHasher::default(); + loop { + let result = receiver.recv(); + match result { + Ok(account) => { + // if we hold the only ref, then this account doesn't need to be hashed + if Arc::strong_count(&account) > 1 { + let _ = (*account).hash(); + }; + }, + Err(_) => {break;}, + } + } + //let map = DashMapLazyHasher::default(); } fn start_store_hasher(&mut self) { @@ -3400,8 +3412,15 @@ impl AccountsDb { let hash = if hashes.is_empty() { None } else {Some(hashes[i])}; + let cached_account = self.accounts_cache .store(slot, &meta.pubkey, (*account).clone(), hash, slot, self.expected_cluster_type()); + // hash this account in the bg + match &self.sender_bg_hasher { + Some(ref sender) => {let _ = sender.lock().unwrap().send(cached_account);}, + None => (), + }; + AccountInfo { store_id: CACHE_VIRTUAL_STORAGE_ID, offset: CACHE_VIRTUAL_OFFSET, From ebbdadc29bc45699df6234c6345c6ac8407734e0 Mon Sep 17 00:00:00 2001 From: "Jeff Washington (jwash)" Date: Fri, 26 Mar 2021 17:25:09 -0500 Subject: [PATCH 03/17] remove deadlock --- runtime/src/accounts_cache.rs | 1 + 1 file changed, 1 insertion(+) diff --git a/runtime/src/accounts_cache.rs b/runtime/src/accounts_cache.rs index c8b19dfc2dbaf5..433063a2a38f3e 100644 --- a/runtime/src/accounts_cache.rs +++ b/runtime/src/accounts_cache.rs @@ -135,6 +135,7 @@ impl CachedAccountInner { hash }, None => { + drop(hash); let hash = crate::accounts_db::AccountsDb::hash_account( self.hash_slot, &self.account, From 299bab331acf8012b2401af458eca03e732390b5 Mon Sep 17 00:00:00 2001 From: "Jeff Washington (jwash)" Date: Fri, 26 Mar 2021 17:28:37 -0500 Subject: [PATCH 04/17] logs --- runtime/src/accounts_cache.rs | 4 ---- 1 file changed, 4 deletions(-) diff --git a/runtime/src/accounts_cache.rs b/runtime/src/accounts_cache.rs index 433063a2a38f3e..3aa7c6e9c38be0 100644 --- a/runtime/src/accounts_cache.rs +++ b/runtime/src/accounts_cache.rs @@ -127,11 +127,9 @@ pub struct CachedAccountInner { impl CachedAccountInner { pub fn hash(&self) -> Hash { - error!("locking to read hash"); let hash = self.hash.read().unwrap(); match *hash { Some(hash) => { - error!("done locking to read hash"); hash }, None => { @@ -142,9 +140,7 @@ impl CachedAccountInner { &self.pubkey, &self.cluster_type, ); - error!("write locking to set hash"); *self.hash.write().unwrap() = Some(hash.clone()); - error!("done write locking to set hash"); hash } } From d2fdc34dc2f59215962f311bf4f9bd1526eecab9 Mon Sep 17 00:00:00 2001 From: "Jeff Washington (jwash)" Date: Fri, 26 Mar 2021 17:41:10 -0500 Subject: [PATCH 05/17] format --- runtime/src/accounts_cache.rs | 11 +--- runtime/src/accounts_db.rs | 108 ++++++++++++++++++---------------- 2 files changed, 60 insertions(+), 59 deletions(-) diff --git a/runtime/src/accounts_cache.rs b/runtime/src/accounts_cache.rs index 3aa7c6e9c38be0..2861fad34f23ff 100644 --- a/runtime/src/accounts_cache.rs +++ b/runtime/src/accounts_cache.rs @@ -1,4 +1,5 @@ use dashmap::DashMap; +use log::error; use solana_sdk::{ account::{AccountSharedData, ReadableAccount}, clock::Slot, @@ -14,7 +15,6 @@ use std::{ Arc, RwLock, }, }; -use log::error; pub type SlotCache = Arc; @@ -72,10 +72,7 @@ impl SlotCacheInner { cluster_type, pubkey: *pubkey, }); - self.cache.insert( - *pubkey, - item.clone(), - ); + self.cache.insert(*pubkey, item.clone()); item } @@ -129,9 +126,7 @@ impl CachedAccountInner { pub fn hash(&self) -> Hash { let hash = self.hash.read().unwrap(); match *hash { - Some(hash) => { - hash - }, + Some(hash) => hash, None => { drop(hash); let hash = crate::accounts_db::AccountsDb::hash_account( diff --git a/runtime/src/accounts_db.rs b/runtime/src/accounts_db.rs index ad8641e6801481..a996be0da69d51 100644 --- a/runtime/src/accounts_db.rs +++ b/runtime/src/accounts_db.rs @@ -1076,7 +1076,7 @@ impl Default for AccountsDb { storage: AccountStorage::default(), accounts_cache: AccountsCache::default(), store_hasher: None, - sender_bg_hasher: None, + sender_bg_hasher: None, recycle_stores: RwLock::new(RecycleStores::default()), uncleaned_pubkeys: DashMap::new(), next_id: AtomicUsize::new(0), @@ -1309,7 +1309,7 @@ impl AccountsDb { } } - fn background_hasher(receiver: Receiver, cluster_type: ClusterType) { + fn background_hasher(receiver: Receiver) { loop { let result = receiver.recv(); match result { @@ -1318,8 +1318,10 @@ impl AccountsDb { if Arc::strong_count(&account) > 1 { let _ = (*account).hash(); }; - }, - Err(_) => {break;}, + } + Err(_) => { + break; + } } } //let map = DashMapLazyHasher::default(); @@ -1327,15 +1329,13 @@ impl AccountsDb { fn start_store_hasher(&mut self) { let (sender, receiver) = channel(); - let cluster_type = self.expected_cluster_type(); - //self.store_hasher = Arc::new( Builder::new() .name("solana-accounts-db-store-hasher".to_string()) .spawn(move || { - Self::background_hasher(receiver, cluster_type); + Self::background_hasher(receiver); }) - .unwrap();//); - self.sender_bg_hasher = Some(Mutex::new(sender)); + .unwrap(); //); + self.sender_bg_hasher = Some(Mutex::new(sender)); } fn purge_keys_exact<'a, C: 'a>( @@ -3310,31 +3310,30 @@ impl AccountsDb { let iter_items: Vec<_> = slot_cache.iter().collect(); let mut purged_slot_pubkeys: HashSet<(Slot, Pubkey)> = HashSet::new(); let mut pubkey_to_slot_set: Vec<(Pubkey, Slot)> = vec![]; - let (accounts, hashes): (Vec<(&Pubkey, &AccountSharedData)>, Vec) = - iter_items - .iter() - .filter_map(|iter_item| { - let key = iter_item.key(); - let account = &iter_item.value().account; - let should_flush = should_flush_f - .as_mut() - .map(|should_flush_f| should_flush_f(key, account)) - .unwrap_or(true); - if should_flush { - let hash = iter_item.value().hash(); - total_size += (account.data().len() + STORE_META_OVERHEAD) as u64; - num_flushed += 1; - Some(((key, account), hash)) - } else { - // If we don't flush, we have to remove the entry from the - // index, since it's equivalent to purging - purged_slot_pubkeys.insert((slot, *key)); - pubkey_to_slot_set.push((*key, slot)); - num_purged += 1; - None - } - }) - .unzip(); + let (accounts, hashes): (Vec<(&Pubkey, &AccountSharedData)>, Vec) = iter_items + .iter() + .filter_map(|iter_item| { + let key = iter_item.key(); + let account = &iter_item.value().account; + let should_flush = should_flush_f + .as_mut() + .map(|should_flush_f| should_flush_f(key, account)) + .unwrap_or(true); + if should_flush { + let hash = iter_item.value().hash(); + total_size += (account.data().len() + STORE_META_OVERHEAD) as u64; + num_flushed += 1; + Some(((key, account), hash)) + } else { + // If we don't flush, we have to remove the entry from the + // index, since it's equivalent to purging + purged_slot_pubkeys.insert((slot, *key)); + pubkey_to_slot_set.push((*key, slot)); + num_purged += 1; + None + } + }) + .unzip(); let is_dead_slot = accounts.is_empty(); // Remove the account index entries from earlier roots that are outdated by later roots. @@ -3401,26 +3400,35 @@ impl AccountsDb { Some(hashes) => { assert_eq!(hashes.len(), len); hashes - }, + } None => &empty, }; - + (0..len) .into_iter() .map(|i| { let (meta, account) = &accounts_and_meta_to_store[i]; let hash = if hashes.is_empty() { - None } - else {Some(hashes[i])}; - let cached_account = - self.accounts_cache - .store(slot, &meta.pubkey, (*account).clone(), hash, slot, self.expected_cluster_type()); + None + } else { + Some(hashes[i]) + }; + let cached_account = self.accounts_cache.store( + slot, + &meta.pubkey, + (*account).clone(), + hash, + slot, + self.expected_cluster_type(), + ); // hash this account in the bg match &self.sender_bg_hasher { - Some(ref sender) => {let _ = sender.lock().unwrap().send(cached_account);}, + Some(ref sender) => { + let _ = sender.lock().unwrap().send(cached_account); + } None => (), }; - + AccountInfo { store_id: CACHE_VIRTUAL_STORAGE_ID, offset: CACHE_VIRTUAL_OFFSET, @@ -3466,14 +3474,12 @@ impl AccountsDb { self.write_accounts_to_cache(slot, hashes, &accounts_and_meta_to_store) } else { match hashes { - Some(hashes) => { - self.write_accounts_to_storage( - slot, - hashes, - storage_finder, - &accounts_and_meta_to_store, - ) - }, + Some(hashes) => self.write_accounts_to_storage( + slot, + hashes, + storage_finder, + &accounts_and_meta_to_store, + ), None => { // hash any accounts where we were lazy in calculating the hash let mut hash_time = Measure::start("hash_accounts"); From 555c4ae01bbc72de1383d2952b2b9b982d90fe63 Mon Sep 17 00:00:00 2001 From: "Jeff Washington (jwash)" Date: Fri, 26 Mar 2021 17:49:13 -0500 Subject: [PATCH 06/17] some cleanup on aisle 9 --- runtime/src/accounts_cache.rs | 9 ++++++--- runtime/src/accounts_db.rs | 4 ++-- 2 files changed, 8 insertions(+), 5 deletions(-) diff --git a/runtime/src/accounts_cache.rs b/runtime/src/accounts_cache.rs index 2861fad34f23ff..f5fcd71104d38b 100644 --- a/runtime/src/accounts_cache.rs +++ b/runtime/src/accounts_cache.rs @@ -1,5 +1,4 @@ use dashmap::DashMap; -use log::error; use solana_sdk::{ account::{AccountSharedData, ReadableAccount}, clock::Slot, @@ -296,7 +295,9 @@ pub mod tests { inserted_slot, &Pubkey::new_unique(), AccountSharedData::new(1, 0, &Pubkey::default()), - Hash::default(), + Some(Hash::default()), + inserted_slot, + ClusterType::Development, ); // If the cache is told the size limit is 0, it should return the one slot let removed = cache.remove_slots_le(0); @@ -314,7 +315,9 @@ pub mod tests { inserted_slot, &Pubkey::new_unique(), AccountSharedData::new(1, 0, &Pubkey::default()), - Hash::default(), + Some(Hash::default()), + inserted_slot, + ClusterType::Development, ); // If the cache is told the size limit is 0, it should return nothing because there's only diff --git a/runtime/src/accounts_db.rs b/runtime/src/accounts_db.rs index a996be0da69d51..cf45fadebd8e2b 100644 --- a/runtime/src/accounts_db.rs +++ b/runtime/src/accounts_db.rs @@ -19,7 +19,7 @@ //! commit for each slot entry would be indexed. use crate::{ - accounts_cache::{AccountsCache, CachedAccount, CachedAccountInner, SlotCache}, + accounts_cache::{AccountsCache, CachedAccount, SlotCache}, accounts_hash::{AccountsHash, CalculateHashIntermediate, HashStats, PreviousPass}, accounts_index::{ AccountIndex, AccountsIndex, AccountsIndexRootsStats, Ancestors, IndexKey, IsCached, @@ -57,7 +57,7 @@ use std::{ ops::{Range, RangeBounds}, path::{Path, PathBuf}, sync::atomic::{AtomicBool, AtomicU64, AtomicUsize, Ordering}, - sync::mpsc::{channel, Receiver, SendError, Sender, SyncSender}, + sync::mpsc::{channel, Receiver, Sender}, sync::{Arc, Mutex, MutexGuard, RwLock}, thread::{Builder, JoinHandle}, time::Instant, From 86fe33edda10d3ea6dcf96fa94b727d7cfab352e Mon Sep 17 00:00:00 2001 From: "Jeff Washington (jwash)" Date: Mon, 29 Mar 2021 13:02:04 -0500 Subject: [PATCH 07/17] format, fix up some metrics --- runtime/src/accounts_cache.rs | 2 +- runtime/src/accounts_db.rs | 35 ++++++++++++++++++++++------------- 2 files changed, 23 insertions(+), 14 deletions(-) diff --git a/runtime/src/accounts_cache.rs b/runtime/src/accounts_cache.rs index f5fcd71104d38b..586744fefd2a97 100644 --- a/runtime/src/accounts_cache.rs +++ b/runtime/src/accounts_cache.rs @@ -134,7 +134,7 @@ impl CachedAccountInner { &self.pubkey, &self.cluster_type, ); - *self.hash.write().unwrap() = Some(hash.clone()); + *self.hash.write().unwrap() = Some(hash); hash } } diff --git a/runtime/src/accounts_db.rs b/runtime/src/accounts_db.rs index cf45fadebd8e2b..3a8b89ff4872b8 100644 --- a/runtime/src/accounts_db.rs +++ b/runtime/src/accounts_db.rs @@ -96,7 +96,6 @@ const CACHE_VIRTUAL_OFFSET: usize = 0; const CACHE_VIRTUAL_STORED_SIZE: usize = 0; type DashMapVersionHash = DashMap; -type DashMapLazyHasher = DashMap; lazy_static! { // FROZEN_ACCOUNT_PANIC is used to signal local_cluster that an AccountsDb panic has occurred, @@ -3483,16 +3482,12 @@ impl AccountsDb { None => { // hash any accounts where we were lazy in calculating the hash let mut hash_time = Measure::start("hash_accounts"); - let mut total_data = 0; let mut stats = BankHashStats::default(); let len = accounts_and_meta_to_store.len(); let mut hashes = Vec::with_capacity(len); - for i in 0..len { - let mut hash = hashes[i]; - let account = accounts[i]; - total_data += account.1.data().len(); + for account in accounts { stats.update(account.1); - hash = Self::hash_account( + let hash = Self::hash_account( slot, account.1, account.0, @@ -3500,6 +3495,10 @@ impl AccountsDb { ); hashes.push(hash); } + hash_time.stop(); + self.stats + .store_hash_accounts + .fetch_add(hash_time.as_us(), Ordering::Relaxed); self.write_accounts_to_storage( slot, @@ -4262,12 +4261,22 @@ impl AccountsDb { } self.assert_frozen_accounts(accounts); - { - let mut bank_hashes = self.bank_hashes.write().unwrap(); - bank_hashes - .entry(slot) - .or_insert_with(BankHashInfo::default); - } + let mut stats = BankHashStats::default(); + let mut total_data = 0; + accounts.iter().for_each(|(_pubkey, account)| { + total_data += account.data().len(); + stats.update(account); + }); + + self.stats + .store_total_data + .fetch_add(total_data as u64, Ordering::Relaxed); + + let mut bank_hashes = self.bank_hashes.write().unwrap(); + let slot_info = bank_hashes + .entry(slot) + .or_insert_with(BankHashInfo::default); + slot_info.stats.merge(&stats); // we use default hashes for now since the same account may be stored to the cache multiple times self.store_accounts_unfrozen(slot, accounts, None, is_cached_store); From de58c7ed8496da4fb7c309f67594949b1d5cf393 Mon Sep 17 00:00:00 2001 From: "Jeff Washington (jwash)" Date: Mon, 29 Mar 2021 13:16:28 -0500 Subject: [PATCH 08/17] fix test, remove legacy function only there for tests --- runtime/src/accounts_db.rs | 45 +++++++------------------------------- 1 file changed, 8 insertions(+), 37 deletions(-) diff --git a/runtime/src/accounts_db.rs b/runtime/src/accounts_db.rs index 3a8b89ff4872b8..b730dd972b831c 100644 --- a/runtime/src/accounts_db.rs +++ b/runtime/src/accounts_db.rs @@ -7210,13 +7210,11 @@ pub mod tests { let bank_hashes = db.bank_hashes.read().unwrap(); let bank_hash = bank_hashes.get(&some_slot).unwrap(); - /* TODO: temporarily disabled assert_eq!(bank_hash.stats.num_updated_accounts, 1); assert_eq!(bank_hash.stats.num_removed_accounts, 1); assert_eq!(bank_hash.stats.num_lamports_stored, 1); assert_eq!(bank_hash.stats.total_data_len, 2 * some_data_len as u64); assert_eq!(bank_hash.stats.num_executable_accounts, 1); - */ } #[test] @@ -7321,38 +7319,6 @@ pub mod tests { ); } - impl AccountsDb { - fn hash_accounts( - &self, - slot: Slot, - accounts: &[(&Pubkey, &Account)], - cluster_type: &ClusterType, - ) -> Vec { - let mut stats = BankHashStats::default(); - let mut total_data = 0; - let hashes: Vec<_> = accounts - .iter() - .map(|(pubkey, account)| { - total_data += account.data.len(); - stats.update(account); - Self::hash_account(slot, account, pubkey, cluster_type) - }) - .collect(); - - self.stats - .store_total_data - .fetch_add(total_data as u64, Ordering::Relaxed); - - let mut bank_hashes = self.bank_hashes.write().unwrap(); - let slot_info = bank_hashes - .entry(slot) - .or_insert_with(BankHashInfo::default); - slot_info.stats.merge(&stats); - - hashes - } - } - #[test] fn test_verify_bank_hash_bad_account_hash() { use BankHashVerificationError::*; @@ -7366,11 +7332,16 @@ pub mod tests { let ancestors = vec![(some_slot, 0)].into_iter().collect(); let accounts = &[(&key, &account)]; - // update AccountsDb's bank hash but discard real account hashes - db.hash_accounts(some_slot, accounts, &ClusterType::Development); + // update AccountsDb's bank hash + { + let mut bank_hashes = db.bank_hashes.write().unwrap(); + bank_hashes + .entry(some_slot) + .or_insert_with(BankHashInfo::default); + } // provide bogus account hashes let some_hash = Hash::new(&[0xca; HASH_BYTES]); - db.store_accounts_unfrozen(some_slot, accounts, &mut [some_hash], false); + db.store_accounts_unfrozen(some_slot, accounts, Some(&[some_hash]), false); db.add_root(some_slot); assert_matches!( db.verify_bank_hash_and_lamports(some_slot, &ancestors, 1), From 99dd580d7998a9fe599f463073fbf8577892bf48 Mon Sep 17 00:00:00 2001 From: "Jeff Washington (jwash)" Date: Mon, 29 Mar 2021 22:48:12 -0500 Subject: [PATCH 09/17] cleanup --- runtime/src/accounts_db.rs | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/runtime/src/accounts_db.rs b/runtime/src/accounts_db.rs index b730dd972b831c..41cc4b48300da4 100644 --- a/runtime/src/accounts_db.rs +++ b/runtime/src/accounts_db.rs @@ -1140,7 +1140,7 @@ impl AccountsDb { } }; - new.start_store_hasher(); + new.start_background_hasher(); { for path in new.paths.iter() { std::fs::create_dir_all(path).expect("Create directory failed."); @@ -1313,8 +1313,9 @@ impl AccountsDb { let result = receiver.recv(); match result { Ok(account) => { - // if we hold the only ref, then this account doesn't need to be hashed + // if we hold the only ref, then this account doesn't need to be hashed, we ignore this account and it will disappear if Arc::strong_count(&account) > 1 { + // this will cause the hash to be calculated and store inside account if it needs to be calculated let _ = (*account).hash(); }; } @@ -1323,17 +1324,16 @@ impl AccountsDb { } } } - //let map = DashMapLazyHasher::default(); } - fn start_store_hasher(&mut self) { + fn start_background_hasher(&mut self) { let (sender, receiver) = channel(); Builder::new() .name("solana-accounts-db-store-hasher".to_string()) .spawn(move || { Self::background_hasher(receiver); }) - .unwrap(); //); + .unwrap(); self.sender_bg_hasher = Some(Mutex::new(sender)); } From a6b2d6deed479981002d2b49372df7e7cb8d5236 Mon Sep 17 00:00:00 2001 From: "Jeff Washington (jwash)" Date: Tue, 30 Mar 2021 01:28:34 -0500 Subject: [PATCH 10/17] remove unused store_hasher --- runtime/src/accounts_db.rs | 3 --- 1 file changed, 3 deletions(-) diff --git a/runtime/src/accounts_db.rs b/runtime/src/accounts_db.rs index 41cc4b48300da4..311d59d9eb9e26 100644 --- a/runtime/src/accounts_db.rs +++ b/runtime/src/accounts_db.rs @@ -695,8 +695,6 @@ pub struct AccountsDb { sender_bg_hasher: Option>>, - store_hasher: Option>>, - recycle_stores: RwLock, /// distribute the accounts across storage lists @@ -1074,7 +1072,6 @@ impl Default for AccountsDb { accounts_index: AccountsIndex::default(), storage: AccountStorage::default(), accounts_cache: AccountsCache::default(), - store_hasher: None, sender_bg_hasher: None, recycle_stores: RwLock::new(RecycleStores::default()), uncleaned_pubkeys: DashMap::new(), From 5a3271a5f75df0a330314bafd01b9e529c01547f Mon Sep 17 00:00:00 2001 From: Carl Lin Date: Mon, 29 Mar 2021 23:44:23 -0700 Subject: [PATCH 11/17] Switch to crossbeam --- runtime/src/accounts_db.rs | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/runtime/src/accounts_db.rs b/runtime/src/accounts_db.rs index 311d59d9eb9e26..02e436c70c54b5 100644 --- a/runtime/src/accounts_db.rs +++ b/runtime/src/accounts_db.rs @@ -29,6 +29,7 @@ use crate::{ contains::Contains, }; use blake3::traits::digest::Digest; +use crossbeam_channel::{unbounded, Receiver, Sender}; use dashmap::{ mapref::entry::Entry::{Occupied, Vacant}, DashMap, DashSet, @@ -57,7 +58,6 @@ use std::{ ops::{Range, RangeBounds}, path::{Path, PathBuf}, sync::atomic::{AtomicBool, AtomicU64, AtomicUsize, Ordering}, - sync::mpsc::{channel, Receiver, Sender}, sync::{Arc, Mutex, MutexGuard, RwLock}, thread::{Builder, JoinHandle}, time::Instant, @@ -693,7 +693,7 @@ pub struct AccountsDb { pub accounts_cache: AccountsCache, - sender_bg_hasher: Option>>, + sender_bg_hasher: Option>, recycle_stores: RwLock, @@ -1324,14 +1324,14 @@ impl AccountsDb { } fn start_background_hasher(&mut self) { - let (sender, receiver) = channel(); + let (sender, receiver) = unbounded(); Builder::new() .name("solana-accounts-db-store-hasher".to_string()) .spawn(move || { Self::background_hasher(receiver); }) .unwrap(); - self.sender_bg_hasher = Some(Mutex::new(sender)); + self.sender_bg_hasher = Some(sender); } fn purge_keys_exact<'a, C: 'a>( @@ -3420,7 +3420,7 @@ impl AccountsDb { // hash this account in the bg match &self.sender_bg_hasher { Some(ref sender) => { - let _ = sender.lock().unwrap().send(cached_account); + let _ = sender.send(cached_account); } None => (), }; From 281f2094679fac95ed5f0192fa29511b362b6443 Mon Sep 17 00:00:00 2001 From: "Jeff Washington (jwash)" Date: Tue, 30 Mar 2021 01:47:57 -0500 Subject: [PATCH 12/17] clippy --- runtime/src/accounts_db.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/runtime/src/accounts_db.rs b/runtime/src/accounts_db.rs index 02e436c70c54b5..def00472141f62 100644 --- a/runtime/src/accounts_db.rs +++ b/runtime/src/accounts_db.rs @@ -59,7 +59,7 @@ use std::{ path::{Path, PathBuf}, sync::atomic::{AtomicBool, AtomicU64, AtomicUsize, Ordering}, sync::{Arc, Mutex, MutexGuard, RwLock}, - thread::{Builder, JoinHandle}, + thread::Builder, time::Instant, }; use tempfile::TempDir; From d5a04fbc0733675e3e8e8c73e784f96af1d9c658 Mon Sep 17 00:00:00 2001 From: "Jeff Washington (jwash)" Date: Tue, 30 Mar 2021 11:53:34 -0500 Subject: [PATCH 13/17] format --- runtime/src/accounts_cache.rs | 7 +------ 1 file changed, 1 insertion(+), 6 deletions(-) diff --git a/runtime/src/accounts_cache.rs b/runtime/src/accounts_cache.rs index 586744fefd2a97..b0a5b1a144f06e 100644 --- a/runtime/src/accounts_cache.rs +++ b/runtime/src/accounts_cache.rs @@ -105,18 +105,13 @@ impl Deref for SlotCacheInner { } } -pub struct LazyHash { - pub account: Arc, - pub hash: RwLock, -} - pub type CachedAccount = Arc; #[derive(Debug)] pub struct CachedAccountInner { pub account: AccountSharedData, hash: RwLock>, - pub hash_slot: Slot, + hash_slot: Slot, cluster_type: ClusterType, pubkey: Pubkey, } From 6a1ba07f6f1f86fafc002402afc7084b07706bf3 Mon Sep 17 00:00:00 2001 From: "Jeff Washington (jwash)" Date: Wed, 31 Mar 2021 12:56:50 -0500 Subject: [PATCH 14/17] use iter() --- runtime/src/accounts_db.rs | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/runtime/src/accounts_db.rs b/runtime/src/accounts_db.rs index def00472141f62..ee0ef8a2896b47 100644 --- a/runtime/src/accounts_db.rs +++ b/runtime/src/accounts_db.rs @@ -3400,10 +3400,10 @@ impl AccountsDb { None => &empty, }; - (0..len) - .into_iter() - .map(|i| { - let (meta, account) = &accounts_and_meta_to_store[i]; + accounts_and_meta_to_store + .iter() + .enumerate() + .map(|(i, (meta, account))| { let hash = if hashes.is_empty() { None } else { From 14cdbf9d10ffaab99ef1579e16d273daba412bbc Mon Sep 17 00:00:00 2001 From: "Jeff Washington (jwash)" Date: Wed, 31 Mar 2021 13:08:05 -0500 Subject: [PATCH 15/17] rework from feedback --- runtime/src/accounts_cache.rs | 5 +---- runtime/src/accounts_db.rs | 19 +++++-------------- 2 files changed, 6 insertions(+), 18 deletions(-) diff --git a/runtime/src/accounts_cache.rs b/runtime/src/accounts_cache.rs index b0a5b1a144f06e..eee2d976fe5544 100644 --- a/runtime/src/accounts_cache.rs +++ b/runtime/src/accounts_cache.rs @@ -175,7 +175,6 @@ impl AccountsCache { pubkey: &Pubkey, account: AccountSharedData, hash: Option, - hash_slot: Slot, cluster_type: ClusterType, ) -> CachedAccount { let slot_cache = self.slot_cache(slot).unwrap_or_else(|| @@ -189,7 +188,7 @@ impl AccountsCache { .or_insert(Arc::new(SlotCacheInner::default())) .clone()); - slot_cache.insert(pubkey, account, hash, hash_slot, cluster_type) + slot_cache.insert(pubkey, account, hash, slot, cluster_type) } pub fn load(&self, slot: Slot, pubkey: &Pubkey) -> Option { @@ -291,7 +290,6 @@ pub mod tests { &Pubkey::new_unique(), AccountSharedData::new(1, 0, &Pubkey::default()), Some(Hash::default()), - inserted_slot, ClusterType::Development, ); // If the cache is told the size limit is 0, it should return the one slot @@ -311,7 +309,6 @@ pub mod tests { &Pubkey::new_unique(), AccountSharedData::new(1, 0, &Pubkey::default()), Some(Hash::default()), - inserted_slot, ClusterType::Development, ); diff --git a/runtime/src/accounts_db.rs b/runtime/src/accounts_db.rs index ee0ef8a2896b47..2f1fab1b6c2588 100644 --- a/runtime/src/accounts_db.rs +++ b/runtime/src/accounts_db.rs @@ -3391,30 +3391,21 @@ impl AccountsDb { accounts_and_meta_to_store: &[(StoredMeta, &AccountSharedData)], ) -> Vec { let len = accounts_and_meta_to_store.len(); - let empty = vec![]; - let hashes = match hashes { - Some(hashes) => { - assert_eq!(hashes.len(), len); - hashes - } - None => &empty, - }; + let hashes = hashes.map(|hashes| { + assert_eq!(hashes.len(), len); + hashes + }); accounts_and_meta_to_store .iter() .enumerate() .map(|(i, (meta, account))| { - let hash = if hashes.is_empty() { - None - } else { - Some(hashes[i]) - }; + let hash = hashes.map(|hashes| hashes[i]); let cached_account = self.accounts_cache.store( slot, &meta.pubkey, (*account).clone(), hash, - slot, self.expected_cluster_type(), ); // hash this account in the bg From 9f26d8310d5e8f1ba6e993fb138958e53c7eccb1 Mon Sep 17 00:00:00 2001 From: "Jeff Washington (jwash)" Date: Wed, 31 Mar 2021 13:29:47 -0500 Subject: [PATCH 16/17] hash_slot -> slot --- runtime/src/accounts_cache.rs | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/runtime/src/accounts_cache.rs b/runtime/src/accounts_cache.rs index eee2d976fe5544..7f990fd170b778 100644 --- a/runtime/src/accounts_cache.rs +++ b/runtime/src/accounts_cache.rs @@ -53,7 +53,7 @@ impl SlotCacheInner { pubkey: &Pubkey, account: AccountSharedData, hash: Option, - hash_slot: Slot, + slot: Slot, cluster_type: ClusterType, ) -> CachedAccount { if self.cache.contains_key(pubkey) { @@ -67,7 +67,7 @@ impl SlotCacheInner { let item = Arc::new(CachedAccountInner { account, hash: RwLock::new(hash), - hash_slot, + slot, cluster_type, pubkey: *pubkey, }); @@ -111,7 +111,7 @@ pub type CachedAccount = Arc; pub struct CachedAccountInner { pub account: AccountSharedData, hash: RwLock>, - hash_slot: Slot, + slot: Slot, cluster_type: ClusterType, pubkey: Pubkey, } @@ -124,7 +124,7 @@ impl CachedAccountInner { None => { drop(hash); let hash = crate::accounts_db::AccountsDb::hash_account( - self.hash_slot, + self.slot, &self.account, &self.pubkey, &self.cluster_type, From 829231e6e7e85a72e3e4d40dc4cdc37f51effe11 Mon Sep 17 00:00:00 2001 From: "Jeff Washington (jwash)" Date: Wed, 31 Mar 2021 14:15:43 -0500 Subject: [PATCH 17/17] hash(cluster_type) --- runtime/src/accounts_cache.rs | 12 ++---- runtime/src/accounts_db.rs | 73 ++++++++++++++++++++++----------- runtime/src/snapshot_package.rs | 4 ++ runtime/src/snapshot_utils.rs | 2 + 4 files changed, 57 insertions(+), 34 deletions(-) diff --git a/runtime/src/accounts_cache.rs b/runtime/src/accounts_cache.rs index 7f990fd170b778..4b34c9bdf86301 100644 --- a/runtime/src/accounts_cache.rs +++ b/runtime/src/accounts_cache.rs @@ -54,7 +54,6 @@ impl SlotCacheInner { account: AccountSharedData, hash: Option, slot: Slot, - cluster_type: ClusterType, ) -> CachedAccount { if self.cache.contains_key(pubkey) { self.same_account_writes.fetch_add(1, Ordering::Relaxed); @@ -68,7 +67,6 @@ impl SlotCacheInner { account, hash: RwLock::new(hash), slot, - cluster_type, pubkey: *pubkey, }); self.cache.insert(*pubkey, item.clone()); @@ -112,12 +110,11 @@ pub struct CachedAccountInner { pub account: AccountSharedData, hash: RwLock>, slot: Slot, - cluster_type: ClusterType, pubkey: Pubkey, } impl CachedAccountInner { - pub fn hash(&self) -> Hash { + pub fn hash(&self, cluster_type: ClusterType) -> Hash { let hash = self.hash.read().unwrap(); match *hash { Some(hash) => hash, @@ -127,7 +124,7 @@ impl CachedAccountInner { self.slot, &self.account, &self.pubkey, - &self.cluster_type, + &cluster_type, ); *self.hash.write().unwrap() = Some(hash); hash @@ -175,7 +172,6 @@ impl AccountsCache { pubkey: &Pubkey, account: AccountSharedData, hash: Option, - cluster_type: ClusterType, ) -> CachedAccount { let slot_cache = self.slot_cache(slot).unwrap_or_else(|| // DashMap entry.or_insert() returns a RefMut, essentially a write lock, @@ -188,7 +184,7 @@ impl AccountsCache { .or_insert(Arc::new(SlotCacheInner::default())) .clone()); - slot_cache.insert(pubkey, account, hash, slot, cluster_type) + slot_cache.insert(pubkey, account, hash, slot) } pub fn load(&self, slot: Slot, pubkey: &Pubkey) -> Option { @@ -290,7 +286,6 @@ pub mod tests { &Pubkey::new_unique(), AccountSharedData::new(1, 0, &Pubkey::default()), Some(Hash::default()), - ClusterType::Development, ); // If the cache is told the size limit is 0, it should return the one slot let removed = cache.remove_slots_le(0); @@ -309,7 +304,6 @@ pub mod tests { &Pubkey::new_unique(), AccountSharedData::new(1, 0, &Pubkey::default()), Some(Hash::default()), - ClusterType::Development, ); // If the cache is told the size limit is 0, it should return nothing because there's only diff --git a/runtime/src/accounts_db.rs b/runtime/src/accounts_db.rs index 2f1fab1b6c2588..f3b7283d27af83 100644 --- a/runtime/src/accounts_db.rs +++ b/runtime/src/accounts_db.rs @@ -232,10 +232,10 @@ impl<'a> LoadedAccount<'a> { } } - pub fn loaded_hash(&self) -> Hash { + pub fn loaded_hash(&self, cluster_type: ClusterType) -> Hash { match self { LoadedAccount::Stored(stored_account_meta) => *stored_account_meta.hash, - LoadedAccount::Cached((_, cached_account)) => cached_account.hash(), + LoadedAccount::Cached((_, cached_account)) => cached_account.hash(cluster_type), } } @@ -1305,7 +1305,7 @@ impl AccountsDb { } } - fn background_hasher(receiver: Receiver) { + fn background_hasher(receiver: Receiver, cluster_type: ClusterType) { loop { let result = receiver.recv(); match result { @@ -1313,7 +1313,7 @@ impl AccountsDb { // if we hold the only ref, then this account doesn't need to be hashed, we ignore this account and it will disappear if Arc::strong_count(&account) > 1 { // this will cause the hash to be calculated and store inside account if it needs to be calculated - let _ = (*account).hash(); + let _ = (*account).hash(cluster_type); }; } Err(_) => { @@ -1325,10 +1325,11 @@ impl AccountsDb { fn start_background_hasher(&mut self) { let (sender, receiver) = unbounded(); + let cluster_type = self.expected_cluster_type(); Builder::new() .name("solana-accounts-db-store-hasher".to_string()) .spawn(move || { - Self::background_hasher(receiver); + Self::background_hasher(receiver, cluster_type); }) .unwrap(); self.sender_bg_hasher = Some(sender); @@ -2295,7 +2296,7 @@ impl AccountsDb { self.get_account_accessor_from_cache_or_storage(slot, pubkey, store_id, offset) .get_loaded_account() - .map(|loaded_account| loaded_account.loaded_hash()) + .map(|loaded_account| loaded_account.loaded_hash(self.expected_cluster_type())) .unwrap() } @@ -3316,7 +3317,7 @@ impl AccountsDb { .map(|should_flush_f| should_flush_f(key, account)) .unwrap_or(true); if should_flush { - let hash = iter_item.value().hash(); + let hash = iter_item.value().hash(self.expected_cluster_type()); total_size += (account.data().len() + STORE_META_OVERHEAD) as u64; num_flushed += 1; Some(((key, account), hash)) @@ -3401,13 +3402,9 @@ impl AccountsDb { .enumerate() .map(|(i, (meta, account))| { let hash = hashes.map(|hashes| hashes[i]); - let cached_account = self.accounts_cache.store( - slot, - &meta.pubkey, - (*account).clone(), - hash, - self.expected_cluster_type(), - ); + let cached_account = + self.accounts_cache + .store(slot, &meta.pubkey, (*account).clone(), hash); // hash this account in the bg match &self.sender_bg_hasher { Some(ref sender) => { @@ -3624,7 +3621,8 @@ impl AccountsDb { .get_loaded_account() .and_then( |loaded_account| { - let loaded_hash = loaded_account.loaded_hash(); + let loaded_hash = loaded_account + .loaded_hash(self.expected_cluster_type()); let balance = Self::account_balance_for_capitalization( account_info.lamports, @@ -3753,6 +3751,7 @@ impl AccountsDb { Self::calculate_accounts_hash_without_index( &combined_maps, Some(&self.thread_pool_clean), + self.expected_cluster_type(), ) } else { self.calculate_accounts_hash(slot, ancestors, false) @@ -3791,6 +3790,7 @@ impl AccountsDb { mut stats: &mut crate::accounts_hash::HashStats, bins: usize, bin_range: &Range, + cluster_type: ClusterType, ) -> Vec>> { let max_plus_1 = std::u8::MAX as usize + 1; assert!(bins <= max_plus_1 && bins > 0); @@ -3824,7 +3824,7 @@ impl AccountsDb { let source_item = CalculateHashIntermediate::new( version, - loaded_account.loaded_hash(), + loaded_account.loaded_hash(cluster_type), balance, slot, pubkey, @@ -3846,6 +3846,7 @@ impl AccountsDb { pub fn calculate_accounts_hash_without_index( storages: &[SnapshotStorage], thread_pool: Option<&ThreadPool>, + cluster_type: ClusterType, ) -> (Hash, u64) { let scan_and_hash = || { let mut stats = HashStats::default(); @@ -3878,6 +3879,7 @@ impl AccountsDb { &mut stats, PUBKEY_BINS_FOR_CALCULATING_HASHES, &bounds, + cluster_type, ); let (hash, lamports, for_next_pass) = AccountsHash::rest_of_hash_calculation( @@ -3941,11 +3943,14 @@ impl AccountsDb { slot, |loaded_account: LoadedAccount| { // Cache only has one version per key, don't need to worry about versioning - Some((*loaded_account.pubkey(), loaded_account.loaded_hash())) + Some(( + *loaded_account.pubkey(), + loaded_account.loaded_hash(self.expected_cluster_type()), + )) }, |accum: &DashMap, loaded_account: LoadedAccount| { let loaded_write_version = loaded_account.write_version(); - let loaded_hash = loaded_account.loaded_hash(); + let loaded_hash = loaded_account.loaded_hash(self.expected_cluster_type()); let should_insert = if let Some(existing_entry) = accum.get(loaded_account.pubkey()) { loaded_write_version > existing_entry.value().version() @@ -5123,7 +5128,7 @@ pub mod tests { let mut stats = HashStats::default(); let bounds = Range { start: 0, end: 0 }; - AccountsDb::scan_snapshot_stores(&[], &mut stats, 257, &bounds); + AccountsDb::scan_snapshot_stores(&[], &mut stats, 257, &bounds, ClusterType::Development); } #[test] #[should_panic(expected = "assertion failed: bins <= max_plus_1 && bins > 0")] @@ -5131,7 +5136,7 @@ pub mod tests { let mut stats = HashStats::default(); let bounds = Range { start: 0, end: 0 }; - AccountsDb::scan_snapshot_stores(&[], &mut stats, 0, &bounds); + AccountsDb::scan_snapshot_stores(&[], &mut stats, 0, &bounds, ClusterType::Development); } #[test] @@ -5142,7 +5147,7 @@ pub mod tests { let mut stats = HashStats::default(); let bounds = Range { start: 2, end: 2 }; - AccountsDb::scan_snapshot_stores(&[], &mut stats, 2, &bounds); + AccountsDb::scan_snapshot_stores(&[], &mut stats, 2, &bounds, ClusterType::Development); } #[test] #[should_panic( @@ -5152,7 +5157,7 @@ pub mod tests { let mut stats = HashStats::default(); let bounds = Range { start: 1, end: 3 }; - AccountsDb::scan_snapshot_stores(&[], &mut stats, 2, &bounds); + AccountsDb::scan_snapshot_stores(&[], &mut stats, 2, &bounds, ClusterType::Development); } #[test] @@ -5163,7 +5168,7 @@ pub mod tests { let mut stats = HashStats::default(); let bounds = Range { start: 1, end: 0 }; - AccountsDb::scan_snapshot_stores(&[], &mut stats, 2, &bounds); + AccountsDb::scan_snapshot_stores(&[], &mut stats, 2, &bounds, ClusterType::Development); } fn sample_storages_and_accounts() -> (SnapshotStorages, Vec) { @@ -5259,6 +5264,7 @@ pub mod tests { start: 0, end: bins, }, + ClusterType::Development, ); assert_eq!(result, vec![vec![raw_expected.clone()]]); @@ -5271,6 +5277,7 @@ pub mod tests { start: 0, end: bins, }, + ClusterType::Development, ); let mut expected = vec![Vec::new(); bins]; expected[0].push(raw_expected[0].clone()); @@ -5288,6 +5295,7 @@ pub mod tests { start: 0, end: bins, }, + ClusterType::Development, ); let mut expected = vec![Vec::new(); bins]; expected[0].push(raw_expected[0].clone()); @@ -5305,6 +5313,7 @@ pub mod tests { start: 0, end: bins, }, + ClusterType::Development, ); let mut expected = vec![Vec::new(); bins]; expected[0].push(raw_expected[0].clone()); @@ -5334,6 +5343,7 @@ pub mod tests { start: 0, end: bins, }, + ClusterType::Development, ); assert_eq!(result.len(), 2); // 2 chunks assert_eq!(result[0].len(), 0); // nothing found in first slots @@ -5356,6 +5366,7 @@ pub mod tests { start: 0, end: bins / 2, }, + ClusterType::Development, ); let mut expected = vec![Vec::new(); bins]; expected[0].push(raw_expected[0].clone()); @@ -5371,6 +5382,7 @@ pub mod tests { start: 1, end: bins, }, + ClusterType::Development, ); let mut expected = vec![Vec::new(); bins]; @@ -5389,6 +5401,7 @@ pub mod tests { start: bin, end: bin + 1, }, + ClusterType::Development, ); let mut expected = vec![Vec::new(); bins]; expected[bin].push(raw_expected[bin].clone()); @@ -5406,6 +5419,7 @@ pub mod tests { start: bin, end: bin + 1, }, + ClusterType::Development, ); let mut expected = vec![]; if let Some(index) = bin_locations.iter().position(|&r| r == bin) { @@ -5437,6 +5451,7 @@ pub mod tests { start: 127, end: 128, }, + ClusterType::Development, ); assert_eq!(result.len(), 2); // 2 chunks assert_eq!(result[0].len(), 0); // nothing found in first slots @@ -5451,7 +5466,11 @@ pub mod tests { solana_logger::setup(); let (storages, _size, _slot_expected) = sample_storage(); - let result = AccountsDb::calculate_accounts_hash_without_index(&storages, None); + let result = AccountsDb::calculate_accounts_hash_without_index( + &storages, + None, + ClusterType::Development, + ); let expected_hash = Hash::from_str("GKot5hBsd81kMupNCXHaqbhv3huEbxAFMLnpcX2hniwn").unwrap(); assert_eq!(result, (expected_hash, 0)); } @@ -5466,7 +5485,11 @@ pub mod tests { item.hash }); let sum = raw_expected.iter().map(|item| item.lamports).sum(); - let result = AccountsDb::calculate_accounts_hash_without_index(&storages, None); + let result = AccountsDb::calculate_accounts_hash_without_index( + &storages, + None, + ClusterType::Development, + ); assert_eq!(result, (expected_hash, sum)); } diff --git a/runtime/src/snapshot_package.rs b/runtime/src/snapshot_package.rs index e6bd9d3feee08a..8d03154fd0737b 100644 --- a/runtime/src/snapshot_package.rs +++ b/runtime/src/snapshot_package.rs @@ -2,6 +2,7 @@ use crate::bank_forks::ArchiveFormat; use crate::snapshot_utils::SnapshotVersion; use crate::{accounts_db::SnapshotStorages, bank::BankSlotDelta}; use solana_sdk::clock::Slot; +use solana_sdk::genesis_config::ClusterType; use solana_sdk::hash::Hash; use std::{ path::PathBuf, @@ -26,6 +27,7 @@ pub struct AccountsPackagePre { pub snapshot_output_dir: PathBuf, pub expected_capitalization: u64, pub hash_for_testing: Option, + pub cluster_type: ClusterType, } impl AccountsPackagePre { @@ -42,6 +44,7 @@ impl AccountsPackagePre { snapshot_output_dir: PathBuf, expected_capitalization: u64, hash_for_testing: Option, + cluster_type: ClusterType, ) -> Self { Self { slot, @@ -55,6 +58,7 @@ impl AccountsPackagePre { snapshot_output_dir, expected_capitalization, hash_for_testing, + cluster_type, } } } diff --git a/runtime/src/snapshot_utils.rs b/runtime/src/snapshot_utils.rs index 8dc25c62617cae..39640c2c38c420 100644 --- a/runtime/src/snapshot_utils.rs +++ b/runtime/src/snapshot_utils.rs @@ -187,6 +187,7 @@ pub fn package_snapshot, Q: AsRef>( snapshot_package_output_path.as_ref().to_path_buf(), bank.capitalization(), hash_for_testing, + bank.cluster_type(), ); Ok(package) @@ -978,6 +979,7 @@ pub fn process_accounts_package_pre( let (hash, lamports) = AccountsDb::calculate_accounts_hash_without_index( &accounts_package.storages, thread_pool, + accounts_package.cluster_type, ); assert_eq!(accounts_package.expected_capitalization, lamports);