Skip to content

Commit

Permalink
Adds calculate_incremental_accounts_hash() (#29734)
Browse files Browse the repository at this point in the history
**Node operators,** please delete your `<ledger>/calculate_accounts_hash_cache` directory (if you have one).  It has been moved/renamed to `<ledger>/accounts_hash_cache/full`.
  • Loading branch information
brooksprumo authored Jan 17, 2023
1 parent 8daed75 commit 8c62927
Show file tree
Hide file tree
Showing 2 changed files with 285 additions and 54 deletions.
258 changes: 235 additions & 23 deletions runtime/src/accounts_db.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ use {
accounts_cache::{AccountsCache, CachedAccount, SlotCache},
accounts_hash::{
AccountsHash, AccountsHasher, CalcAccountsHashConfig, CalculateHashIntermediate,
HashStats,
HashStats, ZeroLamportAccounts,
},
accounts_index::{
AccountIndexGetResult, AccountSecondaryIndexes, AccountsIndex, AccountsIndexConfig,
Expand Down Expand Up @@ -1220,7 +1220,8 @@ pub struct AccountsDb {
/// Set of storage paths to pick from
pub(crate) paths: Vec<PathBuf>,

accounts_hash_cache_path: PathBuf,
full_accounts_hash_cache_path: PathBuf,
incremental_accounts_hash_cache_path: PathBuf,

// used by tests
// holds this until we are dropped
Expand Down Expand Up @@ -2184,7 +2185,7 @@ impl<'a> AppendVecScan for ScanState<'a> {
}

impl AccountsDb {
pub const ACCOUNTS_HASH_CACHE_DIR: &str = "calculate_accounts_hash_cache";
pub const ACCOUNTS_HASH_CACHE_DIR: &str = "accounts_hash_cache";

pub fn default_for_tests() -> Self {
Self::default_with_accounts_index(AccountInfoAccountsIndex::default_for_tests(), None)
Expand Down Expand Up @@ -2234,7 +2235,8 @@ impl AccountsDb {
write_cache_limit_bytes: None,
write_version: AtomicU64::new(0),
paths: vec![],
accounts_hash_cache_path,
full_accounts_hash_cache_path: accounts_hash_cache_path.join("full"),
incremental_accounts_hash_cache_path: accounts_hash_cache_path.join("incremental"),
temp_accounts_hash_cache_path,
shrink_paths: RwLock::new(None),
temp_paths: None,
Expand Down Expand Up @@ -5845,7 +5847,6 @@ impl AccountsDb {
if lamports == 0 {
return Hash::default();
}

let mut hasher = blake3::Hasher::new();

hasher.update(&lamports.to_le_bytes());
Expand Down Expand Up @@ -7176,17 +7177,9 @@ impl AccountsDb {
);
}

fn get_cache_hash_data(
&self,
config: &CalcAccountsHashConfig<'_>,
slot: Slot,
) -> CacheHashData {
Self::_get_cache_hash_data(self.accounts_hash_cache_path.clone(), config, slot)
}

/// normal code path returns the common cache path
/// when called after a failure has been detected, redirect the cache storage to a separate folder for debugging later
fn _get_cache_hash_data(
fn get_cache_hash_data(
accounts_hash_cache_path: PathBuf,
config: &CalcAccountsHashConfig<'_>,
slot: Slot,
Expand All @@ -7206,60 +7199,107 @@ impl AccountsDb {
// modeled after get_accounts_delta_hash
// intended to be faster than calculate_accounts_hash
pub fn calculate_accounts_hash_from_storages(
&self,
config: &CalcAccountsHashConfig<'_>,
storages: &SortedStorages<'_>,
stats: HashStats,
) -> Result<(AccountsHash, u64), BankHashVerificationError> {
self._calculate_accounts_hash_from_storages(
config,
storages,
stats,
CalcAccountsHashFlavor::Full,
self.full_accounts_hash_cache_path.clone(),
)
}

/// Calculate the incremental accounts hash
///
/// This calculation is intended to be used by incremental snapshots, and thus differs from a
/// "full" accounts hash in a few ways:
/// - Zero-lamport accounts are *included* in the hash because zero-lamport accounts are also
/// included in the incremental snapshot. This ensures reconstructing the AccountsDb is
/// still correct when using this incremental accounts hash.
/// - `storages` must be *greater than* `base_slot`. This follows the same requirements as
/// incremental snapshots.
pub fn calculate_incremental_accounts_hash(
&self,
config: &CalcAccountsHashConfig<'_>,
storages: &SortedStorages<'_>,
base_slot: Slot,
stats: HashStats,
) -> Result<(AccountsHash, /* capitalization */ u64), BankHashVerificationError> {
assert!(storages.range().start > base_slot, "The storages for calculating an incremental accounts hash must all be higher than the base slot");
self._calculate_accounts_hash_from_storages(
config,
storages,
stats,
CalcAccountsHashFlavor::Incremental,
self.incremental_accounts_hash_cache_path.clone(),
)
}

fn _calculate_accounts_hash_from_storages(
&self,
config: &CalcAccountsHashConfig<'_>,
storages: &SortedStorages<'_>,
mut stats: HashStats,
flavor: CalcAccountsHashFlavor,
accounts_hash_cache_path: PathBuf,
) -> Result<(AccountsHash, u64), BankHashVerificationError> {
let _guard = self.active_stats.activate(ActiveStatItem::Hash);
stats.oldest_root = storages.range().start;

self.mark_old_slots_as_dirty(storages, config.epoch_schedule.slots_per_epoch, &mut stats);

let use_bg_thread_pool = config.use_bg_thread_pool;
let mut scan_and_hash = || {
let cache_hash_data = self.get_cache_hash_data(config, storages.max_slot_inclusive());
let scan_and_hash = || {
let cache_hash_data = Self::get_cache_hash_data(
accounts_hash_cache_path,
config,
storages.max_slot_inclusive(),
);

let bounds = Range {
start: 0,
end: PUBKEY_BINS_FOR_CALCULATING_HASHES,
};

let hash = AccountsHasher {
let accounts_hasher = AccountsHasher {
filler_account_suffix: if self.filler_accounts_config.count > 0 {
self.filler_account_suffix
} else {
None
},
zero_lamport_accounts: flavor.zero_lamport_accounts(),
};

// get raw data by scanning
let result = self.scan_snapshot_stores_with_cache(
let cache_hash_data_files = self.scan_snapshot_stores_with_cache(
&cache_hash_data,
storages,
&mut stats,
PUBKEY_BINS_FOR_CALCULATING_HASHES,
&bounds,
config,
hash.filler_account_suffix.as_ref(),
accounts_hasher.filler_account_suffix.as_ref(),
)?;

// convert mmapped cache files into slices of data
let slices = result
let cache_hash_intermediates = cache_hash_data_files
.iter()
.map(|d| d.get_cache_hash_data())
.collect::<Vec<_>>();

// rework slices of data into bins for parallel processing and to match data shape expected by 'rest_of_hash_calculation'
let result = AccountsHasher::get_binned_data(
&slices,
&cache_hash_intermediates,
PUBKEY_BINS_FOR_CALCULATING_HASHES,
&bounds,
);

// turn raw data into merkle tree hashes and sum of lamports
let final_result = hash.rest_of_hash_calculation(result, &mut stats);

let final_result = accounts_hasher.rest_of_hash_calculation(result, &mut stats);
info!(
"calculate_accounts_hash_from_storages: slot: {} {:?}",
storages.max_slot_inclusive(),
Expand Down Expand Up @@ -8952,6 +8992,23 @@ pub enum CalcAccountsHashDataSource {
Storages,
}

/// Which accounts hash calculation is being performed?
#[derive(Debug)]
enum CalcAccountsHashFlavor {
Full,
Incremental,
}

impl CalcAccountsHashFlavor {
/// How should zero-lamport accounts be handled by this accounts hash calculation?
fn zero_lamport_accounts(&self) -> ZeroLamportAccounts {
match self {
CalcAccountsHashFlavor::Full => ZeroLamportAccounts::Excluded,
CalcAccountsHashFlavor::Incremental => ZeroLamportAccounts::Included,
}
}
}

#[cfg(test)]
impl AccountsDb {
pub fn new(paths: Vec<PathBuf>, cluster_type: &ClusterType) -> Self {
Expand Down Expand Up @@ -17364,4 +17421,159 @@ pub mod tests {
make_ancient_append_vec_full(full.new_storage());
full
}

#[test]
fn test_calculate_incremental_accounts_hash() {
let accounts_db =
AccountsDb::new_for_tests_with_caching(Vec::new(), &ClusterType::Development);

let owner = Pubkey::new_unique();
let mut accounts: Vec<_> = (0..10)
.map(|_| (Pubkey::new_unique(), AccountSharedData::new(0, 0, &owner)))
.collect();

// store some accounts into slot 0
let slot = 0;
{
accounts[0].1.set_lamports(0);
accounts[1].1.set_lamports(1);
accounts[2].1.set_lamports(10);
accounts[3].1.set_lamports(100);
//accounts[4].1.set_lamports(1_000); <-- will be added next slot

let accounts = vec![
(&accounts[0].0, &accounts[0].1),
(&accounts[1].0, &accounts[1].1),
(&accounts[2].0, &accounts[2].1),
(&accounts[3].0, &accounts[3].1),
];
accounts_db.store_cached((slot, accounts.as_slice()), None);
accounts_db.add_root_and_flush_write_cache(slot);
}

// store some accounts into slot 1
let slot = slot + 1;
{
//accounts[0].1.set_lamports(0); <-- unchanged
accounts[1].1.set_lamports(0); /* <-- drain account */
//accounts[2].1.set_lamports(10); <-- unchanged
//accounts[3].1.set_lamports(100); <-- unchanged
accounts[4].1.set_lamports(1_000); /* <-- add account */

let accounts = vec![
(&accounts[1].0, &accounts[1].1),
(&accounts[4].0, &accounts[4].1),
];
accounts_db.store_cached((slot, accounts.as_slice()), None);
accounts_db.add_root_and_flush_write_cache(slot);
}

// calculate the full accounts hash
let full_accounts_hash = {
accounts_db.clean_accounts(Some(slot - 1), false, None);
let (storages, _) = accounts_db.get_snapshot_storages(..=slot, None);
let storages = SortedStorages::new(&storages);
accounts_db
.calculate_accounts_hash_from_storages(
&CalcAccountsHashConfig::default(),
&storages,
HashStats::default(),
)
.unwrap()
};
assert_eq!(full_accounts_hash.1, 1_110);
let full_accounts_hash_slot = slot;

// Calculate the expected full accounts hash here and ensure it matches.
// Ensure the zero-lamport accounts are NOT included in the full accounts hash.
let full_account_hashes = [(2, 0), (3, 0), (4, 1)].into_iter().map(|(index, slot)| {
let (pubkey, account) = &accounts[index];
AccountsDb::hash_account(slot, account, pubkey, INCLUDE_SLOT_IN_HASH_TESTS)
});
let expected_accounts_hash = AccountsHash(compute_merkle_root(full_account_hashes));
assert_eq!(full_accounts_hash.0, expected_accounts_hash);

// store accounts into slot 2
let slot = slot + 1;
{
//accounts[0].1.set_lamports(0); <-- unchanged
//accounts[1].1.set_lamports(0); <-- unchanged
accounts[2].1.set_lamports(0); /* <-- drain account */
//accounts[3].1.set_lamports(100); <-- unchanged
//accounts[4].1.set_lamports(1_000); <-- unchanged
accounts[5].1.set_lamports(10_000); /* <-- add account */
accounts[6].1.set_lamports(100_000); /* <-- add account */
//accounts[7].1.set_lamports(1_000_000); <-- will be added next slot

let accounts = vec![
(&accounts[2].0, &accounts[2].1),
(&accounts[5].0, &accounts[5].1),
(&accounts[6].0, &accounts[6].1),
];
accounts_db.store_cached((slot, accounts.as_slice()), None);
accounts_db.add_root_and_flush_write_cache(slot);
}

// store accounts into slot 3
let slot = slot + 1;
{
//accounts[0].1.set_lamports(0); <-- unchanged
//accounts[1].1.set_lamports(0); <-- unchanged
//accounts[2].1.set_lamports(0); <-- unchanged
accounts[3].1.set_lamports(0); /* <-- drain account */
//accounts[4].1.set_lamports(1_000); <-- unchanged
accounts[5].1.set_lamports(0); /* <-- drain account */
//accounts[6].1.set_lamports(100_000); <-- unchanged
accounts[7].1.set_lamports(1_000_000); /* <-- add account */

let accounts = vec![
(&accounts[3].0, &accounts[3].1),
(&accounts[5].0, &accounts[5].1),
(&accounts[7].0, &accounts[7].1),
];
accounts_db.store_cached((slot, accounts.as_slice()), None);
accounts_db.add_root_and_flush_write_cache(slot);
}

// calculate the incremental accounts hash
let incremental_accounts_hash = {
accounts_db.clean_accounts(Some(slot - 1), false, Some(full_accounts_hash_slot));
let (storages, _) =
accounts_db.get_snapshot_storages(full_accounts_hash_slot + 1..=slot, None);
let storages = SortedStorages::new(&storages);
accounts_db
.calculate_incremental_accounts_hash(
&CalcAccountsHashConfig::default(),
&storages,
full_accounts_hash_slot,
HashStats::default(),
)
.unwrap()
};
assert_eq!(incremental_accounts_hash.1, 1_100_000);

// Ensure the zero-lamport accounts are included in the IAH.
// Accounts 2, 3, and 5 are all zero-lamports.
let incremental_account_hashes =
[(2, 2), (3, 3), (5, 3), (6, 2), (7, 3)]
.into_iter()
.map(|(index, slot)| {
let (pubkey, account) = &accounts[index];
if account.is_zero_lamport() {
// For incremental accounts hash, the hash of a zero lamport account is the hash of its pubkey.
// Ensure this implementation detail remains in sync with AccountsHasher::de_dup_in_parallel().
let hash = blake3::hash(bytemuck::bytes_of(pubkey));
Hash::new_from_array(hash.into())
} else {
AccountsDb::hash_account(slot, account, pubkey, INCLUDE_SLOT_IN_HASH_TESTS)
}
});
let expected_accounts_hash = AccountsHash(compute_merkle_root(incremental_account_hashes));
assert_eq!(incremental_accounts_hash.0, expected_accounts_hash);
}

fn compute_merkle_root(hashes: impl IntoIterator<Item = Hash>) -> Hash {
let hashes = hashes.into_iter().collect();
AccountsHasher::compute_merkle_root_recurse(hashes, MERKLE_FANOUT)
}
}
Loading

0 comments on commit 8c62927

Please sign in to comment.