diff --git a/api/src/handlers/peers_api.rs b/api/src/handlers/peers_api.rs index 9469ef2735..65b8c61417 100644 --- a/api/src/handlers/peers_api.rs +++ b/api/src/handlers/peers_api.rs @@ -28,7 +28,7 @@ pub struct PeersAllHandler { impl Handler for PeersAllHandler { fn get(&self, _req: Request) -> ResponseFuture { - let peers = &w_fut!(&self.peers).all_peers(); + let peers = &w_fut!(&self.peers).all_peer_data(); json_response_pretty(&peers) } } @@ -40,8 +40,9 @@ pub struct PeersConnectedHandler { impl PeersConnectedHandler { pub fn get_connected_peers(&self) -> Result, Error> { let peers = w(&self.peers)? - .connected_peers() .iter() + .connected() + .into_iter() .map(|p| p.info.clone().into()) .collect::>(); Ok(peers) @@ -51,8 +52,9 @@ impl PeersConnectedHandler { impl Handler for PeersConnectedHandler { fn get(&self, _req: Request) -> ResponseFuture { let peers: Vec = w_fut!(&self.peers) - .connected_peers() .iter() + .connected() + .into_iter() .map(|p| p.info.clone().into()) .collect(); json_response(&peers) @@ -77,7 +79,7 @@ impl PeerHandler { })?; return Ok(vec![peer_data]); } - let peers = w(&self.peers)?.all_peers(); + let peers = w(&self.peers)?.all_peer_data(); Ok(peers) } diff --git a/api/src/handlers/server_api.rs b/api/src/handlers/server_api.rs index 5b14557d04..4802bf8390 100644 --- a/api/src/handlers/server_api.rs +++ b/api/src/handlers/server_api.rs @@ -21,6 +21,7 @@ use crate::types::*; use crate::web::*; use hyper::{Body, Request}; use serde_json::json; +use std::convert::TryInto; use std::sync::Weak; // RESTful index of available api endpoints @@ -54,7 +55,12 @@ impl StatusHandler { let (api_sync_status, api_sync_info) = sync_status_to_api(sync_status); Ok(Status::from_tip_and_peers( head, - w(&self.peers)?.peer_count(), + w(&self.peers)? + .iter() + .connected() + .count() + .try_into() + .unwrap(), api_sync_status, api_sync_info, )) diff --git a/p2p/src/peers.rs b/p2p/src/peers.rs index e4efa6ccae..9c6dd2c59d 100644 --- a/p2p/src/peers.rs +++ b/p2p/src/peers.rs @@ -18,8 +18,7 @@ use std::fs::File; use std::path::PathBuf; use std::sync::Arc; -use rand::seq::SliceRandom; -use rand::thread_rng; +use rand::prelude::*; use crate::chain; use crate::core::core; @@ -105,139 +104,32 @@ impl Peers { Ok(peers.contains_key(&addr)) } - /// Get vec of peers we are currently connected to. - pub fn connected_peers(&self) -> Vec> { + /// Iterator over our current peers. + /// This allows us to hide try_read_for() behind a cleaner interface. + /// PeersIter lets us chain various adaptors for convenience. + pub fn iter(&self) -> PeersIter>> { let peers = match self.peers.try_read_for(LOCK_TIMEOUT) { - Some(peers) => peers, + Some(peers) => peers.values().cloned().collect(), None => { error!("connected_peers: failed to get peers lock"); - return vec![]; + vec![] } }; - let mut res = peers - .values() - .filter(|p| p.is_connected()) - .cloned() - .collect::>(); - res.shuffle(&mut thread_rng()); - res - } - - /// Get vec of peers we currently have an outgoing connection with. - pub fn outgoing_connected_peers(&self) -> Vec> { - self.connected_peers() - .into_iter() - .filter(|x| x.info.is_outbound()) - .collect() - } - - /// Get vec of peers we currently have an incoming connection with. - pub fn incoming_connected_peers(&self) -> Vec> { - self.connected_peers() - .into_iter() - .filter(|x| x.info.is_inbound()) - .collect() + PeersIter { + iter: peers.into_iter(), + } } /// Get a peer we're connected to by address. pub fn get_connected_peer(&self, addr: PeerAddr) -> Option> { - let peers = match self.peers.try_read_for(LOCK_TIMEOUT) { - Some(peers) => peers, - None => { - error!("get_connected_peer: failed to get peers lock"); - return None; - } - }; - peers.get(&addr).cloned() + self.iter().connected().by_addr(addr) } - /// Number of peers currently connected to. - pub fn peer_count(&self) -> u32 { - self.connected_peers().len() as u32 - } - - /// Number of outbound peers currently connected to. - pub fn peer_outbound_count(&self) -> u32 { - self.outgoing_connected_peers().len() as u32 - } - - /// Number of inbound peers currently connected to. - pub fn peer_inbound_count(&self) -> u32 { - self.incoming_connected_peers().len() as u32 - } - - // Return vec of connected peers that currently advertise more work - // (total_difficulty) than we do. - pub fn more_work_peers(&self) -> Result>, chain::Error> { - let peers = self.connected_peers(); - if peers.is_empty() { - return Ok(vec![]); - } - - let total_difficulty = self.total_difficulty()?; - - let mut max_peers = peers - .into_iter() - .filter(|x| x.info.total_difficulty() > total_difficulty) - .collect::>(); - - max_peers.shuffle(&mut thread_rng()); - Ok(max_peers) - } - - // Return number of connected peers that currently advertise more/same work - // (total_difficulty) than/as we do. - pub fn more_or_same_work_peers(&self) -> Result { - let peers = self.connected_peers(); - if peers.is_empty() { - return Ok(0); - } - - let total_difficulty = self.total_difficulty()?; - - Ok(peers - .iter() - .filter(|x| x.info.total_difficulty() >= total_difficulty) - .count()) - } - - /// Returns single random peer with more work than us. - pub fn more_work_peer(&self) -> Option> { - match self.more_work_peers() { - Ok(mut peers) => peers.pop(), - Err(e) => { - error!("failed to get more work peers: {:?}", e); - None - } - } - } - - /// Return vec of connected peers that currently have the most worked - /// branch, showing the highest total difficulty. - pub fn most_work_peers(&self) -> Vec> { - let peers = self.connected_peers(); - if peers.is_empty() { - return vec![]; - } - - let max_total_difficulty = match peers.iter().map(|x| x.info.total_difficulty()).max() { - Some(v) => v, - None => return vec![], - }; - - let mut max_peers = peers - .into_iter() - .filter(|x| x.info.total_difficulty() == max_total_difficulty) - .collect::>(); - - max_peers.shuffle(&mut thread_rng()); - max_peers - } - - /// Returns single random peer with the most worked branch, showing the - /// highest total difficulty. - pub fn most_work_peer(&self) -> Option> { - self.most_work_peers().pop() + pub fn max_peer_difficulty(&self) -> Difficulty { + self.iter() + .connected() + .max_difficulty() + .unwrap_or(Difficulty::zero()) } pub fn is_banned(&self, peer_addr: PeerAddr) -> bool { @@ -286,7 +178,7 @@ impl Peers { { let mut count = 0; - for p in self.connected_peers().iter() { + for p in self.iter().connected() { match inner(&p) { Ok(true) => count += 1, Ok(false) => (), @@ -353,7 +245,7 @@ impl Peers { /// Ping all our connected peers. Always automatically expects a pong back /// or disconnects. This acts as a liveness test. pub fn check_all(&self, total_difficulty: Difficulty, height: u64) { - for p in self.connected_peers().iter() { + for p in self.iter().connected() { if let Err(e) = p.send_ping(total_difficulty, height) { debug!("Error pinging peer {:?}: {:?}", &p.info.addr, e); let mut peers = match self.peers.try_write_for(LOCK_TIMEOUT) { @@ -370,13 +262,13 @@ impl Peers { } /// Iterator over all peers we know about (stored in our db). - pub fn peers_iter(&self) -> Result, Error> { + pub fn peer_data_iter(&self) -> Result, Error> { self.store.peers_iter().map_err(From::from) } - /// Convenience for reading all peers. - pub fn all_peers(&self) -> Vec { - self.peers_iter() + /// Convenience for reading all peer data from the db. + pub fn all_peer_data(&self) -> Vec { + self.peer_data_iter() .map(|peers| peers.collect()) .unwrap_or(vec![]) } @@ -427,14 +319,8 @@ impl Peers { // build a list of peers to be cleaned up { - let peers = match self.peers.try_read_for(LOCK_TIMEOUT) { - Some(peers) => peers, - None => { - error!("clean_peers: can't get peers lock"); - return; - } - }; - for peer in peers.values() { + for peer in self.iter() { + let ref peer: &Peer = peer.as_ref(); if peer.is_banned() { debug!("clean_peers {:?}, peer banned", peer.info.addr); rm.push(peer.info.addr.clone()); @@ -466,27 +352,34 @@ impl Peers { } } + // closure to build an iterator of our inbound peers + let outbound_peers = || self.iter().outbound().connected().into_iter(); + // check here to make sure we don't have too many outgoing connections - let excess_outgoing_count = - (self.peer_outbound_count() as usize).saturating_sub(max_outbound_count); + // Preferred peers are treated preferentially here. + // Also choose outbound peers with lowest total difficulty to drop. + let excess_outgoing_count = outbound_peers().count().saturating_sub(max_outbound_count); if excess_outgoing_count > 0 { - let mut addrs: Vec<_> = self - .outgoing_connected_peers() - .iter() - .filter(|x| !preferred_peers.contains(&x.info.addr)) + let mut peer_infos: Vec<_> = outbound_peers() + .map(|x| x.info.clone()) + .filter(|x| !preferred_peers.contains(&x.addr)) + .collect(); + peer_infos.sort_unstable_by_key(|x| x.total_difficulty()); + let mut addrs = peer_infos + .into_iter() + .map(|x| x.addr) .take(excess_outgoing_count) - .map(|x| x.info.addr) .collect(); rm.append(&mut addrs); } + // closure to build an iterator of our inbound peers + let inbound_peers = || self.iter().inbound().connected().into_iter(); + // check here to make sure we don't have too many incoming connections - let excess_incoming_count = - (self.peer_inbound_count() as usize).saturating_sub(max_inbound_count); + let excess_incoming_count = inbound_peers().count().saturating_sub(max_inbound_count); if excess_incoming_count > 0 { - let mut addrs: Vec<_> = self - .incoming_connected_peers() - .iter() + let mut addrs: Vec<_> = inbound_peers() .filter(|x| !preferred_peers.contains(&x.info.addr)) .take(excess_incoming_count) .map(|x| x.info.addr) @@ -522,7 +415,8 @@ impl Peers { /// We have enough outbound connected peers pub fn enough_outbound_peers(&self) -> bool { - self.peer_outbound_count() >= self.config.peer_min_preferred_outbound_count() + self.iter().outbound().connected().count() + >= self.config.peer_min_preferred_outbound_count() as usize } /// Removes those peers that seem to have expired @@ -780,3 +674,86 @@ impl NetAdapter for Peers { } } } + +pub struct PeersIter { + iter: I, +} + +impl IntoIterator for PeersIter { + type Item = I::Item; + type IntoIter = I; + + fn into_iter(self) -> Self::IntoIter { + self.iter.into_iter() + } +} + +impl>> PeersIter { + /// Filter peers that are currently connected. + /// Note: This adaptor takes a read lock internally. + /// So if we are chaining adaptors then defer this toward the end of the chain. + pub fn connected(self) -> PeersIter>> { + PeersIter { + iter: self.iter.filter(|p| p.is_connected()), + } + } + + /// Filter inbound peers. + pub fn inbound(self) -> PeersIter>> { + PeersIter { + iter: self.iter.filter(|p| p.info.is_inbound()), + } + } + + /// Filter outbound peers. + pub fn outbound(self) -> PeersIter>> { + PeersIter { + iter: self.iter.filter(|p| p.info.is_outbound()), + } + } + + /// Filter peers with the provided difficulty comparison fn. + /// + /// with_difficulty(|x| x > diff) + /// + /// Note: This adaptor takes a read lock internally for each peer. + /// So if we are chaining adaptors then put this toward later in the chain. + pub fn with_difficulty(self, f: F) -> PeersIter>> + where + F: Fn(Difficulty) -> bool, + { + PeersIter { + iter: self.iter.filter(move |p| f(p.info.total_difficulty())), + } + } + + /// Filter peers that support the provided capabilities. + pub fn with_capabilities( + self, + cap: Capabilities, + ) -> PeersIter>> { + PeersIter { + iter: self.iter.filter(move |p| p.info.capabilities.contains(cap)), + } + } + + pub fn by_addr(&mut self, addr: PeerAddr) -> Option> { + self.iter.find(|p| p.info.addr == addr) + } + + /// Choose a random peer from the current (filtered) peers. + pub fn choose_random(self) -> Option> { + let mut rng = rand::thread_rng(); + self.iter.choose(&mut rng) + } + + /// Find the max difficulty of the current (filtered) peers. + pub fn max_difficulty(self) -> Option { + self.iter.map(|p| p.info.total_difficulty()).max() + } + + /// Count the current (filtered) peers. + pub fn count(self) -> usize { + self.iter.count() + } +} diff --git a/p2p/src/serv.rs b/p2p/src/serv.rs index ff2a5c34c2..3211e285a8 100644 --- a/p2p/src/serv.rs +++ b/p2p/src/serv.rs @@ -233,7 +233,7 @@ impl Server { /// different sets of peers themselves. In addition, it prevent potential /// duplicate connections, malicious or not. fn check_undesirable(&self, stream: &TcpStream) -> bool { - if self.peers.peer_inbound_count() + if self.peers.iter().inbound().connected().count() as u32 >= self.config.peer_max_inbound_count() + self.config.peer_listener_buffer_count() { debug!("Accepting new connection will exceed peer limit, refusing connection."); diff --git a/p2p/src/types.rs b/p2p/src/types.rs index e7a2d33d34..f8929d5ce4 100644 --- a/p2p/src/types.rs +++ b/p2p/src/types.rs @@ -266,6 +266,7 @@ pub struct P2PConfig { /// The list of seed nodes, if using Seeding as a seed type pub seeds: Option, + /// TODO: Rethink this. We need to separate what *we* advertise vs. who we connect to. /// Capabilities expose by this node, also conditions which other peers this /// node will have an affinity toward when connection. pub capabilities: Capabilities, diff --git a/p2p/tests/peer_handshake.rs b/p2p/tests/peer_handshake.rs index d34e6cd91f..342613884c 100644 --- a/p2p/tests/peer_handshake.rs +++ b/p2p/tests/peer_handshake.rs @@ -97,5 +97,5 @@ fn peer_handshake() { let server_peer = server.peers.get_connected_peer(my_addr).unwrap(); assert_eq!(server_peer.info.total_difficulty(), Difficulty::min()); - assert!(server.peers.peer_count() > 0); + assert!(server.peers.iter().connected().count() > 0); } diff --git a/servers/src/common/types.rs b/servers/src/common/types.rs index 4f86d51cd8..5f6ff14796 100644 --- a/servers/src/common/types.rs +++ b/servers/src/common/types.rs @@ -345,11 +345,11 @@ impl DandelionEpoch { /// Choose a new outbound stem relay peer. pub fn next_epoch(&mut self, peers: &Arc) { self.start_time = Some(Utc::now().timestamp()); - self.relay_peer = peers.outgoing_connected_peers().first().cloned(); + self.relay_peer = peers.iter().outbound().connected().choose_random(); // If stem_probability == 90 then we stem 90% of the time. - let mut rng = rand::thread_rng(); let stem_probability = self.config.stem_probability; + let mut rng = rand::thread_rng(); self.is_stem = rng.gen_range(0, 100) < stem_probability; let addr = self.relay_peer.clone().map(|p| p.info.addr); @@ -386,7 +386,7 @@ impl DandelionEpoch { } if update_relay { - self.relay_peer = peers.outgoing_connected_peers().first().cloned(); + self.relay_peer = peers.iter().outbound().connected().choose_random(); info!( "DandelionEpoch: relay_peer: new peer chosen: {:?}", self.relay_peer.clone().map(|p| p.info.addr) diff --git a/servers/src/grin/seed.rs b/servers/src/grin/seed.rs index 7caa7e7833..ed5a682e44 100644 --- a/servers/src/grin/seed.rs +++ b/servers/src/grin/seed.rs @@ -151,7 +151,7 @@ fn monitor_peers( let mut banned_count = 0; let mut defuncts = vec![]; - for x in peers.all_peers().into_iter() { + for x in peers.all_peer_data().into_iter() { match x.flags { p2p::State::Banned => { let interval = Utc::now().timestamp() - x.last_banned; @@ -174,13 +174,23 @@ fn monitor_peers( total_count += 1; } + let peers_count = peers.iter().connected().count(); + + let max_diff = peers.max_peer_difficulty(); + let most_work_count = peers + .iter() + .outbound() + .with_difficulty(|x| x >= max_diff) + .connected() + .count(); + debug!( "monitor_peers: on {}:{}, {} connected ({} most_work). \ all {} = {} healthy + {} banned + {} defunct", config.host, config.port, - peers.peer_count(), - peers.most_work_peers().len(), + peers_count, + most_work_count, total_count, healthy_count, banned_count, @@ -198,10 +208,14 @@ fn monitor_peers( return; } - // loop over connected peers + // loop over connected peers that can provide peer lists // ask them for their list of peers let mut connected_peers: Vec = vec![]; - for p in peers.connected_peers() { + for p in peers + .iter() + .with_capabilities(p2p::Capabilities::PEER_LIST) + .connected() + { trace!( "monitor_peers: {}:{} ask {} for more peers", config.host, @@ -331,9 +345,10 @@ fn listen_for_addrs( .name("peer_connect".to_string()) .spawn(move || match p2p_c.connect(addr) { Ok(p) => { - if p.send_peer_request(capab).is_ok() { - let _ = peers_c.update_state(addr, p2p::State::Healthy); + if p.info.capabilities.contains(p2p::Capabilities::PEER_LIST) { + let _ = p.send_peer_request(capab); } + let _ = peers_c.update_state(addr, p2p::State::Healthy); } Err(_) => { let _ = peers_c.update_state(addr, p2p::State::Defunct); diff --git a/servers/src/grin/server.rs b/servers/src/grin/server.rs index f6cf069cab..c5b1871302 100644 --- a/servers/src/grin/server.rs +++ b/servers/src/grin/server.rs @@ -16,11 +16,11 @@ //! the peer-to-peer server, the blockchain and the transaction pool) and acts //! as a facade. -use std::fs; use std::fs::File; use std::io::prelude::*; use std::path::Path; use std::sync::{mpsc, Arc}; +use std::{convert::TryInto, fs}; use std::{ thread::{self, JoinHandle}, time::{self, Duration}, @@ -356,7 +356,13 @@ impl Server { /// Number of peers pub fn peer_count(&self) -> u32 { - self.p2p.peers.peer_count() + self.p2p + .peers + .iter() + .connected() + .count() + .try_into() + .unwrap() } /// Start a minimal "stratum" mining service on a separate thread @@ -486,7 +492,8 @@ impl Server { let peer_stats = self .p2p .peers - .connected_peers() + .iter() + .connected() .into_iter() .map(|p| PeerStats::from_peer(&p)) .collect(); diff --git a/servers/src/grin/sync/body_sync.rs b/servers/src/grin/sync/body_sync.rs index d77e634a01..65f7c417f0 100644 --- a/servers/src/grin/sync/body_sync.rs +++ b/servers/src/grin/sync/body_sync.rs @@ -14,6 +14,7 @@ use chrono::prelude::{DateTime, Utc}; use chrono::Duration; +use rand::prelude::*; use std::cmp; use std::sync::Arc; @@ -89,7 +90,17 @@ impl BodySync { hashes.reverse(); - let peers = self.peers.more_work_peers()?; + let head = self.chain.head()?; + + // Find connected peers with strictly greater difficulty than us. + let peers: Vec<_> = self + .peers + .iter() + .outbound() + .with_difficulty(|x| x > head.total_difficulty) + .connected() + .into_iter() + .collect(); // if we have 5 peers to sync from then ask for 50 blocks total (peer_count * // 10) max will be 80 if all 8 peers are advertising more work @@ -125,9 +136,9 @@ impl BodySync { self.blocks_requested = 0; self.receive_timeout = Utc::now() + Duration::seconds(6); - let mut peers_iter = peers.iter().cycle(); + let mut rng = rand::thread_rng(); for hash in hashes_to_get.clone() { - if let Some(peer) = peers_iter.next() { + if let Some(peer) = peers.choose(&mut rng) { if let Err(e) = peer.send_block_request(*hash, chain::Options::SYNC) { debug!("Skipped request to {}: {:?}", peer.info.addr, e); peer.stop(); diff --git a/servers/src/grin/sync/header_sync.rs b/servers/src/grin/sync/header_sync.rs index 6fec87e134..a55c425ecf 100644 --- a/servers/src/grin/sync/header_sync.rs +++ b/servers/src/grin/sync/header_sync.rs @@ -19,7 +19,7 @@ use std::sync::Arc; use crate::chain::{self, SyncState, SyncStatus}; use crate::common::types::Error; use crate::core::core::hash::{Hash, Hashed}; -use crate::p2p::{self, types::ReasonForBan, Peer}; +use crate::p2p::{self, types::ReasonForBan, Capabilities, Peer}; pub struct HeaderSync { sync_state: Arc, @@ -170,10 +170,17 @@ impl HeaderSync { fn header_sync(&mut self) -> Option> { if let Ok(header_head) = self.chain.header_head() { - let difficulty = header_head.total_difficulty; - - if let Some(peer) = self.peers.most_work_peer() { - if peer.info.total_difficulty() > difficulty { + let max_diff = self.peers.max_peer_difficulty(); + let peer = self + .peers + .iter() + .outbound() + .with_capabilities(Capabilities::HEADER_HIST) + .with_difficulty(|x| x >= max_diff) + .connected() + .choose_random(); + if let Some(peer) = peer { + if peer.info.total_difficulty() > header_head.total_difficulty { return self.request_headers(peer); } } diff --git a/servers/src/grin/sync/state_sync.rs b/servers/src/grin/sync/state_sync.rs index e5ab4d30d4..86bceca6df 100644 --- a/servers/src/grin/sync/state_sync.rs +++ b/servers/src/grin/sync/state_sync.rs @@ -19,7 +19,7 @@ use std::sync::Arc; use crate::chain::{self, SyncState, SyncStatus}; use crate::core::core::hash::Hashed; use crate::core::global; -use crate::p2p::{self, Peer}; +use crate::p2p::{self, Capabilities, Peer}; /// Fast sync has 3 "states": /// * syncing headers @@ -158,7 +158,16 @@ impl StateSync { let mut txhashset_height = header_head.height.saturating_sub(threshold); txhashset_height = txhashset_height.saturating_sub(txhashset_height % archive_interval); - if let Some(peer) = self.peers.most_work_peer() { + let max_diff = self.peers.max_peer_difficulty(); + let peer = self + .peers + .iter() + .outbound() + .with_capabilities(Capabilities::TXHASHSET_HIST) + .with_difficulty(|x| x >= max_diff) + .connected() + .choose_random(); + if let Some(peer) = peer { // ask for txhashset at state_sync_threshold let mut txhashset_head = self .chain diff --git a/servers/src/grin/sync/syncer.rs b/servers/src/grin/sync/syncer.rs index a5e66fb956..4f71b48fd9 100644 --- a/servers/src/grin/sync/syncer.rs +++ b/servers/src/grin/sync/syncer.rs @@ -79,7 +79,15 @@ impl SyncRunner { if self.stop_state.is_stopped() { break; } - let wp = self.peers.more_or_same_work_peers()?; + // Count peers with at least our difficulty. + let wp = self + .peers + .iter() + .outbound() + .with_difficulty(|x| x >= head.total_difficulty) + .connected() + .count(); + // exit loop when: // * we have more than MIN_PEERS more_or_same_work peers // * we are synced already, e.g. grin was quickly restarted @@ -221,7 +229,16 @@ impl SyncRunner { fn needs_syncing(&self) -> Result<(bool, u64), chain::Error> { let local_diff = self.chain.head()?.total_difficulty; let mut is_syncing = self.sync_state.is_syncing(); - let peer = self.peers.most_work_peer(); + + // Find a peer with greatest known difficulty. + let max_diff = self.peers.max_peer_difficulty(); + let peer = self + .peers + .iter() + .outbound() + .with_difficulty(|x| x >= max_diff) + .connected() + .choose_random(); let peer_info = if let Some(p) = peer { p.info.clone()