Skip to content

Commit

Permalink
Client NAT traversal 0.1
Browse files Browse the repository at this point in the history
UPnP is now used to request a port on the NAT be forwarded to the local machine.
This obviously only works for NATs that support UPnP, and thus is not a panacea
for all NAT-related connectivity issues.

Notable hacks in this patch include a transmit/receive UDP socket pair to work
around current protocol limitations whereby the full node assumes its peer can
receive on the same UDP port it transmitted from.
  • Loading branch information
mvines authored and solana-grimes committed Jun 30, 2018
1 parent 4ffb5d1 commit 0b56d60
Show file tree
Hide file tree
Showing 10 changed files with 138 additions and 85 deletions.
3 changes: 3 additions & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,9 @@ rand = "0.5.1"
pnet_datalink = "0.21.0"
tokio = "0.1"
tokio-codec = "0.1"
tokio-core = "0.1.17"
tokio-io = "0.1"
itertools = "0.7.8"
bs58 = "0.2.0"
p2p = "0.5.2"
futures = "0.1.21"
2 changes: 1 addition & 1 deletion multinode-demo/client.sh
Original file line number Diff line number Diff line change
Expand Up @@ -20,5 +20,5 @@ rsync -vPz "$rsync_leader_url"/config/mint.json $SOLANA_CONFIG_DIR/

# shellcheck disable=SC2086 # $solana_client_demo should not be quoted
exec $solana_client_demo \
-n "$count" -l $SOLANA_CONFIG_DIR/leader.json -d \
-n "$count" -l $SOLANA_CONFIG_DIR/leader.json \
< $SOLANA_CONFIG_DIR/mint.json
3 changes: 2 additions & 1 deletion multinode-demo/wallet.sh
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ source "$here"/common.sh
SOLANA_CONFIG_DIR=config-client-demo

leader=${1:-${here}/..} # Default to local solana repo
shift

rsync_leader_url=$(rsync_url "$leader")

Expand All @@ -19,4 +20,4 @@ rsync -vPz "$rsync_leader_url"/config/mint.json $SOLANA_CONFIG_DIR/

# shellcheck disable=SC2086 # $solana_wallet should not be quoted
exec $solana_wallet \
-l $SOLANA_CONFIG_DIR/leader.json -m $SOLANA_CONFIG_DIR/mint.json -d
-l $SOLANA_CONFIG_DIR/leader.json -m $SOLANA_CONFIG_DIR/mint.json "$@"
70 changes: 23 additions & 47 deletions src/bin/client-demo.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,9 +8,10 @@ extern crate solana;
use atty::{is, Stream};
use getopts::Options;
use rayon::prelude::*;
use solana::crdt::{get_ip_addr, Crdt, ReplicatedData};
use solana::crdt::{Crdt, ReplicatedData};
use solana::hash::Hash;
use solana::mint::Mint;
use solana::nat::udp_public_bind;
use solana::ncp::Ncp;
use solana::signature::{GenKeys, KeyPair, KeyPairUtil};
use solana::streamer::default_window;
Expand Down Expand Up @@ -40,14 +41,13 @@ fn print_usage(program: &str, opts: Options) {
}

