diff --git a/Cargo.lock b/Cargo.lock index ec1331eb7e5084..07996796f9a0ac 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1374,6 +1374,7 @@ version = "0.8.2" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "e91b62f79061a0bc2e046024cb7ba44b08419ed238ecbd9adbd787434b9e8c25" dependencies = [ + "ahash", "autocfg 1.0.0", ] @@ -1991,6 +1992,15 @@ dependencies = [ "cfg-if", ] +[[package]] +name = "lru" +version = "0.6.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "111b945ac72ec09eb7bc62a0fbdc3cc6e80555a7245f52a69d3921a75b53b153" +dependencies = [ + "hashbrown", +] + [[package]] name = "matches" version = "0.1.8" @@ -3805,6 +3815,7 @@ dependencies = [ "jsonrpc-pubsub", "jsonrpc-ws-server", "log 0.4.8", + "lru", "matches", "num-traits", "num_cpus", diff --git a/core/Cargo.toml b/core/Cargo.toml index 6fec41d1113fd4..7767804ecc915c 100644 --- a/core/Cargo.toml +++ b/core/Cargo.toml @@ -34,6 +34,7 @@ jsonrpc-http-server = "15.0.0" jsonrpc-pubsub = "15.0.0" jsonrpc-ws-server = "15.0.0" log = "0.4.8" +lru = "0.6.0" num_cpus = "1.13.0" num-traits = "0.2" rand = "0.7.0" diff --git a/core/src/cluster_info.rs b/core/src/cluster_info.rs index 7b630a37476fee..51b08d56f943c4 100644 --- a/core/src/cluster_info.rs +++ b/core/src/cluster_info.rs @@ -23,12 +23,13 @@ use crate::{ }, data_budget::DataBudget, epoch_slots::EpochSlots, + ping_pong::{self, PingCache, Pong}, result::{Error, Result}, weighted_shuffle::weighted_shuffle, }; use rand::distributions::{Distribution, WeightedIndex}; -use rand::SeedableRng; +use rand::{CryptoRng, Rng, SeedableRng}; use rand_chacha::ChaChaRng; use solana_sdk::sanitize::{Sanitize, SanitizeError}; @@ -97,6 +98,10 @@ const MAX_GOSSIP_TRAFFIC: usize = 128_000_000 / PACKET_DATA_SIZE; /// Keep the number of snapshot hashes a node publishes under MAX_PROTOCOL_PAYLOAD_SIZE pub const MAX_SNAPSHOT_HASHES: usize = 16; +/// Number of bytes in the randomly generated token sent with ping messages. +const GOSSIP_PING_TOKEN_SIZE: usize = 32; +const GOSSIP_PING_CACHE_CAPACITY: usize = 16384; +const GOSSIP_PING_CACHE_TTL: Duration = Duration::from_secs(640); #[derive(Debug, PartialEq, Eq)] pub enum ClusterInfoError { @@ -226,6 +231,7 @@ struct GossipStats { prune_received_cache: Counter, prune_message_count: Counter, prune_message_len: Counter, + pull_request_ping_pong_check_failed_count: Counter, purge: Counter, epoch_slots_lookup: Counter, epoch_slots_push: Counter, @@ -251,6 +257,7 @@ pub struct ClusterInfo { entrypoint: RwLock>, outbound_budget: DataBudget, my_contact_info: RwLock, + ping_cache: RwLock, id: Pubkey, stats: GossipStats, socket: UdpSocket, @@ -355,8 +362,10 @@ pub fn make_accounts_hashes_message( Some(CrdsValue::new_signed(message, keypair)) } +type Ping = ping_pong::Ping<[u8; GOSSIP_PING_TOKEN_SIZE]>; + // TODO These messages should go through the gpu pipeline for spam filtering -#[frozen_abi(digest = "21UweZ4WXK9RHypF97D1rEJQ4C8Bh4pw52SBSNAKxJvW")] +#[frozen_abi(digest = "3jHXixLRv6fuCykW47hBZSwFuwDjbZShR73GVQB6TjGr")] #[derive(Serialize, Deserialize, Debug, AbiEnumVisitor, AbiExample)] #[allow(clippy::large_enum_variant)] enum Protocol { @@ -365,6 +374,8 @@ enum Protocol { PullResponse(Pubkey, Vec), PushMessage(Pubkey, Vec), PruneMessage(Pubkey, PruneData), + PingMessage(Ping), + PongMessage(Pong), } impl Protocol { @@ -416,6 +427,22 @@ impl Protocol { None } } + Protocol::PingMessage(ref ping) => { + if ping.verify() { + Some(self) + } else { + inc_new_counter_info!("cluster_info-gossip_ping_msg_verify_fail", 1); + None + } + } + Protocol::PongMessage(ref pong) => { + if pong.verify() { + Some(self) + } else { + inc_new_counter_info!("cluster_info-gossip_pong_msg_verify_fail", 1); + None + } + } } } } @@ -430,6 +457,8 @@ impl Sanitize for Protocol { Protocol::PullResponse(_, val) => val.sanitize(), Protocol::PushMessage(_, val) => val.sanitize(), Protocol::PruneMessage(_, val) => val.sanitize(), + Protocol::PingMessage(ping) => ping.sanitize(), + Protocol::PongMessage(pong) => pong.sanitize(), } } } @@ -459,6 +488,10 @@ impl ClusterInfo { entrypoint: RwLock::new(None), outbound_budget: DataBudget::default(), my_contact_info: RwLock::new(contact_info), + ping_cache: RwLock::new(PingCache::new( + GOSSIP_PING_CACHE_TTL, + GOSSIP_PING_CACHE_CAPACITY, + )), id, stats: GossipStats::default(), socket: UdpSocket::bind("0.0.0.0:0").unwrap(), @@ -486,6 +519,7 @@ impl ClusterInfo { entrypoint: RwLock::new(self.entrypoint.read().unwrap().clone()), outbound_budget: self.outbound_budget.clone_non_atomic(), my_contact_info: RwLock::new(my_contact_info), + ping_cache: RwLock::new(self.ping_cache.read().unwrap().mock_clone()), id: *new_id, stats: GossipStats::default(), socket: UdpSocket::bind("0.0.0.0:0").unwrap(), @@ -1790,6 +1824,8 @@ impl ClusterInfo { .read() .unwrap() .make_timeouts(&stakes, epoch_time_ms); + let mut ping_messages = vec![]; + let mut pong_messages = vec![]; let mut pull_responses = HashMap::new(); for (from_addr, packet) in packets { match packet { @@ -1865,9 +1901,15 @@ impl ClusterInfo { ("prune_message", (allocated.get() - start) as i64, i64), ); } + Protocol::PingMessage(ping) => ping_messages.push((ping, from_addr)), + Protocol::PongMessage(pong) => pong_messages.push((pong, from_addr)), } } + if let Some(response) = self.handle_ping_messages(ping_messages, recycler) { + let _ = response_sender.send(response); + } + self.handle_pong_messages(pong_messages, Instant::now()); for (from, data) in pull_responses { self.handle_pull_response(&from, data, &timeouts); } @@ -1878,7 +1920,7 @@ impl ClusterInfo { .pull_requests_count .add_relaxed(gossip_pull_data.len() as u64); let rsp = self.handle_pull_requests(recycler, gossip_pull_data, stakes, feature_set); - if let Some(rsp) = rsp { + if !rsp.is_empty() { let _ignore_disconnect = response_sender.send(rsp); } } @@ -1898,6 +1940,49 @@ impl ClusterInfo { }); } + // Returns a predicate checking if the pull request is from a valid + // address, and if the address have responded to a ping request. Also + // appends ping packets for the addresses which need to be (re)verified. + fn check_pull_request<'a, R>( + &'a self, + now: Instant, + mut rng: &'a mut R, + packets: &'a mut Packets, + feature_set: Option<&FeatureSet>, + ) -> impl FnMut(&PullData) -> bool + 'a + where + R: Rng + CryptoRng, + { + let check_enabled = matches!(feature_set, Some(feature_set) if + feature_set.is_active(&feature_set::pull_request_ping_pong_check::id())); + let mut cache = HashMap::<(Pubkey, SocketAddr), bool>::new(); + let mut pingf = move || Ping::new_rand(&mut rng, &self.keypair).ok(); + let mut ping_cache = self.ping_cache.write().unwrap(); + let mut hard_check = move |node| { + let (check, ping) = ping_cache.check(now, node, &mut pingf); + if let Some(ping) = ping { + let ping = Protocol::PingMessage(ping); + let ping = Packet::from_data(&node.1, ping); + packets.packets.push(ping); + } + if !check { + self.stats + .pull_request_ping_pong_check_failed_count + .add_relaxed(1) + } + check || !check_enabled + }; + // Because pull-responses are sent back to packet.meta.addr() of + // incoming pull-requests, pings are also sent to request.from_addr (as + // opposed to caller.gossip address). + move |request| { + ContactInfo::is_valid_address(&request.from_addr) && { + let node = (request.caller.pubkey(), request.from_addr); + *cache.entry(node).or_insert_with(|| hard_check(node)) + } + } + } + // Pull requests take an incoming bloom filter of contained entries from a node // and tries to send back to them the values it detects are missing. fn handle_pull_requests( @@ -1906,21 +1991,22 @@ impl ClusterInfo { requests: Vec, stakes: &HashMap, feature_set: Option<&FeatureSet>, - ) -> Option { - if matches!(feature_set, Some(feature_set) if - feature_set.is_active(&feature_set::pull_request_ping_pong_check::id())) - { - // TODO: add ping-pong check on pull-request addresses. - } - // split the requests into addrs and filters - let mut caller_and_filters = vec![]; - let mut addrs = vec![]; + ) -> Packets { let mut time = Measure::start("handle_pull_requests"); + self.time_gossip_write_lock("process_pull_reqs", &self.stats.process_pull_requests) + .process_pull_requests(requests.iter().map(|r| r.caller.clone()), timestamp()); self.update_data_budget(stakes.len()); - for pull_data in requests { - caller_and_filters.push((pull_data.caller, pull_data.filter)); - addrs.push(pull_data.from_addr); - } + let mut packets = Packets::new_with_recycler(recycler.clone(), 64, "handle_pull_requests"); + let (caller_and_filters, addrs): (Vec<_>, Vec<_>) = { + let mut rng = rand::thread_rng(); + let check_pull_request = + self.check_pull_request(Instant::now(), &mut rng, &mut packets, feature_set); + requests + .into_iter() + .filter(check_pull_request) + .map(|r| ((r.caller, r.filter), r.from_addr)) + .unzip() + }; let now = timestamp(); let self_id = self.id(); @@ -1931,27 +2017,14 @@ impl ClusterInfo { ) .generate_pull_responses(&caller_and_filters, now); - self.time_gossip_write_lock("process_pull_reqs", &self.stats.process_pull_requests) - .process_pull_requests(caller_and_filters, now); - - // Filter bad to addresses let pull_responses: Vec<_> = pull_responses .into_iter() .zip(addrs.into_iter()) - .filter_map(|(responses, from_addr)| { - if !from_addr.ip().is_unspecified() - && from_addr.port() != 0 - && !responses.is_empty() - { - Some((responses, from_addr)) - } else { - None - } - }) + .filter(|(response, _)| !response.is_empty()) .collect(); if pull_responses.is_empty() { - return None; + return packets; } let mut stats: Vec<_> = pull_responses @@ -1983,7 +2056,6 @@ impl ClusterInfo { let rng = &mut ChaChaRng::from_seed(seed); let weighted_index = WeightedIndex::new(weights).unwrap(); - let mut packets = Packets::new_with_recycler(recycler.clone(), 64, "handle_pull_requests"); let mut total_bytes = 0; let mut sent = HashSet::new(); while sent.len() < stats.len() { @@ -2018,10 +2090,7 @@ impl ClusterInfo { stats.len(), total_bytes ); - if packets.is_empty() { - return None; - } - Some(packets) + packets } // Returns (failed, timeout, success) @@ -2108,6 +2177,41 @@ impl ClusterInfo { } } + fn handle_ping_messages(&self, pings: I, recycler: &PacketsRecycler) -> Option + where + I: IntoIterator, + { + let packets: Vec<_> = pings + .into_iter() + .filter_map(|(ping, addr)| { + let pong = Pong::new(&ping, &self.keypair).ok()?; + let pong = Protocol::PongMessage(pong); + let packet = Packet::from_data(&addr, pong); + Some(packet) + }) + .collect(); + if packets.is_empty() { + None + } else { + let packets = + Packets::new_with_recycler_data(recycler, "handle_ping_messages", packets); + Some(packets) + } + } + + fn handle_pong_messages(&self, pongs: I, now: Instant) + where + I: IntoIterator, + { + let mut pongs = pongs.into_iter().peekable(); + if pongs.peek().is_some() { + let mut ping_cache = self.ping_cache.write().unwrap(); + for (pong, addr) in pongs { + ping_cache.add(&pong, addr, now); + } + } + } + fn handle_push_message( &self, recycler: &PacketsRecycler, @@ -2412,6 +2516,11 @@ impl ClusterInfo { self.stats.process_pull_requests.clear(), i64 ), + ( + "pull_request_ping_pong_check_failed_count", + self.stats.pull_request_ping_pong_check_failed_count.clear(), + i64 + ), ( "generate_pull_responses", self.stats.generate_pull_responses.clear(), @@ -2796,12 +2905,14 @@ pub fn stake_weight_peers( mod tests { use super::*; use crate::crds_value::{CrdsValue, CrdsValueLabel, Vote as CrdsVote}; + use itertools::izip; use rayon::prelude::*; use solana_perf::test_tx::test_tx; use solana_sdk::signature::{Keypair, Signer}; use solana_vote_program::{vote_instruction, vote_state::Vote}; use std::collections::HashSet; - use std::net::{IpAddr, Ipv4Addr}; + use std::iter::repeat_with; + use std::net::{IpAddr, Ipv4Addr, SocketAddrV4}; use std::sync::Arc; #[test] @@ -2843,6 +2954,117 @@ mod tests { ); } + fn new_rand_remote_node(rng: &mut R) -> (Keypair, SocketAddr) + where + R: Rng, + { + let keypair = Keypair::new(); + let socket = SocketAddr::V4(SocketAddrV4::new( + Ipv4Addr::new(rng.gen(), rng.gen(), rng.gen(), rng.gen()), + rng.gen(), + )); + (keypair, socket) + } + + #[test] + fn test_handle_pong_messages() { + let now = Instant::now(); + let mut rng = rand::thread_rng(); + let this_node = Arc::new(Keypair::new()); + let cluster_info = ClusterInfo::new( + ContactInfo::new_localhost(&this_node.pubkey(), timestamp()), + this_node.clone(), + ); + let remote_nodes: Vec<(Keypair, SocketAddr)> = + repeat_with(|| new_rand_remote_node(&mut rng)) + .take(128) + .collect(); + let pings: Vec<_> = { + let mut ping_cache = cluster_info.ping_cache.write().unwrap(); + let mut pingf = || Ping::new_rand(&mut rng, &this_node).ok(); + remote_nodes + .iter() + .map(|(keypair, socket)| { + let node = (keypair.pubkey(), *socket); + let (check, ping) = ping_cache.check(now, node, &mut pingf); + // Assert that initially remote nodes will not pass the + // ping/pong check. + assert!(!check); + ping.unwrap() + }) + .collect() + }; + let pongs: Vec<(Pong, SocketAddr)> = pings + .iter() + .zip(&remote_nodes) + .map(|(ping, (keypair, socket))| (Pong::new(ping, keypair).unwrap(), *socket)) + .collect(); + let now = now + Duration::from_millis(1); + cluster_info.handle_pong_messages(pongs, now); + // Assert that remote nodes now pass the ping/pong check. + { + let mut ping_cache = cluster_info.ping_cache.write().unwrap(); + for (keypair, socket) in &remote_nodes { + let node = (keypair.pubkey(), *socket); + let (check, _) = ping_cache.check(now, node, || -> Option { None }); + assert!(check); + } + } + // Assert that a new random remote node still will not pass the check. + { + let mut ping_cache = cluster_info.ping_cache.write().unwrap(); + let (keypair, socket) = new_rand_remote_node(&mut rng); + let node = (keypair.pubkey(), socket); + let (check, _) = ping_cache.check(now, node, || -> Option { None }); + assert!(!check); + } + } + + #[test] + fn test_handle_ping_messages() { + let mut rng = rand::thread_rng(); + let this_node = Arc::new(Keypair::new()); + let cluster_info = ClusterInfo::new( + ContactInfo::new_localhost(&this_node.pubkey(), timestamp()), + this_node.clone(), + ); + let remote_nodes: Vec<(Keypair, SocketAddr)> = + repeat_with(|| new_rand_remote_node(&mut rng)) + .take(128) + .collect(); + let pings: Vec<_> = remote_nodes + .iter() + .map(|(keypair, _)| Ping::new_rand(&mut rng, keypair).unwrap()) + .collect(); + let pongs: Vec<_> = pings + .iter() + .map(|ping| Pong::new(ping, &this_node).unwrap()) + .collect(); + let recycler = PacketsRecycler::default(); + let packets = cluster_info + .handle_ping_messages( + pings + .into_iter() + .zip(remote_nodes.iter().map(|(_, socket)| *socket)), + &recycler, + ) + .unwrap() + .packets; + assert_eq!(remote_nodes.len(), packets.len()); + for (packet, (_, socket), pong) in izip!( + packets.into_iter(), + remote_nodes.into_iter(), + pongs.into_iter() + ) { + assert_eq!(packet.meta.addr(), socket); + let bytes = serialize(&pong).unwrap(); + match limited_deserialize(&packet.data[..packet.meta.size]).unwrap() { + Protocol::PongMessage(pong) => assert_eq!(serialize(&pong).unwrap(), bytes), + _ => panic!("invalid packet!"), + } + } + } + fn test_crds_values(pubkey: Pubkey) -> Vec { let entrypoint = ContactInfo::new_localhost(&pubkey, timestamp()); let entrypoint_crdsvalue = CrdsValue::new_unsigned(CrdsData::ContactInfo(entrypoint)); diff --git a/core/src/crds_gossip.rs b/core/src/crds_gossip.rs index ad95478f7ef0e7..d0a437876b5942 100644 --- a/core/src/crds_gossip.rs +++ b/core/src/crds_gossip.rs @@ -174,9 +174,12 @@ impl CrdsGossip { self.pull.mark_pull_request_creation_time(from, now) } /// process a pull request and create a response - pub fn process_pull_requests(&mut self, filters: Vec<(CrdsValue, CrdsFilter)>, now: u64) { + pub fn process_pull_requests(&mut self, callers: I, now: u64) + where + I: IntoIterator, + { self.pull - .process_pull_requests(&mut self.crds, filters, now); + .process_pull_requests(&mut self.crds, callers, now); } pub fn generate_pull_responses( diff --git a/core/src/crds_gossip_pull.rs b/core/src/crds_gossip_pull.rs index d895a8b7296905..c28a6ad5374034 100644 --- a/core/src/crds_gossip_pull.rs +++ b/core/src/crds_gossip_pull.rs @@ -273,20 +273,18 @@ impl CrdsGossipPull { } /// process a pull request - pub fn process_pull_requests( - &mut self, - crds: &mut Crds, - requests: Vec<(CrdsValue, CrdsFilter)>, - now: u64, - ) { - requests.into_iter().for_each(|(caller, _)| { + pub fn process_pull_requests(&mut self, crds: &mut Crds, callers: I, now: u64) + where + I: IntoIterator, + { + for caller in callers { let key = caller.label().pubkey(); if let Ok(Some(val)) = crds.insert(caller, now) { self.purged_values .push_back((val.value_hash, val.local_timestamp)); } crds.update_record_timestamp(&key, now); - }); + } } /// Create gossip responses to pull requests @@ -1087,7 +1085,11 @@ mod test { let (_, filters, caller) = req.unwrap(); let filters: Vec<_> = filters.into_iter().map(|f| (caller.clone(), f)).collect(); let rsp = dest.generate_pull_responses(&dest_crds, &filters, 0); - dest.process_pull_requests(&mut dest_crds, filters, 1); + dest.process_pull_requests( + &mut dest_crds, + filters.into_iter().map(|(caller, _)| caller), + 1, + ); assert!(rsp.iter().all(|rsp| rsp.is_empty())); assert!(dest_crds.lookup(&caller.label()).is_some()); assert_eq!( @@ -1161,7 +1163,11 @@ mod test { let (_, filters, caller) = req.unwrap(); let filters: Vec<_> = filters.into_iter().map(|f| (caller.clone(), f)).collect(); let mut rsp = dest.generate_pull_responses(&dest_crds, &filters, 0); - dest.process_pull_requests(&mut dest_crds, filters, 0); + dest.process_pull_requests( + &mut dest_crds, + filters.into_iter().map(|(caller, _)| caller), + 0, + ); // if there is a false positive this is empty // prob should be around 0.1 per iteration if rsp.is_empty() { diff --git a/core/src/lib.rs b/core/src/lib.rs index 30737faa35485d..736aa6a96694ae 100644 --- a/core/src/lib.rs +++ b/core/src/lib.rs @@ -43,6 +43,7 @@ pub mod local_vote_signer_service; pub mod non_circulating_supply; pub mod optimistic_confirmation_verifier; pub mod optimistically_confirmed_bank_tracker; +pub mod ping_pong; pub mod poh_recorder; pub mod poh_service; pub mod progress_map; diff --git a/core/src/ping_pong.rs b/core/src/ping_pong.rs new file mode 100644 index 00000000000000..c1ad136a0d547a --- /dev/null +++ b/core/src/ping_pong.rs @@ -0,0 +1,400 @@ +use bincode::{serialize, Error}; +use lru::LruCache; +use rand::{AsByteSliceMut, CryptoRng, Rng}; +use serde::Serialize; +use solana_sdk::hash::{self, Hash}; +use solana_sdk::pubkey::Pubkey; +use solana_sdk::sanitize::{Sanitize, SanitizeError}; +use solana_sdk::signature::{Keypair, Signable, Signature, Signer}; +use std::borrow::Cow; +use std::net::SocketAddr; +use std::time::{Duration, Instant}; + +#[derive(AbiExample, Debug, Deserialize, Serialize)] +pub struct Ping { + from: Pubkey, + token: T, + signature: Signature, +} + +#[derive(AbiExample, Debug, Deserialize, Serialize)] +pub struct Pong { + from: Pubkey, + hash: Hash, // Hash of received ping token. + signature: Signature, +} + +/// Maintains records of remote nodes which have returned a valid response to a +/// ping message, and on-the-fly ping messages pending a pong response from the +/// remote node. +pub struct PingCache { + // Time-to-live of received pong messages. + ttl: Duration, + // Timestamp of last ping message sent to a remote node. + // Used to rate limit pings to remote nodes. + pings: LruCache<(Pubkey, SocketAddr), Instant>, + // Verified pong responses from remote nodes. + pongs: LruCache<(Pubkey, SocketAddr), Instant>, + // Hash of ping tokens sent out to remote nodes, + // pending a pong response back. + pending_cache: LruCache, +} + +impl Ping { + pub fn new(token: T, keypair: &Keypair) -> Result { + let signature = keypair.sign_message(&serialize(&token)?); + let ping = Ping { + from: keypair.pubkey(), + token, + signature, + }; + Ok(ping) + } +} + +impl Ping +where + T: Serialize + AsByteSliceMut + Default, +{ + pub fn new_rand(rng: &mut R, keypair: &Keypair) -> Result + where + R: Rng + CryptoRng, + { + let mut token = T::default(); + rng.fill(&mut token); + Ping::new(token, keypair) + } +} + +impl Sanitize for Ping { + fn sanitize(&self) -> Result<(), SanitizeError> { + self.from.sanitize()?; + // TODO Add self.token.sanitize()?; when rust's + // specialization feature becomes stable. + self.signature.sanitize() + } +} + +impl Signable for Ping { + fn pubkey(&self) -> Pubkey { + self.from + } + + fn signable_data(&self) -> Cow<[u8]> { + Cow::Owned(serialize(&self.token).unwrap()) + } + + fn get_signature(&self) -> Signature { + self.signature + } + + fn set_signature(&mut self, signature: Signature) { + self.signature = signature; + } +} + +impl Pong { + pub fn new(ping: &Ping, keypair: &Keypair) -> Result { + let hash = hash::hash(&serialize(&ping.token)?); + let pong = Pong { + from: keypair.pubkey(), + hash, + signature: keypair.sign_message(hash.as_ref()), + }; + Ok(pong) + } +} + +impl Sanitize for Pong { + fn sanitize(&self) -> Result<(), SanitizeError> { + self.from.sanitize()?; + self.hash.sanitize()?; + self.signature.sanitize() + } +} + +impl Signable for Pong { + fn pubkey(&self) -> Pubkey { + self.from + } + + fn signable_data(&self) -> Cow<[u8]> { + Cow::Owned(self.hash.as_ref().into()) + } + + fn get_signature(&self) -> Signature { + self.signature + } + + fn set_signature(&mut self, signature: Signature) { + self.signature = signature; + } +} + +impl PingCache { + pub fn new(ttl: Duration, cap: usize) -> Self { + Self { + ttl, + pings: LruCache::new(cap), + pongs: LruCache::new(cap), + pending_cache: LruCache::new(cap), + } + } + + /// Checks if the pong hash, pubkey and socket match a ping message sent + /// out previously. If so records current timestamp for the remote node and + /// returns true. + /// Note: Does not verify the signature. + pub fn add(&mut self, pong: &Pong, socket: SocketAddr, now: Instant) -> bool { + let node = (pong.pubkey(), socket); + match self.pending_cache.peek(&pong.hash) { + Some(value) if *value == node => { + self.pings.pop(&node); + self.pongs.put(node, now); + self.pending_cache.pop(&pong.hash); + true + } + _ => false, + } + } + + /// Checks if the remote node has been pinged recently. If not, calls the + /// given function to generates a new ping message, records current + /// timestamp and hash of ping token, and returns the ping message. + fn maybe_ping( + &mut self, + now: Instant, + node: (Pubkey, SocketAddr), + mut pingf: F, + ) -> Option> + where + T: Serialize, + F: FnMut() -> Option>, + { + // Rate limit consecutive pings sent to a remote node. + let delay = self.ttl / 64; + match self.pings.peek(&node) { + Some(t) if now.saturating_duration_since(*t) < delay => None, + _ => { + let ping = pingf()?; + let hash = hash::hash(&serialize(&ping.token).ok()?); + self.pings.put(node, now); + self.pending_cache.put(hash, node); + Some(ping) + } + } + } + + /// Returns true if the remote node has responded to a ping message. + /// Removes expired pong messages. In order to extend verifications before + /// expiration, if the pong message is not too recent, and the node has not + /// been pinged recently, calls the given function to generates a new ping + /// message, records current timestamp and hash of ping token, and returns + /// the ping message. + /// Caller should verify if the socket address is valid. (e.g. by using + /// ContactInfo::is_valid_address). + pub fn check( + &mut self, + now: Instant, + node: (Pubkey, SocketAddr), + pingf: F, + ) -> (bool, Option>) + where + T: Serialize, + F: FnMut() -> Option>, + { + let (check, should_ping) = match self.pongs.get(&node) { + None => (false, true), + Some(t) => { + let age = now.saturating_duration_since(*t); + // Pop if the pong message has expired. + if age > self.ttl { + self.pongs.pop(&node); + } + // If the pong message is not too recent, generate a new ping + // message to extend remote node verification. + (true, age > self.ttl / 8) + } + }; + let ping = if should_ping { + self.maybe_ping(now, node, pingf) + } else { + None + }; + (check, ping) + } + + // Only for tests and simulations. + pub(crate) fn mock_clone(&self) -> Self { + let mut clone = Self { + ttl: self.ttl, + pings: LruCache::new(self.pings.cap()), + pongs: LruCache::new(self.pongs.cap()), + pending_cache: LruCache::new(self.pending_cache.cap()), + }; + for (k, v) in self.pongs.iter().rev() { + clone.pings.put(*k, *v); + } + for (k, v) in self.pongs.iter().rev() { + clone.pongs.put(*k, *v); + } + for (k, v) in self.pending_cache.iter().rev() { + clone.pending_cache.put(*k, *v); + } + clone + } +} + +#[cfg(test)] +mod tests { + use super::*; + use std::collections::HashSet; + use std::iter::repeat_with; + use std::net::{Ipv4Addr, SocketAddrV4}; + + type Token = [u8; 32]; + + #[test] + fn test_ping_pong() { + let mut rng = rand::thread_rng(); + let keypair = Keypair::new(); + let ping = Ping::::new_rand(&mut rng, &keypair).unwrap(); + assert!(ping.verify()); + assert!(ping.sanitize().is_ok()); + + let pong = Pong::new(&ping, &keypair).unwrap(); + assert!(pong.verify()); + assert!(pong.sanitize().is_ok()); + assert_eq!(hash::hash(&ping.token), pong.hash); + } + + #[test] + fn test_ping_cache() { + let now = Instant::now(); + let mut rng = rand::thread_rng(); + let ttl = Duration::from_millis(256); + let mut cache = PingCache::new(ttl, /*cap=*/ 1000); + let this_node = Keypair::new(); + let keypairs: Vec<_> = repeat_with(Keypair::new).take(8).collect(); + let sockets: Vec<_> = repeat_with(|| { + SocketAddr::V4(SocketAddrV4::new( + Ipv4Addr::new(rng.gen(), rng.gen(), rng.gen(), rng.gen()), + rng.gen(), + )) + }) + .take(8) + .collect(); + let remote_nodes: Vec<(&Keypair, SocketAddr)> = repeat_with(|| { + let keypair = &keypairs[rng.gen_range(0, keypairs.len())]; + let socket = sockets[rng.gen_range(0, sockets.len())]; + (keypair, socket) + }) + .take(128) + .collect(); + + // Initially all checks should fail. The first observation of each node + // should create a ping packet. + let mut seen_nodes = HashSet::<(Pubkey, SocketAddr)>::new(); + let pings: Vec>> = remote_nodes + .iter() + .map(|(keypair, socket)| { + let node = (keypair.pubkey(), *socket); + let pingf = || Ping::::new_rand(&mut rng, &this_node).ok(); + let (check, ping) = cache.check(now, node, pingf); + assert!(!check); + assert_eq!(seen_nodes.insert(node), ping.is_some()); + ping + }) + .collect(); + + let now = now + Duration::from_millis(1); + let panic_ping = || -> Option> { panic!("this should not happen!") }; + for ((keypair, socket), ping) in remote_nodes.iter().zip(&pings) { + match ping { + None => { + // Already have a recent ping packets for nodes, so no new + // ping packet will be generated. + let node = (keypair.pubkey(), *socket); + let (check, ping) = cache.check(now, node, panic_ping); + assert!(check); + assert!(ping.is_none()); + } + Some(ping) => { + let pong = Pong::new(ping, keypair).unwrap(); + assert!(cache.add(&pong, *socket, now)); + } + } + } + + let now = now + Duration::from_millis(1); + // All nodes now have a recent pong packet. + for (keypair, socket) in &remote_nodes { + let node = (keypair.pubkey(), *socket); + let (check, ping) = cache.check(now, node, panic_ping); + assert!(check); + assert!(ping.is_none()); + } + + let now = now + ttl / 8; + // All nodes still have a valid pong packet, but the cache will create + // a new ping packet to extend verification. + seen_nodes.clear(); + for (keypair, socket) in &remote_nodes { + let node = (keypair.pubkey(), *socket); + let pingf = || Ping::::new_rand(&mut rng, &this_node).ok(); + let (check, ping) = cache.check(now, node, pingf); + assert!(check); + assert_eq!(seen_nodes.insert(node), ping.is_some()); + } + + let now = now + Duration::from_millis(1); + // All nodes still have a valid pong packet, and a very recent ping + // packet pending response. So no new ping packet will be created. + for (keypair, socket) in &remote_nodes { + let node = (keypair.pubkey(), *socket); + let (check, ping) = cache.check(now, node, panic_ping); + assert!(check); + assert!(ping.is_none()); + } + + let now = now + ttl; + // Pong packets are still valid but expired. The first observation of + // each node will remove the pong packet from cache and create a new + // ping packet. + seen_nodes.clear(); + for (keypair, socket) in &remote_nodes { + let node = (keypair.pubkey(), *socket); + let pingf = || Ping::::new_rand(&mut rng, &this_node).ok(); + let (check, ping) = cache.check(now, node, pingf); + if seen_nodes.insert(node) { + assert!(check); + assert!(ping.is_some()); + } else { + assert!(!check); + assert!(ping.is_none()); + } + } + + let now = now + Duration::from_millis(1); + // No valid pong packet in the cache. A recent ping packet already + // created, so no new one will be created. + for (keypair, socket) in &remote_nodes { + let node = (keypair.pubkey(), *socket); + let (check, ping) = cache.check(now, node, panic_ping); + assert!(!check); + assert!(ping.is_none()); + } + + let now = now + ttl / 64; + // No valid pong packet in the cache. Another ping packet will be + // created for the first observation of each node. + seen_nodes.clear(); + for (keypair, socket) in &remote_nodes { + let node = (keypair.pubkey(), *socket); + let pingf = || Ping::::new_rand(&mut rng, &this_node).ok(); + let (check, ping) = cache.check(now, node, pingf); + assert!(!check); + assert_eq!(seen_nodes.insert(node), ping.is_some()); + } + } +} diff --git a/core/tests/crds_gossip.rs b/core/tests/crds_gossip.rs index 2540e041bb0440..c1011f6ef76f76 100644 --- a/core/tests/crds_gossip.rs +++ b/core/tests/crds_gossip.rs @@ -462,7 +462,10 @@ fn network_run_pull( .into_iter() .flatten() .collect(); - node.lock().unwrap().process_pull_requests(filters, now); + node.lock().unwrap().process_pull_requests( + filters.into_iter().map(|(caller, _)| caller), + now, + ); rsp }) .unwrap();