From d15675ad7f41978ceb11ea6261b9b03fb204d156 Mon Sep 17 00:00:00 2001 From: behzad nouri Date: Mon, 12 Oct 2020 15:38:08 -0400 Subject: [PATCH] builds a predicate for checking pull requests --- core/src/cluster_info.rs | 51 ++++++++++++++++++++-------------------- 1 file changed, 26 insertions(+), 25 deletions(-) diff --git a/core/src/cluster_info.rs b/core/src/cluster_info.rs index 83afa7f1927742..802b2d03474964 100644 --- a/core/src/cluster_info.rs +++ b/core/src/cluster_info.rs @@ -28,7 +28,7 @@ use crate::{ }; use rand::distributions::{Distribution, WeightedIndex}; -use rand::SeedableRng; +use rand::{CryptoRng, Rng, SeedableRng}; use rand_chacha::ChaChaRng; use solana_sdk::sanitize::{Sanitize, SanitizeError}; @@ -1812,20 +1812,22 @@ impl ClusterInfo { } } - // Filters out pull requests from invalid addresses, and those which have - // not passed the ping-pong check. Appends ping packets for the addresses - // which need to be (re)verified. - fn filter_pull_requests( - &self, - requests: Vec, - packets: &mut Packets, - ) -> Vec { - let mut rng = rand::thread_rng(); + // Returns a predicate checking if the pull request is from a valid + // address, and if the address have responded to a ping request. Also + // appends ping packets for the addresses which need to be (re)verified. + fn check_pull_request<'a, R>( + &'a self, + now: Instant, + mut rng: &'a mut R, + packets: &'a mut Packets, + ) -> impl FnMut(&PullData) -> bool + 'a + where + R: Rng + CryptoRng, + { let mut cache = HashMap::<(Pubkey, SocketAddr), bool>::new(); - let mut pingf = || Ping::new_rand(&mut rng, &self.keypair).ok(); + let mut pingf = move || Ping::new_rand(&mut rng, &self.keypair).ok(); let mut ping_cache = self.ping_cache.write().unwrap(); - let now = Instant::now(); - let mut hard_check = |node| { + let mut hard_check = move |node| { let (_check, ping) = ping_cache.check(now, node, &mut pingf); if let Some(ping) = ping { let ping = Protocol::PingMessage(ping); @@ -1833,22 +1835,19 @@ impl ClusterInfo { packets.packets.push(ping); } // TODO: For backward compatibility, this unconditionally returns - // true for now. This has to return _check, once nodes start + // true for now. It has to return _check, once nodes start // responding to ping messages. true }; // Because pull-responses are sent back to packet.meta.addr() of // incoming pull-requests, pings are also sent to request.from_addr (as // opposed to caller.gossip address). - requests - .into_iter() - .filter(|request| { - ContactInfo::is_valid_address(&request.from_addr) && { - let node = (request.caller.pubkey(), request.from_addr); - *cache.entry(node).or_insert_with(|| hard_check(node)) - } - }) - .collect() + move |request| { + ContactInfo::is_valid_address(&request.from_addr) && { + let node = (request.caller.pubkey(), request.from_addr); + *cache.entry(node).or_insert_with(|| hard_check(node)) + } + } } // Pull requests take an incoming bloom filter of contained entries from a node @@ -1863,10 +1862,12 @@ impl ClusterInfo { self.time_gossip_write_lock("process_pull_reqs", &self.stats.process_pull_requests) .process_pull_requests(requests.iter().map(|r| r.caller.clone()), timestamp()); self.update_data_budget(stakes); + let mut rng = rand::thread_rng(); let mut packets = Packets::new_with_recycler(recycler.clone(), 64, "handle_pull_requests"); - let (caller_and_filters, addrs): (Vec<_>, Vec<_>) = self - .filter_pull_requests(requests, &mut packets) + let check_pull_request = self.check_pull_request(Instant::now(), &mut rng, &mut packets); + let (caller_and_filters, addrs): (Vec<_>, Vec<_>) = requests .into_iter() + .filter(check_pull_request) .map(|r| ((r.caller, r.filter), r.from_addr)) .unzip(); let now = timestamp();