From deec1f450d1f2eae40158f481a37f1e8be79ca03 Mon Sep 17 00:00:00 2001 From: Alessandro Decina Date: Sat, 20 Apr 2024 08:21:30 +1000 Subject: [PATCH] runtime: parallelize Stakes::new using rayon (#450) * runtime: Stakes::new: avoid extra loops over stakes.stake_delegations * runtime: parallelize Stakes::new with rayon * Address review comments * Address more comments --- runtime/src/bank.rs | 6 ++-- runtime/src/stakes.rs | 83 +++++++++++++++++++++++++------------------ 2 files changed, 52 insertions(+), 37 deletions(-) diff --git a/runtime/src/bank.rs b/runtime/src/bank.rs index d2ab05b4d9..84b1aee9f7 100644 --- a/runtime/src/bank.rs +++ b/runtime/src/bank.rs @@ -1587,10 +1587,11 @@ impl Bank { // from Stakes by reading the full account state from // accounts-db. Note that it is crucial that these accounts are loaded // at the right slot and match precisely with serialized Delegations. + // // Note that we are disabling the read cache while we populate the stakes cache. // The stakes accounts will not be expected to be loaded again. // If we populate the read cache with these loads, then we'll just soon have to evict these. - let stakes = Stakes::new(&fields.stakes, |pubkey| { + let (stakes, stakes_time) = measure!(Stakes::new(&fields.stakes, |pubkey| { let (account, _slot) = bank_rc .accounts .load_with_fixed_root_do_not_populate_read_cache(&ancestors, pubkey)?; @@ -1599,7 +1600,8 @@ impl Bank { .expect( "Stakes cache is inconsistent with accounts-db. This can indicate \ a corrupted snapshot or bugs in cached accounts or accounts-db.", - ); + )); + info!("Loading Stakes took: {stakes_time}"); let stakes_accounts_load_duration = now.elapsed(); let mut bank = Self { skipped_rewrites: Mutex::default(), diff --git a/runtime/src/stakes.rs b/runtime/src/stakes.rs index 45192e919d..263b2bea22 100644 --- a/runtime/src/stakes.rs +++ b/runtime/src/stakes.rs @@ -18,7 +18,7 @@ use { }, solana_vote::vote_account::{VoteAccount, VoteAccounts}, std::{ - collections::{HashMap, HashSet}, + collections::HashMap, ops::Add, sync::{Arc, RwLock, RwLockReadGuard}, }, @@ -227,22 +227,53 @@ impl Stakes { /// cached. pub(crate) fn new(stakes: &Stakes, get_account: F) -> Result where - F: Fn(&Pubkey) -> Option, + F: Fn(&Pubkey) -> Option + Sync, { - let stake_delegations = stakes.stake_delegations.iter().map(|(pubkey, delegation)| { - let Some(stake_account) = get_account(pubkey) else { - return Err(Error::StakeAccountNotFound(*pubkey)); - }; - let stake_account = StakeAccount::try_from(stake_account)?; - // Sanity check that the delegation is consistent with what is - // stored in the account. - if stake_account.delegation() == *delegation { - Ok((*pubkey, stake_account)) - } else { - Err(Error::InvalidDelegation(*pubkey)) - } - }); + let stake_delegations = stakes + .stake_delegations + .iter() + // im::HashMap doesn't support rayon so we manually build a temporary vector. Note this is + // what std HashMap::par_iter() does internally too. + .collect::>() + .into_par_iter() + // We use fold/reduce to aggregate the results, which does a bit more work than calling + // collect()/collect_vec_list() and then im::HashMap::from_iter(collected.into_iter()), + // but it does it in background threads, so effectively it's faster. + .try_fold(ImHashMap::new, |mut map, (pubkey, delegation)| { + let Some(stake_account) = get_account(pubkey) else { + return Err(Error::StakeAccountNotFound(*pubkey)); + }; + + // Assert that all valid vote-accounts referenced in stake delegations are already + // contained in `stakes.vote_account`. + let voter_pubkey = &delegation.voter_pubkey; + if stakes.vote_accounts.get(voter_pubkey).is_none() { + if let Some(account) = get_account(voter_pubkey) { + if VoteStateVersions::is_correct_size_and_initialized(account.data()) + && VoteAccount::try_from(account.clone()).is_ok() + { + error!("vote account not cached: {voter_pubkey}, {account:?}"); + return Err(Error::VoteAccountNotCached(*voter_pubkey)); + } + } + } + + let stake_account = StakeAccount::try_from(stake_account)?; + // Sanity check that the delegation is consistent with what is + // stored in the account. + if stake_account.delegation() == *delegation { + map.insert(*pubkey, stake_account); + Ok(map) + } else { + Err(Error::InvalidDelegation(*pubkey)) + } + }) + .try_reduce(ImHashMap::new, |a, b| Ok(a.union(b)))?; + // Assert that cached vote accounts are consistent with accounts-db. + // + // This currently includes ~5500 accounts, parallelizing brings minor + // (sub 2s) improvements. for (pubkey, vote_account) in stakes.vote_accounts.iter() { let Some(account) = get_account(pubkey) else { return Err(Error::VoteAccountNotFound(*pubkey)); @@ -253,28 +284,10 @@ impl Stakes { return Err(Error::VoteAccountMismatch(*pubkey)); } } - // Assert that all valid vote-accounts referenced in - // stake delegations are already cached. - let voter_pubkeys: HashSet = stakes - .stake_delegations - .values() - .map(|delegation| delegation.voter_pubkey) - .filter(|voter_pubkey| stakes.vote_accounts.get(voter_pubkey).is_none()) - .collect(); - for pubkey in voter_pubkeys { - let Some(account) = get_account(&pubkey) else { - continue; - }; - if VoteStateVersions::is_correct_size_and_initialized(account.data()) - && VoteAccount::try_from(account.clone()).is_ok() - { - error!("vote account not cached: {pubkey}, {account:?}"); - return Err(Error::VoteAccountNotCached(pubkey)); - } - } + Ok(Self { vote_accounts: stakes.vote_accounts.clone(), - stake_delegations: stake_delegations.collect::>()?, + stake_delegations, unused: stakes.unused, epoch: stakes.epoch, stake_history: stakes.stake_history.clone(),