Skip to content

Commit

Permalink
Improve gossip use for drone and wallet
Browse files Browse the repository at this point in the history
  - Add utility function
  - Add thread sleep
  - Enable configurable timeout for gossip poll
  • Loading branch information
Tyera Eulberg committed Aug 23, 2018
1 parent a85a0be commit 7d195e4
Show file tree
Hide file tree
Showing 6 changed files with 91 additions and 78 deletions.
1 change: 1 addition & 0 deletions multinode-demo/drone.sh
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ $rsync -vPz "$rsync_leader_url"/config/leader.json "$SOLANA_CONFIG_DIR"/
trap 'kill "$pid" && wait "$pid"' INT TERM
$solana_drone \
-l "$SOLANA_CONFIG_DIR"/leader.json -k "$SOLANA_CONFIG_PRIVATE_DIR"/mint.json \
--timeout 120 \
> >($drone_logger) 2>&1 &
pid=$!
oom_score_adj "$pid" 1000
Expand Down
2 changes: 1 addition & 1 deletion multinode-demo/wallet.sh
Original file line number Diff line number Diff line change
Expand Up @@ -42,4 +42,4 @@ fi

# shellcheck disable=SC2086 # $solana_wallet should not be quoted
exec $solana_wallet \
-l "$SOLANA_CONFIG_CLIENT_DIR"/leader.json -k "$client_id_path" "$@"
-l "$SOLANA_CONFIG_CLIENT_DIR"/leader.json -k "$client_id_path" --timeout 10 "$@"
63 changes: 26 additions & 37 deletions src/bin/drone.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,24 +8,23 @@ extern crate tokio_codec;

use bincode::deserialize;
use clap::{App, Arg};
use solana::crdt::{Crdt, NodeInfo, TestNode};
use solana::crdt::NodeInfo;
use solana::drone::{Drone, DroneRequest, DRONE_PORT};
use solana::fullnode::Config;
use solana::logger;
use solana::metrics::set_panic_hook;
use solana::ncp::Ncp;
use solana::service::Service;
use solana::signature::read_keypair;
use solana::thin_client::poll_gossip_for_leader;
use std::error;
use std::fs::File;
use std::net::{IpAddr, Ipv4Addr, SocketAddr};
use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::{Arc, Mutex, RwLock};
use std::sync::{Arc, Mutex};
use std::thread;
use tokio::net::TcpListener;
use tokio::prelude::*;
use tokio_codec::{BytesCodec, Decoder};

