diff --git a/core/src/cluster_info.rs b/core/src/cluster_info.rs index 66d7b2300f9748..d7cf3f58811b99 100644 --- a/core/src/cluster_info.rs +++ b/core/src/cluster_info.rs @@ -18,8 +18,8 @@ use crate::{ crds_gossip_error::CrdsGossipError, crds_gossip_pull::{CrdsFilter, ProcessPullStats, CRDS_GOSSIP_PULL_CRDS_TIMEOUT_MS}, crds_value::{ - self, CrdsData, CrdsValue, CrdsValueLabel, EpochSlotsIndex, LowestSlot, SnapshotHash, - Version, Vote, MAX_WALLCLOCK, + self, CrdsData, CrdsValue, CrdsValueLabel, EpochSlotsIndex, LowestSlot, NodeInstance, + SnapshotHash, Version, Vote, MAX_WALLCLOCK, }, data_budget::DataBudget, epoch_slots::EpochSlots, @@ -298,6 +298,7 @@ pub struct ClusterInfo { stats: GossipStats, socket: UdpSocket, local_message_pending_push_queue: RwLock>, + instance: NodeInstance, } impl Default for ClusterInfo { @@ -422,7 +423,7 @@ pub fn make_accounts_hashes_message( type Ping = ping_pong::Ping<[u8; GOSSIP_PING_TOKEN_SIZE]>; // TODO These messages should go through the gpu pipeline for spam filtering -#[frozen_abi(digest = "8L3mKuv292LTa3XFCGNVdaFihWnsgYE4hf941p9gqUxF")] +#[frozen_abi(digest = "6PpTdBvyX37y5ERokb8DejgKobpsuTbFJC39f8Eqz7Vy")] #[derive(Serialize, Deserialize, Debug, AbiEnumVisitor, AbiExample)] #[allow(clippy::large_enum_variant)] enum Protocol { @@ -553,6 +554,7 @@ impl ClusterInfo { stats: GossipStats::default(), socket: UdpSocket::bind("0.0.0.0:0").unwrap(), local_message_pending_push_queue: RwLock::new(vec![]), + instance: NodeInstance::new(id, timestamp()), }; { let mut gossip = me.gossip.write().unwrap(); @@ -586,6 +588,7 @@ impl ClusterInfo { .unwrap() .clone(), ), + instance: NodeInstance::new(*new_id, timestamp()), } } @@ -606,16 +609,24 @@ impl ClusterInfo { ) { let now = timestamp(); self.my_contact_info.write().unwrap().wallclock = now; - let entry = - CrdsValue::new_signed(CrdsData::ContactInfo(self.my_contact_info()), &self.keypair); + let entries: Vec<_> = vec![ + CrdsData::ContactInfo(self.my_contact_info()), + CrdsData::NodeInstance(self.instance.with_wallclock(now)), + ] + .into_iter() + .map(|v| CrdsValue::new_signed(v, &self.keypair)) + .collect(); + { + let mut local_message_pending_push_queue = + self.local_message_pending_push_queue.write().unwrap(); + for entry in entries { + local_message_pending_push_queue.push((entry, now)); + } + } self.gossip .write() .unwrap() .refresh_push_active_set(stakes, gossip_validators); - self.local_message_pending_push_queue - .write() - .unwrap() - .push((entry, now)); } // TODO kill insert_info, only used by tests @@ -1789,9 +1800,14 @@ impl ClusterInfo { let mut last_contact_info_trace = timestamp(); let mut adopt_shred_version = self.my_shred_version() == 0; let recycler = PacketsRecycler::default(); - - let message = CrdsData::Version(Version::new(self.id())); - self.push_message(CrdsValue::new_signed(message, &self.keypair)); + let crds_data = vec![ + CrdsData::Version(Version::new(self.id())), + CrdsData::NodeInstance(self.instance.with_wallclock(timestamp())), + ]; + for value in crds_data { + let value = CrdsValue::new_signed(value, &self.keypair); + self.push_message(value); + } let mut generate_pull_requests = true; loop { let start = timestamp(); @@ -2487,8 +2503,8 @@ impl ClusterInfo { stakes: HashMap, feature_set: Option<&FeatureSet>, epoch_time_ms: u64, - ) { - let mut timer = Measure::start("process_gossip_packets_time"); + ) -> Result<()> { + let _st = ScopedTimer::from(&self.stats.process_gossip_packets_time); let packets: Vec<_> = thread_pool.install(|| { packets .into_par_iter() @@ -2501,6 +2517,16 @@ impl ClusterInfo { }) .collect() }); + // Check if there is a duplicate instance of + // this node with more recent timestamp. + let check_duplicate_instance = |values: &[CrdsValue]| { + for value in values { + if self.instance.check_duplicate(value) { + return Err(Error::DuplicateNodeInstance); + } + } + Ok(()) + }; // Split packets based on their types. let mut pull_requests = vec![]; let mut pull_responses = vec![]; @@ -2513,8 +2539,14 @@ impl ClusterInfo { Protocol::PullRequest(filter, caller) => { pull_requests.push((from_addr, filter, caller)) } - Protocol::PullResponse(from, data) => pull_responses.push((from, data)), - Protocol::PushMessage(from, data) => push_messages.push((from, data)), + Protocol::PullResponse(from, data) => { + check_duplicate_instance(&data)?; + pull_responses.push((from, data)); + } + Protocol::PushMessage(from, data) => { + check_duplicate_instance(&data)?; + push_messages.push((from, data)); + } Protocol::PruneMessage(from, data) => prune_messages.push((from, data)), Protocol::PingMessage(ping) => ping_messages.push((from_addr, ping)), Protocol::PongMessage(pong) => pong_messages.push((from_addr, pong)), @@ -2539,9 +2571,7 @@ impl ClusterInfo { response_sender, feature_set, ); - self.stats - .process_gossip_packets_time - .add_measure(&mut timer); + Ok(()) } /// Process messages from the network @@ -2588,7 +2618,7 @@ impl ClusterInfo { stakes, feature_set.as_deref(), epoch_time_ms, - ); + )?; self.print_reset_stats(last_print); @@ -2853,25 +2883,36 @@ impl ClusterInfo { .build() .unwrap(); let mut last_print = Instant::now(); - loop { - let e = self.run_listen( + while !exit.load(Ordering::Relaxed) { + if let Err(err) = self.run_listen( &recycler, bank_forks.as_ref(), &requests_receiver, &response_sender, &thread_pool, &mut last_print, - ); - if exit.load(Ordering::Relaxed) { - return; - } - if e.is_err() { - let r_gossip = self.gossip.read().unwrap(); - debug!( - "{}: run_listen timeout, table size: {}", - self.id(), - r_gossip.crds.len() - ); + ) { + match err { + Error::RecvTimeoutError(_) => { + let table_size = self.gossip.read().unwrap().crds.len(); + debug!( + "{}: run_listen timeout, table size: {}", + self.id(), + table_size, + ); + } + Error::DuplicateNodeInstance => { + error!( + "duplicate running instances of the same validator node: {}", + self.id() + ); + exit.store(true, Ordering::Relaxed); + // TODO: Pass through ValidatorExit here so + // that this will exit cleanly. + std::process::exit(1); + } + _ => error!("gossip run_listen failed: {}", err), + } } thread_mem_usage::datapoint("solana-listen"); } diff --git a/core/src/crds_gossip_push.rs b/core/src/crds_gossip_push.rs index edf9b028a9173d..d7552eef564222 100644 --- a/core/src/crds_gossip_push.rs +++ b/core/src/crds_gossip_push.rs @@ -247,7 +247,7 @@ impl CrdsGossipPush { for i in start..(start + push_fanout) { let index = i % self.active_set.len(); let (peer, filter) = self.active_set.get_index(index).unwrap(); - if !filter.contains(&origin) { + if !filter.contains(&origin) || value.should_force_push(peer) { trace!("new_push_messages insert {} {:?}", *peer, value); push_messages.entry(*peer).or_default().push(value.clone()); num_pushes += 1; diff --git a/core/src/crds_value.rs b/core/src/crds_value.rs index e27abcd17db830..aec4911bceaec0 100644 --- a/core/src/crds_value.rs +++ b/core/src/crds_value.rs @@ -79,6 +79,7 @@ pub enum CrdsData { EpochSlots(EpochSlotsIndex, EpochSlots), LegacyVersion(LegacyVersion), Version(Version), + NodeInstance(NodeInstance), } impl Sanitize for CrdsData { @@ -107,6 +108,7 @@ impl Sanitize for CrdsData { } CrdsData::LegacyVersion(version) => version.sanitize(), CrdsData::Version(version) => version.sanitize(), + CrdsData::NodeInstance(node) => node.sanitize(), } } } @@ -323,6 +325,55 @@ impl Version { } } +#[derive(Clone, Debug, PartialEq, AbiExample, Deserialize, Serialize)] +pub struct NodeInstance { + from: Pubkey, + wallclock: u64, + timestamp: u64, // Timestamp when the instance was created. + token: u64, // Randomly generated value at node instantiation. +} + +impl NodeInstance { + pub fn new(pubkey: Pubkey, now: u64) -> Self { + Self { + from: pubkey, + wallclock: now, + timestamp: now, + token: rand::thread_rng().gen(), + } + } + + // Clones the value with an updated wallclock. + pub fn with_wallclock(&self, now: u64) -> Self { + Self { + wallclock: now, + ..*self + } + } + + // Returns true if the crds-value is a duplicate instance + // of this node, with a more recent timestamp. + pub fn check_duplicate(&self, other: &CrdsValue) -> bool { + match &other.data { + CrdsData::NodeInstance(other) => { + self.token != other.token + && self.timestamp <= other.timestamp + && self.from == other.from + } + _ => false, + } + } +} + +impl Sanitize for NodeInstance { + fn sanitize(&self) -> Result<(), SanitizeError> { + if self.wallclock >= MAX_WALLCLOCK { + return Err(SanitizeError::ValueOutOfBounds); + } + self.from.sanitize() + } +} + /// Type of the replicated value /// These are labels for values in a record that is associated with `Pubkey` #[derive(PartialEq, Hash, Eq, Clone, Debug)] @@ -335,6 +386,7 @@ pub enum CrdsValueLabel { AccountsHashes(Pubkey), LegacyVersion(Pubkey), Version(Pubkey), + NodeInstance(Pubkey), } impl fmt::Display for CrdsValueLabel { @@ -348,6 +400,7 @@ impl fmt::Display for CrdsValueLabel { CrdsValueLabel::AccountsHashes(_) => write!(f, "AccountsHashes({})", self.pubkey()), CrdsValueLabel::LegacyVersion(_) => write!(f, "LegacyVersion({})", self.pubkey()), CrdsValueLabel::Version(_) => write!(f, "Version({})", self.pubkey()), + CrdsValueLabel::NodeInstance(_) => write!(f, "NodeInstance({})", self.pubkey()), } } } @@ -363,6 +416,7 @@ impl CrdsValueLabel { CrdsValueLabel::AccountsHashes(p) => *p, CrdsValueLabel::LegacyVersion(p) => *p, CrdsValueLabel::Version(p) => *p, + CrdsValueLabel::NodeInstance(p) => *p, } } } @@ -409,6 +463,7 @@ impl CrdsValue { CrdsData::EpochSlots(_, p) => p.wallclock, CrdsData::LegacyVersion(version) => version.wallclock, CrdsData::Version(version) => version.wallclock, + CrdsData::NodeInstance(node) => node.wallclock, } } pub fn pubkey(&self) -> Pubkey { @@ -421,6 +476,7 @@ impl CrdsValue { CrdsData::EpochSlots(_, p) => p.from, CrdsData::LegacyVersion(version) => version.from, CrdsData::Version(version) => version.from, + CrdsData::NodeInstance(node) => node.from, } } pub fn label(&self) -> CrdsValueLabel { @@ -433,6 +489,7 @@ impl CrdsValue { CrdsData::EpochSlots(ix, _) => CrdsValueLabel::EpochSlots(*ix, self.pubkey()), CrdsData::LegacyVersion(_) => CrdsValueLabel::LegacyVersion(self.pubkey()), CrdsData::Version(_) => CrdsValueLabel::Version(self.pubkey()), + CrdsData::NodeInstance(_) => CrdsValueLabel::NodeInstance(self.pubkey()), } } pub fn contact_info(&self) -> Option<&ContactInfo> { @@ -499,13 +556,14 @@ impl CrdsValue { /// Return all the possible labels for a record identified by Pubkey. pub fn record_labels(key: Pubkey) -> impl Iterator { - const CRDS_VALUE_LABEL_STUBS: [fn(Pubkey) -> CrdsValueLabel; 6] = [ + const CRDS_VALUE_LABEL_STUBS: [fn(Pubkey) -> CrdsValueLabel; 7] = [ CrdsValueLabel::ContactInfo, CrdsValueLabel::LowestSlot, CrdsValueLabel::SnapshotHashes, CrdsValueLabel::AccountsHashes, CrdsValueLabel::LegacyVersion, CrdsValueLabel::Version, + CrdsValueLabel::NodeInstance, ]; CRDS_VALUE_LABEL_STUBS .iter() @@ -545,6 +603,15 @@ impl CrdsValue { .vote_index() .expect("all values must be votes") } + + /// Returns true if, regardless of prunes, this crds-value + /// should be pushed to the receiving node. + pub fn should_force_push(&self, peer: &Pubkey) -> bool { + match &self.data { + CrdsData::NodeInstance(node) => node.from == *peer, + _ => false, + } + } } /// Filters out an iterator of crds values, returning @@ -584,7 +651,7 @@ mod test { #[test] fn test_labels() { - let mut hits = [false; 6 + MAX_VOTES as usize + MAX_EPOCH_SLOTS as usize]; + let mut hits = [false; 7 + MAX_VOTES as usize + MAX_EPOCH_SLOTS as usize]; // this method should cover all the possible labels for v in CrdsValue::record_labels(Pubkey::default()) { match &v { @@ -594,9 +661,10 @@ mod test { CrdsValueLabel::AccountsHashes(_) => hits[3] = true, CrdsValueLabel::LegacyVersion(_) => hits[4] = true, CrdsValueLabel::Version(_) => hits[5] = true, - CrdsValueLabel::Vote(ix, _) => hits[*ix as usize + 6] = true, + CrdsValueLabel::NodeInstance(_) => hits[6] = true, + CrdsValueLabel::Vote(ix, _) => hits[*ix as usize + 7] = true, CrdsValueLabel::EpochSlots(ix, _) => { - hits[*ix as usize + MAX_VOTES as usize + 6] = true + hits[*ix as usize + MAX_VOTES as usize + 7] = true } } } @@ -806,4 +874,80 @@ mod test { // cannot be more than 5 times number of keys. assert!(currents.len() <= keys.len() * 5); } + + #[test] + fn test_check_duplicate_instance() { + fn make_crds_value(node: NodeInstance) -> CrdsValue { + CrdsValue::new_unsigned(CrdsData::NodeInstance(node)) + } + let now = timestamp(); + let mut rng = rand::thread_rng(); + let pubkey = Pubkey::new_unique(); + let node = NodeInstance::new(pubkey, now); + // Same token is not a duplicate. + assert!(!node.check_duplicate(&make_crds_value(NodeInstance { + from: pubkey, + wallclock: now + 1, + timestamp: now + 1, + token: node.token, + }))); + // Older timestamp is not a duplicate. + assert!(!node.check_duplicate(&make_crds_value(NodeInstance { + from: pubkey, + wallclock: now + 1, + timestamp: now - 1, + token: rng.gen(), + }))); + // Updated wallclock is not a duplicate. + let other = node.with_wallclock(now + 8); + assert_eq!( + other, + NodeInstance { + from: pubkey, + wallclock: now + 8, + timestamp: now, + token: node.token, + } + ); + assert!(!node.check_duplicate(&make_crds_value(other))); + // Duplicate instance. + assert!(node.check_duplicate(&make_crds_value(NodeInstance { + from: pubkey, + wallclock: 0, + timestamp: now, + token: rng.gen(), + }))); + // Different pubkey is not a duplicate. + assert!(!node.check_duplicate(&make_crds_value(NodeInstance { + from: Pubkey::new_unique(), + wallclock: now + 1, + timestamp: now + 1, + token: rng.gen(), + }))); + // Differnt crds value is not a duplicate. + assert!( + !node.check_duplicate(&CrdsValue::new_unsigned(CrdsData::ContactInfo( + ContactInfo::new_rand(&mut rng, Some(pubkey)) + ))) + ); + } + + #[test] + fn test_should_force_push() { + let mut rng = rand::thread_rng(); + let pubkey = Pubkey::new_unique(); + assert!( + !CrdsValue::new_unsigned(CrdsData::ContactInfo(ContactInfo::new_rand( + &mut rng, + Some(pubkey), + ))) + .should_force_push(&pubkey) + ); + let node = CrdsValue::new_unsigned(CrdsData::NodeInstance(NodeInstance::new( + pubkey, + timestamp(), + ))); + assert!(node.should_force_push(&pubkey)); + assert!(!node.should_force_push(&Pubkey::new_unique())); + } } diff --git a/core/src/result.rs b/core/src/result.rs index c633be554b04c7..f46ac16fbed38d 100644 --- a/core/src/result.rs +++ b/core/src/result.rs @@ -32,6 +32,7 @@ pub enum Error { FsExtra(fs_extra::error::Error), SnapshotError(snapshot_utils::SnapshotError), WeightedIndexError(rand::distributions::weighted::WeightedError), + DuplicateNodeInstance, } pub type Result = std::result::Result;