From 006ac8feef9a476e1e4595118410e8faa78716c3 Mon Sep 17 00:00:00 2001 From: Pankaj Garg Date: Thu, 9 Aug 2018 21:35:38 +0000 Subject: [PATCH 1/2] Dynamically bind to available UDP ports in Fullnode --- src/bin/fullnode.rs | 13 +++++++-- src/crdt.rs | 66 ++++++++++++++++++++++++++++++++++++++++++++- 2 files changed, 76 insertions(+), 3 deletions(-) diff --git a/src/bin/fullnode.rs b/src/bin/fullnode.rs index 72ac52ccae0bfb..9ac23431724df6 100644 --- a/src/bin/fullnode.rs +++ b/src/bin/fullnode.rs @@ -72,11 +72,20 @@ fn main() -> () { } let leader_pubkey = keypair.pubkey(); - let repl_clone = repl_data.clone(); let ledger_path = matches.value_of("ledger").unwrap(); - let node = TestNode::new_with_bind_addr(repl_data, bind_addr); + let node = if let Some(_t) = matches.value_of("testnet") { + TestNode::new_with_external_ip(leader_pubkey, repl_data.contact_info.ncp.ip(), 0) + } else { + TestNode::new_with_external_ip( + leader_pubkey, + repl_data.contact_info.ncp.ip(), + repl_data.contact_info.ncp.port(), + ) + }; + let repl_clone = node.data.clone(); + let mut drone_addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 0, 1)), DRONE_PORT); let testnet_addr = matches.value_of("testnet").map(|addr_str| { let addr: SocketAddr = addr_str.parse().unwrap(); diff --git a/src/crdt.rs b/src/crdt.rs index ad0e7a3744bb7c..8276ebf7c57c9b 100644 --- a/src/crdt.rs +++ b/src/crdt.rs @@ -20,6 +20,7 @@ use counter::Counter; use hash::Hash; use ledger::LedgerWindow; use log::Level; +use nat::udp_random_bind; use packet::{to_blob, Blob, BlobRecycler, SharedBlob, BLOB_SIZE}; use pnet_datalink as datalink; use rand::{thread_rng, RngCore}; @@ -30,7 +31,7 @@ use std; use std::collections::HashMap; use std::collections::VecDeque; use std::io::Cursor; -use std::net::{IpAddr, SocketAddr, UdpSocket}; +use std::net::{IpAddr, Ipv4Addr, SocketAddr, UdpSocket}; use std::sync::atomic::{AtomicBool, AtomicUsize, Ordering}; use std::sync::{Arc, RwLock}; use std::thread::{sleep, Builder, JoinHandle}; @@ -1354,6 +1355,69 @@ impl TestNode { }, } } + pub fn new_with_external_ip(pubkey: Pubkey, ip: IpAddr, ncp_port: u16) -> TestNode { + fn bind() -> (u16, UdpSocket) { + match udp_random_bind(8100, 10000, 5) { + Ok(socket) => (socket.local_addr().unwrap().port(), socket), + Err(err) => { + panic!("Failed to bind to {:?}", err); + } + } + }; + + fn bind_to(port: u16) -> UdpSocket { + let addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(0, 0, 0, 0)), port); + match UdpSocket::bind(addr) { + Ok(socket) => socket, + Err(err) => { + panic!("Failed to bind to {:?}: {:?}", addr, err); + } + } + }; + + let (gossip_port, gossip) = if ncp_port != 0 { + (ncp_port, bind_to(ncp_port)) + } else { + bind() + }; + let (replicate_port, replicate) = bind(); + let (requests_port, requests) = bind(); + let (transaction_port, transaction) = bind(); + let (repair_port, repair) = bind(); + + // Responses are sent from the same Udp port as requests are received + // from, in hopes that a NAT sitting in the middle will route the + // response Udp packet correctly back to the requester. + let respond = requests.try_clone().unwrap(); + + let gossip_send = UdpSocket::bind("0.0.0.0:0").unwrap(); + let broadcast = UdpSocket::bind("0.0.0.0:0").unwrap(); + let retransmit = UdpSocket::bind("0.0.0.0:0").unwrap(); + + let node_info = NodeInfo::new( + pubkey, + SocketAddr::new(ip, gossip_port), + SocketAddr::new(ip, replicate_port), + SocketAddr::new(ip, requests_port), + SocketAddr::new(ip, transaction_port), + SocketAddr::new(ip, repair_port), + ); + + TestNode { + data: node_info, + sockets: Sockets { + gossip, + gossip_send, + requests, + replicate, + transaction, + respond, + broadcast, + repair, + retransmit, + }, + } + } } fn report_time_spent(label: &str, time: &Duration, extra: &str) { From 7b5eb5620b80fcb52728ef1ad84ac9b7bd82b55b Mon Sep 17 00:00:00 2001 From: Pankaj Garg Date: Fri, 24 Aug 2018 09:56:37 -0700 Subject: [PATCH 2/2] Added tests for dynamic port binding - Also removed hard coding of port range from CRDT --- src/bin/fullnode.rs | 9 +++- src/crdt.rs | 104 ++++++++++++++++++++++++++++++++++++++++---- 2 files changed, 103 insertions(+), 10 deletions(-) diff --git a/src/bin/fullnode.rs b/src/bin/fullnode.rs index 9ac23431724df6..ab0e4c918f8455 100644 --- a/src/bin/fullnode.rs +++ b/src/bin/fullnode.rs @@ -75,12 +75,19 @@ fn main() -> () { let ledger_path = matches.value_of("ledger").unwrap(); + let port_range = (8100, 10000); let node = if let Some(_t) = matches.value_of("testnet") { - TestNode::new_with_external_ip(leader_pubkey, repl_data.contact_info.ncp.ip(), 0) + TestNode::new_with_external_ip( + leader_pubkey, + repl_data.contact_info.ncp.ip(), + port_range, + 0, + ) } else { TestNode::new_with_external_ip( leader_pubkey, repl_data.contact_info.ncp.ip(), + port_range, repl_data.contact_info.ncp.port(), ) }; diff --git a/src/crdt.rs b/src/crdt.rs index 8276ebf7c57c9b..85b498b0550910 100644 --- a/src/crdt.rs +++ b/src/crdt.rs @@ -1355,9 +1355,14 @@ impl TestNode { }, } } - pub fn new_with_external_ip(pubkey: Pubkey, ip: IpAddr, ncp_port: u16) -> TestNode { - fn bind() -> (u16, UdpSocket) { - match udp_random_bind(8100, 10000, 5) { + pub fn new_with_external_ip( + pubkey: Pubkey, + ip: IpAddr, + port_range: (u16, u16), + ncp_port: u16, + ) -> TestNode { + fn bind(port_range: (u16, u16)) -> (u16, UdpSocket) { + match udp_random_bind(port_range.0, port_range.1, 5) { Ok(socket) => (socket.local_addr().unwrap().port(), socket), Err(err) => { panic!("Failed to bind to {:?}", err); @@ -1378,12 +1383,12 @@ impl TestNode { let (gossip_port, gossip) = if ncp_port != 0 { (ncp_port, bind_to(ncp_port)) } else { - bind() + bind(port_range) }; - let (replicate_port, replicate) = bind(); - let (requests_port, requests) = bind(); - let (transaction_port, transaction) = bind(); - let (repair_port, repair) = bind(); + let (replicate_port, replicate) = bind(port_range); + let (requests_port, requests) = bind(port_range); + let (transaction_port, transaction) = bind(port_range); + let (repair_port, repair) = bind(port_range); // Responses are sent from the same Udp port as requests are received // from, in hopes that a NAT sitting in the middle will route the @@ -1430,7 +1435,7 @@ fn report_time_spent(label: &str, time: &Duration, extra: &str) { #[cfg(test)] mod tests { use crdt::{ - parse_port_or_addr, Crdt, CrdtError, NodeInfo, Protocol, GOSSIP_PURGE_MILLIS, + parse_port_or_addr, Crdt, CrdtError, NodeInfo, Protocol, TestNode, GOSSIP_PURGE_MILLIS, GOSSIP_SLEEP_MILLIS, MIN_TABLE_SIZE, }; use entry::Entry; @@ -1441,6 +1446,7 @@ mod tests { use result::Error; use signature::{Keypair, KeypairUtil, Pubkey}; use std::fs::remove_dir_all; + use std::net::{IpAddr, Ipv4Addr, SocketAddr}; use std::sync::atomic::{AtomicBool, Ordering}; use std::sync::mpsc::channel; use std::sync::{Arc, RwLock}; @@ -2133,4 +2139,84 @@ mod tests { crdt.insert(&network_entry_point); assert!(crdt.leader_data().is_none()); } + + #[test] + fn new_with_external_ip_test_random() { + let sockaddr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(0, 0, 0, 0)), 8080); + let node = + TestNode::new_with_external_ip(Keypair::new().pubkey(), sockaddr.ip(), (8100, 8200), 0); + + assert_eq!( + node.sockets.gossip.local_addr().unwrap().ip(), + sockaddr.ip() + ); + assert_eq!( + node.sockets.replicate.local_addr().unwrap().ip(), + sockaddr.ip() + ); + assert_eq!( + node.sockets.requests.local_addr().unwrap().ip(), + sockaddr.ip() + ); + assert_eq!( + node.sockets.transaction.local_addr().unwrap().ip(), + sockaddr.ip() + ); + assert_eq!( + node.sockets.repair.local_addr().unwrap().ip(), + sockaddr.ip() + ); + + assert!(node.sockets.gossip.local_addr().unwrap().port() >= 8100); + assert!(node.sockets.gossip.local_addr().unwrap().port() <= 8200); + assert!(node.sockets.replicate.local_addr().unwrap().port() >= 8100); + assert!(node.sockets.replicate.local_addr().unwrap().port() <= 8200); + assert!(node.sockets.requests.local_addr().unwrap().port() >= 8100); + assert!(node.sockets.requests.local_addr().unwrap().port() <= 8200); + assert!(node.sockets.transaction.local_addr().unwrap().port() >= 8100); + assert!(node.sockets.transaction.local_addr().unwrap().port() <= 8200); + assert!(node.sockets.repair.local_addr().unwrap().port() >= 8100); + assert!(node.sockets.repair.local_addr().unwrap().port() <= 8200); + } + + #[test] + fn new_with_external_ip_test_gossip() { + let sockaddr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(0, 0, 0, 0)), 8080); + let node = TestNode::new_with_external_ip( + Keypair::new().pubkey(), + sockaddr.ip(), + (8100, 8200), + 8050, + ); + assert_eq!( + node.sockets.gossip.local_addr().unwrap().ip(), + sockaddr.ip() + ); + assert_eq!( + node.sockets.replicate.local_addr().unwrap().ip(), + sockaddr.ip() + ); + assert_eq!( + node.sockets.requests.local_addr().unwrap().ip(), + sockaddr.ip() + ); + assert_eq!( + node.sockets.transaction.local_addr().unwrap().ip(), + sockaddr.ip() + ); + assert_eq!( + node.sockets.repair.local_addr().unwrap().ip(), + sockaddr.ip() + ); + + assert_eq!(node.sockets.gossip.local_addr().unwrap().port(), 8050); + assert!(node.sockets.replicate.local_addr().unwrap().port() >= 8100); + assert!(node.sockets.replicate.local_addr().unwrap().port() <= 8200); + assert!(node.sockets.requests.local_addr().unwrap().port() >= 8100); + assert!(node.sockets.requests.local_addr().unwrap().port() <= 8200); + assert!(node.sockets.transaction.local_addr().unwrap().port() >= 8100); + assert!(node.sockets.transaction.local_addr().unwrap().port() <= 8200); + assert!(node.sockets.repair.local_addr().unwrap().port() >= 8100); + assert!(node.sockets.repair.local_addr().unwrap().port() <= 8200); + } }