diff --git a/core/src/commitment_service.rs b/core/src/commitment_service.rs index 6eda84ae5be585..2f37b9f09b4d5c 100644 --- a/core/src/commitment_service.rs +++ b/core/src/commitment_service.rs @@ -8,6 +8,7 @@ use solana_runtime::bank::Bank; use solana_sdk::clock::Slot; use solana_vote_program::vote_state::VoteState; use std::{ + cmp::max, collections::HashMap, sync::atomic::{AtomicBool, Ordering}, sync::mpsc::{channel, Receiver, RecvTimeoutError, Sender}, @@ -103,27 +104,8 @@ impl AggregateCommitmentService { } let mut aggregate_commitment_time = Measure::start("aggregate-commitment-ms"); - let (block_commitment, rooted_stake) = - Self::aggregate_commitment(&ancestors, &aggregation_data.bank); - - let largest_confirmed_root = - get_largest_confirmed_root(rooted_stake, aggregation_data.total_staked); - - let mut new_block_commitment = BlockCommitmentCache::new( - block_commitment, - largest_confirmed_root, - aggregation_data.total_staked, - aggregation_data.bank, - block_commitment_cache.read().unwrap().blockstore.clone(), - aggregation_data.root, - aggregation_data.root, - ); - new_block_commitment.highest_confirmed_slot = - new_block_commitment.calculate_highest_confirmed_slot(); - - let mut w_block_commitment_cache = block_commitment_cache.write().unwrap(); - - std::mem::swap(&mut *w_block_commitment_cache, &mut new_block_commitment); + let cache_slot_info = + Self::update_commitment_cache(block_commitment_cache, aggregation_data, ancestors); aggregate_commitment_time.stop(); datapoint_info!( "block-commitment-cache", @@ -134,12 +116,50 @@ impl AggregateCommitmentService { ) ); - subscriptions.notify_subscribers(CacheSlotInfo { - current_slot: w_block_commitment_cache.slot(), - node_root: w_block_commitment_cache.root(), - largest_confirmed_root: w_block_commitment_cache.largest_confirmed_root(), - highest_confirmed_slot: w_block_commitment_cache.highest_confirmed_slot(), - }); + // Triggers rpc_subscription notifications as soon as new commitment data is available, + // sending just the commitment cache slot information that the notifications thread + // needs + subscriptions.notify_subscribers(cache_slot_info); + } + } + + fn update_commitment_cache( + block_commitment_cache: &RwLock, + aggregation_data: CommitmentAggregationData, + ancestors: Vec, + ) -> CacheSlotInfo { + let (block_commitment, rooted_stake) = + Self::aggregate_commitment(&ancestors, &aggregation_data.bank); + + let largest_confirmed_root = + get_largest_confirmed_root(rooted_stake, aggregation_data.total_staked); + + let mut new_block_commitment = BlockCommitmentCache::new( + block_commitment, + largest_confirmed_root, + aggregation_data.total_staked, + aggregation_data.bank, + block_commitment_cache.read().unwrap().blockstore.clone(), + aggregation_data.root, + aggregation_data.root, + ); + new_block_commitment.highest_confirmed_slot = + new_block_commitment.calculate_highest_confirmed_slot(); + + let mut w_block_commitment_cache = block_commitment_cache.write().unwrap(); + + let largest_confirmed_root = max( + new_block_commitment.largest_confirmed_root(), + w_block_commitment_cache.largest_confirmed_root(), + ); + new_block_commitment.set_largest_confirmed_root(largest_confirmed_root); + + std::mem::swap(&mut *w_block_commitment_cache, &mut new_block_commitment); + CacheSlotInfo { + current_slot: w_block_commitment_cache.slot(), + node_root: w_block_commitment_cache.root(), + largest_confirmed_root: w_block_commitment_cache.largest_confirmed_root(), + highest_confirmed_slot: w_block_commitment_cache.highest_confirmed_slot(), } } @@ -225,10 +245,24 @@ impl AggregateCommitmentService { #[cfg(test)] mod tests { use super::*; - use solana_ledger::genesis_utils::{create_genesis_config, GenesisConfigInfo}; - use solana_sdk::pubkey::Pubkey; + use solana_ledger::{ + bank_forks::BankForks, + blockstore::Blockstore, + genesis_utils::{create_genesis_config, GenesisConfigInfo}, + get_tmp_ledger_path, + }; + use solana_runtime::genesis_utils::{ + create_genesis_config_with_vote_accounts, ValidatorVoteKeypairs, + }; + use solana_sdk::{ + pubkey::Pubkey, + signature::{Keypair, Signer}, + }; use solana_stake_program::stake_state; - use solana_vote_program::vote_state::{self, VoteStateVersions}; + use solana_vote_program::{ + vote_state::{self, VoteStateVersions}, + vote_transaction, + }; #[test] fn test_get_largest_confirmed_root() { @@ -451,4 +485,163 @@ mod tests { assert_eq!(rooted_stake.len(), 2); assert_eq!(get_largest_confirmed_root(rooted_stake, 100), 1) } + + #[test] + fn test_highest_confirmed_root_advance() { + fn get_vote_account_root_slot(vote_pubkey: Pubkey, bank: &Arc) -> Slot { + let account = &bank.vote_accounts()[&vote_pubkey].1; + let vote_state = VoteState::from(account).unwrap(); + vote_state.root_slot.unwrap() + } + + let ledger_path = get_tmp_ledger_path!(); + { + let blockstore = Arc::new(Blockstore::open(&ledger_path).unwrap()); + let block_commitment_cache = RwLock::new( + BlockCommitmentCache::new_for_tests_with_blockstore(blockstore), + ); + + let node_keypair = Keypair::new().to_bytes(); + let vote_keypair = Keypair::new().to_bytes(); + let stake_keypair = Keypair::new().to_bytes(); + let validator_keypairs = vec![ValidatorVoteKeypairs { + node_keypair: Keypair::from_bytes(&node_keypair).unwrap(), + vote_keypair: Keypair::from_bytes(&vote_keypair).unwrap(), + stake_keypair: Keypair::from_bytes(&stake_keypair).unwrap(), + }]; + let GenesisConfigInfo { + genesis_config, + mint_keypair, + voting_keypair: _, + } = create_genesis_config_with_vote_accounts(1_000_000_000, &validator_keypairs, 100); + + let node_keypair = Keypair::from_bytes(&node_keypair).unwrap(); + let vote_keypair = Keypair::from_bytes(&vote_keypair).unwrap(); + + let bank0 = Bank::new(&genesis_config); + bank0 + .transfer(100_000, &mint_keypair, &node_keypair.pubkey()) + .unwrap(); + let mut bank_forks = BankForks::new(bank0); + + // Fill bank_forks with banks with votes landing in the next slot + // Create enough banks such that vote account will root slots 0 and 1 + for x in 0..33 { + let previous_bank = bank_forks.get(x).unwrap(); + let bank = Bank::new_from_parent(previous_bank, &Pubkey::default(), x + 1); + let vote = vote_transaction::new_vote_transaction( + vec![x], + previous_bank.hash(), + previous_bank.last_blockhash(), + &node_keypair, + &vote_keypair, + &vote_keypair, + ); + bank.process_transaction(&vote).unwrap(); + bank_forks.insert(bank); + } + + let working_bank = bank_forks.working_bank(); + let root = get_vote_account_root_slot(vote_keypair.pubkey(), &working_bank); + for x in 0..root { + bank_forks.set_root(x, &None, None); + } + + // Add an additional bank/vote that will root slot 2 + let bank33 = bank_forks.get(33).unwrap(); + let bank34 = Bank::new_from_parent(bank33, &Pubkey::default(), 34); + let vote33 = vote_transaction::new_vote_transaction( + vec![33], + bank33.hash(), + bank33.last_blockhash(), + &node_keypair, + &vote_keypair, + &vote_keypair, + ); + bank34.process_transaction(&vote33).unwrap(); + bank_forks.insert(bank34); + + let working_bank = bank_forks.working_bank(); + let root = get_vote_account_root_slot(vote_keypair.pubkey(), &working_bank); + let ancestors = working_bank.status_cache_ancestors(); + let _ = AggregateCommitmentService::update_commitment_cache( + &block_commitment_cache, + CommitmentAggregationData { + bank: working_bank, + root: 0, + total_staked: 100, + }, + ancestors, + ); + let largest_confirmed_root = block_commitment_cache + .read() + .unwrap() + .largest_confirmed_root(); + bank_forks.set_root(root, &None, Some(largest_confirmed_root)); + let largest_confirmed_root_bank = bank_forks.get(largest_confirmed_root); + assert!(largest_confirmed_root_bank.is_some()); + + // Add a forked bank. Because the vote for bank 33 landed in the non-ancestor, the vote + // account's root (and thus the highest_confirmed_root) rolls back to slot 1 + let bank33 = bank_forks.get(33).unwrap(); + let bank35 = Bank::new_from_parent(bank33, &Pubkey::default(), 35); + bank_forks.insert(bank35); + + let working_bank = bank_forks.working_bank(); + let ancestors = working_bank.status_cache_ancestors(); + let _ = AggregateCommitmentService::update_commitment_cache( + &block_commitment_cache, + CommitmentAggregationData { + bank: working_bank, + root: 1, + total_staked: 100, + }, + ancestors, + ); + let largest_confirmed_root = block_commitment_cache + .read() + .unwrap() + .largest_confirmed_root(); + let largest_confirmed_root_bank = bank_forks.get(largest_confirmed_root); + assert!(largest_confirmed_root_bank.is_some()); + + // Add additional banks beyond lockout built on the new fork to ensure that behavior + // continues normally + for x in 35..=37 { + let previous_bank = bank_forks.get(x).unwrap(); + let bank = Bank::new_from_parent(previous_bank, &Pubkey::default(), x + 1); + let vote = vote_transaction::new_vote_transaction( + vec![x], + previous_bank.hash(), + previous_bank.last_blockhash(), + &node_keypair, + &vote_keypair, + &vote_keypair, + ); + bank.process_transaction(&vote).unwrap(); + bank_forks.insert(bank); + } + + let working_bank = bank_forks.working_bank(); + let root = get_vote_account_root_slot(vote_keypair.pubkey(), &working_bank); + let ancestors = working_bank.status_cache_ancestors(); + let _ = AggregateCommitmentService::update_commitment_cache( + &block_commitment_cache, + CommitmentAggregationData { + bank: working_bank, + root: 0, + total_staked: 100, + }, + ancestors, + ); + let largest_confirmed_root = block_commitment_cache + .read() + .unwrap() + .largest_confirmed_root(); + bank_forks.set_root(root, &None, Some(largest_confirmed_root)); + let largest_confirmed_root_bank = bank_forks.get(largest_confirmed_root); + assert!(largest_confirmed_root_bank.is_some()); + } + Blockstore::destroy(&ledger_path).unwrap(); + } }