diff --git a/gossip/src/cluster_info.rs b/gossip/src/cluster_info.rs index 113b387512608d..8bfe628da8c441 100644 --- a/gossip/src/cluster_info.rs +++ b/gossip/src/cluster_info.rs @@ -267,7 +267,7 @@ pub fn make_accounts_hashes_message( pub(crate) type Ping = ping_pong::Ping<[u8; GOSSIP_PING_TOKEN_SIZE]>; // TODO These messages should go through the gpu pipeline for spam filtering -#[frozen_abi(digest = "EnbW8mYTsPMndq9NkHLTkHJgduXvWSfSD6bBdmqQ8TiF")] +#[frozen_abi(digest = "CVvKB495YW6JN4w1rWwajyZmG5wvNhmD97V99rSv9fGw")] #[derive(Serialize, Deserialize, Debug, AbiEnumVisitor, AbiExample)] #[allow(clippy::large_enum_variant)] pub(crate) enum Protocol { @@ -393,7 +393,8 @@ fn retain_staked(values: &mut Vec, stakes: &HashMap) { CrdsData::AccountsHashes(_) => true, CrdsData::LowestSlot(_, _) | CrdsData::LegacyVersion(_) - | CrdsData::DuplicateShred(_, _) => { + | CrdsData::DuplicateShred(_, _) + | CrdsData::RestartLastVotedForkSlots(_) => { let stake = stakes.get(&value.pubkey()).copied(); stake.unwrap_or_default() >= MIN_STAKE_FOR_GOSSIP } @@ -4020,7 +4021,7 @@ mod tests { ClusterInfo::split_gossip_messages(PUSH_MESSAGE_MAX_PAYLOAD_SIZE, values.clone()) .collect(); let self_pubkey = solana_sdk::pubkey::new_rand(); - assert!(splits.len() * 3 < NUM_CRDS_VALUES); + assert!(splits.len() * 2 < NUM_CRDS_VALUES); // Assert that all messages are included in the splits. assert_eq!(NUM_CRDS_VALUES, splits.iter().map(Vec::len).sum::()); splits diff --git a/gossip/src/cluster_info_metrics.rs b/gossip/src/cluster_info_metrics.rs index 095848fd2932ca..fbb7365387aad9 100644 --- a/gossip/src/cluster_info_metrics.rs +++ b/gossip/src/cluster_info_metrics.rs @@ -627,6 +627,16 @@ pub(crate) fn submit_gossip_stats( ("SnapshotHashes-pull", crds_stats.pull.counts[10], i64), ("ContactInfo-push", crds_stats.push.counts[11], i64), ("ContactInfo-pull", crds_stats.pull.counts[11], i64), + ( + "RestartLastVotedForkSlots-push", + crds_stats.push.counts[12], + i64 + ), + ( + "RestartLastVotedForkSlots-pull", + crds_stats.pull.counts[12], + i64 + ), ( "all-push", crds_stats.push.counts.iter().sum::(), @@ -664,6 +674,16 @@ pub(crate) fn submit_gossip_stats( ("SnapshotHashes-pull", crds_stats.pull.fails[10], i64), ("ContactInfo-push", crds_stats.push.fails[11], i64), ("ContactInfo-pull", crds_stats.pull.fails[11], i64), + ( + "RestartLastVotedForkSlots-push", + crds_stats.push.fails[12], + i64 + ), + ( + "RestartLastVotedForkSlots-pull", + crds_stats.pull.fails[12], + i64 + ), ("all-push", crds_stats.push.fails.iter().sum::(), i64), ("all-pull", crds_stats.pull.fails.iter().sum::(), i64), ); diff --git a/gossip/src/crds.rs b/gossip/src/crds.rs index b20ba9dfb15647..5ce3cf5ec56065 100644 --- a/gossip/src/crds.rs +++ b/gossip/src/crds.rs @@ -103,7 +103,7 @@ pub enum GossipRoute<'a> { PushMessage(/*from:*/ &'a Pubkey), } -type CrdsCountsArray = [usize; 12]; +type CrdsCountsArray = [usize; 13]; pub(crate) struct CrdsDataStats { pub(crate) counts: CrdsCountsArray, @@ -721,6 +721,7 @@ impl CrdsDataStats { CrdsData::DuplicateShred(_, _) => 9, CrdsData::SnapshotHashes(_) => 10, CrdsData::ContactInfo(_) => 11, + CrdsData::RestartLastVotedForkSlots(_) => 12, // Update CrdsCountsArray if new items are added here. } } diff --git a/gossip/src/crds_value.rs b/gossip/src/crds_value.rs index 125555ea51eeb4..63efa141bdf129 100644 --- a/gossip/src/crds_value.rs +++ b/gossip/src/crds_value.rs @@ -1,10 +1,10 @@ use { crate::{ - cluster_info::MAX_ACCOUNTS_HASHES, + cluster_info::{MAX_ACCOUNTS_HASHES, MAX_CRDS_OBJECT_SIZE}, contact_info::ContactInfo, deprecated, duplicate_shred::{DuplicateShred, DuplicateShredIndex, MAX_DUPLICATE_SHREDS}, - epoch_slots::EpochSlots, + epoch_slots::{CompressedSlots, EpochSlots, MAX_SLOTS_PER_ENTRY}, legacy_contact_info::LegacyContactInfo, }, bincode::{serialize, serialized_size}, @@ -94,6 +94,7 @@ pub enum CrdsData { DuplicateShred(DuplicateShredIndex, DuplicateShred), SnapshotHashes(SnapshotHashes), ContactInfo(ContactInfo), + RestartLastVotedForkSlots(RestartLastVotedForkSlots), } impl Sanitize for CrdsData { @@ -132,6 +133,7 @@ impl Sanitize for CrdsData { } CrdsData::SnapshotHashes(val) => val.sanitize(), CrdsData::ContactInfo(node) => node.sanitize(), + CrdsData::RestartLastVotedForkSlots(slots) => slots.sanitize(), } } } @@ -145,7 +147,7 @@ pub(crate) fn new_rand_timestamp(rng: &mut R) -> u64 { impl CrdsData { /// New random CrdsData for tests and benchmarks. fn new_rand(rng: &mut R, pubkey: Option) -> CrdsData { - let kind = rng.gen_range(0..7); + let kind = rng.gen_range(0..8); // TODO: Implement other kinds of CrdsData here. // TODO: Assign ranges to each arm proportional to their frequency in // the mainnet crds table. @@ -157,6 +159,9 @@ impl CrdsData { 3 => CrdsData::AccountsHashes(AccountsHashes::new_rand(rng, pubkey)), 4 => CrdsData::Version(Version::new_rand(rng, pubkey)), 5 => CrdsData::Vote(rng.gen_range(0..MAX_VOTES), Vote::new_rand(rng, pubkey)), + 6 => CrdsData::RestartLastVotedForkSlots(RestartLastVotedForkSlots::new_rand( + rng, pubkey, + )), _ => CrdsData::EpochSlots( rng.gen_range(0..MAX_EPOCH_SLOTS), EpochSlots::new_rand(rng, pubkey), @@ -485,6 +490,87 @@ impl Sanitize for NodeInstance { } } +#[derive(Serialize, Deserialize, Clone, Default, PartialEq, Eq, AbiExample, Debug)] +pub struct RestartLastVotedForkSlots { + pub from: Pubkey, + pub wallclock: u64, + pub slots: Vec, + pub last_voted_hash: Hash, + pub shred_version: u16, +} + +impl Sanitize for RestartLastVotedForkSlots { + fn sanitize(&self) -> std::result::Result<(), SanitizeError> { + if self.slots.is_empty() { + return Err(SanitizeError::InvalidValue); + } + self.slots.sanitize()?; + self.last_voted_hash.sanitize() + } +} + +impl RestartLastVotedForkSlots { + pub fn new(from: Pubkey, now: u64, last_voted_hash: Hash, shred_version: u16) -> Self { + Self { + from, + wallclock: now, + slots: Vec::new(), + last_voted_hash, + shred_version, + } + } + + /// New random Version for tests and benchmarks. + pub fn new_rand(rng: &mut R, pubkey: Option) -> Self { + let pubkey = pubkey.unwrap_or_else(solana_sdk::pubkey::new_rand); + let mut result = + RestartLastVotedForkSlots::new(pubkey, new_rand_timestamp(rng), Hash::new_unique(), 1); + let num_slots = rng.gen_range(2..20); + let mut slots = std::iter::repeat_with(|| 47825632 + rng.gen_range(0..512)) + .take(num_slots) + .collect::>(); + slots.sort(); + result.fill(&slots); + result + } + + pub fn fill(&mut self, slots: &[Slot]) -> usize { + let slots = &slots[slots.len().saturating_sub(MAX_SLOTS_PER_ENTRY)..]; + let mut num = 0; + let space = self.max_compressed_slot_size(); + if space == 0 { + return 0; + } + while num < slots.len() { + let mut cslot = CompressedSlots::new(space as usize); + num += cslot.add(&slots[num..]); + self.slots.push(cslot); + } + num + } + + pub fn deflate(&mut self) { + for s in self.slots.iter_mut() { + let _ = s.deflate(); + } + } + + pub fn max_compressed_slot_size(&self) -> isize { + let len_header = serialized_size(self).unwrap(); + let len_slot = serialized_size(&CompressedSlots::default()).unwrap(); + MAX_CRDS_OBJECT_SIZE as isize - (len_header + len_slot) as isize + } + + pub fn to_slots(&self, min_slot: Slot) -> Vec { + self.slots + .iter() + .filter(|s| min_slot < s.first_slot() + s.num_slots() as u64) + .filter_map(|s| s.to_slots(min_slot).ok()) + .flatten() + .collect() + } +} + /// Type of the replicated value /// These are labels for values in a record that is associated with `Pubkey` #[derive(PartialEq, Hash, Eq, Clone, Debug)] @@ -501,6 +587,7 @@ pub enum CrdsValueLabel { DuplicateShred(DuplicateShredIndex, Pubkey), SnapshotHashes(Pubkey), ContactInfo(Pubkey), + RestartLastVotedForkSlots(Pubkey), } impl fmt::Display for CrdsValueLabel { @@ -524,6 +611,9 @@ impl fmt::Display for CrdsValueLabel { write!(f, "SnapshotHashes({})", self.pubkey()) } CrdsValueLabel::ContactInfo(_) => write!(f, "ContactInfo({})", self.pubkey()), + CrdsValueLabel::RestartLastVotedForkSlots(_) => { + write!(f, "RestartLastVotedForkSlots({})", self.pubkey()) + } } } } @@ -543,6 +633,7 @@ impl CrdsValueLabel { CrdsValueLabel::DuplicateShred(_, p) => *p, CrdsValueLabel::SnapshotHashes(p) => *p, CrdsValueLabel::ContactInfo(pubkey) => *pubkey, + CrdsValueLabel::RestartLastVotedForkSlots(p) => *p, } } } @@ -593,6 +684,7 @@ impl CrdsValue { CrdsData::DuplicateShred(_, shred) => shred.wallclock, CrdsData::SnapshotHashes(hash) => hash.wallclock, CrdsData::ContactInfo(node) => node.wallclock(), + CrdsData::RestartLastVotedForkSlots(slots) => slots.wallclock, } } pub fn pubkey(&self) -> Pubkey { @@ -609,6 +701,7 @@ impl CrdsValue { CrdsData::DuplicateShred(_, shred) => shred.from, CrdsData::SnapshotHashes(hash) => hash.from, CrdsData::ContactInfo(node) => *node.pubkey(), + CrdsData::RestartLastVotedForkSlots(slots) => slots.from, } } pub fn label(&self) -> CrdsValueLabel { @@ -627,6 +720,9 @@ impl CrdsValue { CrdsData::DuplicateShred(ix, shred) => CrdsValueLabel::DuplicateShred(*ix, shred.from), CrdsData::SnapshotHashes(_) => CrdsValueLabel::SnapshotHashes(self.pubkey()), CrdsData::ContactInfo(node) => CrdsValueLabel::ContactInfo(*node.pubkey()), + CrdsData::RestartLastVotedForkSlots(_) => { + CrdsValueLabel::RestartLastVotedForkSlots(self.pubkey()) + } } } pub fn contact_info(&self) -> Option<&LegacyContactInfo> { @@ -1073,4 +1169,58 @@ mod test { assert!(node.should_force_push(&pubkey)); assert!(!node.should_force_push(&Pubkey::new_unique())); } + + #[test] + fn test_restart_last_voted_fork_slots() { + let keypair = Keypair::new(); + let slot = 53; + let slot_parent = slot - 5; + let shred_version = 21; + let mut slots = RestartLastVotedForkSlots::new( + keypair.pubkey(), + timestamp(), + Hash::default(), + shred_version, + ); + let original_slots_vec = [slot_parent, slot]; + slots.fill(&original_slots_vec); + let value = + CrdsValue::new_signed(CrdsData::RestartLastVotedForkSlots(slots.clone()), &keypair); + assert_eq!(value.sanitize(), Ok(())); + let label = value.label(); + assert_eq!( + label, + CrdsValueLabel::RestartLastVotedForkSlots(keypair.pubkey()) + ); + assert_eq!(label.pubkey(), keypair.pubkey()); + assert_eq!(value.wallclock(), slots.wallclock); + let retrived_slots = slots.to_slots(0); + assert_eq!(retrived_slots.len(), 2); + assert_eq!(retrived_slots[0], slot_parent); + assert_eq!(retrived_slots[1], slot); + + let empty_slots = RestartLastVotedForkSlots::new( + keypair.pubkey(), + timestamp(), + Hash::default(), + shred_version, + ); + let bad_value = + CrdsValue::new_signed(CrdsData::RestartLastVotedForkSlots(empty_slots), &keypair); + assert_eq!(bad_value.sanitize(), Err(SanitizeError::InvalidValue)); + + let last_slot: Slot = (MAX_SLOTS_PER_ENTRY + 10).try_into().unwrap(); + let mut large_slots = RestartLastVotedForkSlots::new( + keypair.pubkey(), + timestamp(), + Hash::default(), + shred_version, + ); + let large_slots_vec: Vec = (0..last_slot + 1).collect(); + large_slots.fill(&large_slots_vec); + let retrived_slots = large_slots.to_slots(0); + assert_eq!(retrived_slots.len(), MAX_SLOTS_PER_ENTRY); + assert_eq!(retrived_slots.first(), Some(&11)); + assert_eq!(retrived_slots.last(), Some(&last_slot)); + } } diff --git a/gossip/src/epoch_slots.rs b/gossip/src/epoch_slots.rs index 186a17aa6ec255..c589e348143f7d 100644 --- a/gossip/src/epoch_slots.rs +++ b/gossip/src/epoch_slots.rs @@ -178,7 +178,7 @@ impl Default for CompressedSlots { } impl CompressedSlots { - fn new(max_size: usize) -> Self { + pub(crate) fn new(max_size: usize) -> Self { CompressedSlots::Uncompressed(Uncompressed::new(max_size)) }