From 8c62927d59b08efe0610b55a87b9a7add69fdf03 Mon Sep 17 00:00:00 2001 From: Brooks Date: Tue, 17 Jan 2023 15:04:29 -0500 Subject: [PATCH] Adds calculate_incremental_accounts_hash() (#29734) **Node operators,** please delete your `/calculate_accounts_hash_cache` directory (if you have one). It has been moved/renamed to `/accounts_hash_cache/full`. --- runtime/src/accounts_db.rs | 258 +++++++++++++++++++++++++++++++---- runtime/src/accounts_hash.rs | 81 ++++++----- 2 files changed, 285 insertions(+), 54 deletions(-) diff --git a/runtime/src/accounts_db.rs b/runtime/src/accounts_db.rs index f98c6157206f40..05cf0f03f430ca 100644 --- a/runtime/src/accounts_db.rs +++ b/runtime/src/accounts_db.rs @@ -26,7 +26,7 @@ use { accounts_cache::{AccountsCache, CachedAccount, SlotCache}, accounts_hash::{ AccountsHash, AccountsHasher, CalcAccountsHashConfig, CalculateHashIntermediate, - HashStats, + HashStats, ZeroLamportAccounts, }, accounts_index::{ AccountIndexGetResult, AccountSecondaryIndexes, AccountsIndex, AccountsIndexConfig, @@ -1220,7 +1220,8 @@ pub struct AccountsDb { /// Set of storage paths to pick from pub(crate) paths: Vec, - accounts_hash_cache_path: PathBuf, + full_accounts_hash_cache_path: PathBuf, + incremental_accounts_hash_cache_path: PathBuf, // used by tests // holds this until we are dropped @@ -2184,7 +2185,7 @@ impl<'a> AppendVecScan for ScanState<'a> { } impl AccountsDb { - pub const ACCOUNTS_HASH_CACHE_DIR: &str = "calculate_accounts_hash_cache"; + pub const ACCOUNTS_HASH_CACHE_DIR: &str = "accounts_hash_cache"; pub fn default_for_tests() -> Self { Self::default_with_accounts_index(AccountInfoAccountsIndex::default_for_tests(), None) @@ -2234,7 +2235,8 @@ impl AccountsDb { write_cache_limit_bytes: None, write_version: AtomicU64::new(0), paths: vec![], - accounts_hash_cache_path, + full_accounts_hash_cache_path: accounts_hash_cache_path.join("full"), + incremental_accounts_hash_cache_path: accounts_hash_cache_path.join("incremental"), temp_accounts_hash_cache_path, shrink_paths: RwLock::new(None), temp_paths: None, @@ -5845,7 +5847,6 @@ impl AccountsDb { if lamports == 0 { return Hash::default(); } - let mut hasher = blake3::Hasher::new(); hasher.update(&lamports.to_le_bytes()); @@ -7176,17 +7177,9 @@ impl AccountsDb { ); } - fn get_cache_hash_data( - &self, - config: &CalcAccountsHashConfig<'_>, - slot: Slot, - ) -> CacheHashData { - Self::_get_cache_hash_data(self.accounts_hash_cache_path.clone(), config, slot) - } - /// normal code path returns the common cache path /// when called after a failure has been detected, redirect the cache storage to a separate folder for debugging later - fn _get_cache_hash_data( + fn get_cache_hash_data( accounts_hash_cache_path: PathBuf, config: &CalcAccountsHashConfig<'_>, slot: Slot, @@ -7206,10 +7199,53 @@ impl AccountsDb { // modeled after get_accounts_delta_hash // intended to be faster than calculate_accounts_hash pub fn calculate_accounts_hash_from_storages( + &self, + config: &CalcAccountsHashConfig<'_>, + storages: &SortedStorages<'_>, + stats: HashStats, + ) -> Result<(AccountsHash, u64), BankHashVerificationError> { + self._calculate_accounts_hash_from_storages( + config, + storages, + stats, + CalcAccountsHashFlavor::Full, + self.full_accounts_hash_cache_path.clone(), + ) + } + + /// Calculate the incremental accounts hash + /// + /// This calculation is intended to be used by incremental snapshots, and thus differs from a + /// "full" accounts hash in a few ways: + /// - Zero-lamport accounts are *included* in the hash because zero-lamport accounts are also + /// included in the incremental snapshot. This ensures reconstructing the AccountsDb is + /// still correct when using this incremental accounts hash. + /// - `storages` must be *greater than* `base_slot`. This follows the same requirements as + /// incremental snapshots. + pub fn calculate_incremental_accounts_hash( + &self, + config: &CalcAccountsHashConfig<'_>, + storages: &SortedStorages<'_>, + base_slot: Slot, + stats: HashStats, + ) -> Result<(AccountsHash, /* capitalization */ u64), BankHashVerificationError> { + assert!(storages.range().start > base_slot, "The storages for calculating an incremental accounts hash must all be higher than the base slot"); + self._calculate_accounts_hash_from_storages( + config, + storages, + stats, + CalcAccountsHashFlavor::Incremental, + self.incremental_accounts_hash_cache_path.clone(), + ) + } + + fn _calculate_accounts_hash_from_storages( &self, config: &CalcAccountsHashConfig<'_>, storages: &SortedStorages<'_>, mut stats: HashStats, + flavor: CalcAccountsHashFlavor, + accounts_hash_cache_path: PathBuf, ) -> Result<(AccountsHash, u64), BankHashVerificationError> { let _guard = self.active_stats.activate(ActiveStatItem::Hash); stats.oldest_root = storages.range().start; @@ -7217,49 +7253,53 @@ impl AccountsDb { self.mark_old_slots_as_dirty(storages, config.epoch_schedule.slots_per_epoch, &mut stats); let use_bg_thread_pool = config.use_bg_thread_pool; - let mut scan_and_hash = || { - let cache_hash_data = self.get_cache_hash_data(config, storages.max_slot_inclusive()); + let scan_and_hash = || { + let cache_hash_data = Self::get_cache_hash_data( + accounts_hash_cache_path, + config, + storages.max_slot_inclusive(), + ); let bounds = Range { start: 0, end: PUBKEY_BINS_FOR_CALCULATING_HASHES, }; - let hash = AccountsHasher { + let accounts_hasher = AccountsHasher { filler_account_suffix: if self.filler_accounts_config.count > 0 { self.filler_account_suffix } else { None }, + zero_lamport_accounts: flavor.zero_lamport_accounts(), }; // get raw data by scanning - let result = self.scan_snapshot_stores_with_cache( + let cache_hash_data_files = self.scan_snapshot_stores_with_cache( &cache_hash_data, storages, &mut stats, PUBKEY_BINS_FOR_CALCULATING_HASHES, &bounds, config, - hash.filler_account_suffix.as_ref(), + accounts_hasher.filler_account_suffix.as_ref(), )?; // convert mmapped cache files into slices of data - let slices = result + let cache_hash_intermediates = cache_hash_data_files .iter() .map(|d| d.get_cache_hash_data()) .collect::>(); // rework slices of data into bins for parallel processing and to match data shape expected by 'rest_of_hash_calculation' let result = AccountsHasher::get_binned_data( - &slices, + &cache_hash_intermediates, PUBKEY_BINS_FOR_CALCULATING_HASHES, &bounds, ); // turn raw data into merkle tree hashes and sum of lamports - let final_result = hash.rest_of_hash_calculation(result, &mut stats); - + let final_result = accounts_hasher.rest_of_hash_calculation(result, &mut stats); info!( "calculate_accounts_hash_from_storages: slot: {} {:?}", storages.max_slot_inclusive(), @@ -8952,6 +8992,23 @@ pub enum CalcAccountsHashDataSource { Storages, } +/// Which accounts hash calculation is being performed? +#[derive(Debug)] +enum CalcAccountsHashFlavor { + Full, + Incremental, +} + +impl CalcAccountsHashFlavor { + /// How should zero-lamport accounts be handled by this accounts hash calculation? + fn zero_lamport_accounts(&self) -> ZeroLamportAccounts { + match self { + CalcAccountsHashFlavor::Full => ZeroLamportAccounts::Excluded, + CalcAccountsHashFlavor::Incremental => ZeroLamportAccounts::Included, + } + } +} + #[cfg(test)] impl AccountsDb { pub fn new(paths: Vec, cluster_type: &ClusterType) -> Self { @@ -17364,4 +17421,159 @@ pub mod tests { make_ancient_append_vec_full(full.new_storage()); full } + + #[test] + fn test_calculate_incremental_accounts_hash() { + let accounts_db = + AccountsDb::new_for_tests_with_caching(Vec::new(), &ClusterType::Development); + + let owner = Pubkey::new_unique(); + let mut accounts: Vec<_> = (0..10) + .map(|_| (Pubkey::new_unique(), AccountSharedData::new(0, 0, &owner))) + .collect(); + + // store some accounts into slot 0 + let slot = 0; + { + accounts[0].1.set_lamports(0); + accounts[1].1.set_lamports(1); + accounts[2].1.set_lamports(10); + accounts[3].1.set_lamports(100); + //accounts[4].1.set_lamports(1_000); <-- will be added next slot + + let accounts = vec![ + (&accounts[0].0, &accounts[0].1), + (&accounts[1].0, &accounts[1].1), + (&accounts[2].0, &accounts[2].1), + (&accounts[3].0, &accounts[3].1), + ]; + accounts_db.store_cached((slot, accounts.as_slice()), None); + accounts_db.add_root_and_flush_write_cache(slot); + } + + // store some accounts into slot 1 + let slot = slot + 1; + { + //accounts[0].1.set_lamports(0); <-- unchanged + accounts[1].1.set_lamports(0); /* <-- drain account */ + //accounts[2].1.set_lamports(10); <-- unchanged + //accounts[3].1.set_lamports(100); <-- unchanged + accounts[4].1.set_lamports(1_000); /* <-- add account */ + + let accounts = vec![ + (&accounts[1].0, &accounts[1].1), + (&accounts[4].0, &accounts[4].1), + ]; + accounts_db.store_cached((slot, accounts.as_slice()), None); + accounts_db.add_root_and_flush_write_cache(slot); + } + + // calculate the full accounts hash + let full_accounts_hash = { + accounts_db.clean_accounts(Some(slot - 1), false, None); + let (storages, _) = accounts_db.get_snapshot_storages(..=slot, None); + let storages = SortedStorages::new(&storages); + accounts_db + .calculate_accounts_hash_from_storages( + &CalcAccountsHashConfig::default(), + &storages, + HashStats::default(), + ) + .unwrap() + }; + assert_eq!(full_accounts_hash.1, 1_110); + let full_accounts_hash_slot = slot; + + // Calculate the expected full accounts hash here and ensure it matches. + // Ensure the zero-lamport accounts are NOT included in the full accounts hash. + let full_account_hashes = [(2, 0), (3, 0), (4, 1)].into_iter().map(|(index, slot)| { + let (pubkey, account) = &accounts[index]; + AccountsDb::hash_account(slot, account, pubkey, INCLUDE_SLOT_IN_HASH_TESTS) + }); + let expected_accounts_hash = AccountsHash(compute_merkle_root(full_account_hashes)); + assert_eq!(full_accounts_hash.0, expected_accounts_hash); + + // store accounts into slot 2 + let slot = slot + 1; + { + //accounts[0].1.set_lamports(0); <-- unchanged + //accounts[1].1.set_lamports(0); <-- unchanged + accounts[2].1.set_lamports(0); /* <-- drain account */ + //accounts[3].1.set_lamports(100); <-- unchanged + //accounts[4].1.set_lamports(1_000); <-- unchanged + accounts[5].1.set_lamports(10_000); /* <-- add account */ + accounts[6].1.set_lamports(100_000); /* <-- add account */ + //accounts[7].1.set_lamports(1_000_000); <-- will be added next slot + + let accounts = vec![ + (&accounts[2].0, &accounts[2].1), + (&accounts[5].0, &accounts[5].1), + (&accounts[6].0, &accounts[6].1), + ]; + accounts_db.store_cached((slot, accounts.as_slice()), None); + accounts_db.add_root_and_flush_write_cache(slot); + } + + // store accounts into slot 3 + let slot = slot + 1; + { + //accounts[0].1.set_lamports(0); <-- unchanged + //accounts[1].1.set_lamports(0); <-- unchanged + //accounts[2].1.set_lamports(0); <-- unchanged + accounts[3].1.set_lamports(0); /* <-- drain account */ + //accounts[4].1.set_lamports(1_000); <-- unchanged + accounts[5].1.set_lamports(0); /* <-- drain account */ + //accounts[6].1.set_lamports(100_000); <-- unchanged + accounts[7].1.set_lamports(1_000_000); /* <-- add account */ + + let accounts = vec![ + (&accounts[3].0, &accounts[3].1), + (&accounts[5].0, &accounts[5].1), + (&accounts[7].0, &accounts[7].1), + ]; + accounts_db.store_cached((slot, accounts.as_slice()), None); + accounts_db.add_root_and_flush_write_cache(slot); + } + + // calculate the incremental accounts hash + let incremental_accounts_hash = { + accounts_db.clean_accounts(Some(slot - 1), false, Some(full_accounts_hash_slot)); + let (storages, _) = + accounts_db.get_snapshot_storages(full_accounts_hash_slot + 1..=slot, None); + let storages = SortedStorages::new(&storages); + accounts_db + .calculate_incremental_accounts_hash( + &CalcAccountsHashConfig::default(), + &storages, + full_accounts_hash_slot, + HashStats::default(), + ) + .unwrap() + }; + assert_eq!(incremental_accounts_hash.1, 1_100_000); + + // Ensure the zero-lamport accounts are included in the IAH. + // Accounts 2, 3, and 5 are all zero-lamports. + let incremental_account_hashes = + [(2, 2), (3, 3), (5, 3), (6, 2), (7, 3)] + .into_iter() + .map(|(index, slot)| { + let (pubkey, account) = &accounts[index]; + if account.is_zero_lamport() { + // For incremental accounts hash, the hash of a zero lamport account is the hash of its pubkey. + // Ensure this implementation detail remains in sync with AccountsHasher::de_dup_in_parallel(). + let hash = blake3::hash(bytemuck::bytes_of(pubkey)); + Hash::new_from_array(hash.into()) + } else { + AccountsDb::hash_account(slot, account, pubkey, INCLUDE_SLOT_IN_HASH_TESTS) + } + }); + let expected_accounts_hash = AccountsHash(compute_merkle_root(incremental_account_hashes)); + assert_eq!(incremental_accounts_hash.0, expected_accounts_hash); + } + + fn compute_merkle_root(hashes: impl IntoIterator) -> Hash { + let hashes = hashes.into_iter().collect(); + AccountsHasher::compute_merkle_root_recurse(hashes, MERKLE_FANOUT) + } } diff --git a/runtime/src/accounts_hash.rs b/runtime/src/accounts_hash.rs index 4fba0bddd1f0b1..97a7ed6f0899ad 100644 --- a/runtime/src/accounts_hash.rs +++ b/runtime/src/accounts_hash.rs @@ -495,9 +495,19 @@ impl CumulativeOffsets { } } -#[derive(Debug, Default)] +#[derive(Debug)] pub struct AccountsHasher { pub filler_account_suffix: Option, + pub zero_lamport_accounts: ZeroLamportAccounts, +} + +impl Default for AccountsHasher { + fn default() -> Self { + Self { + filler_account_suffix: None, + zero_lamport_accounts: ZeroLamportAccounts::Excluded, + } + } } impl AccountsHasher { @@ -862,7 +872,7 @@ impl AccountsHasher { /// Vec, with one entry per bin /// for each entry, Vec in pubkey order /// If return Vec was flattened, it would be all hashes, in pubkey order. - fn de_dup_and_eliminate_zeros<'a>( + fn de_dup_accounts<'a>( &self, sorted_data_by_pubkey: &'a [SortedDataByPubkey<'a>], stats: &mut HashStats, @@ -947,7 +957,7 @@ impl AccountsHasher { // go through: [..][pubkey_bin][..] and return hashes and lamport sum // slot groups^ ^accounts found in a slot group, sorted by pubkey, higher slot, write_version - // 1. eliminate zero lamport accounts + // 1. handle zero lamport accounts // 2. pick the highest slot or (slot = and highest version) of each pubkey // 3. produce this output: // a. AccountHashesFile: individual account hashes in pubkey order @@ -1021,15 +1031,26 @@ impl AccountsHasher { &mut first_item_to_pubkey_division, ); - // add lamports, get hash as long as the lamports are > 0 - if item.lamports != 0 - && (!filler_accounts_enabled || !self.is_filler_account(&item.pubkey)) - { - overall_sum = Self::checked_cast_for_capitalization( - item.lamports as u128 + overall_sum as u128, - ); - hashes.write(&item.hash); + // add lamports and get hash + if item.lamports != 0 { + // do not include filler accounts in the hash + if !(filler_accounts_enabled && self.is_filler_account(&item.pubkey)) { + overall_sum = Self::checked_cast_for_capitalization( + item.lamports as u128 + overall_sum as u128, + ); + hashes.write(&item.hash); + } + } else { + // if lamports == 0, check if they should be included + if self.zero_lamport_accounts == ZeroLamportAccounts::Included { + // For incremental accounts hash, the hash of a zero lamport account is + // the hash of its pubkey + let hash = blake3::hash(bytemuck::bytes_of(&item.pubkey)); + let hash = Hash::new_from_array(hash.into()); + hashes.write(&hash); + } } + if !duplicate_pubkey_indexes.is_empty() { // skip past duplicate keys in earlier slots // reverse this list because get_item can remove first_items[*i] when *i is exhausted @@ -1066,7 +1087,7 @@ impl AccountsHasher { data_sections_by_pubkey: Vec>, mut stats: &mut HashStats, ) -> (Hash, u64) { - let (hashes, total_lamports) = self.de_dup_and_eliminate_zeros( + let (hashes, total_lamports) = self.de_dup_accounts( &data_sections_by_pubkey, stats, PUBKEY_BINS_FOR_CALCULATING_HASHES, @@ -1088,7 +1109,14 @@ impl AccountsHasher { } } -/// Hash of all the accounts +/// How should zero-lamport accounts be treated by the accounts hasher? +#[derive(Debug, Copy, Clone, Eq, PartialEq)] +pub enum ZeroLamportAccounts { + Excluded, + Included, +} + +/// Hash of accounts #[derive(Debug, Default, Copy, Clone, Eq, PartialEq, Serialize, Deserialize, AbiExample)] pub struct AccountsHash(pub Hash); @@ -1291,7 +1319,7 @@ pub mod tests { let vec = vec![vec![], vec![]]; let (hashes, lamports) = - accounts_hash.de_dup_and_eliminate_zeros(&vec, &mut HashStats::default(), one_range()); + accounts_hash.de_dup_accounts(&vec, &mut HashStats::default(), one_range()); assert_eq!( vec![Hash::default(); 0], get_vec_vec(hashes) @@ -1302,7 +1330,7 @@ pub mod tests { assert_eq!(lamports, 0); let vec = vec![]; let (hashes, lamports) = - accounts_hash.de_dup_and_eliminate_zeros(&vec, &mut HashStats::default(), zero_range()); + accounts_hash.de_dup_accounts(&vec, &mut HashStats::default(), zero_range()); let empty: Vec> = Vec::default(); assert_eq!(empty, get_vec_vec(hashes)); assert_eq!(lamports, 0); @@ -1399,25 +1427,16 @@ pub mod tests { let (hashes3, lamports3, _) = hash.de_dup_accounts_in_parallel(&slice3, 0); let vec = slice.to_vec(); let slice4 = convert_to_slice2(&vec); - let (hashes4, lamports4) = hash.de_dup_and_eliminate_zeros( - &slice4, - &mut HashStats::default(), - end - start, - ); + let (hashes4, lamports4) = + hash.de_dup_accounts(&slice4, &mut HashStats::default(), end - start); let vec = slice.to_vec(); let slice5 = convert_to_slice2(&vec); - let (hashes5, lamports5) = hash.de_dup_and_eliminate_zeros( - &slice5, - &mut HashStats::default(), - end - start, - ); + let (hashes5, lamports5) = + hash.de_dup_accounts(&slice5, &mut HashStats::default(), end - start); let vec = slice.to_vec(); let slice5 = convert_to_slice2(&vec); - let (hashes6, lamports6) = hash.de_dup_and_eliminate_zeros( - &slice5, - &mut HashStats::default(), - end - start, - ); + let (hashes6, lamports6) = + hash.de_dup_accounts(&slice5, &mut HashStats::default(), end - start); let hashes2 = get_vec(hashes2); let hashes3 = get_vec(hashes3); @@ -1936,7 +1955,7 @@ pub mod tests { Pubkey::new_unique(), )], ]; - AccountsHasher::default().de_dup_and_eliminate_zeros( + AccountsHasher::default().de_dup_accounts( &[convert_to_slice(&input)], &mut HashStats::default(), 2, // accounts above are in 2 groups