diff --git a/core/src/cluster_info.rs b/core/src/cluster_info.rs index dcb141114d69a0..257126a45b43eb 100644 --- a/core/src/cluster_info.rs +++ b/core/src/cluster_info.rs @@ -482,7 +482,7 @@ impl ClusterInfo { .crds .lookup(&entry) .and_then(CrdsValue::epoch_slots) - .cloned() + .map(|(_, e)| e.clone()) .unwrap_or_else(|| EpochSlots::new(self.id(), timestamp())) } @@ -595,9 +595,16 @@ impl ClusterInfo { } } - pub fn push_epoch_slots(&self, update: &[Slot]) { + // [start_index, start_index + num_epoch_slots) is the range of EpochSlots indexes + // that can be modified + pub fn push_epoch_slots(&self, update: &[Slot], start_index: u8, num_epoch_slots: u8) { + assert!( + num_epoch_slots > 0 + && start_index as u16 + num_epoch_slots as u16 - 1 + <= crds_value::MAX_EPOCH_SLOTS as u16 + ); let mut num = 0; - let mut current_slots: Vec<_> = (0..crds_value::MAX_EPOCH_SLOTS) + let mut current_slots: Vec<_> = (start_index..start_index + (num_epoch_slots - 1)) .filter_map(|ix| { Some(( self.time_gossip_read_lock( @@ -607,7 +614,7 @@ impl ClusterInfo { .crds .lookup(&CrdsValueLabel::EpochSlots(ix, self.id())) .and_then(CrdsValue::epoch_slots) - .and_then(|x| Some((x.wallclock, x.first_slot()?)))?, + .and_then(|(_, x)| Some((x.wallclock, x.first_slot()?)))?, ix, )) }) @@ -620,10 +627,10 @@ impl ClusterInfo { .unwrap_or(0); let max_slot: Slot = update.iter().max().cloned().unwrap_or(0); let total_slots = max_slot as isize - min_slot as isize; + let ratio = num_epoch_slots as f32 / crds_value::MAX_EPOCH_SLOTS as f32; + let num_expected_slots = (ratio * DEFAULT_SLOTS_PER_EPOCH as f32) as isize; // WARN if CRDS is not storing at least a full epoch worth of slots - if DEFAULT_SLOTS_PER_EPOCH as isize > total_slots - && crds_value::MAX_EPOCH_SLOTS as usize <= current_slots.len() - { + if num_expected_slots > total_slots && num_epoch_slots as usize <= current_slots.len() { inc_new_counter_warn!("cluster_info-epoch_slots-filled", 1); warn!( "EPOCH_SLOTS are filling up FAST {}/{}", @@ -632,9 +639,12 @@ impl ClusterInfo { ); } let mut reset = false; - let mut epoch_slot_index = current_slots.last().map(|(_, x)| *x).unwrap_or(0); + let mut epoch_slot_index = current_slots + .last() + .map(|(_, x)| *x - start_index) + .unwrap_or(0); while num < update.len() { - let ix = (epoch_slot_index % crds_value::MAX_EPOCH_SLOTS) as u8; + let ix = (epoch_slot_index % num_epoch_slots) as u8 + start_index; let now = timestamp(); let mut slots = if !reset { self.lookup_epoch_slots(ix) @@ -810,7 +820,10 @@ impl ClusterInfo { .map(|x| map(x.value.lowest_slot().unwrap(), x.insert_timestamp)) } - pub fn get_epoch_slots_since(&self, since: Option) -> (Vec, Option) { + pub fn get_epoch_slots_since( + &self, + since: Option, + ) -> (Vec<(EpochSlotsIndex, EpochSlots)>, Option) { let vals: Vec<_> = self .gossip .read() @@ -823,7 +836,12 @@ impl ClusterInfo { .map(|since| x.insert_timestamp > since) .unwrap_or(true) }) - .filter_map(|x| Some((x.value.epoch_slots()?.clone(), x.insert_timestamp))) + .filter_map(|x| { + Some(( + x.value.epoch_slots().map(|(i, e)| (i, e.clone()))?, + x.insert_timestamp, + )) + }) .collect(); let max = vals.iter().map(|x| x.1).max().or(since); let vec = vals.into_iter().map(|x| x.0).collect(); @@ -2271,6 +2289,7 @@ pub fn stake_weight_peers( mod tests { use super::*; use crate::crds_value::{CrdsValue, CrdsValueLabel, Vote as CrdsVote}; + use rand::Rng; use rayon::prelude::*; use solana_perf::test_tx::test_tx; use solana_sdk::signature::{Keypair, Signer}; @@ -2650,7 +2669,7 @@ mod tests { let (slots, since) = cluster_info.get_epoch_slots_since(None); assert!(slots.is_empty()); assert!(since.is_none()); - cluster_info.push_epoch_slots(&[0]); + cluster_info.push_epoch_slots(&[0], 0, crds_value::MAX_COMPLETED_EPOCH_SLOTS); let (slots, since) = cluster_info.get_epoch_slots_since(Some(std::u64::MAX)); assert!(slots.is_empty()); @@ -2963,7 +2982,6 @@ mod tests { #[test] fn test_push_epoch_slots_large() { - use rand::Rng; let node_keypair = Arc::new(Keypair::new()); let cluster_info = ClusterInfo::new( ContactInfo::new_localhost(&node_keypair.pubkey(), timestamp()), @@ -2975,12 +2993,84 @@ mod tests { let last = *range.last().unwrap_or(&0); range.push(last + rand::thread_rng().gen_range(1, 32)); } - cluster_info.push_epoch_slots(&range[..16000]); - cluster_info.push_epoch_slots(&range[16000..]); - let (slots, since) = cluster_info.get_epoch_slots_since(None); - let slots: Vec<_> = slots.iter().flat_map(|x| x.to_slots(0)).collect(); - assert_eq!(slots, range); - assert!(since.is_some()); + let (_, update_slots, _) = + get_modified_epoch_slots(&cluster_info, &range, 0, crds_value::MAX_EPOCH_SLOTS, None); + assert_eq!(update_slots, range); + } + + #[test] + fn test_push_epoch_slots_range() { + let node_keypair = Arc::new(Keypair::new()); + let cluster_info = ClusterInfo::new( + ContactInfo::new_localhost(&node_keypair.pubkey(), timestamp()), + node_keypair.clone(), + ); + let mut range: Vec = vec![]; + //random should be hard to compress + for _ in 0..32000 { + let last = *range.last().unwrap_or(&0); + range.push(last + rand::thread_rng().gen_range(1, 32)); + } + let (epoch_slots, update_slots, since) = + get_modified_epoch_slots(&cluster_info, &range, 0, crds_value::MAX_EPOCH_SLOTS, None); + + // Integrity checks + assert_eq!(update_slots, range); + let num_modified_epoch_slots = epoch_slots.len(); + assert!(num_modified_epoch_slots <= crds_value::MAX_EPOCH_SLOTS as usize); + assert!(num_modified_epoch_slots > 20); + + // Test with exactly as many EpochSlots as needed + let (_, update_slots, since) = get_modified_epoch_slots( + &cluster_info, + &range, + crds_value::MAX_COMPLETED_EPOCH_SLOTS, + num_modified_epoch_slots as u8, + since, + ); + assert_eq!(update_slots, range); + + // Test with fewer epoch slots than needed, modified EpochSlots should wrap around + let _ = get_modified_epoch_slots( + &cluster_info, + &range, + crds_value::MAX_COMPLETED_EPOCH_SLOTS, + num_modified_epoch_slots as u8 - 10, + since, + ); + + // Test that with multiple writes that cause modified EpochSlots to wrap around, + // correctness is still upheld + let cluster_info = ClusterInfo::new( + ContactInfo::new_localhost(&node_keypair.pubkey(), timestamp()), + node_keypair, + ); + let mut since = None; + let num_iterations = 2 + * ((crds_value::MAX_EPOCH_SLOTS - crds_value::MAX_COMPLETED_EPOCH_SLOTS + 1) as usize) + / num_modified_epoch_slots; + for _ in 0..=num_iterations { + let (_, mut update_slots, new_since) = get_modified_epoch_slots( + &cluster_info, + &range, + crds_value::MAX_COMPLETED_EPOCH_SLOTS, + crds_value::MAX_EPOCH_SLOTS - crds_value::MAX_COMPLETED_EPOCH_SLOTS + 1, + since, + ); + since = new_since; + let first = range[0]; + // Find last instance of `first`, that must be where the write started + let begin_index = update_slots.len() + - update_slots + .iter() + .rev() + .position(|slot| *slot == first) + .unwrap() + - 1; + update_slots.rotate_left(begin_index); + update_slots.truncate(range.len()); + assert_eq!(update_slots, range); + } } #[test] @@ -3009,4 +3099,38 @@ mod tests { let vote = CrdsValue::new_signed(CrdsData::Vote(1, vote), &Keypair::new()); assert!(bincode::serialized_size(&vote).unwrap() <= MAX_PROTOCOL_PAYLOAD_SIZE); } + + fn get_modified_epoch_slots( + cluster_info: &ClusterInfo, + slots: &[Slot], + start_epoch_slot_index: u8, + num_epoch_slots: u8, + since: Option, + ) -> (Vec<(EpochSlotsIndex, EpochSlots)>, Vec, Option) { + cluster_info.push_epoch_slots( + &slots[0..(slots.len() / 2)], + start_epoch_slot_index, + num_epoch_slots, + ); + cluster_info.push_epoch_slots( + &slots[(slots.len() / 2)..], + start_epoch_slot_index, + num_epoch_slots, + ); + let (epoch_slots, new_since) = cluster_info.get_epoch_slots_since(since); + let update_slots: Vec<_> = epoch_slots + .iter() + .flat_map(|(i, x)| { + assert!( + *i >= start_epoch_slot_index + && *i <= start_epoch_slot_index + (num_epoch_slots - 1) + ); + let res = x.to_slots(0); + res + }) + .collect(); + assert!(new_since.is_some()); + assert!(epoch_slots.len() <= num_epoch_slots as usize); + (epoch_slots, update_slots, new_since) + } } diff --git a/core/src/cluster_slots.rs b/core/src/cluster_slots.rs index 85cf83fece4796..d3e6fd6f127254 100644 --- a/core/src/cluster_slots.rs +++ b/core/src/cluster_slots.rs @@ -1,6 +1,10 @@ use crate::{ - cluster_info::ClusterInfo, contact_info::ContactInfo, epoch_slots::EpochSlots, - pubkey_references::LockedPubkeyReferences, serve_repair::RepairType, + cluster_info::ClusterInfo, + contact_info::ContactInfo, + crds_value::{EpochSlotsIndex, MAX_COMPLETED_EPOCH_SLOTS}, + epoch_slots::EpochSlots, + pubkey_references::LockedPubkeyReferences, + serve_repair::RepairType, }; use solana_ledger::bank_forks::BankForks; use solana_runtime::epoch_stakes::NodeIdToVoteAccounts; @@ -15,7 +19,8 @@ pub type ClusterSlotsMap = RwLock>>>; #[derive(Default)] pub struct ClusterSlots { - cluster_slots: ClusterSlotsMap, + completed_cluster_slots: ClusterSlotsMap, + confirmed_cluster_slots: ClusterSlotsMap, keys: LockedPubkeyReferences, since: RwLock>, validator_stakes: RwLock>, @@ -24,8 +29,19 @@ pub struct ClusterSlots { } impl ClusterSlots { - pub fn lookup(&self, slot: Slot) -> Option>> { - self.cluster_slots.read().unwrap().get(&slot).cloned() + pub fn lookup_completed(&self, slot: Slot) -> Option>> { + self.completed_cluster_slots + .read() + .unwrap() + .get(&slot) + .cloned() + } + pub fn lookup_confirmed(&self, slot: Slot) -> Option>> { + self.confirmed_cluster_slots + .read() + .unwrap() + .get(&slot) + .cloned() } pub fn update(&self, root: Slot, cluster_info: &ClusterInfo, bank_forks: &RwLock) { self.update_peers(cluster_info, bank_forks); @@ -33,25 +49,35 @@ impl ClusterSlots { let epoch_slots = cluster_info.get_epoch_slots_since(since); self.update_internal(root, epoch_slots); } - fn update_internal(&self, root: Slot, epoch_slots: (Vec, Option)) { + fn update_internal( + &self, + root: Slot, + epoch_slots: (Vec<(EpochSlotsIndex, EpochSlots)>, Option), + ) { let (epoch_slots_list, since) = epoch_slots; - for epoch_slots in epoch_slots_list { + for (epoch_slots_id, epoch_slots) in epoch_slots_list { let slots = epoch_slots.to_slots(root); for slot in &slots { if *slot <= root { continue; } let unduplicated_pubkey = self.keys.get_or_insert(&epoch_slots.from); - self.insert_node_id(*slot, unduplicated_pubkey); + self.insert_node_id(*slot, unduplicated_pubkey, epoch_slots_id); } } - self.cluster_slots.write().unwrap().retain(|x, _| *x > root); + self.completed_cluster_slots + .write() + .unwrap() + .retain(|x, _| *x > root); + self.confirmed_cluster_slots + .write() + .unwrap() + .retain(|x, _| *x > root); self.keys.purge(); *self.since.write().unwrap() = since; } - - pub fn collect(&self, id: &Pubkey) -> HashSet { - self.cluster_slots + pub fn collect_completed(&self, id: &Pubkey) -> HashSet { + self.completed_cluster_slots .read() .unwrap() .iter() @@ -61,7 +87,12 @@ impl ClusterSlots { .collect() } - pub fn insert_node_id(&self, slot: Slot, node_id: Arc) { + pub fn insert_node_id( + &self, + slot: Slot, + node_id: Arc, + epoch_slots_id: EpochSlotsIndex, + ) { let balance = self .validator_stakes .read() @@ -69,13 +100,34 @@ impl ClusterSlots { .get(&node_id) .map(|v| v.total_stake) .unwrap_or(0); - let mut slot_pubkeys = self.cluster_slots.read().unwrap().get(&slot).cloned(); + let mut slot_pubkeys = { + if epoch_slots_id < MAX_COMPLETED_EPOCH_SLOTS { + self.completed_cluster_slots + .read() + .unwrap() + .get(&slot) + .cloned() + } else { + self.confirmed_cluster_slots + .read() + .unwrap() + .get(&slot) + .cloned() + } + }; if slot_pubkeys.is_none() { let new_slot_pubkeys = Arc::new(RwLock::new(HashMap::default())); - self.cluster_slots - .write() - .unwrap() - .insert(slot, new_slot_pubkeys.clone()); + if epoch_slots_id < MAX_COMPLETED_EPOCH_SLOTS { + self.completed_cluster_slots + .write() + .unwrap() + .insert(slot, new_slot_pubkeys.clone()); + } else { + self.confirmed_cluster_slots + .write() + .unwrap() + .insert(slot, new_slot_pubkeys.clone()); + } slot_pubkeys = Some(new_slot_pubkeys); } slot_pubkeys @@ -108,7 +160,7 @@ impl ClusterSlots { } pub fn compute_weights(&self, slot: Slot, repair_peers: &[ContactInfo]) -> Vec<(u64, usize)> { - let slot_peers = self.lookup(slot); + let slot_peers = self.lookup_completed(slot); repair_peers .iter() .enumerate() @@ -137,7 +189,7 @@ impl ClusterSlots { slot: Slot, repair_peers: &[ContactInfo], ) -> Vec<(u64, usize)> { - let slot_peers = self.lookup(slot); + let slot_peers = self.lookup_completed(slot); repair_peers .iter() .enumerate() @@ -154,8 +206,8 @@ impl ClusterSlots { self_id: &Pubkey, root: Slot, ) -> Vec { - let my_slots = self.collect(self_id); - self.cluster_slots + let my_slots = self.collect_completed(self_id); + self.completed_cluster_slots .read() .unwrap() .keys() @@ -174,7 +226,8 @@ mod tests { #[test] fn test_default() { let cs = ClusterSlots::default(); - assert!(cs.cluster_slots.read().unwrap().is_empty()); + assert!(cs.completed_cluster_slots.read().unwrap().is_empty()); + assert!(cs.confirmed_cluster_slots.read().unwrap().is_empty()); assert!(cs.since.read().unwrap().is_none()); } @@ -182,7 +235,8 @@ mod tests { fn test_update_noop() { let cs = ClusterSlots::default(); cs.update_internal(0, (vec![], None)); - assert!(cs.cluster_slots.read().unwrap().is_empty()); + assert!(cs.completed_cluster_slots.read().unwrap().is_empty()); + assert!(cs.confirmed_cluster_slots.read().unwrap().is_empty()); assert!(cs.since.read().unwrap().is_none()); } @@ -190,9 +244,13 @@ mod tests { fn test_update_empty() { let cs = ClusterSlots::default(); let epoch_slot = EpochSlots::default(); - cs.update_internal(0, (vec![epoch_slot], Some(0))); + cs.update_internal(0, (vec![(0, epoch_slot.clone())], Some(0))); + assert_eq!(*cs.since.read().unwrap(), Some(0)); + assert!(cs.lookup_completed(0).is_none()); + + cs.update_internal(0, (vec![(MAX_COMPLETED_EPOCH_SLOTS, epoch_slot)], Some(0))); assert_eq!(*cs.since.read().unwrap(), Some(0)); - assert!(cs.lookup(0).is_none()); + assert!(cs.lookup_confirmed(0).is_none()); } #[test] @@ -201,9 +259,13 @@ mod tests { let cs = ClusterSlots::default(); let mut epoch_slot = EpochSlots::default(); epoch_slot.fill(&[0], 0); - cs.update_internal(0, (vec![epoch_slot], Some(0))); + cs.update_internal(0, (vec![(0, epoch_slot.clone())], Some(0))); + assert_eq!(*cs.since.read().unwrap(), Some(0)); + assert!(cs.lookup_completed(0).is_none()); + + cs.update_internal(0, (vec![(MAX_COMPLETED_EPOCH_SLOTS, epoch_slot)], Some(0))); assert_eq!(*cs.since.read().unwrap(), Some(0)); - assert!(cs.lookup(0).is_none()); + assert!(cs.lookup_confirmed(0).is_none()); } #[test] @@ -211,12 +273,27 @@ mod tests { let cs = ClusterSlots::default(); let mut epoch_slot = EpochSlots::default(); epoch_slot.fill(&[1], 0); - cs.update_internal(0, (vec![epoch_slot], Some(0))); + cs.update_internal(0, (vec![(0, epoch_slot.clone())], Some(0))); + assert_eq!(*cs.since.read().unwrap(), Some(0)); + assert!(cs.lookup_completed(0).is_none()); + assert!(cs.lookup_completed(1).is_some()); + assert_eq!( + cs.lookup_completed(1) + .unwrap() + .read() + .unwrap() + .get(&Pubkey::default()), + Some(&0) + ); + assert!(cs.lookup_confirmed(0).is_none()); + assert!(cs.lookup_confirmed(1).is_none()); + + cs.update_internal(0, (vec![(MAX_COMPLETED_EPOCH_SLOTS, epoch_slot)], Some(0))); assert_eq!(*cs.since.read().unwrap(), Some(0)); - assert!(cs.lookup(0).is_none()); - assert!(cs.lookup(1).is_some()); + assert!(cs.lookup_confirmed(0).is_none()); + assert!(cs.lookup_confirmed(1).is_some()); assert_eq!( - cs.lookup(1) + cs.lookup_confirmed(1) .unwrap() .read() .unwrap() @@ -242,7 +319,7 @@ mod tests { let k2 = Pubkey::new_rand(); map.insert(Arc::new(k1), std::u64::MAX / 2); map.insert(Arc::new(k2), 0); - cs.cluster_slots + cs.completed_cluster_slots .write() .unwrap() .insert(0, Arc::new(RwLock::new(map))); @@ -263,7 +340,7 @@ mod tests { let k1 = Pubkey::new_rand(); let k2 = Pubkey::new_rand(); map.insert(Arc::new(k2), 0); - cs.cluster_slots + cs.completed_cluster_slots .write() .unwrap() .insert(0, Arc::new(RwLock::new(map))); @@ -316,7 +393,7 @@ mod tests { // Mark the first validator as completed slot 9, should pick that validator, // even though it only has default stake, while the other validator has // max stake - cs.insert_node_id(slot, Arc::new(contact_infos[0].id)); + cs.insert_node_id(slot, Arc::new(contact_infos[0].id), 0); assert_eq!( cs.compute_weights_exclude_noncomplete(slot, &contact_infos), vec![(1, 0)] @@ -342,10 +419,21 @@ mod tests { ); *cs.validator_stakes.write().unwrap() = map; - cs.update_internal(0, (vec![epoch_slot], None)); - assert!(cs.lookup(1).is_some()); + cs.update_internal(0, (vec![(0, epoch_slot.clone())], None)); + assert!(cs.lookup_completed(1).is_some()); + assert_eq!( + cs.lookup_completed(1) + .unwrap() + .read() + .unwrap() + .get(&Pubkey::default()), + Some(&1) + ); + + cs.update_internal(0, (vec![(MAX_COMPLETED_EPOCH_SLOTS, epoch_slot)], None)); + assert!(cs.lookup_confirmed(1).is_some()); assert_eq!( - cs.lookup(1) + cs.lookup_confirmed(1) .unwrap() .read() .unwrap() @@ -359,7 +447,7 @@ mod tests { let cs = ClusterSlots::default(); let mut epoch_slot = EpochSlots::default(); epoch_slot.fill(&[1], 0); - cs.update_internal(0, (vec![epoch_slot], None)); + cs.update_internal(0, (vec![(0, epoch_slot)], None)); let self_id = Pubkey::new_rand(); assert_eq!( cs.generate_repairs_for_missing_slots(&self_id, 0), @@ -373,8 +461,8 @@ mod tests { let mut epoch_slot = EpochSlots::default(); epoch_slot.fill(&[1], 0); let self_id = epoch_slot.from; - cs.update_internal(0, (vec![epoch_slot], None)); - let slots: Vec = cs.collect(&self_id).into_iter().collect(); + cs.update_internal(0, (vec![(0, epoch_slot)], None)); + let slots: Vec = cs.collect_completed(&self_id).into_iter().collect(); assert_eq!(slots, vec![1]); } @@ -384,7 +472,7 @@ mod tests { let mut epoch_slot = EpochSlots::default(); epoch_slot.fill(&[1], 0); let self_id = epoch_slot.from; - cs.update_internal(0, (vec![epoch_slot], None)); + cs.update_internal(0, (vec![(0, epoch_slot)], None)); assert!(cs .generate_repairs_for_missing_slots(&self_id, 0) .is_empty()); diff --git a/core/src/consensus.rs b/core/src/consensus.rs index 9c19aded03d54b..c8553a41b7b778 100644 --- a/core/src/consensus.rs +++ b/core/src/consensus.rs @@ -22,6 +22,7 @@ use std::{ pub const VOTE_THRESHOLD_DEPTH: usize = 8; pub const VOTE_THRESHOLD_SIZE: f64 = 2f64 / 3f64; +pub const SUPERMINORITY_THRESHOLD: f64 = 1f64 / 3f64; pub const SWITCH_FORK_THRESHOLD: f64 = 0.38; #[derive(Default, Debug, Clone)] diff --git a/core/src/crds_value.rs b/core/src/crds_value.rs index f6ef6c83c221d1..fab0eb1b76d675 100644 --- a/core/src/crds_value.rs +++ b/core/src/crds_value.rs @@ -25,6 +25,7 @@ pub const MAX_VOTES: VoteIndex = 32; pub type EpochSlotsIndex = u8; pub const MAX_EPOCH_SLOTS: EpochSlotsIndex = 255; +pub const MAX_COMPLETED_EPOCH_SLOTS: EpochSlotsIndex = 128; /// CrdsValue that is replicated across the cluster #[derive(Serialize, Deserialize, Clone, Debug, PartialEq)] @@ -366,9 +367,9 @@ impl CrdsValue { } } - pub fn epoch_slots(&self) -> Option<&EpochSlots> { + pub fn epoch_slots(&self) -> Option<(EpochSlotsIndex, &EpochSlots)> { match &self.data { - CrdsData::EpochSlots(_, slots) => Some(slots), + CrdsData::EpochSlots(i, slots) => Some((*i, slots)), _ => None, } } diff --git a/core/src/progress_map.rs b/core/src/progress_map.rs index 15a50b62c8725b..ca726db4b9b06e 100644 --- a/core/src/progress_map.rs +++ b/core/src/progress_map.rs @@ -1,7 +1,8 @@ use crate::{ - cluster_info_vote_listener::SlotVoteTracker, cluster_slots::SlotPubkeys, - consensus::StakeLockout, pubkey_references::PubkeyReferences, - replay_stage::SUPERMINORITY_THRESHOLD, + cluster_info_vote_listener::SlotVoteTracker, + cluster_slots::SlotPubkeys, + consensus::{StakeLockout, SUPERMINORITY_THRESHOLD}, + pubkey_references::PubkeyReferences, }; use solana_ledger::{ bank_forks::BankForks, diff --git a/core/src/repair_service.rs b/core/src/repair_service.rs index 01ec5cb705ae8c..97c097812d393b 100644 --- a/core/src/repair_service.rs +++ b/core/src/repair_service.rs @@ -3,7 +3,8 @@ use crate::{ cluster_info::ClusterInfo, cluster_slots::ClusterSlots, - consensus::VOTE_THRESHOLD_SIZE, + consensus::SUPERMINORITY_THRESHOLD, + crds_value::{MAX_COMPLETED_EPOCH_SLOTS, MAX_EPOCH_SLOTS}, result::Result, serve_repair::{RepairType, ServeRepair, DEFAULT_NONCE}, }; @@ -29,6 +30,8 @@ use std::{ pub type DuplicateSlotsResetSender = CrossbeamSender; pub type DuplicateSlotsResetReceiver = CrossbeamReceiver; +pub type ConfirmedSlotsSender = CrossbeamSender>; +pub type ConfirmedSlotsReceiver = CrossbeamReceiver>; #[derive(Default)] pub struct RepairStatsGroup { @@ -63,6 +66,7 @@ pub struct RepairInfo { pub completed_slots_receiver: CompletedSlotsReceiver, pub epoch_schedule: EpochSchedule, pub duplicate_slots_reset_sender: DuplicateSlotsResetSender, + pub confirmed_slots_receiver: ConfirmedSlotsReceiver, } pub struct RepairSlotRange { @@ -145,6 +149,7 @@ impl RepairService { let lowest_slot = blockstore.lowest_slot(); Self::update_lowest_slot(&id, lowest_slot, &cluster_info); Self::update_completed_slots(&repair_info.completed_slots_receiver, &cluster_info); + Self::update_confirmed_slots(&repair_info.confirmed_slots_receiver, &cluster_info); cluster_slots.update(new_root, &cluster_info, &repair_info.bank_forks); let new_duplicate_slots = Self::find_new_duplicate_slots( &duplicate_slot_repair_statuses, @@ -375,7 +380,7 @@ impl RepairService { ) { for slot in new_duplicate_slots { warn!( - "Cluster completed slot: {}, dumping our current version and repairing", + "Cluster confirmed slot: {}, dumping our current version and repairing", slot ); // Clear the slot signatures from status cache for this slot @@ -428,7 +433,7 @@ impl RepairService { ); } cluster_slots - .lookup(dead_slot) + .lookup_confirmed(dead_slot) .and_then(|completed_dead_slot_pubkeys| { let epoch = root_bank.get_epoch_and_slot_index(dead_slot).0; if let Some(epoch_stakes) = root_bank.epoch_stakes(epoch) { @@ -446,7 +451,7 @@ impl RepairService { }) .sum(); if total_completed_slot_stake as f64 / total_stake as f64 - > VOTE_THRESHOLD_SIZE + > SUPERMINORITY_THRESHOLD { Some(dead_slot) } else { @@ -535,6 +540,20 @@ impl RepairService { cluster_info.push_lowest_slot(id, blockstore.lowest_slot()); } + fn update_confirmed_slots( + confirmed_slots_receiver: &ConfirmedSlotsReceiver, + cluster_info: &ClusterInfo, + ) { + let mut slots: Vec = vec![]; + while let Ok(mut more) = confirmed_slots_receiver.try_recv() { + slots.append(&mut more); + } + slots.sort(); + if !slots.is_empty() { + Self::push_confirmed_epoch_slots(&slots, &cluster_info); + } + } + fn update_completed_slots( completed_slots_receiver: &CompletedSlotsReceiver, cluster_info: &ClusterInfo, @@ -545,7 +564,7 @@ impl RepairService { } slots.sort(); if !slots.is_empty() { - cluster_info.push_epoch_slots(&slots); + Self::push_completed_epoch_slots(&slots, cluster_info); } } @@ -576,8 +595,21 @@ impl RepairService { slots.sort(); slots.dedup(); if !slots.is_empty() { - cluster_info.push_epoch_slots(&slots); + Self::push_completed_epoch_slots(&slots, cluster_info); } + Self::push_confirmed_epoch_slots(&[root], &cluster_info); + } + + fn push_confirmed_epoch_slots(slots: &[Slot], cluster_info: &ClusterInfo) { + cluster_info.push_epoch_slots( + slots, + MAX_COMPLETED_EPOCH_SLOTS, + MAX_EPOCH_SLOTS - MAX_COMPLETED_EPOCH_SLOTS + 1, + ); + } + + fn push_completed_epoch_slots(slots: &[Slot], cluster_info: &ClusterInfo) { + cluster_info.push_epoch_slots(slots, 0, MAX_COMPLETED_EPOCH_SLOTS); } pub fn join(self) -> thread::Result<()> { @@ -946,11 +978,15 @@ mod test { let serve_repair = ServeRepair::new(cluster_info.clone()); let valid_repair_peer = Node::new_localhost().info; - // Signal that this peer has completed the dead slot, and is thus + // Signal that this peer has confirmed the dead slot, and is thus // a valid target for repair let dead_slot = 9; let cluster_slots = ClusterSlots::default(); - cluster_slots.insert_node_id(dead_slot, Arc::new(valid_repair_peer.id)); + cluster_slots.insert_node_id( + dead_slot, + Arc::new(valid_repair_peer.id), + MAX_COMPLETED_EPOCH_SLOTS, + ); cluster_info.insert_info(valid_repair_peer); // Not enough time has passed, should not update the @@ -1105,9 +1141,9 @@ mod test { ) .is_empty()); - // If supermajority confirms the slot, then dead slot should be + // If superminority reports slot was confirmed, then dead slot should be // marked as a duplicate that needs to be repaired - cluster_slots.insert_node_id(dead_slot, only_node_id); + cluster_slots.insert_node_id(dead_slot, only_node_id, MAX_COMPLETED_EPOCH_SLOTS); assert_eq!( RepairService::find_new_duplicate_slots( &duplicate_slot_repair_statuses, diff --git a/core/src/replay_stage.rs b/core/src/replay_stage.rs index 97a7614f5c2586..a628a34fdd62ae 100644 --- a/core/src/replay_stage.rs +++ b/core/src/replay_stage.rs @@ -6,11 +6,11 @@ use crate::{ cluster_info_vote_listener::VoteTracker, cluster_slots::ClusterSlots, commitment::{AggregateCommitmentService, BlockCommitmentCache, CommitmentAggregationData}, - consensus::{StakeLockout, Tower}, + consensus::{StakeLockout, Tower, SUPERMINORITY_THRESHOLD}, poh_recorder::{PohRecorder, GRACE_TICKS_FACTOR, MAX_GRACE_SLOTS}, progress_map::{ForkProgress, ForkStats, ProgressMap, PropagatedStats}, pubkey_references::PubkeyReferences, - repair_service::DuplicateSlotsResetReceiver, + repair_service::{ConfirmedSlotsSender, DuplicateSlotsResetReceiver}, result::Result, rewards_recorder_service::RewardsRecorderSender, rpc_subscriptions::RpcSubscriptions, @@ -53,7 +53,6 @@ use std::{ }; pub const MAX_ENTRY_RECV_PER_ITER: usize = 512; -pub const SUPERMINORITY_THRESHOLD: f64 = 1f64 / 3f64; pub const MAX_UNCONFIRMED_SLOTS: usize = 5; #[derive(PartialEq, Debug)] @@ -157,6 +156,7 @@ impl ReplayStage { cluster_slots: Arc, retransmit_slots_sender: RetransmitSlotsSender, duplicate_slots_reset_receiver: DuplicateSlotsResetReceiver, + confirmed_slots_sender: ConfirmedSlotsSender, ) -> Self { let ReplayStageConfig { my_pubkey, @@ -301,12 +301,15 @@ impl ReplayStage { &bank_forks, ); - for slot in confirmed_forks { - progress - .get_mut(&slot) - .unwrap() - .fork_stats - .confirmation_reported = true; + if !confirmed_forks.is_empty() { + for slot in &confirmed_forks { + progress + .get_mut(&slot) + .unwrap() + .fork_stats + .confirmation_reported = true; + } + let _ = confirmed_slots_sender.send(confirmed_forks); } } @@ -758,9 +761,8 @@ impl ReplayStage { // Signal retransmit if Self::should_retransmit(poh_slot, &mut skipped_slots_info.last_retransmit_slot) { datapoint_info!("replay_stage-retransmit", ("slot", bank.slot(), i64),); - retransmit_slots_sender - .send(vec![(bank.slot(), bank.clone())].into_iter().collect()) - .unwrap(); + let _ = retransmit_slots_sender + .send(vec![(bank.slot(), bank.clone())].into_iter().collect()); } return; } @@ -1250,7 +1252,7 @@ impl ReplayStage { .clone(); if cluster_slot_pubkeys.is_none() { - cluster_slot_pubkeys = cluster_slots.lookup(slot); + cluster_slot_pubkeys = cluster_slots.lookup_completed(slot); progress .get_propagated_stats_mut(slot) .expect("All frozen banks must exist in the Progress map") diff --git a/core/src/retransmit_stage.rs b/core/src/retransmit_stage.rs index f3c0fbac816bda..53754ea933f7ba 100644 --- a/core/src/retransmit_stage.rs +++ b/core/src/retransmit_stage.rs @@ -3,8 +3,8 @@ use crate::{ cluster_info::{compute_retransmit_peers, ClusterInfo, DATA_PLANE_FANOUT}, cluster_slots::ClusterSlots, - repair_service::DuplicateSlotsResetSender, repair_service::RepairInfo, + repair_service::{ConfirmedSlotsReceiver, DuplicateSlotsResetSender}, result::{Error, Result}, window_service::{should_retransmit_and_persist, WindowService}, }; @@ -341,6 +341,7 @@ impl RetransmitStage { shred_version: u16, cluster_slots: Arc, duplicate_slots_reset_sender: DuplicateSlotsResetSender, + confirmed_slots_receiver: ConfirmedSlotsReceiver, ) -> Self { let (retransmit_sender, retransmit_receiver) = channel(); @@ -358,6 +359,7 @@ impl RetransmitStage { completed_slots_receiver, epoch_schedule, duplicate_slots_reset_sender, + confirmed_slots_receiver, }; let leader_schedule_cache = leader_schedule_cache.clone(); let window_service = WindowService::new( diff --git a/core/src/tvu.rs b/core/src/tvu.rs index cc03e609353443..aec11a427329e3 100644 --- a/core/src/tvu.rs +++ b/core/src/tvu.rs @@ -130,6 +130,7 @@ impl Tvu { ); let cluster_slots = Arc::new(ClusterSlots::default()); + let (confirmed_slots_sender, confirmed_slots_receiver) = unbounded(); let (duplicate_slots_reset_sender, duplicate_slots_reset_receiver) = unbounded(); let retransmit_stage = RetransmitStage::new( bank_forks.clone(), @@ -146,6 +147,7 @@ impl Tvu { tvu_config.shred_version, cluster_slots.clone(), duplicate_slots_reset_sender, + confirmed_slots_receiver, ); let (ledger_cleanup_slot_sender, ledger_cleanup_slot_receiver) = channel(); @@ -195,6 +197,7 @@ impl Tvu { cluster_slots, retransmit_slots_sender, duplicate_slots_reset_receiver, + confirmed_slots_sender, ); let ledger_cleanup_service = tvu_config.max_ledger_shreds.map(|max_ledger_shreds| { diff --git a/ledger/src/blockstore.rs b/ledger/src/blockstore.rs index eb615c2bc98efa..cd463a01c682b4 100644 --- a/ledger/src/blockstore.rs +++ b/ledger/src/blockstore.rs @@ -74,7 +74,7 @@ const TIMESTAMP_SLOT_RANGE: usize = 16; // (32K shreds per slot * 4 TX per shred * 2.5 slots per sec) pub const MAX_DATA_SHREDS_PER_SLOT: usize = 32_768; -pub type CompletedSlotsReceiver = Receiver>; +pub type CompletedSlotsReceiver = Receiver>; // ledger window pub struct Blockstore {