diff --git a/core/src/cluster_info.rs b/core/src/cluster_info.rs index e6f4bf457c7437..a83b71713f6b8d 100644 --- a/core/src/cluster_info.rs +++ b/core/src/cluster_info.rs @@ -67,11 +67,11 @@ pub const GOSSIP_SLEEP_MILLIS: u64 = 100; /// the number of slots to respond with when responding to `Orphan` requests pub const MAX_ORPHAN_REPAIR_RESPONSES: usize = 10; /// The maximum size of a bloom filter -pub const MAX_BLOOM_SIZE: usize = 1030; +pub const MAX_BLOOM_SIZE: usize = 1028; /// The maximum size of a protocol payload const MAX_PROTOCOL_PAYLOAD_SIZE: u64 = PACKET_DATA_SIZE as u64 - MAX_PROTOCOL_HEADER_SIZE; /// The largest protocol header size -const MAX_PROTOCOL_HEADER_SIZE: u64 = 202; +const MAX_PROTOCOL_HEADER_SIZE: u64 = 204; #[derive(Debug, PartialEq, Eq)] pub enum ClusterInfoError { @@ -272,7 +272,7 @@ impl ClusterInfo { let ip_addr = node.gossip.ip(); format!( - "{:15} {:2}| {:5} | {:44} | {:5}| {:5}| {:5} | {:5}| {:5} | {:5}| {:5} | {:5}| {:5}\n", + "{:15} {:2}| {:5} | {:44} | {:5}| {:5}| {:5} | {:5}| {:5} | {:5}| {:5} | {:5}| {:5}| v{}\n", if ContactInfo::is_valid_address(&node.gossip) { ip_addr.to_string() } else { @@ -290,15 +290,16 @@ impl ClusterInfo { addr_to_string(&ip_addr, &node.storage_addr), addr_to_string(&ip_addr, &node.rpc), addr_to_string(&ip_addr, &node.rpc_pubsub), + node.shred_version, ) }) .collect(); format!( "IP Address |Age(ms)| Node identifier \ - |Gossip| TPU |TPU fwd| TVU |TVU fwd|Repair|Storage| RPC |PubSub\n\ + |Gossip| TPU |TPU fwd| TVU |TVU fwd|Repair|Storage| RPC |PubSub|ShredVer\n\ ------------------+-------+----------------------------------------------+\ - ------+------+-------+------+-------+------+-------+------+------\n\ + ------+------+-------+------+-------+------+-------+------+------+--------\n\ {}\ Nodes: {}{}{}", nodes.join(""), @@ -405,13 +406,13 @@ impl ClusterInfo { } pub fn rpc_peers(&self) -> Vec { - let me = self.my_data().id; + let me = self.my_data(); self.gossip .crds .table .values() .filter_map(|x| x.value.contact_info()) - .filter(|x| x.id != me) + .filter(|x| x.id != me.id) .filter(|x| ContactInfo::is_valid_address(&x.rpc)) .cloned() .collect() @@ -446,7 +447,7 @@ impl ClusterInfo { /// all validators that have a valid tvu port. pub fn tvu_peers(&self) -> Vec { - let me = self.my_data().id; + let me = self.my_data(); self.gossip .crds .table @@ -454,34 +455,34 @@ impl ClusterInfo { .filter_map(|x| x.value.contact_info()) .filter(|x| ContactInfo::is_valid_address(&x.tvu)) .filter(|x| !ClusterInfo::is_archiver(x)) - .filter(|x| x.id != me) + .filter(|x| x.id != me.id) .cloned() .collect() } /// all peers that have a valid storage addr pub fn storage_peers(&self) -> Vec { - let me = self.my_data().id; + let me = self.my_data(); self.gossip .crds .table .values() .filter_map(|x| x.value.contact_info()) .filter(|x| ContactInfo::is_valid_address(&x.storage_addr)) - .filter(|x| x.id != me) + .filter(|x| x.id != me.id) .cloned() .collect() } /// all peers that have a valid tvu pub fn retransmit_peers(&self) -> Vec { - let me = self.my_data().id; + let me = self.my_data(); self.gossip .crds .table .values() .filter_map(|x| x.value.contact_info()) - .filter(|x| x.id != me) + .filter(|x| x.id != me.id) .filter(|x| ContactInfo::is_valid_address(&x.tvu)) .filter(|x| ContactInfo::is_valid_address(&x.tvu_forwards)) .cloned() @@ -490,10 +491,10 @@ impl ClusterInfo { /// all tvu peers with valid gossip addrs that likely have the slot being requested fn repair_peers(&self, slot: Slot) -> Vec { - let me = self.my_data().id; + let me = self.my_data(); ClusterInfo::tvu_peers(self) .into_iter() - .filter(|x| x.id != me) + .filter(|x| x.id != me.id) .filter(|x| ContactInfo::is_valid_address(&x.gossip)) .filter(|x| { self.get_epoch_state_for_node(&x.id, None) @@ -2575,7 +2576,7 @@ mod tests { #[test] fn test_split_messages_packet_size() { - // Test that if a value is smaller than payload size but too large to be wrappe in a vec + // Test that if a value is smaller than payload size but too large to be wrapped in a vec // that it is still dropped let payload: Vec = vec![]; let vec_size = serialized_size(&payload).unwrap(); @@ -2589,7 +2590,7 @@ mod tests { })); let mut i = 0; - while value.size() < desired_size { + while value.size() <= desired_size { let slots = (0..i).collect::>(); if slots.len() > 200 { panic!( diff --git a/core/src/contact_info.rs b/core/src/contact_info.rs index d64dfb04b4fec0..69efba1176160d 100644 --- a/core/src/contact_info.rs +++ b/core/src/contact_info.rs @@ -31,6 +31,8 @@ pub struct ContactInfo { pub rpc_pubsub: SocketAddr, /// latest wallclock picked pub wallclock: u64, + /// node shred version + pub shred_version: u16, } impl Ord for ContactInfo { @@ -84,6 +86,7 @@ impl Default for ContactInfo { rpc: socketaddr_any!(), rpc_pubsub: socketaddr_any!(), wallclock: 0, + shred_version: 0, } } } @@ -115,6 +118,7 @@ impl ContactInfo { rpc, rpc_pubsub, wallclock: now, + shred_version: 0, } } diff --git a/core/src/gossip_service.rs b/core/src/gossip_service.rs index 81e235c435052b..119c28c09370b4 100644 --- a/core/src/gossip_service.rs +++ b/core/src/gossip_service.rs @@ -199,7 +199,6 @@ fn spy( .unwrap() .tvu_peers() .into_iter() - .filter(|node| !ClusterInfo::is_archiver(&node)) .collect::>(); archivers = spy_ref.read().unwrap().storage_peers(); if let Some(num) = num_nodes { diff --git a/core/src/validator.rs b/core/src/validator.rs index 856eda52b8ed8d..480a4232afd8a4 100644 --- a/core/src/validator.rs +++ b/core/src/validator.rs @@ -145,8 +145,6 @@ impl Validator { info!("entrypoint: {:?}", entrypoint_info_option); - Self::print_node_info(&node); - info!("Initializing sigverify, this could take a while..."); sigverify::init(); info!("Done."); @@ -177,8 +175,6 @@ impl Validator { let bank = bank_forks[bank_info.bank_slot].clone(); let bank_forks = Arc::new(RwLock::new(bank_forks)); let block_commitment_cache = Arc::new(RwLock::new(BlockCommitmentCache::default())); - // The version used by shreds, derived from genesis - let shred_version = Shred::version_from_hash(&genesis_hash); let mut validator_exit = ValidatorExit::default(); let exit_ = exit.clone(); @@ -186,6 +182,9 @@ impl Validator { let validator_exit = Arc::new(RwLock::new(Some(validator_exit))); node.info.wallclock = timestamp(); + node.info.shred_version = Shred::version_from_hash(&genesis_hash); + Self::print_node_info(&node); + let cluster_info = Arc::new(RwLock::new(ClusterInfo::new( node.info.clone(), keypair.clone(), @@ -372,7 +371,7 @@ impl Validator { block_commitment_cache, config.dev_sigverify_disabled, config.partition_cfg.clone(), - shred_version, + node.info.shred_version, transaction_status_sender.clone(), ); @@ -392,7 +391,7 @@ impl Validator { &blockstore, &config.broadcast_stage_type, &exit, - shred_version, + node.info.shred_version, ); datapoint_info!("validator-new", ("id", id.to_string(), String)); @@ -615,6 +614,7 @@ fn get_stake_percent_in_gossip( let mut gossip_stake = 0; let mut total_activated_stake = 0; let tvu_peers = cluster_info.read().unwrap().tvu_peers(); + let me = cluster_info.read().unwrap().my_data(); for (activated_stake, vote_account) in bank.vote_accounts().values() { let vote_state = @@ -622,6 +622,7 @@ fn get_stake_percent_in_gossip( total_activated_stake += activated_stake; if tvu_peers .iter() + .filter(|peer| peer.shred_version == me.shred_version) .any(|peer| peer.id == vote_state.node_pubkey) { trace!(