diff --git a/Cargo.toml b/Cargo.toml index 03b44201791a9c..f2bebece8a672e 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -72,4 +72,7 @@ rand = "0.5.1" pnet_datalink = "0.21.0" tokio = "0.1" tokio-codec = "0.1" +tokio-core = "0.1.17" tokio-io = "0.1" +p2p = "0.5.2" +futures = "0.1.21" diff --git a/multinode-demo/client.sh b/multinode-demo/client.sh index 32e1313bbbf90c..79493abdc6d51f 100755 --- a/multinode-demo/client.sh +++ b/multinode-demo/client.sh @@ -20,5 +20,5 @@ rsync -vPz "$rsync_leader_url"/config/mint.json $SOLANA_CONFIG_DIR/ # shellcheck disable=SC2086 # $solana_client_demo should not be quoted exec $solana_client_demo \ - -n "$count" -l $SOLANA_CONFIG_DIR/leader.json -d \ + -n "$count" -l $SOLANA_CONFIG_DIR/leader.json \ < $SOLANA_CONFIG_DIR/mint.json diff --git a/src/bin/client-demo.rs b/src/bin/client-demo.rs index b819a17debf21c..1ba058defa06f4 100644 --- a/src/bin/client-demo.rs +++ b/src/bin/client-demo.rs @@ -8,7 +8,7 @@ extern crate solana; use atty::{is, Stream}; use getopts::Options; use rayon::prelude::*; -use solana::crdt::{get_ip_addr, Crdt, ReplicatedData}; +use solana::crdt::{Crdt, ReplicatedData}; use solana::hash::Hash; use solana::mint::Mint; use solana::ncp::Ncp; @@ -30,6 +30,8 @@ use std::thread::JoinHandle; use std::time::Duration; use std::time::Instant; +use solana::nat::udp_public_bind; + fn print_usage(program: &str, opts: Options) { let mut brief = format!("Usage: cat | {} [options]\n\n", program); brief += " Solana client demo creates a number of transactions and\n"; @@ -40,14 +42,13 @@ fn print_usage(program: &str, opts: Options) { } fn sample_tx_count( - thread_addr: Arc>, exit: Arc, maxes: Arc>>, first_count: u64, v: ReplicatedData, sample_period: u64, ) { - let mut client = mk_client(&thread_addr, &v); + let mut client = mk_client(&v); let mut now = Instant::now(); let mut initial_tx_count = client.transaction_count(); let mut max_tps = 0.0; @@ -149,9 +150,7 @@ fn main() { let mut opts = Options::new(); opts.optopt("l", "", "leader", "leader.json"); - opts.optopt("c", "", "client port", "port"); opts.optopt("t", "", "number of threads", &format!("{}", threads)); - opts.optflag("d", "dyn", "detect network address dynamically"); opts.optopt( "s", "", @@ -179,15 +178,6 @@ fn main() { print_usage(&program, opts); return; } - let mut addr: SocketAddr = "0.0.0.0:8100".parse().unwrap(); - if matches.opt_present("c") { - let port = matches.opt_str("c").unwrap().parse().unwrap(); - addr.set_port(port); - } - if matches.opt_present("d") { - addr.set_ip(get_ip_addr().unwrap()); - } - let client_addr: Arc> = Arc::new(RwLock::new(addr)); if matches.opt_present("t") { threads = matches.opt_str("t").unwrap().parse().expect("integer"); } @@ -207,13 +197,7 @@ fn main() { let signal = Arc::new(AtomicBool::new(false)); let mut c_threads = vec![]; - let validators = converge( - &client_addr, - &leader, - signal.clone(), - num_nodes, - &mut c_threads, - ); + let validators = converge(&leader, signal.clone(), num_nodes, &mut c_threads); assert_eq!(validators.len(), num_nodes); if is(Stream::Stdin) { @@ -233,7 +217,7 @@ fn main() { eprintln!("failed to parse json: {}", e); exit(1); }); - let mut client = mk_client(&client_addr, &leader); + let mut client = mk_client(&leader); println!("Get last ID..."); let mut last_id = client.get_last_id(); @@ -260,20 +244,17 @@ fn main() { .into_iter() .map(|v| { let exit = signal.clone(); - let thread_addr = client_addr.clone(); let maxes = maxes.clone(); Builder::new() .name("solana-client-sample".to_string()) .spawn(move || { - sample_tx_count(thread_addr, exit, maxes, first_count, v, sample_period); + sample_tx_count(exit, maxes, first_count, v, sample_period); }) .unwrap() }) .collect(); - let clients = (0..threads) - .map(|_| mk_client(&client_addr, &leader)) - .collect(); + let clients = (0..threads).map(|_| mk_client(&leader)).collect(); // generate and send transactions for the specified duration let time = Duration::new(time_sec, 0); @@ -320,45 +301,41 @@ fn main() { } } -fn mk_client(locked_addr: &Arc>, r: &ReplicatedData) -> ThinClient { - let mut addr = locked_addr.write().unwrap(); - let port = addr.port(); - let transactions_socket = UdpSocket::bind(addr.clone()).unwrap(); - addr.set_port(port + 1); - let requests_socket = UdpSocket::bind(addr.clone()).unwrap(); - requests_socket +fn mk_client(r: &ReplicatedData) -> ThinClient { + let transactions_socket_pair = udp_public_bind("transactions"); + let requests_socket_pair = udp_public_bind("requests"); + + requests_socket_pair + .rx .set_read_timeout(Some(Duration::new(1, 0))) .unwrap(); - addr.set_port(port + 2); ThinClient::new( r.requests_addr, - requests_socket, + requests_socket_pair.tx, + requests_socket_pair.rx, r.transactions_addr, - transactions_socket, + transactions_socket_pair.tx, ) } -fn spy_node(client_addr: &Arc>) -> (ReplicatedData, UdpSocket) { - let mut addr = client_addr.write().unwrap(); - let port = addr.port(); - let gossip = UdpSocket::bind(addr.clone()).unwrap(); - addr.set_port(port + 1); - let daddr = "0.0.0.0:0".parse().unwrap(); +fn spy_node() -> (ReplicatedData, UdpSocket) { + let gossip_socket_pair = udp_public_bind("gossip"); let pubkey = KeyPair::new().pubkey(); + let daddr = "0.0.0.0:0".parse().unwrap(); let node = ReplicatedData::new( pubkey, - gossip.local_addr().unwrap(), + //gossip.local_addr().unwrap(), + gossip_socket_pair.addr, daddr, daddr, daddr, daddr, ); - (node, gossip) + (node, gossip_socket_pair.rx) } fn converge( - client_addr: &Arc>, leader: &ReplicatedData, exit: Arc, num_nodes: usize, @@ -366,7 +343,7 @@ fn converge( ) -> Vec { //lets spy on the network let daddr = "0.0.0.0:0".parse().unwrap(); - let (spy, spy_gossip) = spy_node(client_addr); + let (spy, spy_gossip) = spy_node(); let mut spy_crdt = Crdt::new(spy); spy_crdt.insert(&leader); spy_crdt.set_leader(leader.id); diff --git a/src/drone.rs b/src/drone.rs index edf10392cf6699..5c8d585acb1bec 100644 --- a/src/drone.rs +++ b/src/drone.rs @@ -99,6 +99,7 @@ impl Drone { let mut client = ThinClient::new( self.requests_addr, + requests_socket.try_clone().unwrap(), requests_socket, self.transactions_addr, transactions_socket, diff --git a/src/lib.rs b/src/lib.rs index da04911dee64a2..bef4ccee952457 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -25,6 +25,7 @@ pub mod hash; pub mod ledger; pub mod logger; pub mod mint; +pub mod nat; pub mod ncp; pub mod packet; pub mod payment_plan; diff --git a/src/nat.rs b/src/nat.rs new file mode 100644 index 00000000000000..d8ef5342d0170c --- /dev/null +++ b/src/nat.rs @@ -0,0 +1,75 @@ +//! Demonstrates how to bind to public address (using UPnP) and receive some data + +extern crate futures; +extern crate p2p; +extern crate tokio_core; + +use std::net::{IpAddr, Ipv4Addr, SocketAddr, UdpSocket}; + +use self::futures::Future; +use self::p2p::UdpSocketExt; + +pub struct UdpSocketPair { + pub addr: SocketAddr, // Public address of the socket + pub rx: UdpSocket, // Locally bound socket that can receive from the public address + pub tx: UdpSocket, // Locally bound socket to send via public address +} + +/// Binds a private Udp address to a public address using UPnP if possible +pub fn udp_public_bind(label: &str) -> UdpSocketPair { + //(SocketAddr, UdpSocket, UdpSocket) { + let private_addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(0, 0, 0, 0)), 0); + + let mut core = tokio_core::reactor::Core::new().unwrap(); + let handle = core.handle(); + let mc = p2p::P2p::default(); + let res = core.run({ + tokio_core::net::UdpSocket::bind_public(&private_addr, &handle, &mc) + .map_err(|e| { + info!("Failed to bind public socket for {}: {}", label, e); + }) + .and_then(|(socket, public_addr)| Ok((public_addr, socket.local_addr().unwrap()))) + }); + + match res { + Ok((public_addr, local_addr)) => { + info!( + "Using local address {} mapped to public address {} for {}", + local_addr, public_addr, label + ); + + // NAT should now be forwarding inbound packets directed at + // |public_addr| to the local |rx| socket... + let rx = UdpSocket::bind(local_addr).unwrap(); + + // ... however for outbound packets, the NAT *will not* rewrite the + // source port from |rx.local_addr().port()| to |public_addr.port()|. + // This is currently a problem when talking with a fullnode as it + // assumes it can send UDP packets back at the source. This hits the + // NAT as a datagram for |rx.local_addr().port()| on the NAT's public + // IP, which the NAT promptly discards. As a short term hack, create a + // local UDP socket, |tx|, with the same port as |public_addr.port()|. + // + // TODO: Remove the |tx| socket and deal with the downstream changes to + // the UDP signalling + let mut local_addr_tx = local_addr.clone(); + local_addr_tx.set_port(public_addr.port()); + let tx = UdpSocket::bind(local_addr_tx).unwrap(); + + UdpSocketPair { + addr: public_addr, + rx, + tx, + } + } + Err(_) => { + info!("Using to local address {} for {}", private_addr, label); + let tx = UdpSocket::bind(private_addr).unwrap(); + UdpSocketPair { + addr: private_addr, + rx: tx.try_clone().unwrap(), + tx, + } + } + } +} diff --git a/src/thin_client.rs b/src/thin_client.rs index f0a3da38d89172..d0d6dff7ebbb82 100644 --- a/src/thin_client.rs +++ b/src/thin_client.rs @@ -15,7 +15,8 @@ use transaction::Transaction; /// An object for querying and sending transactions to the network. pub struct ThinClient { requests_addr: SocketAddr, - requests_socket: UdpSocket, + requests_socket_tx: UdpSocket, + requests_socket_rx: UdpSocket, transactions_addr: SocketAddr, transactions_socket: UdpSocket, last_id: Option, @@ -29,13 +30,15 @@ impl ThinClient { /// to a public address before invoking ThinClient methods. pub fn new( requests_addr: SocketAddr, - requests_socket: UdpSocket, + requests_socket_tx: UdpSocket, + requests_socket_rx: UdpSocket, transactions_addr: SocketAddr, transactions_socket: UdpSocket, ) -> Self { let client = ThinClient { requests_addr, - requests_socket, + requests_socket_tx, + requests_socket_rx, transactions_addr, transactions_socket, last_id: None, @@ -48,7 +51,7 @@ impl ThinClient { pub fn recv_response(&self) -> io::Result { let mut buf = vec![0u8; 1024]; trace!("start recv_from"); - self.requests_socket.recv_from(&mut buf)?; + self.requests_socket_rx.recv_from(&mut buf)?; trace!("end recv_from"); let resp = deserialize(&buf).expect("deserialize balance in thin_client"); Ok(resp) @@ -99,7 +102,7 @@ impl ThinClient { trace!("get_balance"); let req = Request::GetBalance { key: *pubkey }; let data = serialize(&req).expect("serialize GetBalance in pub fn get_balance"); - self.requests_socket + self.requests_socket_tx .send_to(&data, &self.requests_addr) .expect("buffer error in pub fn get_balance"); let mut done = false; @@ -123,7 +126,7 @@ impl ThinClient { serialize(&req).expect("serialize GetTransactionCount in pub fn transaction_count"); let mut done = false; while !done { - self.requests_socket + self.requests_socket_tx .send_to(&data, &self.requests_addr) .expect("buffer error in pub fn transaction_count"); @@ -146,7 +149,8 @@ impl ThinClient { let data = serialize(&req).expect("serialize GetLastId in pub fn get_last_id"); let mut done = false; while !done { - self.requests_socket + eprintln!("get_last_id send_to {}", &self.requests_addr); + self.requests_socket_tx .send_to(&data, &self.requests_addr) .expect("buffer error in pub fn get_last_id"); @@ -229,6 +233,7 @@ mod tests { let mut client = ThinClient::new( leader.data.requests_addr, requests_socket, + requests_socket, leader.data.transactions_addr, transactions_socket, ); @@ -276,6 +281,7 @@ mod tests { let mut client = ThinClient::new( leader.data.requests_addr, requests_socket, + requests_socket, leader.data.transactions_addr, transactions_socket, );