Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Adds calculate_incremental_accounts_hash() #29734

Merged
merged 1 commit into from
Jan 17, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
258 changes: 235 additions & 23 deletions runtime/src/accounts_db.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ use {
accounts_cache::{AccountsCache, CachedAccount, SlotCache},
accounts_hash::{
AccountsHash, AccountsHasher, CalcAccountsHashConfig, CalculateHashIntermediate,
HashStats,
HashStats, ZeroLamportAccounts,
},
accounts_index::{
AccountIndexGetResult, AccountSecondaryIndexes, AccountsIndex, AccountsIndexConfig,
Expand Down Expand Up @@ -1223,7 +1223,8 @@ pub struct AccountsDb {
/// Set of storage paths to pick from
pub(crate) paths: Vec<PathBuf>,

accounts_hash_cache_path: PathBuf,
full_accounts_hash_cache_path: PathBuf,
incremental_accounts_hash_cache_path: PathBuf,

// used by tests
// holds this until we are dropped
Expand Down Expand Up @@ -2187,7 +2188,7 @@ impl<'a> AppendVecScan for ScanState<'a> {
}

impl AccountsDb {
pub const ACCOUNTS_HASH_CACHE_DIR: &str = "calculate_accounts_hash_cache";
pub const ACCOUNTS_HASH_CACHE_DIR: &str = "accounts_hash_cache";
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

maybe we need to delete 'calculate_accounts_hash_cache' on user's ledger?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, I was wondering how to handle this as well. I should've called it out here.

Since we'll need separate caches for "full" and "incremental" account hashes, which already breaks/changes the current path, I figured shortening this to "accounts_hash_cache" would be nice.

Want me to add code to check for "calculate_accounts_hash_cache" and delete the directory if it's found?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

not sure. just seems like a gotcha. this folder could be 30G today on mnb?


pub fn default_for_tests() -> Self {
Self::default_with_accounts_index(AccountInfoAccountsIndex::default_for_tests(), None)
Expand Down Expand Up @@ -2237,7 +2238,8 @@ impl AccountsDb {
write_cache_limit_bytes: None,
write_version: AtomicU64::new(0),
paths: vec![],
accounts_hash_cache_path,
full_accounts_hash_cache_path: accounts_hash_cache_path.join("full"),
incremental_accounts_hash_cache_path: accounts_hash_cache_path.join("incremental"),
temp_accounts_hash_cache_path,
shrink_paths: RwLock::new(None),
temp_paths: None,
Expand Down Expand Up @@ -5869,7 +5871,6 @@ impl AccountsDb {
if lamports == 0 {
return Hash::default();
}

let mut hasher = blake3::Hasher::new();

hasher.update(&lamports.to_le_bytes());
Expand Down Expand Up @@ -7200,17 +7201,9 @@ impl AccountsDb {
);
}

fn get_cache_hash_data(
&self,
config: &CalcAccountsHashConfig<'_>,
slot: Slot,
) -> CacheHashData {
Self::_get_cache_hash_data(self.accounts_hash_cache_path.clone(), config, slot)
}

/// normal code path returns the common cache path
/// when called after a failure has been detected, redirect the cache storage to a separate folder for debugging later
fn _get_cache_hash_data(
fn get_cache_hash_data(
accounts_hash_cache_path: PathBuf,
config: &CalcAccountsHashConfig<'_>,
slot: Slot,
Expand All @@ -7230,60 +7223,107 @@ impl AccountsDb {
// modeled after get_accounts_delta_hash
// intended to be faster than calculate_accounts_hash
pub fn calculate_accounts_hash_from_storages(
&self,
config: &CalcAccountsHashConfig<'_>,
storages: &SortedStorages<'_>,
stats: HashStats,
) -> Result<(AccountsHash, u64), BankHashVerificationError> {
self._calculate_accounts_hash_from_storages(
config,
storages,
stats,
CalcAccountsHashFlavor::Full,
self.full_accounts_hash_cache_path.clone(),
)
}

/// Calculate the incremental accounts hash
///
/// This calculation is intended to be used by incremental snapshots, and thus differs from a
/// "full" accounts hash in a few ways:
/// - Zero-lamport accounts are *included* in the hash because zero-lamport accounts are also
/// included in the incremental snapshot. This ensures reconstructing the AccountsDb is
/// still correct when using this incremental accounts hash.
/// - `storages` must be *greater than* `base_slot`. This follows the same requirements as
/// incremental snapshots.
pub fn calculate_incremental_accounts_hash(
&self,
config: &CalcAccountsHashConfig<'_>,
storages: &SortedStorages<'_>,
base_slot: Slot,
stats: HashStats,
) -> Result<(AccountsHash, /* capitalization */ u64), BankHashVerificationError> {
assert!(storages.range().start > base_slot, "The storages for calculating an incremental accounts hash must all be higher than the base slot");
self._calculate_accounts_hash_from_storages(
config,
storages,
stats,
CalcAccountsHashFlavor::Incremental,
self.incremental_accounts_hash_cache_path.clone(),
)
}

fn _calculate_accounts_hash_from_storages(
&self,
config: &CalcAccountsHashConfig<'_>,
storages: &SortedStorages<'_>,
mut stats: HashStats,
flavor: CalcAccountsHashFlavor,
accounts_hash_cache_path: PathBuf,
) -> Result<(AccountsHash, u64), BankHashVerificationError> {
let _guard = self.active_stats.activate(ActiveStatItem::Hash);
stats.oldest_root = storages.range().start;

self.mark_old_slots_as_dirty(storages, config.epoch_schedule.slots_per_epoch, &mut stats);

let use_bg_thread_pool = config.use_bg_thread_pool;
let mut scan_and_hash = || {
let cache_hash_data = self.get_cache_hash_data(config, storages.max_slot_inclusive());
let scan_and_hash = || {
let cache_hash_data = Self::get_cache_hash_data(
accounts_hash_cache_path,
config,
storages.max_slot_inclusive(),
);

let bounds = Range {
start: 0,
end: PUBKEY_BINS_FOR_CALCULATING_HASHES,
};

let hash = AccountsHasher {
let accounts_hasher = AccountsHasher {
filler_account_suffix: if self.filler_accounts_config.count > 0 {
self.filler_account_suffix
} else {
None
},
zero_lamport_accounts: flavor.zero_lamport_accounts(),
};

// get raw data by scanning
let result = self.scan_snapshot_stores_with_cache(
let cache_hash_data_files = self.scan_snapshot_stores_with_cache(
&cache_hash_data,
storages,
&mut stats,
PUBKEY_BINS_FOR_CALCULATING_HASHES,
&bounds,
config,
hash.filler_account_suffix.as_ref(),
accounts_hasher.filler_account_suffix.as_ref(),
)?;

// convert mmapped cache files into slices of data
let slices = result
let cache_hash_intermediates = cache_hash_data_files
.iter()
.map(|d| d.get_cache_hash_data())
.collect::<Vec<_>>();

// rework slices of data into bins for parallel processing and to match data shape expected by 'rest_of_hash_calculation'
let result = AccountsHasher::get_binned_data(
&slices,
&cache_hash_intermediates,
PUBKEY_BINS_FOR_CALCULATING_HASHES,
&bounds,
);

// turn raw data into merkle tree hashes and sum of lamports
let final_result = hash.rest_of_hash_calculation(result, &mut stats);

let final_result = accounts_hasher.rest_of_hash_calculation(result, &mut stats);
info!(
"calculate_accounts_hash_from_storages: slot: {} {:?}",
storages.max_slot_inclusive(),
Expand Down Expand Up @@ -8986,6 +9026,23 @@ pub enum CalcAccountsHashDataSource {
Storages,
}

/// Which accounts hash calculation is being performed?
#[derive(Debug)]
enum CalcAccountsHashFlavor {
Full,
Incremental,
}

impl CalcAccountsHashFlavor {
/// How should zero-lamport accounts be handled by this accounts hash calculation?
fn zero_lamport_accounts(&self) -> ZeroLamportAccounts {
match self {
CalcAccountsHashFlavor::Full => ZeroLamportAccounts::Excluded,
CalcAccountsHashFlavor::Incremental => ZeroLamportAccounts::Included,
}
}
}

#[cfg(test)]
impl AccountsDb {
pub fn new(paths: Vec<PathBuf>, cluster_type: &ClusterType) -> Self {
Expand Down Expand Up @@ -17397,4 +17454,159 @@ pub mod tests {
make_ancient_append_vec_full(full.new_storage());
full
}

#[test]
fn test_calculate_incremental_accounts_hash() {
let accounts_db =
AccountsDb::new_for_tests_with_caching(Vec::new(), &ClusterType::Development);

let owner = Pubkey::new_unique();
let mut accounts: Vec<_> = (0..10)
.map(|_| (Pubkey::new_unique(), AccountSharedData::new(0, 0, &owner)))
.collect();

// store some accounts into slot 0
let slot = 0;
{
accounts[0].1.set_lamports(0);
accounts[1].1.set_lamports(1);
accounts[2].1.set_lamports(10);
accounts[3].1.set_lamports(100);
//accounts[4].1.set_lamports(1_000); <-- will be added next slot

let accounts = vec![
(&accounts[0].0, &accounts[0].1),
(&accounts[1].0, &accounts[1].1),
(&accounts[2].0, &accounts[2].1),
(&accounts[3].0, &accounts[3].1),
];
accounts_db.store_cached((slot, accounts.as_slice()), None);
accounts_db.add_root_and_flush_write_cache(slot);
}

// store some accounts into slot 1
let slot = slot + 1;
{
//accounts[0].1.set_lamports(0); <-- unchanged
accounts[1].1.set_lamports(0); /* <-- drain account */
//accounts[2].1.set_lamports(10); <-- unchanged
//accounts[3].1.set_lamports(100); <-- unchanged
accounts[4].1.set_lamports(1_000); /* <-- add account */

let accounts = vec![
(&accounts[1].0, &accounts[1].1),
(&accounts[4].0, &accounts[4].1),
];
accounts_db.store_cached((slot, accounts.as_slice()), None);
accounts_db.add_root_and_flush_write_cache(slot);
}

// calculate the full accounts hash
let full_accounts_hash = {
accounts_db.clean_accounts(Some(slot - 1), false, None);
let (storages, _) = accounts_db.get_snapshot_storages(..=slot, None);
let storages = SortedStorages::new(&storages);
accounts_db
.calculate_accounts_hash_from_storages(
&CalcAccountsHashConfig::default(),
&storages,
HashStats::default(),
)
.unwrap()
};
assert_eq!(full_accounts_hash.1, 1_110);
let full_accounts_hash_slot = slot;

// Calculate the expected full accounts hash here and ensure it matches.
// Ensure the zero-lamport accounts are NOT included in the full accounts hash.
let full_account_hashes = [(2, 0), (3, 0), (4, 1)].into_iter().map(|(index, slot)| {
let (pubkey, account) = &accounts[index];
AccountsDb::hash_account(slot, account, pubkey, INCLUDE_SLOT_IN_HASH_TESTS)
});
let expected_accounts_hash = AccountsHash(compute_merkle_root(full_account_hashes));
assert_eq!(full_accounts_hash.0, expected_accounts_hash);

// store accounts into slot 2
let slot = slot + 1;
{
//accounts[0].1.set_lamports(0); <-- unchanged
//accounts[1].1.set_lamports(0); <-- unchanged
accounts[2].1.set_lamports(0); /* <-- drain account */
//accounts[3].1.set_lamports(100); <-- unchanged
//accounts[4].1.set_lamports(1_000); <-- unchanged
accounts[5].1.set_lamports(10_000); /* <-- add account */
accounts[6].1.set_lamports(100_000); /* <-- add account */
//accounts[7].1.set_lamports(1_000_000); <-- will be added next slot

let accounts = vec![
(&accounts[2].0, &accounts[2].1),
(&accounts[5].0, &accounts[5].1),
(&accounts[6].0, &accounts[6].1),
];
accounts_db.store_cached((slot, accounts.as_slice()), None);
accounts_db.add_root_and_flush_write_cache(slot);
}

// store accounts into slot 3
let slot = slot + 1;
{
//accounts[0].1.set_lamports(0); <-- unchanged
//accounts[1].1.set_lamports(0); <-- unchanged
//accounts[2].1.set_lamports(0); <-- unchanged
accounts[3].1.set_lamports(0); /* <-- drain account */
//accounts[4].1.set_lamports(1_000); <-- unchanged
accounts[5].1.set_lamports(0); /* <-- drain account */
//accounts[6].1.set_lamports(100_000); <-- unchanged
accounts[7].1.set_lamports(1_000_000); /* <-- add account */

let accounts = vec![
(&accounts[3].0, &accounts[3].1),
(&accounts[5].0, &accounts[5].1),
(&accounts[7].0, &accounts[7].1),
];
accounts_db.store_cached((slot, accounts.as_slice()), None);
accounts_db.add_root_and_flush_write_cache(slot);
}

// calculate the incremental accounts hash
let incremental_accounts_hash = {
accounts_db.clean_accounts(Some(slot - 1), false, Some(full_accounts_hash_slot));
let (storages, _) =
accounts_db.get_snapshot_storages(full_accounts_hash_slot + 1..=slot, None);
let storages = SortedStorages::new(&storages);
accounts_db
.calculate_incremental_accounts_hash(
&CalcAccountsHashConfig::default(),
&storages,
full_accounts_hash_slot,
HashStats::default(),
)
.unwrap()
};
assert_eq!(incremental_accounts_hash.1, 1_100_000);

// Ensure the zero-lamport accounts are included in the IAH.
// Accounts 2, 3, and 5 are all zero-lamports.
let incremental_account_hashes =
[(2, 2), (3, 3), (5, 3), (6, 2), (7, 3)]
.into_iter()
.map(|(index, slot)| {
let (pubkey, account) = &accounts[index];
if account.is_zero_lamport() {
// For incremental accounts hash, the hash of a zero lamport account is the hash of its pubkey.
// Ensure this implementation detail remains in sync with AccountsHasher::de_dup_in_parallel().
let hash = blake3::hash(bytemuck::bytes_of(pubkey));
Hash::new_from_array(hash.into())
} else {
AccountsDb::hash_account(slot, account, pubkey, INCLUDE_SLOT_IN_HASH_TESTS)
}
});
let expected_accounts_hash = AccountsHash(compute_merkle_root(incremental_account_hashes));
assert_eq!(incremental_accounts_hash.0, expected_accounts_hash);
}

fn compute_merkle_root(hashes: impl IntoIterator<Item = Hash>) -> Hash {
let hashes = hashes.into_iter().collect();
AccountsHasher::compute_merkle_root_recurse(hashes, MERKLE_FANOUT)
}
}
Loading