diff --git a/core/src/tvu.rs b/core/src/tvu.rs index c3f94efe0dc911..d498ab405d39aa 100644 --- a/core/src/tvu.rs +++ b/core/src/tvu.rs @@ -228,7 +228,7 @@ impl Tvu { leader_schedule_cache.clone(), verified_vote_receiver, completed_data_sets_sender, - duplicate_slots_sender, + duplicate_slots_sender.clone(), ancestor_hashes_replay_update_receiver, dumped_slots_receiver, popular_pruned_forks_sender, @@ -337,6 +337,7 @@ impl Tvu { blockstore, leader_schedule_cache.clone(), bank_forks.clone(), + duplicate_slots_sender, ), ); diff --git a/gossip/src/duplicate_shred.rs b/gossip/src/duplicate_shred.rs index 70e56d35e82334..85f4f4fa0cf149 100644 --- a/gossip/src/duplicate_shred.rs +++ b/gossip/src/duplicate_shred.rs @@ -56,6 +56,8 @@ pub enum Error { BlockstoreInsertFailed(#[from] BlockstoreError), #[error("data chunk mismatch")] DataChunkMismatch, + #[error("unable to send duplicate slot to state machine")] + DuplicateSlotSenderFailure, #[error("invalid chunk_index: {chunk_index}, num_chunks: {num_chunks}")] InvalidChunkIndex { chunk_index: u8, num_chunks: u8 }, #[error("invalid duplicate shreds")] diff --git a/gossip/src/duplicate_shred_handler.rs b/gossip/src/duplicate_shred_handler.rs index 1410e8262f027d..e7b4cd0466fe75 100644 --- a/gossip/src/duplicate_shred_handler.rs +++ b/gossip/src/duplicate_shred_handler.rs @@ -3,11 +3,13 @@ use { duplicate_shred::{self, DuplicateShred, Error}, duplicate_shred_listener::DuplicateShredHandlerTrait, }, + crossbeam_channel::Sender, log::error, solana_ledger::{blockstore::Blockstore, leader_schedule_cache::LeaderScheduleCache}, solana_runtime::bank_forks::BankForks, solana_sdk::{ clock::{Epoch, Slot}, + feature_set, pubkey::Pubkey, }, std::{ @@ -44,6 +46,8 @@ pub struct DuplicateShredHandler { cached_on_epoch: Epoch, cached_staked_nodes: Arc>, cached_slots_in_epoch: u64, + // Used to notify duplicate consensus state machine + duplicate_slots_sender: Sender, } impl DuplicateShredHandlerTrait for DuplicateShredHandler { @@ -63,6 +67,7 @@ impl DuplicateShredHandler { blockstore: Arc, leader_schedule_cache: Arc, bank_forks: Arc>, + duplicate_slots_sender: Sender, ) -> Self { Self { buffer: HashMap::<(Slot, Pubkey), BufferEntry>::default(), @@ -74,6 +79,7 @@ impl DuplicateShredHandler { blockstore, leader_schedule_cache, bank_forks, + duplicate_slots_sender, } } @@ -131,12 +137,30 @@ impl DuplicateShredHandler { shred1.into_payload(), shred2.into_payload(), )?; + if self.should_notify_state_machine(slot) { + // Notify duplicate consensus state machine + self.duplicate_slots_sender + .send(slot) + .map_err(|_| Error::DuplicateSlotSenderFailure)?; + } } self.consumed.insert(slot, true); } Ok(()) } + fn should_notify_state_machine(&self, slot: Slot) -> bool { + let root_bank = self.bank_forks.read().unwrap().root_bank(); + let Some(activated_slot) = root_bank + .feature_set + .activated_slot(&feature_set::enable_gossip_duplicate_proof_ingestion::id()) + else { + return false; + }; + root_bank.epoch_schedule().get_epoch(slot) + > root_bank.epoch_schedule().get_epoch(activated_slot) + } + fn should_consume_slot(&mut self, slot: Slot) -> bool { slot > self.last_root && slot < self.last_root.saturating_add(self.cached_slots_in_epoch) @@ -211,12 +235,14 @@ mod tests { cluster_info::DUPLICATE_SHRED_MAX_PAYLOAD_SIZE, duplicate_shred::{from_shred, tests::new_rand_shred}, }, + crossbeam_channel::unbounded, + itertools::Itertools, solana_ledger::{ genesis_utils::{create_genesis_config_with_leader, GenesisConfigInfo}, get_tmp_ledger_path_auto_delete, shred::Shredder, }, - solana_runtime::bank::Bank, + solana_runtime::{accounts_background_service::AbsRequestSender, bank::Bank}, solana_sdk::{ signature::{Keypair, Signer}, timing::timestamp, @@ -271,16 +297,34 @@ mod tests { let my_pubkey = my_keypair.pubkey(); let genesis_config_info = create_genesis_config_with_leader(10_000, &my_pubkey, 10_000); let GenesisConfigInfo { genesis_config, .. } = genesis_config_info; - let bank_forks = BankForks::new_rw_arc(Bank::new_for_tests(&genesis_config)); + let mut bank = Bank::new_for_tests(&genesis_config); + bank.activate_feature(&feature_set::enable_gossip_duplicate_proof_ingestion::id()); + let slots_in_epoch = bank.get_epoch_info().slots_in_epoch; + let bank_forks_arc = BankForks::new_rw_arc(bank); + { + let mut bank_forks = bank_forks_arc.write().unwrap(); + let bank0 = bank_forks.get(0).unwrap(); + bank_forks.insert(Bank::new_from_parent(bank0.clone(), &Pubkey::default(), 9)); + bank_forks.set_root(9, &AbsRequestSender::default(), None); + } + blockstore.set_roots([0, 9].iter()).unwrap(); let leader_schedule_cache = Arc::new(LeaderScheduleCache::new_from_bank( - &bank_forks.read().unwrap().working_bank(), + &bank_forks_arc.read().unwrap().working_bank(), )); - let mut duplicate_shred_handler = - DuplicateShredHandler::new(blockstore.clone(), leader_schedule_cache, bank_forks); + let (sender, receiver) = unbounded(); + // The feature will only be activated at Epoch 1. + let start_slot: Slot = slots_in_epoch + 1; + + let mut duplicate_shred_handler = DuplicateShredHandler::new( + blockstore.clone(), + leader_schedule_cache, + bank_forks_arc, + sender, + ); let chunks = create_duplicate_proof( my_keypair.clone(), None, - 1, + start_slot, None, DUPLICATE_SHRED_MAX_PAYLOAD_SIZE, ) @@ -288,20 +332,24 @@ mod tests { let chunks1 = create_duplicate_proof( my_keypair.clone(), None, - 2, + start_slot + 1, None, DUPLICATE_SHRED_MAX_PAYLOAD_SIZE, ) .unwrap(); - assert!(!blockstore.has_duplicate_shreds_in_slot(1)); - assert!(!blockstore.has_duplicate_shreds_in_slot(2)); + assert!(!blockstore.has_duplicate_shreds_in_slot(start_slot)); + assert!(!blockstore.has_duplicate_shreds_in_slot(start_slot + 1)); // Test that two proofs are mixed together, but we can store the proofs fine. for (chunk1, chunk2) in chunks.zip(chunks1) { duplicate_shred_handler.handle(chunk1); duplicate_shred_handler.handle(chunk2); } - assert!(blockstore.has_duplicate_shreds_in_slot(1)); - assert!(blockstore.has_duplicate_shreds_in_slot(2)); + assert!(blockstore.has_duplicate_shreds_in_slot(start_slot)); + assert!(blockstore.has_duplicate_shreds_in_slot(start_slot + 1)); + assert_eq!( + receiver.try_iter().collect_vec(), + vec![start_slot, start_slot + 1] + ); // Test all kinds of bad proofs. for error in [ @@ -312,7 +360,7 @@ mod tests { match create_duplicate_proof( my_keypair.clone(), None, - 3, + start_slot + 2, Some(error), DUPLICATE_SHRED_MAX_PAYLOAD_SIZE, ) { @@ -321,7 +369,8 @@ mod tests { for chunk in chunks { duplicate_shred_handler.handle(chunk); } - assert!(!blockstore.has_duplicate_shreds_in_slot(3)); + assert!(!blockstore.has_duplicate_shreds_in_slot(start_slot + 2)); + assert!(receiver.is_empty()); } } } @@ -337,13 +386,29 @@ mod tests { let my_pubkey = my_keypair.pubkey(); let genesis_config_info = create_genesis_config_with_leader(10_000, &my_pubkey, 10_000); let GenesisConfigInfo { genesis_config, .. } = genesis_config_info; - let bank_forks = BankForks::new_rw_arc(Bank::new_for_tests(&genesis_config)); + let mut bank = Bank::new_for_tests(&genesis_config); + bank.activate_feature(&feature_set::enable_gossip_duplicate_proof_ingestion::id()); + let slots_in_epoch = bank.get_epoch_info().slots_in_epoch; + let bank_forks_arc = BankForks::new_rw_arc(bank); + { + let mut bank_forks = bank_forks_arc.write().unwrap(); + let bank0 = bank_forks.get(0).unwrap(); + bank_forks.insert(Bank::new_from_parent(bank0.clone(), &Pubkey::default(), 9)); + bank_forks.set_root(9, &AbsRequestSender::default(), None); + } + blockstore.set_roots([0, 9].iter()).unwrap(); let leader_schedule_cache = Arc::new(LeaderScheduleCache::new_from_bank( - &bank_forks.read().unwrap().working_bank(), + &bank_forks_arc.read().unwrap().working_bank(), )); - let mut duplicate_shred_handler = - DuplicateShredHandler::new(blockstore.clone(), leader_schedule_cache, bank_forks); - let start_slot: Slot = 1; + let (sender, receiver) = unbounded(); + let mut duplicate_shred_handler = DuplicateShredHandler::new( + blockstore.clone(), + leader_schedule_cache, + bank_forks_arc, + sender, + ); + // The feature will only be activated at Epoch 1. + let start_slot: Slot = slots_in_epoch + 1; // This proof will not be accepted because num_chunks is too large. let chunks = create_duplicate_proof( @@ -358,6 +423,7 @@ mod tests { duplicate_shred_handler.handle(chunk); } assert!(!blockstore.has_duplicate_shreds_in_slot(start_slot)); + assert!(receiver.is_empty()); // This proof will be rejected because the slot is too far away in the future. let future_slot = @@ -374,6 +440,7 @@ mod tests { duplicate_shred_handler.handle(chunk); } assert!(!blockstore.has_duplicate_shreds_in_slot(future_slot)); + assert!(receiver.is_empty()); // Send in two proofs, the first proof showing up will be accepted, the following // proofs will be discarded. @@ -388,10 +455,54 @@ mod tests { // handle chunk 0 of the first proof. duplicate_shred_handler.handle(chunks.next().unwrap()); assert!(!blockstore.has_duplicate_shreds_in_slot(start_slot)); + assert!(receiver.is_empty()); // Now send in the rest of the first proof, it will succeed. for chunk in chunks { duplicate_shred_handler.handle(chunk); } assert!(blockstore.has_duplicate_shreds_in_slot(start_slot)); + assert_eq!(receiver.try_iter().collect_vec(), vec![start_slot]); + } + + #[test] + fn test_feature_disabled() { + let ledger_path = get_tmp_ledger_path_auto_delete!(); + let blockstore = Arc::new(Blockstore::open(ledger_path.path()).unwrap()); + let my_keypair = Arc::new(Keypair::new()); + let my_pubkey = my_keypair.pubkey(); + let genesis_config_info = create_genesis_config_with_leader(10_000, &my_pubkey, 10_000); + let GenesisConfigInfo { genesis_config, .. } = genesis_config_info; + let mut bank = Bank::new_for_tests(&genesis_config); + bank.deactivate_feature(&feature_set::enable_gossip_duplicate_proof_ingestion::id()); + assert!(!bank + .feature_set + .is_active(&feature_set::enable_gossip_duplicate_proof_ingestion::id())); + let bank_forks_arc = BankForks::new_rw_arc(bank); + let leader_schedule_cache = Arc::new(LeaderScheduleCache::new_from_bank( + &bank_forks_arc.read().unwrap().working_bank(), + )); + let (sender, receiver) = unbounded(); + + let mut duplicate_shred_handler = DuplicateShredHandler::new( + blockstore.clone(), + leader_schedule_cache, + bank_forks_arc, + sender, + ); + let chunks = create_duplicate_proof( + my_keypair.clone(), + None, + 1, + None, + DUPLICATE_SHRED_MAX_PAYLOAD_SIZE, + ) + .unwrap(); + assert!(!blockstore.has_duplicate_shreds_in_slot(1)); + for chunk in chunks { + duplicate_shred_handler.handle(chunk); + } + // If feature disabled, blockstore gets signal but state machine doesn't see it. + assert!(blockstore.has_duplicate_shreds_in_slot(1)); + assert!(receiver.try_iter().collect_vec().is_empty()); } } diff --git a/local-cluster/src/cluster_tests.rs b/local-cluster/src/cluster_tests.rs index b410585396f8f0..90337bb272460f 100644 --- a/local-cluster/src/cluster_tests.rs +++ b/local-cluster/src/cluster_tests.rs @@ -41,7 +41,7 @@ use { solana_vote_program::vote_transaction, std::{ borrow::Borrow, - collections::{HashMap, HashSet}, + collections::{HashMap, HashSet, VecDeque}, net::{IpAddr, Ipv4Addr, SocketAddr, TcpListener}, path::Path, sync::{ @@ -489,6 +489,9 @@ pub fn start_gossip_voter( + std::marker::Send + 'static, sleep_ms: u64, + num_expected_peers: usize, + refresh_ms: u64, + max_votes_to_refresh: usize, ) -> GossipVoter { let exit = Arc::new(AtomicBool::new(false)); let (gossip_service, tcp_listener, cluster_info) = gossip_service::make_gossip_node( @@ -503,6 +506,15 @@ pub fn start_gossip_voter( SocketAddrSpace::Unspecified, ); + // Wait for peer discovery + while cluster_info.gossip_peers().len() < num_expected_peers { + sleep(Duration::from_millis(sleep_ms)); + } + + let mut latest_voted_slot = 0; + let mut refreshable_votes: VecDeque<(Transaction, VoteTransaction)> = VecDeque::new(); + let mut latest_push_attempt = Instant::now(); + let t_voter = { let exit = exit.clone(); let cluster_info = cluster_info.clone(); @@ -514,6 +526,18 @@ pub fn start_gossip_voter( } let (labels, votes) = cluster_info.get_votes_with_labels(&mut cursor); + if labels.is_empty() { + if latest_push_attempt.elapsed() > Duration::from_millis(refresh_ms) { + for (leader_vote_tx, parsed_vote) in refreshable_votes.iter().rev() { + let vote_slot = parsed_vote.last_voted_slot().unwrap(); + info!("gossip voter refreshing vote {}", vote_slot); + process_vote_tx(vote_slot, leader_vote_tx, parsed_vote, &cluster_info); + latest_push_attempt = Instant::now(); + } + } + sleep(Duration::from_millis(sleep_ms)); + continue; + } let mut parsed_vote_iter: Vec<_> = labels .into_iter() .zip(votes) @@ -527,22 +551,20 @@ pub fn start_gossip_voter( }); for (parsed_vote, leader_vote_tx) in &parsed_vote_iter { - if let Some(latest_vote_slot) = parsed_vote.last_voted_slot() { - info!("received vote for {}", latest_vote_slot); - process_vote_tx( - latest_vote_slot, - leader_vote_tx, - parsed_vote, - &cluster_info, - ) + if let Some(vote_slot) = parsed_vote.last_voted_slot() { + info!("received vote for {}", vote_slot); + if vote_slot > latest_voted_slot { + latest_voted_slot = vote_slot; + refreshable_votes + .push_front((leader_vote_tx.clone(), parsed_vote.clone())); + refreshable_votes.truncate(max_votes_to_refresh); + } + process_vote_tx(vote_slot, leader_vote_tx, parsed_vote, &cluster_info); + latest_push_attempt = Instant::now(); } // Give vote some time to propagate sleep(Duration::from_millis(sleep_ms)); } - - if parsed_vote_iter.is_empty() { - sleep(Duration::from_millis(sleep_ms)); - } } }) }; diff --git a/local-cluster/tests/local_cluster.rs b/local-cluster/tests/local_cluster.rs index b79a1c4e309f26..752160e5ada970 100644 --- a/local-cluster/tests/local_cluster.rs +++ b/local-cluster/tests/local_cluster.rs @@ -2745,6 +2745,9 @@ fn test_oc_bad_signatures() { } }, voter_thread_sleep_ms as u64, + cluster.validators.len().saturating_sub(1), + 0, + 0, ); let (mut block_subscribe_client, receiver) = PubsubClient::block_subscribe( @@ -3745,6 +3748,18 @@ fn test_kill_partition_switch_threshold_progress() { #[serial] #[allow(unused_attributes)] fn test_duplicate_shreds_broadcast_leader() { + run_duplicate_shreds_broadcast_leader(true); +} +#[test] +#[serial] +#[ignore] +#[allow(unused_attributes)] +fn test_duplicate_shreds_broadcast_leader_ancestor_hashes() { + run_duplicate_shreds_broadcast_leader(false); +} + +fn run_duplicate_shreds_broadcast_leader(vote_on_duplicate: bool) { + solana_logger::setup_with_default(RUST_LOG_FILTER); // Create 4 nodes: // 1) Bad leader sending different versions of shreds to both of the other nodes // 2) 1 node who's voting behavior in gossip @@ -3795,11 +3810,13 @@ fn test_duplicate_shreds_broadcast_leader() { // for the partition. assert!(partition_node_stake < our_node_stake && partition_node_stake < good_node_stake); + let (duplicate_slot_sender, duplicate_slot_receiver) = unbounded(); + // 1) Set up the cluster let (mut cluster, validator_keys) = test_faulty_node( BroadcastStageType::BroadcastDuplicates(BroadcastDuplicatesConfig { partition: ClusterPartition::Stake(partition_node_stake), - duplicate_slot_sender: None, + duplicate_slot_sender: Some(duplicate_slot_sender), }), node_stakes, None, @@ -3841,27 +3858,23 @@ fn test_duplicate_shreds_broadcast_leader() { { let node_keypair = node_keypair.insecure_clone(); let vote_keypair = vote_keypair.insecure_clone(); - let mut max_vote_slot = 0; let mut gossip_vote_index = 0; + let mut duplicate_slots = vec![]; move |latest_vote_slot, leader_vote_tx, parsed_vote, cluster_info| { info!("received vote for {}", latest_vote_slot); // Add to EpochSlots. Mark all slots frozen between slot..=max_vote_slot. - if latest_vote_slot > max_vote_slot { - let new_epoch_slots: Vec = - (max_vote_slot + 1..latest_vote_slot + 1).collect(); - info!( - "Simulating epoch slots from our node: {:?}", - new_epoch_slots - ); - cluster_info.push_epoch_slots(&new_epoch_slots); - max_vote_slot = latest_vote_slot; - } + let new_epoch_slots: Vec = (0..latest_vote_slot + 1).collect(); + info!( + "Simulating epoch slots from our node: {:?}", + new_epoch_slots + ); + cluster_info.push_epoch_slots(&new_epoch_slots); - // Only vote on even slots. Note this may violate lockouts if the - // validator started voting on a different fork before we could exit - // it above. + for slot in duplicate_slot_receiver.try_iter() { + duplicate_slots.push(slot); + } let vote_hash = parsed_vote.hash(); - if latest_vote_slot % 2 == 0 { + if vote_on_duplicate || !duplicate_slots.contains(&latest_vote_slot) { info!( "Simulating vote from our node on slot {}, hash {}", latest_vote_slot, vote_hash @@ -3899,6 +3912,9 @@ fn test_duplicate_shreds_broadcast_leader() { } }, voter_thread_sleep_ms as u64, + cluster.validators.len().saturating_sub(1), + 5000, // Refresh if 5 seconds of inactivity + 5, // Refresh the past 5 votes ); // 4) Check that the cluster is making progress diff --git a/sdk/src/feature_set.rs b/sdk/src/feature_set.rs index 2941c94ae81cb3..25196462e5bd94 100644 --- a/sdk/src/feature_set.rs +++ b/sdk/src/feature_set.rs @@ -772,6 +772,10 @@ pub mod cost_model_requested_write_lock_cost { solana_sdk::declare_id!("wLckV1a64ngtcKPRGU4S4grVTestXjmNjxBjaKZrAcn"); } +pub mod enable_gossip_duplicate_proof_ingestion { + solana_sdk::declare_id!("FNKCMBzYUdjhHyPdsKG2LSmdzH8TCHXn3ytj8RNBS4nG"); +} + lazy_static! { /// Map of feature identifiers to user-visible description pub static ref FEATURE_NAMES: HashMap = [ @@ -960,6 +964,7 @@ lazy_static! { (enable_zk_proof_from_account::id(), "Enable zk token proof program to read proof from accounts instead of instruction data #34750"), (curve25519_restrict_msm_length::id(), "restrict curve25519 multiscalar multiplication vector lengths #34763"), (cost_model_requested_write_lock_cost::id(), "cost model uses number of requested write locks #34819"), + (enable_gossip_duplicate_proof_ingestion::id(), "enable gossip duplicate proof ingestion #32963"), /*************** ADD NEW FEATURES HERE ***************/ ] .iter() diff --git a/vote/src/vote_transaction.rs b/vote/src/vote_transaction.rs index 7c52801f25dc56..fed2d730a0a177 100644 --- a/vote/src/vote_transaction.rs +++ b/vote/src/vote_transaction.rs @@ -6,7 +6,7 @@ use { solana_vote_program::vote_state::{Vote, VoteStateUpdate}, }; -#[derive(Debug, PartialEq, Eq)] +#[derive(Debug, PartialEq, Eq, Clone)] pub enum VoteTransaction { Vote(Vote), VoteStateUpdate(VoteStateUpdate),