This repository has been archived by the owner on Nov 6, 2020. It is now read-only.
-
Notifications
You must be signed in to change notification settings - Fork 1.7k
Limit incoming connections. #8060
Merged
Merged
Changes from 1 commit
Commits
Show all changes
2 commits
Select commit
Hold shift + click to select a range
File filter
Filter by extension
Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -18,9 +18,9 @@ use std::net::{SocketAddr, SocketAddrV4, Ipv4Addr}; | |
use std::collections::{HashMap, HashSet}; | ||
use std::str::FromStr; | ||
use std::sync::Arc; | ||
use std::sync::atomic::{AtomicUsize, AtomicBool, Ordering as AtomicOrdering}; | ||
use std::sync::atomic::{AtomicBool, Ordering as AtomicOrdering}; | ||
use std::ops::*; | ||
use std::cmp::min; | ||
use std::cmp::{min, max}; | ||
use std::path::{Path, PathBuf}; | ||
use std::io::{Read, Write, self}; | ||
use std::fs; | ||
|
@@ -247,7 +247,6 @@ pub struct Host { | |
timer_counter: RwLock<usize>, | ||
stats: Arc<NetworkStats>, | ||
reserved_nodes: RwLock<HashSet<NodeId>>, | ||
num_sessions: AtomicUsize, | ||
stopping: AtomicBool, | ||
filter: Option<Arc<ConnectionFilter>>, | ||
} | ||
|
@@ -304,7 +303,6 @@ impl Host { | |
timer_counter: RwLock::new(USER_TIMER), | ||
stats: stats, | ||
reserved_nodes: RwLock::new(HashSet::new()), | ||
num_sessions: AtomicUsize::new(0), | ||
stopping: AtomicBool::new(false), | ||
filter: filter, | ||
}; | ||
|
@@ -359,7 +357,7 @@ impl Host { | |
// disconnect all non-reserved peers here. | ||
let reserved: HashSet<NodeId> = self.reserved_nodes.read().clone(); | ||
let mut to_kill = Vec::new(); | ||
for e in self.sessions.write().iter_mut() { | ||
for e in self.sessions.read().iter() { | ||
let mut s = e.lock(); | ||
{ | ||
let id = s.id(); | ||
|
@@ -399,7 +397,7 @@ impl Host { | |
pub fn stop(&self, io: &IoContext<NetworkIoMessage>) -> Result<(), Error> { | ||
self.stopping.store(true, AtomicOrdering::Release); | ||
let mut to_kill = Vec::new(); | ||
for e in self.sessions.write().iter_mut() { | ||
for e in self.sessions.read().iter() { | ||
let mut s = e.lock(); | ||
s.disconnect(io, DisconnectReason::ClientQuit); | ||
to_kill.push(s.token()); | ||
|
@@ -490,22 +488,33 @@ impl Host { | |
self.sessions.read().iter().any(|e| e.lock().info.id == Some(id.clone())) | ||
} | ||
|
||
fn session_count(&self) -> usize { | ||
self.num_sessions.load(AtomicOrdering::Relaxed) | ||
// returns (handshakes, egress, ingress) | ||
fn session_count(&self) -> (usize, usize, usize) { | ||
let mut handshakes = 0; | ||
let mut egress = 0; | ||
let mut ingress = 0; | ||
for s in self.sessions.read().iter() { | ||
match s.try_lock() { | ||
Some(ref s) if s.is_ready() && s.info.originated => egress += 1, | ||
Some(ref s) if s.is_ready() && !s.info.originated => ingress += 1, | ||
_ => handshakes +=1, | ||
} | ||
} | ||
(handshakes, egress, ingress) | ||
} | ||
|
||
fn connecting_to(&self, id: &NodeId) -> bool { | ||
self.sessions.read().iter().any(|e| e.lock().id() == Some(id)) | ||
} | ||
|
||
fn handshake_count(&self) -> usize { | ||
// session_count < total_count is possible because of the data race. | ||
self.sessions.read().count().saturating_sub(self.session_count()) | ||
let (handshakes, ..) = self.session_count(); | ||
handshakes | ||
} | ||
|
||
fn keep_alive(&self, io: &IoContext<NetworkIoMessage>) { | ||
let mut to_kill = Vec::new(); | ||
for e in self.sessions.write().iter_mut() { | ||
for e in self.sessions.read().iter() { | ||
let mut s = e.lock(); | ||
if !s.keep_alive(io) { | ||
s.disconnect(io, DisconnectReason::PingTimeout); | ||
|
@@ -529,9 +538,9 @@ impl Host { | |
(config.min_peers, config.non_reserved_mode == NonReservedPeerMode::Deny, config.max_handshakes as usize, config.ip_filter.clone(), info.id().clone()) | ||
}; | ||
|
||
let session_count = self.session_count(); | ||
let (handshake_count, egress_count, ingress_count) = self.session_count(); | ||
let reserved_nodes = self.reserved_nodes.read(); | ||
if session_count >= min_peers as usize + reserved_nodes.len() { | ||
if egress_count + ingress_count >= min_peers as usize + reserved_nodes.len() { | ||
// check if all pinned nodes are connected. | ||
if reserved_nodes.iter().all(|n| self.have_session(n) && self.connecting_to(n)) { | ||
return; | ||
|
@@ -541,7 +550,6 @@ impl Host { | |
pin = true; | ||
} | ||
|
||
let handshake_count = self.handshake_count(); | ||
// allow 16 slots for incoming connections | ||
if handshake_count >= max_handshakes { | ||
return; | ||
|
@@ -566,7 +574,7 @@ impl Host { | |
self.connect_peer(&id, io); | ||
started += 1; | ||
} | ||
debug!(target: "network", "Connecting peers: {} sessions, {} pending, {} started", self.session_count(), self.handshake_count(), started); | ||
debug!(target: "network", "Connecting peers: {} sessions, {} pending, {} started", egress_count + ingress_count, self.handshake_count(), started); | ||
} | ||
|
||
fn connect_peer(&self, id: &NodeId, io: &IoContext<NetworkIoMessage>) { | ||
|
@@ -676,11 +684,11 @@ impl Host { | |
let mut ready_id = None; | ||
if let Some(session) = session.clone() { | ||
{ | ||
let mut s = session.lock(); | ||
loop { | ||
let session_result = s.readable(io, &self.info.read()); | ||
let session_result = session.lock().readable(io, &self.info.read()); | ||
match session_result { | ||
Err(e) => { | ||
let s = session.lock(); | ||
trace!(target: "network", "Session read error: {}:{:?} ({:?}) {:?}", token, s.id(), s.remote_addr(), e); | ||
if let ErrorKind::Disconnect(DisconnectReason::IncompatibleProtocol) = *e.kind() { | ||
if let Some(id) = s.id() { | ||
|
@@ -693,9 +701,9 @@ impl Host { | |
break; | ||
}, | ||
Ok(SessionData::Ready) => { | ||
self.num_sessions.fetch_add(1, AtomicOrdering::SeqCst); | ||
let session_count = self.session_count(); | ||
let (min_peers, max_peers, reserved_only, self_id) = { | ||
let (_, egress_count, ingress_count) = self.session_count(); | ||
let mut s = session.lock(); | ||
let (min_peers, mut max_peers, reserved_only, self_id) = { | ||
let info = self.info.read(); | ||
let mut max_peers = info.config.max_peers; | ||
for cap in s.info.capabilities.iter() { | ||
|
@@ -707,12 +715,17 @@ impl Host { | |
(info.config.min_peers as usize, max_peers as usize, info.config.non_reserved_mode == NonReservedPeerMode::Deny, info.id().clone()) | ||
}; | ||
|
||
max_peers = max(max_peers, min_peers); | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Why is this needed? (assuming we validate in the config that There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Just another sanity check, so that this code does not rely on config. |
||
|
||
let id = s.id().expect("Ready session always has id").clone(); | ||
|
||
// Check for the session limit. session_counts accounts for the new session. | ||
// Check for the session limit. | ||
// Outgoing connections are allowed as long as their count is <= min_peers | ||
// Incoming connections are allowed to take all of the max_peers reserve, or at most half of the slots. | ||
let max_ingress = max(max_peers - min_peers, min_peers / 2); | ||
if reserved_only || | ||
(s.info.originated && session_count > min_peers) || | ||
(!s.info.originated && session_count > max_peers) { | ||
(s.info.originated && egress_count > min_peers) || | ||
(!s.info.originated && ingress_count > max_ingress) { | ||
// only proceed if the connecting peer is reserved. | ||
if !self.reserved_nodes.read().contains(&id) { | ||
s.disconnect(io, DisconnectReason::TooManyPeers); | ||
|
@@ -816,13 +829,12 @@ impl Host { | |
let mut deregister = false; | ||
let mut expired_session = None; | ||
if let FIRST_SESSION ... LAST_SESSION = token { | ||
let sessions = self.sessions.write(); | ||
let sessions = self.sessions.read(); | ||
if let Some(session) = sessions.get(token).cloned() { | ||
expired_session = Some(session.clone()); | ||
let mut s = session.lock(); | ||
if !s.expired() { | ||
if s.is_ready() { | ||
self.num_sessions.fetch_sub(1, AtomicOrdering::SeqCst); | ||
for (p, _) in self.handlers.read().iter() { | ||
if s.have_capability(*p) { | ||
to_disconnect.push(*p); | ||
|
@@ -854,7 +866,7 @@ impl Host { | |
fn update_nodes(&self, _io: &IoContext<NetworkIoMessage>, node_changes: TableUpdates) { | ||
let mut to_remove: Vec<PeerId> = Vec::new(); | ||
{ | ||
let sessions = self.sessions.write(); | ||
let sessions = self.sessions.read(); | ||
for c in sessions.iter() { | ||
let s = c.lock(); | ||
if let Some(id) = s.id() { | ||
|
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why not calling a variable
handshake_count
? Calling a method initiates another iteration ofsession_count()
.