Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Use gossip in drone and wallet to identify leader ports #1002

Merged
merged 3 commits into from
Aug 23, 2018
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions multinode-demo/drone.sh
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ $rsync -vPz "$rsync_leader_url"/config/leader.json "$SOLANA_CONFIG_DIR"/
trap 'kill "$pid" && wait "$pid"' INT TERM
$solana_drone \
-l "$SOLANA_CONFIG_DIR"/leader.json -k "$SOLANA_CONFIG_PRIVATE_DIR"/mint.json \
--timeout 120 \
> >($drone_logger) 2>&1 &
pid=$!
oom_score_adj "$pid" 1000
Expand Down
2 changes: 1 addition & 1 deletion multinode-demo/wallet.sh
Original file line number Diff line number Diff line change
Expand Up @@ -42,4 +42,4 @@ fi

# shellcheck disable=SC2086 # $solana_wallet should not be quoted
exec $solana_wallet \
-l "$SOLANA_CONFIG_CLIENT_DIR"/leader.json -k "$client_id_path" "$@"
-l "$SOLANA_CONFIG_CLIENT_DIR"/leader.json -k "$client_id_path" --timeout 10 "$@"
34 changes: 25 additions & 9 deletions src/bin/drone.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,8 @@ use solana::fullnode::Config;
use solana::logger;
use solana::metrics::set_panic_hook;
use solana::signature::read_keypair;
use solana::thin_client::poll_gossip_for_leader;
use std::error;
use std::fs::File;
use std::net::{IpAddr, Ipv4Addr, SocketAddr};
use std::sync::{Arc, Mutex};
Expand All @@ -22,7 +24,7 @@ use tokio::net::TcpListener;
use tokio::prelude::*;
use tokio_codec::{BytesCodec, Decoder};