fn sample_tx_count(
thread_addr: Arc<RwLock<SocketAddr>>,
exit: Arc<AtomicBool>,
maxes: Arc<RwLock<Vec<(f64, u64)>>>,
first_count: u64,
v: ReplicatedData,
sample_period: u64,
) {
let mut client = mk_client(&thread_addr, &v);
let mut client = mk_client(&v);
let mut now = Instant::now();
let mut initial_tx_count = client.transaction_count();
let mut max_tps = 0.0;
Expand Down Expand Up @@ -149,9 +149,7 @@ fn main() {

let mut opts = Options::new();
opts.optopt("l", "", "leader", "leader.json");
opts.optopt("c", "", "client port", "port");
opts.optopt("t", "", "number of threads", &format!("{}", threads));
opts.optflag("d", "dyn", "detect network address dynamically");
opts.optopt(
"s",
"",
Expand Down Expand Up @@ -179,15 +177,6 @@ fn main() {
print_usage(&program, opts);
return;
}
let mut addr: SocketAddr = "0.0.0.0:8100".parse().unwrap();
if matches.opt_present("c") {
let port = matches.opt_str("c").unwrap().parse().unwrap();
addr.set_port(port);
}
if matches.opt_present("d") {
addr.set_ip(get_ip_addr().unwrap());
}
let client_addr: Arc<RwLock<SocketAddr>> = Arc::new(RwLock::new(addr));
if matches.opt_present("t") {
threads = matches.opt_str("t").unwrap().parse().expect("integer");
}
Expand All @@ -207,13 +196,7 @@ fn main() {

let signal = Arc::new(AtomicBool::new(false));
let mut c_threads = vec![];
let validators = converge(
&client_addr,
&leader,
signal.clone(),
num_nodes,
&mut c_threads,
);
let validators = converge(&leader, signal.clone(), num_nodes, &mut c_threads);
assert_eq!(validators.len(), num_nodes);

if is(Stream::Stdin) {
Expand All @@ -233,7 +216,7 @@ fn main() {
eprintln!("failed to parse json: {}", e);
exit(1);
});
let mut client = mk_client(&client_addr, &leader);
let mut client = mk_client(&leader);

println!("Get last ID...");
let mut last_id = client.get_last_id();
Expand All @@ -260,20 +243,17 @@ fn main() {
.into_iter()
.map(|v| {
let exit = signal.clone();
let thread_addr = client_addr.clone();
let maxes = maxes.clone();
Builder::new()
.name("solana-client-sample".to_string())
.spawn(move || {
sample_tx_count(thread_addr, exit, maxes, first_count, v, sample_period);
sample_tx_count(exit, maxes, first_count, v, sample_period);
})
.unwrap()
})
.collect();

let clients = (0..threads)
.map(|_| mk_client(&client_addr, &leader))
.collect();
let clients = (0..threads).map(|_| mk_client(&leader)).collect();

// generate and send transactions for the specified duration
let time = Duration::new(time_sec, 0);
Expand Down Expand Up @@ -320,53 +300,49 @@ fn main() {
}
}

fn mk_client(locked_addr: &Arc<RwLock<SocketAddr>>, r: &ReplicatedData) -> ThinClient {
let mut addr = locked_addr.write().unwrap();
let port = addr.port();
let transactions_socket = UdpSocket::bind(addr.clone()).unwrap();
addr.set_port(port + 1);
let requests_socket = UdpSocket::bind(addr.clone()).unwrap();
requests_socket
fn mk_client(r: &ReplicatedData) -> ThinClient {
let transactions_socket_pair = udp_public_bind("transactions");
let requests_socket_pair = udp_public_bind("requests");

requests_socket_pair
.receiver
.set_read_timeout(Some(Duration::new(1, 0)))
.unwrap();

addr.set_port(port + 2);
ThinClient::new(
r.requests_addr,
requests_socket,
requests_socket_pair.sender,
requests_socket_pair.receiver,
r.transactions_addr,
transactions_socket,
transactions_socket_pair.sender,
)
}

fn spy_node(client_addr: &Arc<RwLock<SocketAddr>>) -> (ReplicatedData, UdpSocket) {
let mut addr = client_addr.write().unwrap();
let port = addr.port();
let gossip = UdpSocket::bind(addr.clone()).unwrap();
addr.set_port(port + 1);
let daddr = "0.0.0.0:0".parse().unwrap();
fn spy_node() -> (ReplicatedData, UdpSocket) {
let gossip_socket_pair = udp_public_bind("gossip");
let pubkey = KeyPair::new().pubkey();
let daddr = "0.0.0.0:0".parse().unwrap();
let node = ReplicatedData::new(
pubkey,
gossip.local_addr().unwrap(),
//gossip.local_addr().unwrap(),
gossip_socket_pair.addr,
daddr,
daddr,
daddr,
daddr,
);
(node, gossip)
(node, gossip_socket_pair.receiver)
}

fn converge(
client_addr: &Arc<RwLock<SocketAddr>>,
leader: &ReplicatedData,
exit: Arc<AtomicBool>,
num_nodes: usize,
threads: &mut Vec<JoinHandle<()>>,
) -> Vec<ReplicatedData> {
//lets spy on the network
let daddr = "0.0.0.0:0".parse().unwrap();
let (spy, spy_gossip) = spy_node(client_addr);
let (spy, spy_gossip) = spy_node();
let mut spy_crdt = Crdt::new(spy);
spy_crdt.insert(&leader);
spy_crdt.set_leader(leader.id);
Expand Down
42 changes: 14 additions & 28 deletions src/bin/wallet.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,9 +8,10 @@ extern crate solana;

use bincode::serialize;
use getopts::{Matches, Options};
use solana::crdt::{get_ip_addr, ReplicatedData};
use solana::crdt::ReplicatedData;
use solana::drone::DroneRequest;
use solana::mint::Mint;
use solana::nat::udp_public_bind;
use solana::signature::{PublicKey, Signature};
use solana::thin_client::ThinClient;
use std::env;
Expand All @@ -19,7 +20,7 @@ use std::fmt;
use std::fs::File;
use std::io;
use std::io::prelude::*;
use std::net::{IpAddr, Ipv4Addr, SocketAddr, TcpStream, UdpSocket};
use std::net::{IpAddr, Ipv4Addr, SocketAddr, TcpStream};
use std::process::exit;
use std::thread::sleep;
use std::time::Duration;
Expand Down Expand Up @@ -57,7 +58,6 @@ impl error::Error for WalletError {
struct WalletConfig {
leader: ReplicatedData,
id: Mint,
client_addr: SocketAddr,
drone_addr: SocketAddr,
command: WalletCommand,
}
Expand All @@ -68,7 +68,6 @@ impl Default for WalletConfig {
WalletConfig {
leader: ReplicatedData::new_leader(&default_addr.clone()),
id: Mint::new(0),
client_addr: default_addr.clone(),
drone_addr: default_addr.clone(),
command: WalletCommand::Balance,
}
Expand Down Expand Up @@ -122,8 +121,6 @@ fn parse_args(args: Vec<String>) -> Result<WalletConfig, Box<error::Error>> {
let mut opts = Options::new();
opts.optopt("l", "", "leader", "leader.json");
opts.optopt("m", "", "mint", "mint.json");
opts.optopt("c", "", "client port", "port");
opts.optflag("d", "dyn", "detect network address dynamically");
opts.optflag("h", "help", "print help");

let matches = match opts.parse(&args[1..]) {
Expand All @@ -139,16 +136,6 @@ fn parse_args(args: Vec<String>) -> Result<WalletConfig, Box<error::Error>> {
return Ok(WalletConfig::default());
}

let mut client_addr: SocketAddr = "0.0.0.0:8100".parse().unwrap();
if matches.opt_present("c") {
let port = matches.opt_str("c").unwrap().parse().unwrap();
client_addr.set_port(port);
}

if matches.opt_present("d") {
client_addr.set_ip(get_ip_addr().unwrap());
}

let leader = if matches.opt_present("l") {
read_leader(matches.opt_str("l").unwrap())
} else {
Expand All @@ -170,7 +157,6 @@ fn parse_args(args: Vec<String>) -> Result<WalletConfig, Box<error::Error>> {
Ok(WalletConfig {
leader,
id,
client_addr,
drone_addr, // TODO: Add an option for this.
command,
})
Expand Down Expand Up @@ -252,20 +238,20 @@ fn read_mint(path: String) -> Result<Mint, Box<error::Error>> {
Ok(mint)
}

fn mk_client(client_addr: &SocketAddr, r: &ReplicatedData) -> io::Result<ThinClient> {
let mut addr = client_addr.clone();
let port = addr.port();
let transactions_socket = UdpSocket::bind(addr.clone())?;
addr.set_port(port + 1);
let requests_socket = UdpSocket::bind(addr.clone())?;
requests_socket.set_read_timeout(Some(Duration::new(1, 0)))?;
fn mk_client(r: &ReplicatedData) -> io::Result<ThinClient> {
let transactions_socket_pair = udp_public_bind("transactions");
let requests_socket_pair = udp_public_bind("requests");
requests_socket_pair
.receiver
.set_read_timeout(Some(Duration::new(1, 0)))
.unwrap();

addr.set_port(port + 2);
Ok(ThinClient::new(
r.requests_addr,
requests_socket,
requests_socket_pair.sender,
requests_socket_pair.receiver,
r.transactions_addr,
transactions_socket,
transactions_socket_pair.sender,
))
}

Expand All @@ -283,6 +269,6 @@ fn request_airdrop(drone_addr: &SocketAddr, id: &Mint) {
fn main() -> Result<(), Box<error::Error>> {
env_logger::init();
let config = parse_args(env::args().collect())?;
let mut client = mk_client(&config.client_addr, &config.leader)?;
let mut client = mk_client(&config.leader)?;
process_command(&config, &mut client)
}
2 changes: 2 additions & 0 deletions src/drone.rs
Original file line number Diff line number Diff line change
Expand Up @@ -99,6 +99,7 @@ impl Drone {

let mut client = ThinClient::new(
self.requests_addr,
requests_socket.try_clone().unwrap(),
requests_socket,
self.transactions_addr,
transactions_socket,
Expand Down Expand Up @@ -292,6 +293,7 @@ mod tests {

let mut client = ThinClient::new(
leader.data.requests_addr,
requests_socket.try_clone().unwrap(),
requests_socket,
leader.data.transactions_addr,
transactions_socket,
Expand Down
1 change: 1 addition & 0 deletions src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ pub mod hash;
pub mod ledger;
pub mod logger;
pub mod mint;
pub mod nat;
pub mod ncp;
pub mod packet;
pub mod payment_plan;
Expand Down
Loading

0 comments on commit 0b56d60

Please sign in to comment.