Skip to content
This repository has been archived by the owner on Nov 6, 2020. It is now read-only.

Reserved peers, reserved-only flag #1347

Merged
merged 5 commits into from
Jun 20, 2016
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions parity/cli.rs
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,10 @@ Networking Options:
--no-discovery Disable new peer discovery.
--node-key KEY Specify node secret key, either as 64-character hex
string or input to SHA3 operation.
--reserved-peers FILE Provide a file containing enodes, one per line.
These nodes will always have a reserved slot on top
of the normal maximum peers.
--reserved-only Connect only to reserved nodes.

API and Console Options:
--jsonrpc-off Disable the JSON-RPC API server.
Expand Down Expand Up @@ -236,6 +240,8 @@ pub struct Args {
pub flag_no_discovery: bool,
pub flag_nat: String,
pub flag_node_key: Option<String>,
pub flag_reserved_peers: Option<String>,
pub flag_reserved_only: bool,
pub flag_cache_pref_size: usize,
pub flag_cache_max_size: usize,
pub flag_queue_max_size: usize,
Expand Down
22 changes: 22 additions & 0 deletions parity/configuration.rs
Original file line number Diff line number Diff line change
Expand Up @@ -153,6 +153,26 @@ impl Configuration {
}
}

pub fn init_reserved_nodes(&self) -> Vec<String> {
use std::fs::File;
use std::io::BufRead;

if let Some(ref path) = self.args.flag_reserved_peers {
let mut buffer = String::new();
let mut node_file = File::open(path).unwrap_or_else(|e| {
die!("Error opening reserved nodes file: {}", e);
});
node_file.read_to_string(&mut buffer).expect("Error reading reserved node file");
buffer.lines().map(|s| {
Self::normalize_enode(s).unwrap_or_else(|| {
die!("{}: Invalid node address format given for a reserved node.", s);
})
}).collect()
} else {
Vec::new()
}
}

pub fn net_addresses(&self) -> (Option<SocketAddr>, Option<SocketAddr>) {
let port = self.net_port();
let listen_address = Some(SocketAddr::new(IpAddr::from_str("0.0.0.0").unwrap(), port));
Expand All @@ -179,6 +199,8 @@ impl Configuration {
let mut net_path = PathBuf::from(&self.path());
net_path.push("network");
ret.config_path = Some(net_path.to_str().unwrap().to_owned());
ret.reserved_nodes = self.init_reserved_nodes();
ret.reserved_only = self.args.flag_reserved_only;
ret
}

Expand Down
6 changes: 4 additions & 2 deletions parity/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -308,10 +308,11 @@ fn execute_export(conf: Configuration) {
udp_port: None,
nat_enabled: false,
discovery_enabled: false,
pin: true,
reserved_only: true,
boot_nodes: Vec::new(),
use_secret: None,
ideal_peers: 0,
reserved_nodes: Vec::new(),
};
let client_config = conf.client_config(&spec);

Expand Down Expand Up @@ -379,10 +380,11 @@ fn execute_import(conf: Configuration) {
udp_port: None,
nat_enabled: false,
discovery_enabled: false,
pin: true,
reserved_only: true,
boot_nodes: Vec::new(),
use_secret: None,
ideal_peers: 0,
reserved_nodes: Vec::new(),
};
let client_config = conf.client_config(&spec);

Expand Down
1 change: 0 additions & 1 deletion rpc/src/v1/impls/ethcore_set.rs
Original file line number Diff line number Diff line change
Expand Up @@ -74,5 +74,4 @@ impl<M> EthcoreSet for EthcoreSetClient<M> where M: MinerService + 'static {
to_value(&true)
})
}

}
65 changes: 43 additions & 22 deletions util/src/network/host.rs
Original file line number Diff line number Diff line change
Expand Up @@ -65,14 +65,16 @@ pub struct NetworkConfiguration {
pub nat_enabled: bool,
/// Enable discovery
pub discovery_enabled: bool,
/// Pin to boot nodes only
pub pin: bool,
/// Pin to reserved nodes only
pub reserved_only: bool,
/// List of initial node addresses
pub boot_nodes: Vec<String>,
/// Use provided node key instead of default
pub use_secret: Option<Secret>,
/// Number of connected peers to maintain
pub ideal_peers: u32,
/// List of reserved node addresses.
pub reserved_nodes: Vec<String>,
}