fn main() {
fn main() -> Result<(), Box<error::Error>> {
logger::setup();
set_panic_hook("drone");
let matches = App::new("drone")
Expand All @@ -45,20 +47,25 @@ fn main() {
.help("/path/to/mint.json"),
)
.arg(
Arg::with_name("time")
.short("t")
.long("time")
Arg::with_name("slice")
.long("slice")
.value_name("SECONDS")
.takes_value(true)
.help("time slice over which to limit requests to drone"),
.help("Time slice over which to limit requests to drone"),
)
.arg(
Arg::with_name("cap")
.short("c")
.long("cap")
.value_name("NUMBER")
.takes_value(true)
.help("request limit for time slice"),
.help("Request limit for time slice"),
)
.arg(
Arg::with_name("timeout")
.long("timeout")
.value_name("SECONDS")
.takes_value(true)
.help("Max SECONDS to wait to get necessary gossip from the network"),
)
.get_matches();

Expand All @@ -74,8 +81,8 @@ fn main() {
read_keypair(matches.value_of("keypair").expect("keypair")).expect("client keypair");

let time_slice: Option<u64>;
if let Some(t) = matches.value_of("time") {
time_slice = Some(t.to_string().parse().expect("integer"));
if let Some(secs) = matches.value_of("slice") {
time_slice = Some(secs.to_string().parse().expect("integer"));
} else {
time_slice = None;
}
Expand All @@ -85,6 +92,14 @@ fn main() {
} else {
request_cap = None;
}
let timeout: Option<u64>;
if let Some(secs) = matches.value_of("timeout") {
timeout = Some(secs.to_string().parse().expect("integer"));
} else {
timeout = None;
}

let leader = poll_gossip_for_leader(leader.contact_info.ncp, timeout)?;

let drone_addr: SocketAddr = format!("0.0.0.0:{}", DRONE_PORT).parse().unwrap();

Expand Down Expand Up @@ -141,6 +156,7 @@ fn main() {
tokio::spawn(processor)
});
tokio::run(done);
Ok(())
}
fn read_leader(path: &str) -> Config {
let file = File::open(path).unwrap_or_else(|_| panic!("file not found: {}", path));
Expand Down
22 changes: 17 additions & 5 deletions src/bin/wallet.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ use solana::drone::DRONE_PORT;
use solana::fullnode::Config;
use solana::logger;
use solana::signature::{read_keypair, Keypair, KeypairUtil, Pubkey, Signature};
use solana::thin_client::ThinClient;
use solana::thin_client::{poll_gossip_for_leader, ThinClient};
use solana::wallet::request_airdrop;
use std::error;
use std::fmt;
Expand Down Expand Up @@ -92,12 +92,18 @@ fn parse_args() -> Result<WalletConfig, Box<error::Error>> {
.takes_value(true)
.help("/path/to/id.json"),
)
.arg(
Arg::with_name("timeout")
.long("timeout")
.value_name("SECONDS")
.takes_value(true)
.help("Max SECONDS to wait to get necessary gossip from the network"),
)
.subcommand(
SubCommand::with_name("airdrop")
.about("Request a batch of tokens")
.arg(
Arg::with_name("tokens")
// .index(1)
.long("tokens")
.value_name("NUMBER")
.takes_value(true)
Expand All @@ -110,16 +116,14 @@ fn parse_args() -> Result<WalletConfig, Box<error::Error>> {
.about("Send a payment")
.arg(
Arg::with_name("tokens")
// .index(2)
.long("tokens")
.value_name("NUMBER")
.takes_value(true)
.required(true)
.help("the number of tokens to send"),
.help("The number of tokens to send"),
)
.arg(
Arg::with_name("to")
// .index(1)
.long("to")
.value_name("PUBKEY")
.takes_value(true)
Expand Down Expand Up @@ -148,6 +152,12 @@ fn parse_args() -> Result<WalletConfig, Box<error::Error>> {
let server_addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(0, 0, 0, 0)), 8000);
leader = NodeInfo::new_leader(&server_addr);
};
let timeout: Option<u64>;
if let Some(secs) = matches.value_of("timeout") {
timeout = Some(secs.to_string().parse().expect("integer"));
} else {
timeout = None;
}

let mut path = dirs::home_dir().expect("home directory");
let id_path = if matches.is_present("keypair") {
Expand All @@ -163,6 +173,8 @@ fn parse_args() -> Result<WalletConfig, Box<error::Error>> {
)))
})?;

let leader = poll_gossip_for_leader(leader.contact_info.ncp, timeout)?;

let mut drone_addr = leader.contact_info.tpu;
drone_addr.set_port(DRONE_PORT);

Expand Down
8 changes: 8 additions & 0 deletions src/result.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,14 @@ pub enum Error {

pub type Result<T> = std::result::Result<T, Error>;

impl std::fmt::Display for Error {
fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
write!(f, "solana error")
}
}

impl std::error::Error for Error {}

impl std::convert::From<std::sync::mpsc::RecvError> for Error {
fn from(e: std::sync::mpsc::RecvError) -> Error {
Error::RecvError(e)
Expand Down
37 changes: 37 additions & 0 deletions src/thin_client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,12 +5,17 @@

use bank::Account;
use bincode::{deserialize, serialize};
use crdt::{Crdt, CrdtError, NodeInfo, TestNode};
use hash::Hash;
use ncp::Ncp;
use request::{Request, Response};
use result::{Error, Result};
use signature::{Keypair, Pubkey, Signature};
use std::collections::HashMap;
use std::io;
use std::net::{SocketAddr, UdpSocket};
use std::sync::atomic::AtomicBool;
use std::sync::{Arc, RwLock};
use std::thread::sleep;
use std::time::Duration;
use std::time::Instant;
Expand Down Expand Up @@ -320,6 +325,38 @@ impl Drop for ThinClient {
}
}

pub fn poll_gossip_for_leader(leader_ncp: SocketAddr, timeout: Option<u64>) -> Result<NodeInfo> {
let exit = Arc::new(AtomicBool::new(false));
let testnode = TestNode::new_localhost();
let extra_data = testnode.data.clone();
let crdt = Arc::new(RwLock::new(Crdt::new(extra_data).expect("Crdt::new")));
let window = Arc::new(RwLock::new(vec![]));
let ncp = Ncp::new(
&crdt.clone(),
window,
None,
testnode.sockets.gossip,
testnode.sockets.gossip_send,
exit.clone(),
).unwrap();
let leader_entry_point = NodeInfo::new_entry_point(leader_ncp);
crdt.write().unwrap().insert(&leader_entry_point);

sleep(Duration::from_millis(100));

let now = Instant::now();
// Block until leader's correct contact info is received
while crdt.read().unwrap().leader_data().is_none() {
if timeout.is_some() && now.elapsed() > Duration::new(timeout.unwrap(), 0) {
return Err(Error::CrdtError(CrdtError::NoLeader));
}
}

ncp.close()?;
let leader = crdt.read().unwrap().leader_data().unwrap().clone();
Ok(leader)
}

#[cfg(test)]
mod tests {
use super::*;
Expand Down