fn main() {
fn main() -> Result<(), Box<error::Error>> {
logger::setup();
set_panic_hook("drone");
let matches = App::new("drone")
Expand All @@ -48,20 +47,25 @@ fn main() {
.help("/path/to/mint.json"),
)
.arg(
Arg::with_name("time")
.short("t")
.long("time")
Arg::with_name("slice")
.long("slice")
.value_name("SECONDS")
.takes_value(true)
.help("time slice over which to limit requests to drone"),
.help("Time slice over which to limit requests to drone"),
)
.arg(
Arg::with_name("cap")
.short("c")
.long("cap")
.value_name("NUMBER")
.takes_value(true)
.help("request limit for time slice"),
.help("Request limit for time slice"),
)
.arg(
Arg::with_name("timeout")
.long("timeout")
.value_name("SECONDS")
.takes_value(true)
.help("Max SECONDS to wait to get necessary gossip from the network"),
)
.get_matches();

Expand All @@ -77,8 +81,8 @@ fn main() {
read_keypair(matches.value_of("keypair").expect("keypair")).expect("client keypair");

let time_slice: Option<u64>;
if let Some(t) = matches.value_of("time") {
time_slice = Some(t.to_string().parse().expect("integer"));
if let Some(secs) = matches.value_of("slice") {
time_slice = Some(secs.to_string().parse().expect("integer"));
} else {
time_slice = None;
}
Expand All @@ -88,30 +92,14 @@ fn main() {
} else {
request_cap = None;
}
let timeout: Option<u64>;
if let Some(secs) = matches.value_of("timeout") {
timeout = Some(secs.to_string().parse().expect("integer"));
} else {
timeout = None;
}

// Set up gossip functionality
let exit = Arc::new(AtomicBool::new(false));
let testnode = TestNode::new_localhost();
let extra_data = testnode.data.clone();
let crdt = Arc::new(RwLock::new(Crdt::new(extra_data).expect("Crdt::new")));
let window = Arc::new(RwLock::new(vec![]));
let ncp = Ncp::new(
&crdt.clone(),
window,
None,
testnode.sockets.gossip,
testnode.sockets.gossip_send,
exit.clone(),
).unwrap();
let leader_entry_point = NodeInfo::new_entry_point(leader.contact_info.ncp);
crdt.write().unwrap().insert(&leader_entry_point);

// Block until leader's correct contact info is received
while crdt.read().unwrap().leader_data().is_none() {}

exit.store(true, Ordering::Relaxed);
ncp.join().unwrap();
let leader = crdt.read().unwrap().leader_data().unwrap().clone();
let leader = poll_gossip_for_leader(leader.contact_info.ncp, timeout)?;

let drone_addr: SocketAddr = format!("0.0.0.0:{}", DRONE_PORT).parse().unwrap();

Expand Down Expand Up @@ -168,6 +156,7 @@ fn main() {
tokio::spawn(processor)
});
tokio::run(done);
Ok(())
}
fn read_leader(path: &str) -> Config {
let file = File::open(path).unwrap_or_else(|_| panic!("file not found: {}", path));
Expand Down
58 changes: 18 additions & 40 deletions src/bin/wallet.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,23 +9,19 @@ extern crate solana;

use clap::{App, Arg, SubCommand};
use solana::client::mk_client;
use solana::crdt::{Crdt, NodeInfo, TestNode};
use solana::crdt::NodeInfo;
use solana::drone::DRONE_PORT;
use solana::fullnode::Config;
use solana::logger;
use solana::ncp::Ncp;
use solana::service::Service;
use solana::signature::{read_keypair, Keypair, KeypairUtil, Pubkey, Signature};
use solana::thin_client::ThinClient;
use solana::thin_client::{poll_gossip_for_leader, ThinClient};
use solana::wallet::request_airdrop;
use std::error;
use std::fmt;
use std::fs::File;
use std::net::{IpAddr, Ipv4Addr, SocketAddr};
use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::{Arc, RwLock};
use std::thread::sleep;
use std::time::{Duration, Instant};
use std::time::Duration;

enum WalletCommand {
Address,
Expand All @@ -39,7 +35,6 @@ enum WalletCommand {
enum WalletError {
CommandNotRecognized(String),
BadParameter(String),
NoNode(String),
}

impl fmt::Display for WalletError {
Expand Down Expand Up @@ -97,12 +92,18 @@ fn parse_args() -> Result<WalletConfig, Box<error::Error>> {
.takes_value(true)
.help("/path/to/id.json"),
)
.arg(
Arg::with_name("timeout")
.long("timeout")
.value_name("SECONDS")
.takes_value(true)
.help("Max SECONDS to wait to get necessary gossip from the network"),
)
.subcommand(
SubCommand::with_name("airdrop")
.about("Request a batch of tokens")
.arg(
Arg::with_name("tokens")
// .index(1)
.long("tokens")
.value_name("NUMBER")
.takes_value(true)
Expand All @@ -115,16 +116,14 @@ fn parse_args() -> Result<WalletConfig, Box<error::Error>> {
.about("Send a payment")
.arg(
Arg::with_name("tokens")
// .index(2)
.long("tokens")
.value_name("NUMBER")
.takes_value(true)
.required(true)
.help("the number of tokens to send"),
.help("The number of tokens to send"),
)
.arg(
Arg::with_name("to")
// .index(1)
.long("to")
.value_name("PUBKEY")
.takes_value(true)
Expand Down Expand Up @@ -153,6 +152,12 @@ fn parse_args() -> Result<WalletConfig, Box<error::Error>> {
let server_addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(0, 0, 0, 0)), 8000);
leader = NodeInfo::new_leader(&server_addr);
};
let timeout: Option<u64>;
if let Some(secs) = matches.value_of("timeout") {
timeout = Some(secs.to_string().parse().expect("integer"));
} else {
timeout = None;
}

let mut path = dirs::home_dir().expect("home directory");
let id_path = if matches.is_present("keypair") {
Expand All @@ -168,34 +173,7 @@ fn parse_args() -> Result<WalletConfig, Box<error::Error>> {
)))
})?;

// Set up gossip functionality
let exit = Arc::new(AtomicBool::new(false));
let testnode = TestNode::new_localhost();
let extra_data = testnode.data.clone();
let crdt = Arc::new(RwLock::new(Crdt::new(extra_data).expect("Crdt::new")));
let window = Arc::new(RwLock::new(vec![]));
let ncp = Ncp::new(
&crdt.clone(),
window,
None,
testnode.sockets.gossip,
testnode.sockets.gossip_send,
exit.clone(),
).unwrap();
let leader_entry_point = NodeInfo::new_entry_point(leader.contact_info.ncp);
crdt.write().unwrap().insert(&leader_entry_point);

let now = Instant::now();
// Block until leader's correct contact info is received
while crdt.read().unwrap().leader_data().is_none() {
if now.elapsed() > Duration::new(10, 0) {
Err(WalletError::NoNode("No leader detected".to_string()))?;
}
}

exit.store(true, Ordering::Relaxed);
ncp.join().unwrap();
let leader = crdt.read().unwrap().leader_data().unwrap().clone();
let leader = poll_gossip_for_leader(leader.contact_info.ncp, timeout)?;

let mut drone_addr = leader.contact_info.tpu;
drone_addr.set_port(DRONE_PORT);
Expand Down
8 changes: 8 additions & 0 deletions src/result.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,14 @@ pub enum Error {

pub type Result<T> = std::result::Result<T, Error>;

impl std::fmt::Display for Error {
fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
write!(f, "solana error")
}
}

impl std::error::Error for Error {}

impl std::convert::From<std::sync::mpsc::RecvError> for Error {
fn from(e: std::sync::mpsc::RecvError) -> Error {
Error::RecvError(e)
Expand Down
37 changes: 37 additions & 0 deletions src/thin_client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,12 +5,17 @@
use bank::Account;
use bincode::{deserialize, serialize};
use crdt::{Crdt, CrdtError, NodeInfo, TestNode};
use hash::Hash;
use ncp::Ncp;
use request::{Request, Response};
use result::{Error, Result};
use signature::{Keypair, Pubkey, Signature};
use std::collections::HashMap;
use std::io;
use std::net::{SocketAddr, UdpSocket};
use std::sync::atomic::AtomicBool;
use std::sync::{Arc, RwLock};
use std::thread::sleep;
use std::time::Duration;
use std::time::Instant;
Expand Down Expand Up @@ -320,6 +325,38 @@ impl Drop for ThinClient {
}
}

pub fn poll_gossip_for_leader(leader_ncp: SocketAddr, timeout: Option<u64>) -> Result<NodeInfo> {
let exit = Arc::new(AtomicBool::new(false));
let testnode = TestNode::new_localhost();
let extra_data = testnode.data.clone();
let crdt = Arc::new(RwLock::new(Crdt::new(extra_data).expect("Crdt::new")));
let window = Arc::new(RwLock::new(vec![]));
let ncp = Ncp::new(
&crdt.clone(),
window,
None,
testnode.sockets.gossip,
testnode.sockets.gossip_send,
exit.clone(),
).unwrap();
let leader_entry_point = NodeInfo::new_entry_point(leader_ncp);
crdt.write().unwrap().insert(&leader_entry_point);

sleep(Duration::from_millis(100));

let now = Instant::now();
// Block until leader's correct contact info is received
while crdt.read().unwrap().leader_data().is_none() {
if timeout.is_some() && now.elapsed() > Duration::new(timeout.unwrap(), 0) {
return Err(Error::CrdtError(CrdtError::NoLeader));
}
}

ncp.close()?;
let leader = crdt.read().unwrap().leader_data().unwrap().clone();
Ok(leader)
}

#[cfg(test)]
mod tests {
use super::*;
Expand Down

0 comments on commit 7d195e4

Please sign in to comment.