impl Default for NetworkConfiguration {
Expand All @@ -91,10 +93,11 @@ impl NetworkConfiguration {
udp_port: None,
nat_enabled: true,
discovery_enabled: true,
pin: false,
reserved_only: false,
boot_nodes: Vec::new(),
use_secret: None,
ideal_peers: 25,
reserved_nodes: Vec::new(),
}
}

Expand Down Expand Up @@ -216,7 +219,7 @@ impl<'s, Message> NetworkContext<'s, Message> where Message: Send + Sync + Clone
pub fn send(&self, peer: PeerId, packet_id: PacketId, data: Vec<u8>) -> Result<(), UtilError> {
let session = self.resolve_session(peer);
if let Some(session) = session {
try!(session.lock().unwrap().deref_mut().send_packet(self.io, self.protocol, packet_id as u8, &data));
try!(session.lock().unwrap().send_packet(self.io, self.protocol, packet_id as u8, &data));
} else {
trace!(target: "network", "Send: Peer no longer exist")
}
Expand Down Expand Up @@ -365,6 +368,9 @@ impl<Message> Host<Message> where Message: Send + Sync + Clone {
let udp_port = config.udp_port.unwrap_or(listen_address.port());
let local_endpoint = NodeEndpoint { address: listen_address, udp_port: udp_port };

let boot_nodes = config.boot_nodes.clone();
let reserved_nodes = config.reserved_nodes.clone();

let mut host = Host::<Message> {
info: RwLock::new(HostInfo {
keys: keys,
Expand All @@ -389,21 +395,26 @@ impl<Message> Host<Message> where Message: Send + Sync + Clone {
stopping: AtomicBool::new(false),
};

let boot_nodes = host.info.read().unwrap().config.boot_nodes.clone();
for n in boot_nodes {
host.add_node(&n);
// don't pin boot nodes.
host.add_node(&n, false);
}

for n in reserved_nodes {
host.add_node(&n, true);
}
Ok(host)
}

pub fn add_node(&mut self, id: &str) {
pub fn add_node(&mut self, id: &str, pin: bool) {
match Node::from_str(id) {
Err(e) => { debug!(target: "network", "Could not add node {}: {:?}", id, e); },
Ok(n) => {
let entry = NodeEntry { endpoint: n.endpoint.clone(), id: n.id.clone() };
self.pinned_nodes.push(n.id.clone());
if pin { self.pinned_nodes.push(n.id.clone()) }

self.nodes.write().unwrap().add_node(n);
if let Some(ref mut discovery) = *self.discovery.lock().unwrap().deref_mut() {
if let Some(ref mut discovery) = *self.discovery.lock().unwrap() {
discovery.add_node(entry);
}
}
Expand Down Expand Up @@ -472,7 +483,7 @@ impl<Message> Host<Message> where Message: Send + Sync + Clone {
// Initialize discovery.
let discovery = {
let info = self.info.read().unwrap();
if info.config.discovery_enabled && !info.config.pin {
if info.config.discovery_enabled && !info.config.reserved_only {
Some(Discovery::new(&info.keys, public_endpoint.address.clone(), public_endpoint, DISCOVERY))
} else { None }
};
Expand All @@ -482,7 +493,7 @@ impl<Message> Host<Message> where Message: Send + Sync + Clone {
for n in self.nodes.read().unwrap().unordered_entries() {
discovery.add_node(n.clone());
}
*self.discovery.lock().unwrap().deref_mut() = Some(discovery);
*self.discovery.lock().unwrap() = Some(discovery);
io.register_stream(DISCOVERY).expect("Error registering UDP listener");
io.register_timer(DISCOVERY_REFRESH, 7200).expect("Error registering discovery timer");
io.register_timer(DISCOVERY_ROUND, 300).expect("Error registering discovery timer");
Expand Down Expand Up @@ -529,14 +540,17 @@ impl<Message> Host<Message> where Message: Send + Sync + Clone {
}

fn connect_peers(&self, io: &IoContext<NetworkIoMessage<Message>>) {
if self.info.read().unwrap().deref().capabilities.is_empty() {
if self.info.read().unwrap().capabilities.is_empty() {
return;
}
let ideal_peers = { self.info.read().unwrap().deref().config.ideal_peers };
let pin = { self.info.read().unwrap().deref().config.pin };
let ideal_peers = { self.info.read().unwrap().config.ideal_peers };
let pin = { self.info.read().unwrap().config.reserved_only };
let session_count = self.session_count();
if session_count >= ideal_peers as usize {
return;
if session_count >= ideal_peers as usize + self.pinned_nodes.len() {
// check if all pinned nodes are connected.
if self.pinned_nodes.iter().all(|n| self.have_session(n) && self.connecting_to(n)) {
return;
}
}

let handshake_count = self.handshake_count();
Expand All @@ -546,9 +560,16 @@ impl<Message> Host<Message> where Message: Send + Sync + Clone {
return;
}

let nodes = if pin { self.pinned_nodes.clone() } else { self.nodes.read().unwrap().nodes() };
// iterate over all nodes, reserved ones coming first.
// if we are pinned to only reserved nodes, ignore all others.
let nodes = self.pinned_nodes.iter().cloned().chain(if !pin {
self.nodes.read().unwrap().nodes()
} else {
Vec::new()
});

let mut started: usize = 0;
for id in nodes.iter().filter(|ref id| !self.have_session(id) && !self.connecting_to(id))
for id in nodes.filter(|ref id| !self.have_session(id) && !self.connecting_to(id))
.take(min(MAX_HANDSHAKES_PER_ROUND, handshake_limit - handshake_count)) {
self.connect_peer(&id, io);
started += 1;
Expand Down Expand Up @@ -678,7 +699,7 @@ impl<Message> Host<Message> where Message: Send + Sync + Clone {
self.num_sessions.fetch_add(1, AtomicOrdering::SeqCst);
if !s.info.originated {
let session_count = self.session_count();
let ideal_peers = { self.info.read().unwrap().deref().config.ideal_peers };
let ideal_peers = { self.info.read().unwrap().config.ideal_peers };
if session_count >= ideal_peers as usize {
s.disconnect(io, DisconnectReason::TooManyPeers);
return;
Expand Down Expand Up @@ -900,7 +921,7 @@ impl<Message> IoHandler<NetworkIoMessage<Message>> for Host<Message> where Messa
} => {
let handler_token = {
let mut timer_counter = self.timer_counter.write().unwrap();
let counter = timer_counter.deref_mut();
let counter = &mut *timer_counter;
let handler_token = *counter;
*counter += 1;
handler_token
Expand Down Expand Up @@ -944,7 +965,7 @@ impl<Message> IoHandler<NetworkIoMessage<Message>> for Host<Message> where Messa
}
}
DISCOVERY => self.discovery.lock().unwrap().as_ref().unwrap().register_socket(event_loop).expect("Error registering discovery socket"),
TCP_ACCEPT => event_loop.register(self.tcp_listener.lock().unwrap().deref(), Token(TCP_ACCEPT), EventSet::all(), PollOpt::edge()).expect("Error registering stream"),
TCP_ACCEPT => event_loop.register(&*self.tcp_listener.lock().unwrap(), Token(TCP_ACCEPT), EventSet::all(), PollOpt::edge()).expect("Error registering stream"),
_ => warn!("Unexpected stream registration")
}
}
Expand Down Expand Up @@ -972,7 +993,7 @@ impl<Message> IoHandler<NetworkIoMessage<Message>> for Host<Message> where Messa
}
}
DISCOVERY => self.discovery.lock().unwrap().as_ref().unwrap().update_registration(event_loop).expect("Error reregistering discovery socket"),
TCP_ACCEPT => event_loop.reregister(self.tcp_listener.lock().unwrap().deref(), Token(TCP_ACCEPT), EventSet::all(), PollOpt::edge()).expect("Error reregistering stream"),
TCP_ACCEPT => event_loop.reregister(&*self.tcp_listener.lock().unwrap(), Token(TCP_ACCEPT), EventSet::all(), PollOpt::edge()).expect("Error reregistering stream"),
_ => warn!("Unexpected stream update")
}
}
Expand Down