From f9b37ae1ceb9cb6974504767d0da788168adf083 Mon Sep 17 00:00:00 2001 From: behzad nouri Date: Tue, 6 Oct 2020 15:46:00 -0400 Subject: [PATCH 01/15] implements ping-pong packets between nodes MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit https://hackerone.com/reports/991106 > It’s possible to use UDP gossip protocol to amplify DDoS attacks. An attacker > can spoof IP address in UDP packet when sending PullRequest to the node. > There's no any validation if provided source IP address is not spoofed and > the node can send much larger PullResponse to victim's IP. As I checked, > PullRequest is about 290 bytes, while PullResponse is about 10 kB. It means > that amplification is about 34x. This way an attacker can easily perform DDoS > attack both on Solana node and third-party server. > > To prevent it, need for example to implement ping-pong mechanism similar as > in Ethereum: Before accepting requests from remote client needs to validate > his IP. Local node sends Ping packet to the remote node and it needs to reply > with Pong packet that contains hash of matching Ping packet. Content of Ping > packet is unpredictable. If hash from Pong packet matches, local node can > remember IP where Ping packet was sent as correct and allow further > communication. > > More info: > https://github.com/ethereum/devp2p/blob/master/discv4.md#endpoint-proof > https://github.com/ethereum/devp2p/blob/master/discv4.md#wire-protocol The commit adds a PingCache, which maintains records of remote nodes which have returned a valid response to a ping message, and on-the-fly ping messages pending a pong response from the remote node. When handling pull-requests, those from addresses which have not passed the ping-pong check are filtered out, and additionally ping packets are added for addresses which need to be (re)verified. --- Cargo.lock | 11 ++ core/Cargo.toml | 1 + core/src/cluster_info.rs | 163 +++++++++++++++++---- core/src/crds_gossip.rs | 7 +- core/src/crds_gossip_pull.rs | 26 ++-- core/src/lib.rs | 1 + core/src/ping_pong.rs | 269 +++++++++++++++++++++++++++++++++++ core/tests/crds_gossip.rs | 5 +- 8 files changed, 440 insertions(+), 43 deletions(-) create mode 100644 core/src/ping_pong.rs diff --git a/Cargo.lock b/Cargo.lock index ec1331eb7e5084..07996796f9a0ac 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1374,6 +1374,7 @@ version = "0.8.2" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "e91b62f79061a0bc2e046024cb7ba44b08419ed238ecbd9adbd787434b9e8c25" dependencies = [ + "ahash", "autocfg 1.0.0", ] @@ -1991,6 +1992,15 @@ dependencies = [ "cfg-if", ] +[[package]] +name = "lru" +version = "0.6.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "111b945ac72ec09eb7bc62a0fbdc3cc6e80555a7245f52a69d3921a75b53b153" +dependencies = [ + "hashbrown", +] + [[package]] name = "matches" version = "0.1.8" @@ -3805,6 +3815,7 @@ dependencies = [ "jsonrpc-pubsub", "jsonrpc-ws-server", "log 0.4.8", + "lru", "matches", "num-traits", "num_cpus", diff --git a/core/Cargo.toml b/core/Cargo.toml index 6fec41d1113fd4..7767804ecc915c 100644 --- a/core/Cargo.toml +++ b/core/Cargo.toml @@ -34,6 +34,7 @@ jsonrpc-http-server = "15.0.0" jsonrpc-pubsub = "15.0.0" jsonrpc-ws-server = "15.0.0" log = "0.4.8" +lru = "0.6.0" num_cpus = "1.13.0" num-traits = "0.2" rand = "0.7.0" diff --git a/core/src/cluster_info.rs b/core/src/cluster_info.rs index 0c83fb7a2f4596..b6da4e2d0090c4 100644 --- a/core/src/cluster_info.rs +++ b/core/src/cluster_info.rs @@ -23,6 +23,7 @@ use crate::{ }, data_budget::DataBudget, epoch_slots::EpochSlots, + ping_pong::{self, PingCache, Pong}, result::{Error, Result}, weighted_shuffle::weighted_shuffle, }; @@ -97,6 +98,9 @@ const MAX_GOSSIP_TRAFFIC: usize = 128_000_000 / PACKET_DATA_SIZE; /// Keep the number of snapshot hashes a node publishes under MAX_PROTOCOL_PAYLOAD_SIZE pub const MAX_SNAPSHOT_HASHES: usize = 16; +const GOSSIP_PING_TOKEN_SIZE: usize = 32; +const GOSSIP_PING_CACHE_CAPACITY: usize = 16384; +const GOSSIP_PING_CACHE_TTL: Duration = Duration::from_secs(640); #[derive(Debug, PartialEq, Eq)] pub enum ClusterInfoError { @@ -251,6 +255,7 @@ pub struct ClusterInfo { entrypoint: RwLock>, outbound_budget: DataBudget, my_contact_info: RwLock, + ping_cache: RwLock, id: Pubkey, stats: GossipStats, socket: UdpSocket, @@ -355,8 +360,10 @@ pub fn make_accounts_hashes_message( Some(CrdsValue::new_signed(message, keypair)) } +type Ping = ping_pong::Ping<[u8; GOSSIP_PING_TOKEN_SIZE]>; + // TODO These messages should go through the gpu pipeline for spam filtering -#[frozen_abi(digest = "21UweZ4WXK9RHypF97D1rEJQ4C8Bh4pw52SBSNAKxJvW")] +#[frozen_abi(digest = "3jHXixLRv6fuCykW47hBZSwFuwDjbZShR73GVQB6TjGr")] #[derive(Serialize, Deserialize, Debug, AbiEnumVisitor, AbiExample)] #[allow(clippy::large_enum_variant)] enum Protocol { @@ -365,6 +372,8 @@ enum Protocol { PullResponse(Pubkey, Vec), PushMessage(Pubkey, Vec), PruneMessage(Pubkey, PruneData), + PingMessage(Ping), + PongMessage(Pong), } impl Protocol { @@ -416,6 +425,22 @@ impl Protocol { None } } + Protocol::PingMessage(ref ping) => { + if ping.verify() { + Some(self) + } else { + inc_new_counter_info!("cluster_info-gossip_ping_msg_verify_fail", 1); + None + } + } + Protocol::PongMessage(ref pong) => { + if pong.verify() { + Some(self) + } else { + inc_new_counter_info!("cluster_info-gossip_pong_msg_verify_fail", 1); + None + } + } } } } @@ -430,6 +455,8 @@ impl Sanitize for Protocol { Protocol::PullResponse(_, val) => val.sanitize(), Protocol::PushMessage(_, val) => val.sanitize(), Protocol::PruneMessage(_, val) => val.sanitize(), + Protocol::PingMessage(ping) => ping.sanitize(), + Protocol::PongMessage(pong) => pong.sanitize(), } } } @@ -459,6 +486,10 @@ impl ClusterInfo { entrypoint: RwLock::new(None), outbound_budget: DataBudget::default(), my_contact_info: RwLock::new(contact_info), + ping_cache: RwLock::new(PingCache::new( + GOSSIP_PING_CACHE_TTL, + GOSSIP_PING_CACHE_CAPACITY, + )), id, stats: GossipStats::default(), socket: UdpSocket::bind("0.0.0.0:0").unwrap(), @@ -486,6 +517,7 @@ impl ClusterInfo { entrypoint: RwLock::new(self.entrypoint.read().unwrap().clone()), outbound_budget: self.outbound_budget.clone_non_atomic(), my_contact_info: RwLock::new(my_contact_info), + ping_cache: RwLock::new(self.ping_cache.read().unwrap().clone()), id: *new_id, stats: GossipStats::default(), socket: UdpSocket::bind("0.0.0.0:0").unwrap(), @@ -1857,6 +1889,31 @@ impl ClusterInfo { ("prune_message", (allocated.get() - start) as i64, i64), ); } + Protocol::PingMessage(ping) => { + let start = allocated.get(); + if !ping.verify() { + inc_new_counter_info!("cluster_info-gossip_ping_msg_verify_fail", 1); + } else if let Some(rsp) = self.handle_ping_message(&ping, &from_addr, recycler) + { + let _ = response_sender.send(rsp); + } + datapoint_debug!( + "solana-gossip-listen-memory", + ("ping_message", (allocated.get() - start) as i64, i64), + ); + } + Protocol::PongMessage(pong) => { + let start = allocated.get(); + if pong.verify() { + self.handle_pong_message(&pong, from_addr); + } else { + inc_new_counter_info!("cluster_info-gossip_pong_msg_verify_fail", 1); + } + datapoint_debug!( + "solana-gossip-listen-memory", + ("pong_message", (allocated.get() - start) as i64, i64), + ); + } } } @@ -1870,7 +1927,7 @@ impl ClusterInfo { .pull_requests_count .add_relaxed(gossip_pull_data.len() as u64); let rsp = self.handle_pull_requests(recycler, gossip_pull_data, stakes, feature_set); - if let Some(rsp) = rsp { + if !rsp.is_empty() { let _ignore_disconnect = response_sender.send(rsp); } } @@ -1890,6 +1947,45 @@ impl ClusterInfo { }); } + // Filters out pull requests from invalid addresses, and those which have + // not passed the ping-pong check. Appends ping packets for the addresses + // which need to be (re)verified. + fn filter_pull_requests( + &self, + requests: Vec, + packets: &mut Packets, + ) -> Vec { + let mut rng = rand::thread_rng(); + let mut cache = HashMap::<(Pubkey, SocketAddr), bool>::new(); + let mut pingf = || Ping::new_rand(&mut rng, &self.keypair).ok(); + let mut ping_cache = self.ping_cache.write().unwrap(); + let now = Instant::now(); + let mut hard_check = |node| { + let (_check, ping) = ping_cache.check(now, node, &mut pingf); + if let Some(ping) = ping { + let ping = Protocol::PingMessage(ping); + let ping = Packet::from_data(&node.1, ping); + packets.packets.push(ping); + } + // TODO: For backward compatibility, this unconditionally returns + // true for now. This has to return _check, once nodes start + // responding to ping messages. + true + }; + // Because pull-responses are sent back to packet.meta.addr() of + // incoming pull-requests, pings are also sent to request.from_addr (as + // opposed to caller.gossip address). + requests + .into_iter() + .filter(|request| { + ContactInfo::is_valid_address(&request.from_addr) && { + let node = (request.caller.pubkey(), request.from_addr); + *cache.entry(node).or_insert_with(|| hard_check(node)) + } + }) + .collect() + } + // Pull requests take an incoming bloom filter of contained entries from a node // and tries to send back to them the values it detects are missing. fn handle_pull_requests( @@ -1898,21 +1994,22 @@ impl ClusterInfo { requests: Vec, stakes: &HashMap, feature_set: Option<&FeatureSet>, - ) -> Option { + ) -> Packets { if matches!(feature_set, Some(feature_set) if feature_set.is_active(&feature_set::pull_request_ping_pong_check::id())) { // TODO: add ping-pong check on pull-request addresses. } - // split the requests into addrs and filters - let mut caller_and_filters = vec![]; - let mut addrs = vec![]; let mut time = Measure::start("handle_pull_requests"); + self.time_gossip_write_lock("process_pull_reqs", &self.stats.process_pull_requests) + .process_pull_requests(requests.iter().map(|r| r.caller.clone()), timestamp()); self.update_data_budget(stakes.len()); - for pull_data in requests { - caller_and_filters.push((pull_data.caller, pull_data.filter)); - addrs.push(pull_data.from_addr); - } + let mut packets = Packets::new_with_recycler(recycler.clone(), 64, "handle_pull_requests"); + let (caller_and_filters, addrs): (Vec<_>, Vec<_>) = self + .filter_pull_requests(requests, &mut packets) + .into_iter() + .map(|r| ((r.caller, r.filter), r.from_addr)) + .unzip(); let now = timestamp(); let self_id = self.id(); @@ -1923,27 +2020,14 @@ impl ClusterInfo { ) .generate_pull_responses(&caller_and_filters, now); - self.time_gossip_write_lock("process_pull_reqs", &self.stats.process_pull_requests) - .process_pull_requests(caller_and_filters, now); - - // Filter bad to addresses let pull_responses: Vec<_> = pull_responses .into_iter() .zip(addrs.into_iter()) - .filter_map(|(responses, from_addr)| { - if !from_addr.ip().is_unspecified() - && from_addr.port() != 0 - && !responses.is_empty() - { - Some((responses, from_addr)) - } else { - None - } - }) + .filter(|(response, _)| !response.is_empty()) .collect(); if pull_responses.is_empty() { - return None; + return packets; } let mut stats: Vec<_> = pull_responses @@ -1975,7 +2059,6 @@ impl ClusterInfo { let rng = &mut ChaChaRng::from_seed(seed); let weighted_index = WeightedIndex::new(weights).unwrap(); - let mut packets = Packets::new_with_recycler(recycler.clone(), 64, "handle_pull_requests"); let mut total_bytes = 0; let mut sent = HashSet::new(); while sent.len() < stats.len() { @@ -2010,10 +2093,7 @@ impl ClusterInfo { stats.len(), total_bytes ); - if packets.is_empty() { - return None; - } - Some(packets) + packets } // Returns (failed, timeout, success) @@ -2100,6 +2180,29 @@ impl ClusterInfo { } } + fn handle_ping_message( + &self, + ping: &Ping, + addr: &SocketAddr, + recycler: &PacketsRecycler, + ) -> Option { + let pong = Pong::new(ping, &self.keypair).ok()?; + let pong = Protocol::PongMessage(pong); + let packets = Packets::new_with_recycler_data( + recycler, + "handle_ping_message", + vec![Packet::from_data(addr, pong)], + ); + Some(packets) + } + + fn handle_pong_message(&self, pong: &Pong, addr: SocketAddr) { + self.ping_cache + .write() + .unwrap() + .add(pong, addr, Instant::now()); + } + fn handle_push_message( &self, recycler: &PacketsRecycler, diff --git a/core/src/crds_gossip.rs b/core/src/crds_gossip.rs index ad95478f7ef0e7..e35d8d1eb72775 100644 --- a/core/src/crds_gossip.rs +++ b/core/src/crds_gossip.rs @@ -174,9 +174,12 @@ impl CrdsGossip { self.pull.mark_pull_request_creation_time(from, now) } /// process a pull request and create a response - pub fn process_pull_requests(&mut self, filters: Vec<(CrdsValue, CrdsFilter)>, now: u64) { + pub fn process_pull_requests(&mut self, callers: I, now: u64) + where + I: Iterator, + { self.pull - .process_pull_requests(&mut self.crds, filters, now); + .process_pull_requests(&mut self.crds, callers, now); } pub fn generate_pull_responses( diff --git a/core/src/crds_gossip_pull.rs b/core/src/crds_gossip_pull.rs index d895a8b7296905..036c1e42d086ac 100644 --- a/core/src/crds_gossip_pull.rs +++ b/core/src/crds_gossip_pull.rs @@ -273,20 +273,18 @@ impl CrdsGossipPull { } /// process a pull request - pub fn process_pull_requests( - &mut self, - crds: &mut Crds, - requests: Vec<(CrdsValue, CrdsFilter)>, - now: u64, - ) { - requests.into_iter().for_each(|(caller, _)| { + pub fn process_pull_requests(&mut self, crds: &mut Crds, callers: I, now: u64) + where + I: Iterator, + { + for caller in callers { let key = caller.label().pubkey(); if let Ok(Some(val)) = crds.insert(caller, now) { self.purged_values .push_back((val.value_hash, val.local_timestamp)); } crds.update_record_timestamp(&key, now); - }); + } } /// Create gossip responses to pull requests @@ -1087,7 +1085,11 @@ mod test { let (_, filters, caller) = req.unwrap(); let filters: Vec<_> = filters.into_iter().map(|f| (caller.clone(), f)).collect(); let rsp = dest.generate_pull_responses(&dest_crds, &filters, 0); - dest.process_pull_requests(&mut dest_crds, filters, 1); + dest.process_pull_requests( + &mut dest_crds, + filters.into_iter().map(|(caller, _)| caller), + 1, + ); assert!(rsp.iter().all(|rsp| rsp.is_empty())); assert!(dest_crds.lookup(&caller.label()).is_some()); assert_eq!( @@ -1161,7 +1163,11 @@ mod test { let (_, filters, caller) = req.unwrap(); let filters: Vec<_> = filters.into_iter().map(|f| (caller.clone(), f)).collect(); let mut rsp = dest.generate_pull_responses(&dest_crds, &filters, 0); - dest.process_pull_requests(&mut dest_crds, filters, 0); + dest.process_pull_requests( + &mut dest_crds, + filters.into_iter().map(|(caller, _)| caller), + 0, + ); // if there is a false positive this is empty // prob should be around 0.1 per iteration if rsp.is_empty() { diff --git a/core/src/lib.rs b/core/src/lib.rs index 30737faa35485d..736aa6a96694ae 100644 --- a/core/src/lib.rs +++ b/core/src/lib.rs @@ -43,6 +43,7 @@ pub mod local_vote_signer_service; pub mod non_circulating_supply; pub mod optimistic_confirmation_verifier; pub mod optimistically_confirmed_bank_tracker; +pub mod ping_pong; pub mod poh_recorder; pub mod poh_service; pub mod progress_map; diff --git a/core/src/ping_pong.rs b/core/src/ping_pong.rs new file mode 100644 index 00000000000000..85db5bc340b681 --- /dev/null +++ b/core/src/ping_pong.rs @@ -0,0 +1,269 @@ +use bincode::{serialize, Error}; +use lru::LruCache; +use rand::{AsByteSliceMut, CryptoRng, Rng}; +use serde::Serialize; +use solana_sdk::hash::{self, Hash}; +use solana_sdk::pubkey::Pubkey; +use solana_sdk::sanitize::{Sanitize, SanitizeError}; +use solana_sdk::signature::{Keypair, Signable, Signature, Signer}; +use std::borrow::Cow; +use std::net::SocketAddr; +use std::time::{Duration, Instant}; + +#[derive(AbiExample, Debug, Deserialize, Serialize)] +pub struct Ping { + from: Pubkey, + token: T, + signature: Signature, +} + +#[derive(AbiExample, Debug, Deserialize, Serialize)] +pub struct Pong { + from: Pubkey, + hash: Hash, // Hash of received ping token. + signature: Signature, +} + +/// Maintains records of remote nodes which have returned a valid response to a +/// ping message, and on-the-fly ping messages pending a pong response from the +/// remote node. +pub struct PingCache { + // Time-to-live of received pong messages. + ttl: Duration, + // Timestamp of last ping message sent to a remote node. + // Used to rate limit pings to remote nodes. + pings: LruCache<(Pubkey, SocketAddr), Instant>, + // Verified pong responses from remote nodes. + pongs: LruCache<(Pubkey, SocketAddr), Instant>, + // Hash of ping tokens sent out to remote nodes, + // pending a pong response back. + cache: LruCache, +} + +impl Ping { + pub fn new(token: T, keypair: &Keypair) -> Result { + let signature = keypair.sign_message(&serialize(&token)?); + let ping = Ping { + from: keypair.pubkey(), + token, + signature, + }; + Ok(ping) + } +} + +impl Ping +where + T: Serialize + AsByteSliceMut + Default, +{ + pub fn new_rand(rng: &mut R, keypair: &Keypair) -> Result + where + R: Rng + CryptoRng, + { + let mut token = T::default(); + rng.fill(&mut token); + Ping::new(token, keypair) + } +} + +impl Sanitize for Ping { + default fn sanitize(&self) -> Result<(), SanitizeError> { + self.from.sanitize()?; + self.signature.sanitize() + } +} + +impl Sanitize for Ping { + fn sanitize(&self) -> Result<(), SanitizeError> { + self.from.sanitize()?; + self.token.sanitize()?; + self.signature.sanitize() + } +} + +impl Signable for Ping { + fn pubkey(&self) -> Pubkey { + self.from + } + + fn signable_data(&self) -> Cow<[u8]> { + Cow::Owned(serialize(&self.token).unwrap()) + } + + fn get_signature(&self) -> Signature { + self.signature + } + + fn set_signature(&mut self, signature: Signature) { + self.signature = signature; + } +} + +impl Pong { + pub fn new(ping: &Ping, keypair: &Keypair) -> Result { + let hash = hash::hash(&serialize(&ping.token)?); + let pong = Pong { + from: keypair.pubkey(), + hash, + signature: keypair.sign_message(hash.as_ref()), + }; + Ok(pong) + } +} + +impl Sanitize for Pong { + fn sanitize(&self) -> Result<(), SanitizeError> { + self.from.sanitize()?; + self.hash.sanitize()?; + self.signature.sanitize() + } +} + +impl Signable for Pong { + fn pubkey(&self) -> Pubkey { + self.from + } + + fn signable_data(&self) -> Cow<[u8]> { + Cow::Owned(self.hash.as_ref().into()) + } + + fn get_signature(&self) -> Signature { + self.signature + } + + fn set_signature(&mut self, signature: Signature) { + self.signature = signature; + } +} + +impl PingCache { + pub fn new(ttl: Duration, cap: usize) -> Self { + Self { + ttl, + pings: LruCache::new(cap), + pongs: LruCache::new(cap), + cache: LruCache::new(cap), + } + } + + /// Checks if the pong hash, pubkey and socket match a ping message sent + /// out previously. If so records current timestamp for the remote node and + /// returns true. + /// Note: Does not verify the signature. + pub fn add(&mut self, pong: &Pong, socket: SocketAddr, now: Instant) -> bool { + let node = (pong.pubkey(), socket); + match self.cache.peek(&pong.hash) { + Some(value) if *value == node => { + self.pings.pop(&node); + self.pongs.put(node, now); + self.cache.pop(&pong.hash); + true + } + _ => false, + } + } + + /// Checks if the remote node has been pinged recently. If not, calls the + /// given function to generates a new ping message, records current + /// timestamp and hash of ping token, and returns the ping message. + fn maybe_ping( + &mut self, + now: Instant, + node: (Pubkey, SocketAddr), + mut pingf: F, + ) -> Option> + where + T: Serialize, + F: FnMut() -> Option>, + { + // Rate limit consecutive pings sent to a remote node. + let delay = self.ttl / 64; + match self.pings.get(&node) { + Some(t) if now.saturating_duration_since(*t) < delay => None, + _ => { + let ping = pingf()?; + let hash = hash::hash(&serialize(&ping.token).ok()?); + self.pings.put(node, now); + self.cache.put(hash, node); + Some(ping) + } + } + } + + /// Returns true if the remote node has responded to a ping message. + /// Removes expired pong messages. In order to extend verifications before + /// expiration, if the pong message is not too recent, and the node has not + /// been pinged recently, calls the given function to generates a new ping + /// message, records current timestamp and hash of ping token, and returns + /// the ping message. + /// Caller should verify if the socket address is valid. (e.g. by using + /// ContactInfo::is_valid_address). + pub fn check( + &mut self, + now: Instant, + node: (Pubkey, SocketAddr), + pingf: F, + ) -> (bool, Option>) + where + T: Serialize, + F: FnMut() -> Option>, + { + let (check, ping) = match self.pongs.get(&node) { + None => (false, true), + Some(t) => { + let age = now.saturating_duration_since(*t); + // Pop if the pong message has expired. + if age > self.ttl { + self.pongs.pop(&node); + } + // If the pong message is not too recent, generate a new ping + // message to extend remote node verification. + (true, age > self.ttl / 8) + } + }; + let ping = if ping { + self.maybe_ping(now, node, pingf) + } else { + None + }; + (check, ping) + } +} + +impl Clone for PingCache { + fn clone(&self) -> Self { + let mut clone = Self { + ttl: self.ttl, + pings: LruCache::new(self.pings.cap()), + pongs: LruCache::new(self.pongs.cap()), + cache: LruCache::new(self.cache.cap()), + }; + for (k, v) in self.pongs.iter() { + clone.pings.put(*k, *v); + } + for (k, v) in self.pongs.iter() { + clone.pongs.put(*k, *v); + } + for (k, v) in self.cache.iter() { + clone.cache.put(*k, *v); + } + clone + } +} + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn test_new_rand() { + let mut rng = rand::thread_rng(); + let keypair = Keypair::new(); + let ping = Ping::<[u8; 32]>::new_rand(&mut rng, &keypair).unwrap(); + assert!(ping.verify()); + let pong = Pong::new(&ping, &keypair).unwrap(); + assert!(pong.verify()); + assert_eq!(hash::hash(&ping.token), pong.hash); + } +} diff --git a/core/tests/crds_gossip.rs b/core/tests/crds_gossip.rs index 2540e041bb0440..c1011f6ef76f76 100644 --- a/core/tests/crds_gossip.rs +++ b/core/tests/crds_gossip.rs @@ -462,7 +462,10 @@ fn network_run_pull( .into_iter() .flatten() .collect(); - node.lock().unwrap().process_pull_requests(filters, now); + node.lock().unwrap().process_pull_requests( + filters.into_iter().map(|(caller, _)| caller), + now, + ); rsp }) .unwrap(); From 20ae8e8d6fad473d2181c8e1373dd6495c3e6f51 Mon Sep 17 00:00:00 2001 From: behzad nouri Date: Sat, 10 Oct 2020 14:23:35 -0400 Subject: [PATCH 02/15] removes specialization for Sanitize impl of Ping --- core/src/ping_pong.rs | 10 ++-------- 1 file changed, 2 insertions(+), 8 deletions(-) diff --git a/core/src/ping_pong.rs b/core/src/ping_pong.rs index 85db5bc340b681..3e62d44779f0f6 100644 --- a/core/src/ping_pong.rs +++ b/core/src/ping_pong.rs @@ -67,16 +67,10 @@ where } impl Sanitize for Ping { - default fn sanitize(&self) -> Result<(), SanitizeError> { - self.from.sanitize()?; - self.signature.sanitize() - } -} - -impl Sanitize for Ping { fn sanitize(&self) -> Result<(), SanitizeError> { self.from.sanitize()?; - self.token.sanitize()?; + // TODO Add self.token.sanitize()?; when rust's + // specialization feature becomes stable. self.signature.sanitize() } } From 503e65f3def3ffbce9bd686ed8a0ade29b674215 Mon Sep 17 00:00:00 2001 From: behzad nouri Date: Sun, 11 Oct 2020 09:58:23 -0400 Subject: [PATCH 03/15] handles ping/pong packets in batch --- core/src/cluster_info.rs | 90 ++++++++++++++++++++++------------------ core/src/ping_pong.rs | 2 +- 2 files changed, 50 insertions(+), 42 deletions(-) diff --git a/core/src/cluster_info.rs b/core/src/cluster_info.rs index b6da4e2d0090c4..87edd7db250e75 100644 --- a/core/src/cluster_info.rs +++ b/core/src/cluster_info.rs @@ -1814,6 +1814,8 @@ impl ClusterInfo { .read() .unwrap() .make_timeouts(&stakes, epoch_time_ms); + let mut ping_messages = vec![]; + let mut pong_messages = vec![]; let mut pull_responses = HashMap::new(); for (from_addr, packet) in packets { match packet { @@ -1889,34 +1891,15 @@ impl ClusterInfo { ("prune_message", (allocated.get() - start) as i64, i64), ); } - Protocol::PingMessage(ping) => { - let start = allocated.get(); - if !ping.verify() { - inc_new_counter_info!("cluster_info-gossip_ping_msg_verify_fail", 1); - } else if let Some(rsp) = self.handle_ping_message(&ping, &from_addr, recycler) - { - let _ = response_sender.send(rsp); - } - datapoint_debug!( - "solana-gossip-listen-memory", - ("ping_message", (allocated.get() - start) as i64, i64), - ); - } - Protocol::PongMessage(pong) => { - let start = allocated.get(); - if pong.verify() { - self.handle_pong_message(&pong, from_addr); - } else { - inc_new_counter_info!("cluster_info-gossip_pong_msg_verify_fail", 1); - } - datapoint_debug!( - "solana-gossip-listen-memory", - ("pong_message", (allocated.get() - start) as i64, i64), - ); - } + Protocol::PingMessage(ping) => ping_messages.push((ping, from_addr)), + Protocol::PongMessage(pong) => pong_messages.push((pong, from_addr)), } } + if let Some(response) = self.handle_ping_messages(ping_messages, recycler) { + let _ = response_sender.send(response); + } + self.handle_pong_messages(pong_messages, Instant::now()); for (from, data) in pull_responses { self.handle_pull_response(&from, data, &timeouts); } @@ -2180,27 +2163,52 @@ impl ClusterInfo { } } - fn handle_ping_message( + fn handle_ping_messages( &self, - ping: &Ping, - addr: &SocketAddr, + msgs: Vec<(Ping, SocketAddr)>, recycler: &PacketsRecycler, ) -> Option { - let pong = Pong::new(ping, &self.keypair).ok()?; - let pong = Protocol::PongMessage(pong); - let packets = Packets::new_with_recycler_data( - recycler, - "handle_ping_message", - vec![Packet::from_data(addr, pong)], - ); - Some(packets) + let num_msgs = msgs.len(); + let msgs: Vec<_> = msgs.into_iter().filter(|(ping, _)| ping.verify()).collect(); + if msgs.len() != num_msgs { + inc_new_counter_info!( + "cluster_info-gossip_ping_msg_verify_fail", + num_msgs - msgs.len() + ); + } + let packets: Vec<_> = msgs + .into_iter() + .filter_map(|(ping, addr)| { + let pong = Pong::new(&ping, &self.keypair).ok()?; + let pong = Protocol::PongMessage(pong); + let packet = Packet::from_data(&addr, pong); + Some(packet) + }) + .collect(); + if packets.is_empty() { + None + } else { + let packets = + Packets::new_with_recycler_data(recycler, "handle_ping_messages", packets); + Some(packets) + } } - fn handle_pong_message(&self, pong: &Pong, addr: SocketAddr) { - self.ping_cache - .write() - .unwrap() - .add(pong, addr, Instant::now()); + fn handle_pong_messages(&self, msgs: Vec<(Pong, SocketAddr)>, now: Instant) { + let num_msgs = msgs.len(); + let msgs: Vec<_> = msgs.into_iter().filter(|(pong, _)| pong.verify()).collect(); + if msgs.len() != num_msgs { + inc_new_counter_info!( + "cluster_info-gossip_pong_msg_verify_fail", + num_msgs - msgs.len() + ); + } + if !msgs.is_empty() { + let mut ping_cache = self.ping_cache.write().unwrap(); + for (pong, addr) in msgs { + ping_cache.add(&pong, addr, now); + } + } } fn handle_push_message( diff --git a/core/src/ping_pong.rs b/core/src/ping_pong.rs index 3e62d44779f0f6..cf103aa33bfdc4 100644 --- a/core/src/ping_pong.rs +++ b/core/src/ping_pong.rs @@ -173,7 +173,7 @@ impl PingCache { { // Rate limit consecutive pings sent to a remote node. let delay = self.ttl / 64; - match self.pings.get(&node) { + match self.pings.peek(&node) { Some(t) if now.saturating_duration_since(*t) < delay => None, _ => { let ping = pingf()?; From 3ba15c717c68d39f8db0fd6fcb39b7f1ebc990a7 Mon Sep 17 00:00:00 2001 From: behzad nouri Date: Sun, 11 Oct 2020 20:28:02 -0400 Subject: [PATCH 04/15] skips counter.inc when count == 0 --- core/src/cluster_info.rs | 39 ++++++++++++++++++--------------------- metrics/src/counter.rs | 2 +- 2 files changed, 19 insertions(+), 22 deletions(-) diff --git a/core/src/cluster_info.rs b/core/src/cluster_info.rs index 87edd7db250e75..4979c47eab6a92 100644 --- a/core/src/cluster_info.rs +++ b/core/src/cluster_info.rs @@ -2168,23 +2168,22 @@ impl ClusterInfo { msgs: Vec<(Ping, SocketAddr)>, recycler: &PacketsRecycler, ) -> Option { - let num_msgs = msgs.len(); - let msgs: Vec<_> = msgs.into_iter().filter(|(ping, _)| ping.verify()).collect(); - if msgs.len() != num_msgs { - inc_new_counter_info!( - "cluster_info-gossip_ping_msg_verify_fail", - num_msgs - msgs.len() - ); - } + let mut verify_failed = 0; let packets: Vec<_> = msgs .into_iter() .filter_map(|(ping, addr)| { - let pong = Pong::new(&ping, &self.keypair).ok()?; - let pong = Protocol::PongMessage(pong); - let packet = Packet::from_data(&addr, pong); - Some(packet) + if ping.verify() { + let pong = Pong::new(&ping, &self.keypair).ok()?; + let pong = Protocol::PongMessage(pong); + let packet = Packet::from_data(&addr, pong); + Some(packet) + } else { + verify_failed += 1; + None + } }) .collect(); + inc_new_counter_info!("cluster_info-gossip_ping_msg_verify_fail", verify_failed); if packets.is_empty() { None } else { @@ -2195,19 +2194,17 @@ impl ClusterInfo { } fn handle_pong_messages(&self, msgs: Vec<(Pong, SocketAddr)>, now: Instant) { - let num_msgs = msgs.len(); - let msgs: Vec<_> = msgs.into_iter().filter(|(pong, _)| pong.verify()).collect(); - if msgs.len() != num_msgs { - inc_new_counter_info!( - "cluster_info-gossip_pong_msg_verify_fail", - num_msgs - msgs.len() - ); - } if !msgs.is_empty() { + let mut verify_failed = 0; let mut ping_cache = self.ping_cache.write().unwrap(); for (pong, addr) in msgs { - ping_cache.add(&pong, addr, now); + if pong.verify() { + ping_cache.add(&pong, addr, now); + } else { + verify_failed += 1; + } } + inc_new_counter_info!("cluster_info-gossip_pong_msg_verify_fail", verify_failed); } } diff --git a/metrics/src/counter.rs b/metrics/src/counter.rs index 661d00be4b18de..7f80eb11df5925 100644 --- a/metrics/src/counter.rs +++ b/metrics/src/counter.rs @@ -76,7 +76,7 @@ macro_rules! inc_counter_info { #[macro_export] macro_rules! inc_new_counter { ($name:expr, $count:expr, $level:expr, $lograte:expr, $metricsrate:expr) => {{ - if log_enabled!($level) { + if $count != 0 && log_enabled!($level) { static mut INC_NEW_COUNTER: $crate::counter::Counter = create_counter!($name, $lograte, $metricsrate); static INIT_HOOK: std::sync::Once = std::sync::Once::new(); From 00e20e66615ac64908cd4cec981e9dd09f89d450 Mon Sep 17 00:00:00 2001 From: behzad nouri Date: Mon, 12 Oct 2020 16:13:01 -0400 Subject: [PATCH 05/15] uses iterators for handling ping/pong messages --- core/src/cluster_info.rs | 26 ++++++++++++++------------ 1 file changed, 14 insertions(+), 12 deletions(-) diff --git a/core/src/cluster_info.rs b/core/src/cluster_info.rs index 4979c47eab6a92..73c23804c44bc0 100644 --- a/core/src/cluster_info.rs +++ b/core/src/cluster_info.rs @@ -1896,10 +1896,10 @@ impl ClusterInfo { } } - if let Some(response) = self.handle_ping_messages(ping_messages, recycler) { + if let Some(response) = self.handle_ping_messages(ping_messages.into_iter(), recycler) { let _ = response_sender.send(response); } - self.handle_pong_messages(pong_messages, Instant::now()); + self.handle_pong_messages(pong_messages.into_iter(), Instant::now()); for (from, data) in pull_responses { self.handle_pull_response(&from, data, &timeouts); } @@ -2163,14 +2163,12 @@ impl ClusterInfo { } } - fn handle_ping_messages( - &self, - msgs: Vec<(Ping, SocketAddr)>, - recycler: &PacketsRecycler, - ) -> Option { + fn handle_ping_messages(&self, pings: I, recycler: &PacketsRecycler) -> Option + where + I: Iterator, + { let mut verify_failed = 0; - let packets: Vec<_> = msgs - .into_iter() + let packets: Vec<_> = pings .filter_map(|(ping, addr)| { if ping.verify() { let pong = Pong::new(&ping, &self.keypair).ok()?; @@ -2193,11 +2191,15 @@ impl ClusterInfo { } } - fn handle_pong_messages(&self, msgs: Vec<(Pong, SocketAddr)>, now: Instant) { - if !msgs.is_empty() { + fn handle_pong_messages(&self, pongs: I, now: Instant) + where + I: Iterator, + { + let mut pongs = pongs.peekable(); + if pongs.peek().is_some() { let mut verify_failed = 0; let mut ping_cache = self.ping_cache.write().unwrap(); - for (pong, addr) in msgs { + for (pong, addr) in pongs { if pong.verify() { ping_cache.add(&pong, addr, now); } else { From 65d73066c6b9fc67a7a5afb63be033ee1d4814e0 Mon Sep 17 00:00:00 2001 From: behzad nouri Date: Mon, 12 Oct 2020 15:38:08 -0400 Subject: [PATCH 06/15] builds a predicate for checking pull requests --- core/src/cluster_info.rs | 60 +++++++++++++++++++++------------------- 1 file changed, 32 insertions(+), 28 deletions(-) diff --git a/core/src/cluster_info.rs b/core/src/cluster_info.rs index 73c23804c44bc0..185882e9f36651 100644 --- a/core/src/cluster_info.rs +++ b/core/src/cluster_info.rs @@ -29,7 +29,7 @@ use crate::{ }; use rand::distributions::{Distribution, WeightedIndex}; -use rand::SeedableRng; +use rand::{CryptoRng, Rng, SeedableRng}; use rand_chacha::ChaChaRng; use solana_sdk::sanitize::{Sanitize, SanitizeError}; @@ -1930,20 +1930,22 @@ impl ClusterInfo { }); } - // Filters out pull requests from invalid addresses, and those which have - // not passed the ping-pong check. Appends ping packets for the addresses - // which need to be (re)verified. - fn filter_pull_requests( - &self, - requests: Vec, - packets: &mut Packets, - ) -> Vec { - let mut rng = rand::thread_rng(); + // Returns a predicate checking if the pull request is from a valid + // address, and if the address have responded to a ping request. Also + // appends ping packets for the addresses which need to be (re)verified. + fn check_pull_request<'a, R>( + &'a self, + now: Instant, + mut rng: &'a mut R, + packets: &'a mut Packets, + ) -> impl FnMut(&PullData) -> bool + 'a + where + R: Rng + CryptoRng, + { let mut cache = HashMap::<(Pubkey, SocketAddr), bool>::new(); - let mut pingf = || Ping::new_rand(&mut rng, &self.keypair).ok(); + let mut pingf = move || Ping::new_rand(&mut rng, &self.keypair).ok(); let mut ping_cache = self.ping_cache.write().unwrap(); - let now = Instant::now(); - let mut hard_check = |node| { + let mut hard_check = move |node| { let (_check, ping) = ping_cache.check(now, node, &mut pingf); if let Some(ping) = ping { let ping = Protocol::PingMessage(ping); @@ -1951,22 +1953,19 @@ impl ClusterInfo { packets.packets.push(ping); } // TODO: For backward compatibility, this unconditionally returns - // true for now. This has to return _check, once nodes start + // true for now. It has to return _check, once nodes start // responding to ping messages. true }; // Because pull-responses are sent back to packet.meta.addr() of // incoming pull-requests, pings are also sent to request.from_addr (as // opposed to caller.gossip address). - requests - .into_iter() - .filter(|request| { - ContactInfo::is_valid_address(&request.from_addr) && { - let node = (request.caller.pubkey(), request.from_addr); - *cache.entry(node).or_insert_with(|| hard_check(node)) - } - }) - .collect() + move |request| { + ContactInfo::is_valid_address(&request.from_addr) && { + let node = (request.caller.pubkey(), request.from_addr); + *cache.entry(node).or_insert_with(|| hard_check(node)) + } + } } // Pull requests take an incoming bloom filter of contained entries from a node @@ -1988,11 +1987,16 @@ impl ClusterInfo { .process_pull_requests(requests.iter().map(|r| r.caller.clone()), timestamp()); self.update_data_budget(stakes.len()); let mut packets = Packets::new_with_recycler(recycler.clone(), 64, "handle_pull_requests"); - let (caller_and_filters, addrs): (Vec<_>, Vec<_>) = self - .filter_pull_requests(requests, &mut packets) - .into_iter() - .map(|r| ((r.caller, r.filter), r.from_addr)) - .unzip(); + let (caller_and_filters, addrs): (Vec<_>, Vec<_>) = { + let mut rng = rand::thread_rng(); + let check_pull_request = + self.check_pull_request(Instant::now(), &mut rng, &mut packets); + requests + .into_iter() + .filter(check_pull_request) + .map(|r| ((r.caller, r.filter), r.from_addr)) + .unzip() + }; let now = timestamp(); let self_id = self.id(); From 8960e5c42d82726098c6a005221392ed752495bc Mon Sep 17 00:00:00 2001 From: behzad nouri Date: Tue, 13 Oct 2020 11:09:19 -0400 Subject: [PATCH 07/15] reverts change to metrics/src/counter.rs --- core/src/cluster_info.rs | 8 ++++++-- metrics/src/counter.rs | 2 +- 2 files changed, 7 insertions(+), 3 deletions(-) diff --git a/core/src/cluster_info.rs b/core/src/cluster_info.rs index 185882e9f36651..e63d133d4d48d4 100644 --- a/core/src/cluster_info.rs +++ b/core/src/cluster_info.rs @@ -2185,7 +2185,9 @@ impl ClusterInfo { } }) .collect(); - inc_new_counter_info!("cluster_info-gossip_ping_msg_verify_fail", verify_failed); + if verify_failed != 0 { + inc_new_counter_info!("cluster_info-gossip_ping_msg_verify_fail", verify_failed); + } if packets.is_empty() { None } else { @@ -2210,7 +2212,9 @@ impl ClusterInfo { verify_failed += 1; } } - inc_new_counter_info!("cluster_info-gossip_pong_msg_verify_fail", verify_failed); + if verify_failed != 0 { + inc_new_counter_info!("cluster_info-gossip_pong_msg_verify_fail", verify_failed); + } } } diff --git a/metrics/src/counter.rs b/metrics/src/counter.rs index 7f80eb11df5925..661d00be4b18de 100644 --- a/metrics/src/counter.rs +++ b/metrics/src/counter.rs @@ -76,7 +76,7 @@ macro_rules! inc_counter_info { #[macro_export] macro_rules! inc_new_counter { ($name:expr, $count:expr, $level:expr, $lograte:expr, $metricsrate:expr) => {{ - if $count != 0 && log_enabled!($level) { + if log_enabled!($level) { static mut INC_NEW_COUNTER: $crate::counter::Counter = create_counter!($name, $lograte, $metricsrate); static INIT_HOOK: std::sync::Once = std::sync::Once::new(); From 7a634e08dabf0e39a8b527cb1ea45c3673339def Mon Sep 17 00:00:00 2001 From: behzad nouri Date: Tue, 13 Oct 2020 11:27:24 -0400 Subject: [PATCH 08/15] adds a stats counter for pull requests which fail ping/pong check --- core/src/cluster_info.rs | 12 ++++++++++++ 1 file changed, 12 insertions(+) diff --git a/core/src/cluster_info.rs b/core/src/cluster_info.rs index e63d133d4d48d4..8a29b50e70b63f 100644 --- a/core/src/cluster_info.rs +++ b/core/src/cluster_info.rs @@ -98,6 +98,7 @@ const MAX_GOSSIP_TRAFFIC: usize = 128_000_000 / PACKET_DATA_SIZE; /// Keep the number of snapshot hashes a node publishes under MAX_PROTOCOL_PAYLOAD_SIZE pub const MAX_SNAPSHOT_HASHES: usize = 16; +/// Number of bytes in the randomly generated token sent with ping messages. const GOSSIP_PING_TOKEN_SIZE: usize = 32; const GOSSIP_PING_CACHE_CAPACITY: usize = 16384; const GOSSIP_PING_CACHE_TTL: Duration = Duration::from_secs(640); @@ -230,6 +231,7 @@ struct GossipStats { prune_received_cache: Counter, prune_message_count: Counter, prune_message_len: Counter, + pull_request_ping_pong_check_failed_count: Counter, purge: Counter, epoch_slots_lookup: Counter, epoch_slots_push: Counter, @@ -1952,6 +1954,11 @@ impl ClusterInfo { let ping = Packet::from_data(&node.1, ping); packets.packets.push(ping); } + if !_check { + self.stats + .pull_request_ping_pong_check_failed_count + .add_relaxed(1) + } // TODO: For backward compatibility, this unconditionally returns // true for now. It has to return _check, once nodes start // responding to ping messages. @@ -2522,6 +2529,11 @@ impl ClusterInfo { self.stats.process_pull_requests.clear(), i64 ), + ( + "pull_request_ping_pong_check_failed_count", + self.stats.pull_request_ping_pong_check_failed_count.clear(), + i64 + ), ( "generate_pull_responses", self.stats.generate_pull_responses.clear(), From 4a16435814c24474e7268f881e37a3e199a98f0c Mon Sep 17 00:00:00 2001 From: behzad nouri Date: Tue, 13 Oct 2020 11:47:02 -0400 Subject: [PATCH 09/15] minor renames --- core/src/ping_pong.rs | 20 ++++++++++---------- 1 file changed, 10 insertions(+), 10 deletions(-) diff --git a/core/src/ping_pong.rs b/core/src/ping_pong.rs index cf103aa33bfdc4..10e0884568707f 100644 --- a/core/src/ping_pong.rs +++ b/core/src/ping_pong.rs @@ -37,7 +37,7 @@ pub struct PingCache { pongs: LruCache<(Pubkey, SocketAddr), Instant>, // Hash of ping tokens sent out to remote nodes, // pending a pong response back. - cache: LruCache, + pending_cache: LruCache, } impl Ping { @@ -137,7 +137,7 @@ impl PingCache { ttl, pings: LruCache::new(cap), pongs: LruCache::new(cap), - cache: LruCache::new(cap), + pending_cache: LruCache::new(cap), } } @@ -147,11 +147,11 @@ impl PingCache { /// Note: Does not verify the signature. pub fn add(&mut self, pong: &Pong, socket: SocketAddr, now: Instant) -> bool { let node = (pong.pubkey(), socket); - match self.cache.peek(&pong.hash) { + match self.pending_cache.peek(&pong.hash) { Some(value) if *value == node => { self.pings.pop(&node); self.pongs.put(node, now); - self.cache.pop(&pong.hash); + self.pending_cache.pop(&pong.hash); true } _ => false, @@ -179,7 +179,7 @@ impl PingCache { let ping = pingf()?; let hash = hash::hash(&serialize(&ping.token).ok()?); self.pings.put(node, now); - self.cache.put(hash, node); + self.pending_cache.put(hash, node); Some(ping) } } @@ -203,7 +203,7 @@ impl PingCache { T: Serialize, F: FnMut() -> Option>, { - let (check, ping) = match self.pongs.get(&node) { + let (check, should_ping) = match self.pongs.get(&node) { None => (false, true), Some(t) => { let age = now.saturating_duration_since(*t); @@ -216,7 +216,7 @@ impl PingCache { (true, age > self.ttl / 8) } }; - let ping = if ping { + let ping = if should_ping { self.maybe_ping(now, node, pingf) } else { None @@ -231,7 +231,7 @@ impl Clone for PingCache { ttl: self.ttl, pings: LruCache::new(self.pings.cap()), pongs: LruCache::new(self.pongs.cap()), - cache: LruCache::new(self.cache.cap()), + pending_cache: LruCache::new(self.pending_cache.cap()), }; for (k, v) in self.pongs.iter() { clone.pings.put(*k, *v); @@ -239,8 +239,8 @@ impl Clone for PingCache { for (k, v) in self.pongs.iter() { clone.pongs.put(*k, *v); } - for (k, v) in self.cache.iter() { - clone.cache.put(*k, *v); + for (k, v) in self.pending_cache.iter() { + clone.pending_cache.put(*k, *v); } clone } From 31be432a100963d24efa41d254bdba864c9906b2 Mon Sep 17 00:00:00 2001 From: behzad nouri Date: Wed, 14 Oct 2020 18:06:15 -0400 Subject: [PATCH 10/15] adds more tests --- core/src/cluster_info.rs | 126 +++++++++++++++++++++++++++++-- core/src/crds_gossip.rs | 2 +- core/src/crds_gossip_pull.rs | 2 +- core/src/ping_pong.rs | 142 ++++++++++++++++++++++++++++++++++- 4 files changed, 262 insertions(+), 10 deletions(-) diff --git a/core/src/cluster_info.rs b/core/src/cluster_info.rs index 8a29b50e70b63f..d3c2cb0adb6c1b 100644 --- a/core/src/cluster_info.rs +++ b/core/src/cluster_info.rs @@ -1898,10 +1898,10 @@ impl ClusterInfo { } } - if let Some(response) = self.handle_ping_messages(ping_messages.into_iter(), recycler) { + if let Some(response) = self.handle_ping_messages(ping_messages, recycler) { let _ = response_sender.send(response); } - self.handle_pong_messages(pong_messages.into_iter(), Instant::now()); + self.handle_pong_messages(pong_messages, Instant::now()); for (from, data) in pull_responses { self.handle_pull_response(&from, data, &timeouts); } @@ -2176,10 +2176,11 @@ impl ClusterInfo { fn handle_ping_messages(&self, pings: I, recycler: &PacketsRecycler) -> Option where - I: Iterator, + I: IntoIterator, { let mut verify_failed = 0; let packets: Vec<_> = pings + .into_iter() .filter_map(|(ping, addr)| { if ping.verify() { let pong = Pong::new(&ping, &self.keypair).ok()?; @@ -2206,9 +2207,9 @@ impl ClusterInfo { fn handle_pong_messages(&self, pongs: I, now: Instant) where - I: Iterator, + I: IntoIterator, { - let mut pongs = pongs.peekable(); + let mut pongs = pongs.into_iter().peekable(); if pongs.peek().is_some() { let mut verify_failed = 0; let mut ping_cache = self.ping_cache.write().unwrap(); @@ -2918,12 +2919,14 @@ pub fn stake_weight_peers( mod tests { use super::*; use crate::crds_value::{CrdsValue, CrdsValueLabel, Vote as CrdsVote}; + use itertools::izip; use rayon::prelude::*; use solana_perf::test_tx::test_tx; use solana_sdk::signature::{Keypair, Signer}; use solana_vote_program::{vote_instruction, vote_state::Vote}; use std::collections::HashSet; - use std::net::{IpAddr, Ipv4Addr}; + use std::iter::repeat_with; + use std::net::{IpAddr, Ipv4Addr, SocketAddrV4}; use std::sync::Arc; #[test] @@ -2965,6 +2968,117 @@ mod tests { ); } + fn new_rand_remote_node(rng: &mut R) -> (Keypair, SocketAddr) + where + R: Rng, + { + let keypair = Keypair::new(); + let socket = SocketAddr::V4(SocketAddrV4::new( + Ipv4Addr::new(rng.gen(), rng.gen(), rng.gen(), rng.gen()), + rng.gen(), + )); + (keypair, socket) + } + + #[test] + fn test_handle_pong_messages() { + let now = Instant::now(); + let mut rng = rand::thread_rng(); + let this_node = Arc::new(Keypair::new()); + let cluster_info = ClusterInfo::new( + ContactInfo::new_localhost(&this_node.pubkey(), timestamp()), + this_node.clone(), + ); + let remote_nodes: Vec<(Keypair, SocketAddr)> = + repeat_with(|| new_rand_remote_node(&mut rng)) + .take(128) + .collect(); + let pings: Vec<_> = { + let mut ping_cache = cluster_info.ping_cache.write().unwrap(); + let mut pingf = || Ping::new_rand(&mut rng, &this_node).ok(); + remote_nodes + .iter() + .map(|(keypair, socket)| { + let node = (keypair.pubkey(), *socket); + let (check, ping) = ping_cache.check(now, node, &mut pingf); + // Assert that initially remote nodes will not pass the + // ping/pong check. + assert!(!check); + ping.unwrap() + }) + .collect() + }; + let pongs: Vec<(Pong, SocketAddr)> = pings + .iter() + .zip(&remote_nodes) + .map(|(ping, (keypair, socket))| (Pong::new(ping, keypair).unwrap(), *socket)) + .collect(); + let now = now + Duration::from_millis(1); + cluster_info.handle_pong_messages(pongs, now); + // Assert that remote nodes now pass the ping/pong check. + { + let mut ping_cache = cluster_info.ping_cache.write().unwrap(); + for (keypair, socket) in &remote_nodes { + let node = (keypair.pubkey(), *socket); + let (check, _) = ping_cache.check(now, node, || -> Option { None }); + assert!(check); + } + } + // Assert that a new random remote node still will not pass the check. + { + let mut ping_cache = cluster_info.ping_cache.write().unwrap(); + let (keypair, socket) = new_rand_remote_node(&mut rng); + let node = (keypair.pubkey(), socket); + let (check, _) = ping_cache.check(now, node, || -> Option { None }); + assert!(!check); + } + } + + #[test] + fn test_handle_ping_messages() { + let mut rng = rand::thread_rng(); + let this_node = Arc::new(Keypair::new()); + let cluster_info = ClusterInfo::new( + ContactInfo::new_localhost(&this_node.pubkey(), timestamp()), + this_node.clone(), + ); + let remote_nodes: Vec<(Keypair, SocketAddr)> = + repeat_with(|| new_rand_remote_node(&mut rng)) + .take(128) + .collect(); + let pings: Vec<_> = remote_nodes + .iter() + .map(|(keypair, _)| Ping::new_rand(&mut rng, keypair).unwrap()) + .collect(); + let pongs: Vec<_> = pings + .iter() + .map(|ping| Pong::new(ping, &this_node).unwrap()) + .collect(); + let recycler = PacketsRecycler::default(); + let packets = cluster_info + .handle_ping_messages( + pings + .into_iter() + .zip(remote_nodes.iter().map(|(_, socket)| *socket)), + &recycler, + ) + .unwrap() + .packets; + assert_eq!(remote_nodes.len(), packets.len()); + for (packet, (_, socket), pong) in izip!( + packets.into_iter(), + remote_nodes.into_iter(), + pongs.into_iter() + ) { + assert_eq!(packet.meta.addr(), socket); + let bytes = serialize(&pong).unwrap(); + match limited_deserialize(&packet.data[..packet.meta.size]).unwrap() { + Protocol::PongMessage(pong) => assert_eq!(serialize(&pong).unwrap(), bytes), + _ => panic!("invalid packet!"), + } + } + } + fn test_crds_values(pubkey: Pubkey) -> Vec { let entrypoint = ContactInfo::new_localhost(&pubkey, timestamp()); let entrypoint_crdsvalue = CrdsValue::new_unsigned(CrdsData::ContactInfo(entrypoint)); diff --git a/core/src/crds_gossip.rs b/core/src/crds_gossip.rs index e35d8d1eb72775..d0a437876b5942 100644 --- a/core/src/crds_gossip.rs +++ b/core/src/crds_gossip.rs @@ -176,7 +176,7 @@ impl CrdsGossip { /// process a pull request and create a response pub fn process_pull_requests(&mut self, callers: I, now: u64) where - I: Iterator, + I: IntoIterator, { self.pull .process_pull_requests(&mut self.crds, callers, now); diff --git a/core/src/crds_gossip_pull.rs b/core/src/crds_gossip_pull.rs index 036c1e42d086ac..c28a6ad5374034 100644 --- a/core/src/crds_gossip_pull.rs +++ b/core/src/crds_gossip_pull.rs @@ -275,7 +275,7 @@ impl CrdsGossipPull { /// process a pull request pub fn process_pull_requests(&mut self, crds: &mut Crds, callers: I, now: u64) where - I: Iterator, + I: IntoIterator, { for caller in callers { let key = caller.label().pubkey(); diff --git a/core/src/ping_pong.rs b/core/src/ping_pong.rs index 10e0884568707f..093da736b6f06e 100644 --- a/core/src/ping_pong.rs +++ b/core/src/ping_pong.rs @@ -249,15 +249,153 @@ impl Clone for PingCache { #[cfg(test)] mod tests { use super::*; + use std::collections::HashSet; + use std::iter::repeat_with; + use std::net::{Ipv4Addr, SocketAddrV4}; + + type Token = [u8; 32]; #[test] - fn test_new_rand() { + fn test_ping_pong() { let mut rng = rand::thread_rng(); let keypair = Keypair::new(); - let ping = Ping::<[u8; 32]>::new_rand(&mut rng, &keypair).unwrap(); + let ping = Ping::::new_rand(&mut rng, &keypair).unwrap(); assert!(ping.verify()); + assert!(ping.sanitize().is_ok()); + let pong = Pong::new(&ping, &keypair).unwrap(); assert!(pong.verify()); + assert!(pong.sanitize().is_ok()); assert_eq!(hash::hash(&ping.token), pong.hash); } + + #[test] + fn test_ping_cache() { + let now = Instant::now(); + let mut rng = rand::thread_rng(); + let ttl = Duration::from_millis(256); + let mut cache = PingCache::new(ttl, /*cap=*/ 1000); + let this_node = Keypair::new(); + let keypairs: Vec<_> = repeat_with(Keypair::new).take(8).collect(); + let sockets: Vec<_> = repeat_with(|| { + SocketAddr::V4(SocketAddrV4::new( + Ipv4Addr::new(rng.gen(), rng.gen(), rng.gen(), rng.gen()), + rng.gen(), + )) + }) + .take(8) + .collect(); + let remote_nodes: Vec<(&Keypair, SocketAddr)> = repeat_with(|| { + let keypair = &keypairs[rng.gen_range(0, keypairs.len())]; + let socket = sockets[rng.gen_range(0, sockets.len())]; + (keypair, socket) + }) + .take(128) + .collect(); + + // Initially all checks should fail. The first observation of each node + // should create a ping packet. + let mut seen_nodes = HashSet::<(Pubkey, SocketAddr)>::new(); + let pings: Vec>> = remote_nodes + .iter() + .map(|(keypair, socket)| { + let node = (keypair.pubkey(), *socket); + let pingf = || Ping::::new_rand(&mut rng, &this_node).ok(); + let (check, ping) = cache.check(now, node, pingf); + assert!(!check); + assert_eq!(seen_nodes.insert(node), ping.is_some()); + ping + }) + .collect(); + + let now = now + Duration::from_millis(1); + let panic_ping = || -> Option> { panic!("this should not happen!") }; + for ((keypair, socket), ping) in remote_nodes.iter().zip(&pings) { + match ping { + None => { + // Already have a recent ping packets for nodes, so no new + // ping packet will be generated. + let node = (keypair.pubkey(), *socket); + let (check, ping) = cache.check(now, node, panic_ping); + assert!(check); + assert!(ping.is_none()); + } + Some(ping) => { + let pong = Pong::new(ping, keypair).unwrap(); + assert!(cache.add(&pong, *socket, now)); + } + } + } + + let now = now + Duration::from_millis(1); + // All nodes now have a recent pong packet. + for (keypair, socket) in &remote_nodes { + let node = (keypair.pubkey(), *socket); + let (check, ping) = cache.check(now, node, panic_ping); + assert!(check); + assert!(ping.is_none()); + } + + let now = now + ttl / 8; + // All nodes still have a valid pong packet, but the cache will create + // a new ping packet to extend verification. + seen_nodes.clear(); + for (keypair, socket) in &remote_nodes { + let node = (keypair.pubkey(), *socket); + let pingf = || Ping::::new_rand(&mut rng, &this_node).ok(); + let (check, ping) = cache.check(now, node, pingf); + assert!(check); + assert_eq!(seen_nodes.insert(node), ping.is_some()); + } + + let now = now + Duration::from_millis(1); + // All nodes still have a valid pong packet, and a very recent ping + // packet pending response. So no new ping packet will be created. + for (keypair, socket) in &remote_nodes { + let node = (keypair.pubkey(), *socket); + let (check, ping) = cache.check(now, node, panic_ping); + assert!(check); + assert!(ping.is_none()); + } + + let now = now + ttl; + // Pong packets are still valid but expired. The first observation of + // each node will remove the pong packet from cache and create a new + // ping packet. + seen_nodes.clear(); + for (keypair, socket) in &remote_nodes { + let node = (keypair.pubkey(), *socket); + let pingf = || Ping::::new_rand(&mut rng, &this_node).ok(); + let (check, ping) = cache.check(now, node, pingf); + if seen_nodes.insert(node) { + assert!(check); + assert!(ping.is_some()); + } else { + assert!(!check); + assert!(ping.is_none()); + } + } + + let now = now + Duration::from_millis(1); + // No valid pong packet in the cache. A recent ping packet already + // created, so no new one will be created. + for (keypair, socket) in &remote_nodes { + let node = (keypair.pubkey(), *socket); + let (check, ping) = cache.check(now, node, panic_ping); + assert!(!check); + assert!(ping.is_none()); + } + + let now = now + ttl / 64; + // No valid pong packet in the cache. Another ping packet will be + // created for the first observation of each node. + seen_nodes.clear(); + for (keypair, socket) in &remote_nodes { + let node = (keypair.pubkey(), *socket); + let pingf = || Ping::::new_rand(&mut rng, &this_node).ok(); + let (check, ping) = cache.check(now, node, pingf); + assert!(!check); + assert_eq!(seen_nodes.insert(node), ping.is_some()); + } + } } From 2010d476a4bb127f7bbbdaa53c8764eaf0e0f10c Mon Sep 17 00:00:00 2001 From: behzad nouri Date: Thu, 15 Oct 2020 18:03:47 -0400 Subject: [PATCH 11/15] enables ping/pong check using feature-set --- core/src/cluster_info.rs | 19 +++++++------------ 1 file changed, 7 insertions(+), 12 deletions(-) diff --git a/core/src/cluster_info.rs b/core/src/cluster_info.rs index d3c2cb0adb6c1b..a47643c9ab677d 100644 --- a/core/src/cluster_info.rs +++ b/core/src/cluster_info.rs @@ -1940,29 +1940,29 @@ impl ClusterInfo { now: Instant, mut rng: &'a mut R, packets: &'a mut Packets, + feature_set: Option<&FeatureSet>, ) -> impl FnMut(&PullData) -> bool + 'a where R: Rng + CryptoRng, { + let check_enabled = matches!(feature_set, Some(feature_set) if + feature_set.is_active(&feature_set::pull_request_ping_pong_check::id())); let mut cache = HashMap::<(Pubkey, SocketAddr), bool>::new(); let mut pingf = move || Ping::new_rand(&mut rng, &self.keypair).ok(); let mut ping_cache = self.ping_cache.write().unwrap(); let mut hard_check = move |node| { - let (_check, ping) = ping_cache.check(now, node, &mut pingf); + let (check, ping) = ping_cache.check(now, node, &mut pingf); if let Some(ping) = ping { let ping = Protocol::PingMessage(ping); let ping = Packet::from_data(&node.1, ping); packets.packets.push(ping); } - if !_check { + if !check { self.stats .pull_request_ping_pong_check_failed_count .add_relaxed(1) } - // TODO: For backward compatibility, this unconditionally returns - // true for now. It has to return _check, once nodes start - // responding to ping messages. - true + check || !check_enabled }; // Because pull-responses are sent back to packet.meta.addr() of // incoming pull-requests, pings are also sent to request.from_addr (as @@ -1984,11 +1984,6 @@ impl ClusterInfo { stakes: &HashMap, feature_set: Option<&FeatureSet>, ) -> Packets { - if matches!(feature_set, Some(feature_set) if - feature_set.is_active(&feature_set::pull_request_ping_pong_check::id())) - { - // TODO: add ping-pong check on pull-request addresses. - } let mut time = Measure::start("handle_pull_requests"); self.time_gossip_write_lock("process_pull_reqs", &self.stats.process_pull_requests) .process_pull_requests(requests.iter().map(|r| r.caller.clone()), timestamp()); @@ -1997,7 +1992,7 @@ impl ClusterInfo { let (caller_and_filters, addrs): (Vec<_>, Vec<_>) = { let mut rng = rand::thread_rng(); let check_pull_request = - self.check_pull_request(Instant::now(), &mut rng, &mut packets); + self.check_pull_request(Instant::now(), &mut rng, &mut packets, feature_set); requests .into_iter() .filter(check_pull_request) From 2e45ff6fbad40cfa17bed9582656d9513de39e7b Mon Sep 17 00:00:00 2001 From: behzad nouri Date: Fri, 16 Oct 2020 05:38:40 -0400 Subject: [PATCH 12/15] verifies packets outside the lock --- core/src/cluster_info.rs | 24 +++++++++++++++--------- 1 file changed, 15 insertions(+), 9 deletions(-) diff --git a/core/src/cluster_info.rs b/core/src/cluster_info.rs index a47643c9ab677d..a1299fc4b795fa 100644 --- a/core/src/cluster_info.rs +++ b/core/src/cluster_info.rs @@ -2204,19 +2204,25 @@ impl ClusterInfo { where I: IntoIterator, { - let mut pongs = pongs.into_iter().peekable(); - if pongs.peek().is_some() { - let mut verify_failed = 0; - let mut ping_cache = self.ping_cache.write().unwrap(); - for (pong, addr) in pongs { + let mut verify_failed = 0; + let pongs: Vec<_> = pongs + .into_iter() + .filter(|(pong, _)| { if pong.verify() { - ping_cache.add(&pong, addr, now); + true } else { verify_failed += 1; + false } - } - if verify_failed != 0 { - inc_new_counter_info!("cluster_info-gossip_pong_msg_verify_fail", verify_failed); + }) + .collect(); + if verify_failed != 0 { + inc_new_counter_info!("cluster_info-gossip_pong_msg_verify_fail", verify_failed); + } + if !pongs.is_empty() { + let mut ping_cache = self.ping_cache.write().unwrap(); + for (pong, addr) in pongs { + ping_cache.add(&pong, addr, now); } } } From 23a42c2f7e133d494c9b96a29b37df0303543efa Mon Sep 17 00:00:00 2001 From: behzad nouri Date: Wed, 21 Oct 2020 11:49:26 -0400 Subject: [PATCH 13/15] Revert "verifies packets outside the lock" This reverts commit bae619553916c20189ed179412e50db5a0a90b38. --- core/src/cluster_info.rs | 24 +++++++++--------------- 1 file changed, 9 insertions(+), 15 deletions(-) diff --git a/core/src/cluster_info.rs b/core/src/cluster_info.rs index a1299fc4b795fa..a47643c9ab677d 100644 --- a/core/src/cluster_info.rs +++ b/core/src/cluster_info.rs @@ -2204,25 +2204,19 @@ impl ClusterInfo { where I: IntoIterator, { - let mut verify_failed = 0; - let pongs: Vec<_> = pongs - .into_iter() - .filter(|(pong, _)| { + let mut pongs = pongs.into_iter().peekable(); + if pongs.peek().is_some() { + let mut verify_failed = 0; + let mut ping_cache = self.ping_cache.write().unwrap(); + for (pong, addr) in pongs { if pong.verify() { - true + ping_cache.add(&pong, addr, now); } else { verify_failed += 1; - false } - }) - .collect(); - if verify_failed != 0 { - inc_new_counter_info!("cluster_info-gossip_pong_msg_verify_fail", verify_failed); - } - if !pongs.is_empty() { - let mut ping_cache = self.ping_cache.write().unwrap(); - for (pong, addr) in pongs { - ping_cache.add(&pong, addr, now); + } + if verify_failed != 0 { + inc_new_counter_info!("cluster_info-gossip_pong_msg_verify_fail", verify_failed); } } } From 4f1791ceec08803157c26d513afaf904c221afb0 Mon Sep 17 00:00:00 2001 From: behzad nouri Date: Wed, 21 Oct 2020 12:04:54 -0400 Subject: [PATCH 14/15] removes signature verification since it is already done --- core/src/cluster_info.rs | 27 +++++---------------------- 1 file changed, 5 insertions(+), 22 deletions(-) diff --git a/core/src/cluster_info.rs b/core/src/cluster_info.rs index a47643c9ab677d..e4dfc26d93eb63 100644 --- a/core/src/cluster_info.rs +++ b/core/src/cluster_info.rs @@ -2173,24 +2173,15 @@ impl ClusterInfo { where I: IntoIterator, { - let mut verify_failed = 0; let packets: Vec<_> = pings .into_iter() .filter_map(|(ping, addr)| { - if ping.verify() { - let pong = Pong::new(&ping, &self.keypair).ok()?; - let pong = Protocol::PongMessage(pong); - let packet = Packet::from_data(&addr, pong); - Some(packet) - } else { - verify_failed += 1; - None - } + let pong = Pong::new(&ping, &self.keypair).ok()?; + let pong = Protocol::PongMessage(pong); + let packet = Packet::from_data(&addr, pong); + Some(packet) }) .collect(); - if verify_failed != 0 { - inc_new_counter_info!("cluster_info-gossip_ping_msg_verify_fail", verify_failed); - } if packets.is_empty() { None } else { @@ -2206,17 +2197,9 @@ impl ClusterInfo { { let mut pongs = pongs.into_iter().peekable(); if pongs.peek().is_some() { - let mut verify_failed = 0; let mut ping_cache = self.ping_cache.write().unwrap(); for (pong, addr) in pongs { - if pong.verify() { - ping_cache.add(&pong, addr, now); - } else { - verify_failed += 1; - } - } - if verify_failed != 0 { - inc_new_counter_info!("cluster_info-gossip_pong_msg_verify_fail", verify_failed); + ping_cache.add(&pong, addr, now); } } } From 7b356546a2d72608206b5c6068f56f1893ac9fa9 Mon Sep 17 00:00:00 2001 From: behzad nouri Date: Tue, 27 Oct 2020 14:10:48 -0400 Subject: [PATCH 15/15] removes Clone impl for PingCache --- core/src/cluster_info.rs | 2 +- core/src/ping_pong.rs | 11 +++++------ 2 files changed, 6 insertions(+), 7 deletions(-) diff --git a/core/src/cluster_info.rs b/core/src/cluster_info.rs index e4dfc26d93eb63..577a2740085acb 100644 --- a/core/src/cluster_info.rs +++ b/core/src/cluster_info.rs @@ -519,7 +519,7 @@ impl ClusterInfo { entrypoint: RwLock::new(self.entrypoint.read().unwrap().clone()), outbound_budget: self.outbound_budget.clone_non_atomic(), my_contact_info: RwLock::new(my_contact_info), - ping_cache: RwLock::new(self.ping_cache.read().unwrap().clone()), + ping_cache: RwLock::new(self.ping_cache.read().unwrap().mock_clone()), id: *new_id, stats: GossipStats::default(), socket: UdpSocket::bind("0.0.0.0:0").unwrap(), diff --git a/core/src/ping_pong.rs b/core/src/ping_pong.rs index 093da736b6f06e..c1ad136a0d547a 100644 --- a/core/src/ping_pong.rs +++ b/core/src/ping_pong.rs @@ -223,23 +223,22 @@ impl PingCache { }; (check, ping) } -} -impl Clone for PingCache { - fn clone(&self) -> Self { + // Only for tests and simulations. + pub(crate) fn mock_clone(&self) -> Self { let mut clone = Self { ttl: self.ttl, pings: LruCache::new(self.pings.cap()), pongs: LruCache::new(self.pongs.cap()), pending_cache: LruCache::new(self.pending_cache.cap()), }; - for (k, v) in self.pongs.iter() { + for (k, v) in self.pongs.iter().rev() { clone.pings.put(*k, *v); } - for (k, v) in self.pongs.iter() { + for (k, v) in self.pongs.iter().rev() { clone.pongs.put(*k, *v); } - for (k, v) in self.pending_cache.iter() { + for (k, v) in self.pending_cache.iter().rev() { clone.pending_cache.put(*k, *v); } clone