From b52092bdfd8f7e612ead846198315148c6341311 Mon Sep 17 00:00:00 2001 From: Greg Fitzgerald Date: Fri, 25 May 2018 16:52:17 -0600 Subject: [PATCH 1/7] Rename sigverify modules --- src/lib.rs | 4 ++-- src/{ecdsa.rs => sigverify.rs} | 6 +++--- src/{sig_verify_stage.rs => sigverify_stage.rs} | 4 ++-- src/tpu.rs | 8 ++++---- 4 files changed, 11 insertions(+), 11 deletions(-) rename src/{ecdsa.rs => sigverify.rs} (97%) rename src/{sig_verify_stage.rs => sigverify_stage.rs} (97%) diff --git a/src/lib.rs b/src/lib.rs index 48dedfbc7aa1f3..184da6fdee3ca5 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -2,7 +2,6 @@ pub mod bank; pub mod banking_stage; pub mod crdt; -pub mod ecdsa; pub mod entry; pub mod entry_writer; #[cfg(feature = "erasure")] @@ -22,8 +21,9 @@ pub mod request_stage; pub mod result; pub mod rpu; pub mod server; -pub mod sig_verify_stage; pub mod signature; +pub mod sigverify; +pub mod sigverify_stage; pub mod streamer; pub mod thin_client; pub mod timing; diff --git a/src/ecdsa.rs b/src/sigverify.rs similarity index 97% rename from src/ecdsa.rs rename to src/sigverify.rs index d8839ba4770265..1a859b8326c636 100644 --- a/src/ecdsa.rs +++ b/src/sigverify.rs @@ -143,8 +143,8 @@ pub fn ed25519_verify(batches: &Vec) -> Vec> { #[cfg(test)] mod tests { use bincode::serialize; - use ecdsa; use packet::{Packet, Packets, SharedPackets}; + use sigverify; use std::sync::RwLock; use transaction::Transaction; use transaction::{memfind, test_tx}; @@ -154,7 +154,7 @@ mod tests { let tx = test_tx(); let tx_bytes = serialize(&tx).unwrap(); let packet = serialize(&tx).unwrap(); - assert_matches!(memfind(&packet, &tx_bytes), Some(ecdsa::TX_OFFSET)); + assert_matches!(memfind(&packet, &tx_bytes), Some(sigverify::TX_OFFSET)); assert_matches!(memfind(&packet, &[0, 1, 2, 3, 4, 5, 6, 7, 8, 9]), None); } @@ -185,7 +185,7 @@ mod tests { let batches = vec![shared_packets.clone(), shared_packets.clone()]; // verify packets - let ans = ecdsa::ed25519_verify(&batches); + let ans = sigverify::ed25519_verify(&batches); // check result let ref_ans = if modify_data { 0u8 } else { 1u8 }; diff --git a/src/sig_verify_stage.rs b/src/sigverify_stage.rs similarity index 97% rename from src/sig_verify_stage.rs rename to src/sigverify_stage.rs index ccb201e3aec723..3750378df527ec 100644 --- a/src/sig_verify_stage.rs +++ b/src/sigverify_stage.rs @@ -1,9 +1,9 @@ //! The `sig_verify_stage` implements the signature verification stage of the TPU. -use ecdsa; use packet::SharedPackets; use rand::{thread_rng, Rng}; use result::Result; +use sigverify; use std::sync::atomic::{AtomicBool, Ordering}; use std::sync::mpsc::{channel, Receiver, Sender}; use std::sync::{Arc, Mutex}; @@ -28,7 +28,7 @@ impl SigVerifyStage { } fn verify_batch(batch: Vec) -> Vec<(SharedPackets, Vec)> { - let r = ecdsa::ed25519_verify(&batch); + let r = sigverify::ed25519_verify(&batch); batch.into_iter().zip(r).collect() } diff --git a/src/tpu.rs b/src/tpu.rs index a0c473fc270c6c..b813bd3275bc05 100644 --- a/src/tpu.rs +++ b/src/tpu.rs @@ -7,7 +7,7 @@ use crdt::{Crdt, ReplicatedData}; use hash::Hash; use packet; use record_stage::RecordStage; -use sig_verify_stage::SigVerifyStage; +use sigverify_stage::SigVerifyStage; use std::io::Write; use std::net::UdpSocket; use std::sync::atomic::AtomicBool; @@ -43,13 +43,13 @@ impl Tpu { packet_sender, ); - let sig_verify_stage = SigVerifyStage::new(exit.clone(), packet_receiver); + let sigverify_stage = SigVerifyStage::new(exit.clone(), packet_receiver); let blob_recycler = packet::BlobRecycler::default(); let banking_stage = BankingStage::new( bank.clone(), exit.clone(), - sig_verify_stage.verified_receiver, + sigverify_stage.verified_receiver, packet_recycler.clone(), ); @@ -87,7 +87,7 @@ impl Tpu { t_listen, t_broadcast, ]; - thread_hdls.extend(sig_verify_stage.thread_hdls.into_iter()); + thread_hdls.extend(sigverify_stage.thread_hdls.into_iter()); Tpu { thread_hdls } } } From 3af421e8b3447a77eb21f338813b40373e6d7d28 Mon Sep 17 00:00:00 2001 From: Greg Fitzgerald Date: Fri, 25 May 2018 16:54:03 -0600 Subject: [PATCH 2/7] Replace client-demo with multinode-demo --- Cargo.toml | 4 - src/bin/client-demo.rs | 207 +++++++++++++++++++++--------- src/bin/multinode-demo.rs | 264 -------------------------------------- 3 files changed, 148 insertions(+), 327 deletions(-) delete mode 100644 src/bin/multinode-demo.rs diff --git a/Cargo.toml b/Cargo.toml index 35a7f75870f2c7..efc2e21554da22 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -16,10 +16,6 @@ license = "Apache-2.0" name = "solana-client-demo" path = "src/bin/client-demo.rs" -[[bin]] -name = "solana-multinode-demo" -path = "src/bin/multinode-demo.rs" - [[bin]] name = "solana-testnode" path = "src/bin/testnode.rs" diff --git a/src/bin/client-demo.rs b/src/bin/client-demo.rs index 9d7ef41f66f822..78c62f8eac1c6f 100644 --- a/src/bin/client-demo.rs +++ b/src/bin/client-demo.rs @@ -9,14 +9,20 @@ use futures::Future; use getopts::Options; use isatty::stdin_isatty; use rayon::prelude::*; +use solana::crdt::{Crdt, ReplicatedData}; use solana::mint::MintDemo; -use solana::signature::{GenKeys, KeyPairUtil}; +use solana::signature::{GenKeys, KeyPair, KeyPairUtil}; +use solana::streamer::default_window; use solana::thin_client::ThinClient; use solana::transaction::Transaction; use std::env; +use std::fs::File; use std::io::{stdin, Read}; use std::net::{SocketAddr, UdpSocket}; use std::process::exit; +use std::sync::atomic::{AtomicBool, Ordering}; +use std::sync::{Arc, RwLock}; +use std::thread::JoinHandle; use std::thread::sleep; use std::time::Duration; use std::time::Instant; @@ -32,13 +38,19 @@ fn print_usage(program: &str, opts: Options) { fn main() { let mut threads = 4usize; - let mut server_addr: String = "127.0.0.1:8000".to_string(); - let mut requests_addr: String = "127.0.0.1:8010".to_string(); + let mut num_nodes = 10usize; + let mut leader = "leader.json".to_string(); let mut opts = Options::new(); - opts.optopt("s", "", "server address", "host:port"); + opts.optopt("l", "", "leader", "leader.json"); opts.optopt("c", "", "client address", "host:port"); opts.optopt("t", "", "number of threads", &format!("{}", threads)); + opts.optopt( + "n", + "", + "number of nodes to converge to", + &format!("{}", num_nodes), + ); opts.optflag("h", "help", "print help"); let args: Vec = env::args().collect(); let matches = match opts.parse(&args[1..]) { @@ -54,19 +66,32 @@ fn main() { print_usage(&program, opts); return; } - if matches.opt_present("s") { - server_addr = matches.opt_str("s").unwrap(); - } - if matches.opt_present("c") { - requests_addr = matches.opt_str("c").unwrap(); + if matches.opt_present("l") { + leader = matches.opt_str("l").unwrap(); } + let client_addr: Arc> = if matches.opt_present("c") { + let addr = matches.opt_str("c").unwrap().parse().unwrap(); + Arc::new(RwLock::new(addr)) + } else { + Arc::new(RwLock::new("127.0.0.1:8010".parse().unwrap())) + }; if matches.opt_present("t") { threads = matches.opt_str("t").unwrap().parse().expect("integer"); } + if matches.opt_present("n") { + num_nodes = matches.opt_str("n").unwrap().parse().expect("integer"); + } - let mut transactions_addr: SocketAddr = requests_addr.parse().unwrap(); - let requests_port = transactions_addr.port(); - transactions_addr.set_port(requests_port + 1); + let leader: ReplicatedData = read_leader(leader); + let signal = Arc::new(AtomicBool::new(false)); + let mut c_threads = vec![]; + let validators = converge( + &client_addr, + &leader, + signal.clone(), + num_nodes + 2, + &mut c_threads, + ); if stdin_isatty() { eprintln!("nothing found on stdin, expected a json file"); @@ -85,23 +110,7 @@ fn main() { eprintln!("failed to parse json: {}", e); exit(1); }); - - println!("Binding to {}", requests_addr); - let requests_socket = UdpSocket::bind(&requests_addr).unwrap(); - requests_socket - .set_read_timeout(Some(Duration::new(5, 0))) - .unwrap(); - let transactions_socket = UdpSocket::bind(&transactions_addr).unwrap(); - let requests_addr: SocketAddr = server_addr.parse().unwrap(); - let requests_port = requests_addr.port(); - let mut transactions_addr = requests_addr.clone(); - transactions_addr.set_port(requests_port + 3); - let mut client = ThinClient::new( - requests_addr, - requests_socket, - transactions_addr, - transactions_socket, - ); + let mut client = mk_client(&client_addr, &leader); println!("Get last ID..."); let last_id = client.get_last_id().wait().unwrap(); @@ -120,7 +129,7 @@ fn main() { .into_par_iter() .map(|chunk| Transaction::new(&chunk[0], chunk[1].pubkey(), 1, last_id)) .collect(); - let mut duration = now.elapsed(); + let duration = now.elapsed(); let ns = duration.as_secs() * 1_000_000_000 + u64::from(duration.subsec_nanos()); let bsps = txs as f64 / ns as f64; let nsps = ns as f64 / txs as f64; @@ -130,46 +139,126 @@ fn main() { nsps / 1_000_f64 ); - let initial_tx_count = client.transaction_count(); - println!("initial count {}", initial_tx_count); + let first_count = client.transaction_count(); + println!("initial count {}", first_count); println!("Transfering {} transactions in {} batches", txs, threads); - let now = Instant::now(); let sz = transactions.len() / threads; let chunks: Vec<_> = transactions.chunks(sz).collect(); chunks.into_par_iter().for_each(|txs| { println!("Transferring 1 unit {} times... to", txs.len()); - let requests_addr: SocketAddr = server_addr.parse().unwrap(); - let mut requests_cb_addr = requests_addr.clone(); - requests_cb_addr.set_port(0); - let requests_socket = UdpSocket::bind(requests_cb_addr).unwrap(); - requests_socket - .set_read_timeout(Some(Duration::new(5, 0))) - .unwrap(); - let mut transactions_addr: SocketAddr = requests_addr.clone(); - transactions_addr.set_port(0); - let transactions_socket = UdpSocket::bind(&transactions_addr).unwrap(); - let client = ThinClient::new( - requests_addr, - requests_socket, - transactions_addr, - transactions_socket, - ); + let client = mk_client(&client_addr, &leader); for tx in txs { client.transfer_signed(tx.clone()).unwrap(); } }); - println!("Waiting for transactions to complete...",); - let mut tx_count; - for _ in 0..10 { - tx_count = client.transaction_count(); - duration = now.elapsed(); - let txs = tx_count - initial_tx_count; - println!("Transactions processed {}", txs); - let ns = duration.as_secs() * 1_000_000_000 + u64::from(duration.subsec_nanos()); - let tps = (txs * 1_000_000_000) as f64 / ns as f64; - println!("{} tps", tps); + println!("Sampling tps every second...",); + validators.into_par_iter().for_each(|val| { + let mut client = mk_client(&client_addr, &val); + let mut now = Instant::now(); + let mut initial_tx_count = client.transaction_count(); + for i in 0..100 { + let tx_count = client.transaction_count(); + let duration = now.elapsed(); + now = Instant::now(); + let sample = tx_count - initial_tx_count; + initial_tx_count = tx_count; + println!( + "{}: Transactions processed {}", + val.transactions_addr, sample + ); + let ns = duration.as_secs() * 1_000_000_000 + u64::from(duration.subsec_nanos()); + let tps = (sample * 1_000_000_000) as f64 / ns as f64; + println!("{}: {} tps", val.transactions_addr, tps); + let total = tx_count - first_count; + println!( + "{}: Total Transactions processed {}", + val.transactions_addr, total + ); + if total == transactions.len() as u64 { + break; + } + if i > 20 && sample == 0 { + break; + } + sleep(Duration::new(1, 0)); + } + }); + signal.store(true, Ordering::Relaxed); + for t in c_threads { + t.join().unwrap(); + } +} + +fn mk_client(locked_addr: &Arc>, r: &ReplicatedData) -> ThinClient { + let mut addr = locked_addr.write().unwrap(); + let port = addr.port(); + let transactions_socket = UdpSocket::bind(addr.clone()).unwrap(); + addr.set_port(port + 1); + let requests_socket = UdpSocket::bind(addr.clone()).unwrap(); + addr.set_port(port + 2); + ThinClient::new( + r.requests_addr, + requests_socket, + r.transactions_addr, + transactions_socket, + ) +} + +fn spy_node(client_addr: &Arc>) -> (ReplicatedData, UdpSocket) { + let mut addr = client_addr.write().unwrap(); + let port = addr.port(); + let gossip = UdpSocket::bind(addr.clone()).unwrap(); + addr.set_port(port + 1); + let daddr = "0.0.0.0:0".parse().unwrap(); + let pubkey = KeyPair::new().pubkey(); + let node = ReplicatedData::new(pubkey, gossip.local_addr().unwrap(), daddr, daddr, daddr); + (node, gossip) +} + +fn converge( + client_addr: &Arc>, + leader: &ReplicatedData, + exit: Arc, + num_nodes: usize, + threads: &mut Vec>, +) -> Vec { + //lets spy on the network + let daddr = "0.0.0.0:0".parse().unwrap(); + let (spy, spy_gossip) = spy_node(client_addr); + let mut spy_crdt = Crdt::new(spy); + spy_crdt.insert(&leader); + spy_crdt.set_leader(leader.id); + + let spy_ref = Arc::new(RwLock::new(spy_crdt)); + let spy_window = default_window(); + let t_spy_listen = Crdt::listen(spy_ref.clone(), spy_window, spy_gossip, exit.clone()); + let t_spy_gossip = Crdt::gossip(spy_ref.clone(), exit.clone()); + //wait for the network to converge + for _ in 0..30 { + let min = spy_ref.read().unwrap().convergence(); + if num_nodes as u64 == min { + println!("converged!"); + break; + } sleep(Duration::new(1, 0)); } + threads.push(t_spy_listen); + threads.push(t_spy_gossip); + let v: Vec = spy_ref + .read() + .unwrap() + .table + .values() + .into_iter() + .filter(|x| x.requests_addr != daddr) + .map(|x| x.clone()) + .collect(); + v.clone() +} + +fn read_leader(path: String) -> ReplicatedData { + let file = File::open(path).expect("file"); + serde_json::from_reader(file).expect("parse") } diff --git a/src/bin/multinode-demo.rs b/src/bin/multinode-demo.rs deleted file mode 100644 index 78c62f8eac1c6f..00000000000000 --- a/src/bin/multinode-demo.rs +++ /dev/null @@ -1,264 +0,0 @@ -extern crate futures; -extern crate getopts; -extern crate isatty; -extern crate rayon; -extern crate serde_json; -extern crate solana; - -use futures::Future; -use getopts::Options; -use isatty::stdin_isatty; -use rayon::prelude::*; -use solana::crdt::{Crdt, ReplicatedData}; -use solana::mint::MintDemo; -use solana::signature::{GenKeys, KeyPair, KeyPairUtil}; -use solana::streamer::default_window; -use solana::thin_client::ThinClient; -use solana::transaction::Transaction; -use std::env; -use std::fs::File; -use std::io::{stdin, Read}; -use std::net::{SocketAddr, UdpSocket}; -use std::process::exit; -use std::sync::atomic::{AtomicBool, Ordering}; -use std::sync::{Arc, RwLock}; -use std::thread::JoinHandle; -use std::thread::sleep; -use std::time::Duration; -use std::time::Instant; - -fn print_usage(program: &str, opts: Options) { - let mut brief = format!("Usage: cat | {} [options]\n\n", program); - brief += " Solana client demo creates a number of transactions and\n"; - brief += " sends them to a target node."; - brief += " Takes json formatted mint file to stdin."; - - print!("{}", opts.usage(&brief)); -} - -fn main() { - let mut threads = 4usize; - let mut num_nodes = 10usize; - let mut leader = "leader.json".to_string(); - - let mut opts = Options::new(); - opts.optopt("l", "", "leader", "leader.json"); - opts.optopt("c", "", "client address", "host:port"); - opts.optopt("t", "", "number of threads", &format!("{}", threads)); - opts.optopt( - "n", - "", - "number of nodes to converge to", - &format!("{}", num_nodes), - ); - opts.optflag("h", "help", "print help"); - let args: Vec = env::args().collect(); - let matches = match opts.parse(&args[1..]) { - Ok(m) => m, - Err(e) => { - eprintln!("{}", e); - exit(1); - } - }; - - if matches.opt_present("h") { - let program = args[0].clone(); - print_usage(&program, opts); - return; - } - if matches.opt_present("l") { - leader = matches.opt_str("l").unwrap(); - } - let client_addr: Arc> = if matches.opt_present("c") { - let addr = matches.opt_str("c").unwrap().parse().unwrap(); - Arc::new(RwLock::new(addr)) - } else { - Arc::new(RwLock::new("127.0.0.1:8010".parse().unwrap())) - }; - if matches.opt_present("t") { - threads = matches.opt_str("t").unwrap().parse().expect("integer"); - } - if matches.opt_present("n") { - num_nodes = matches.opt_str("n").unwrap().parse().expect("integer"); - } - - let leader: ReplicatedData = read_leader(leader); - let signal = Arc::new(AtomicBool::new(false)); - let mut c_threads = vec![]; - let validators = converge( - &client_addr, - &leader, - signal.clone(), - num_nodes + 2, - &mut c_threads, - ); - - if stdin_isatty() { - eprintln!("nothing found on stdin, expected a json file"); - exit(1); - } - - let mut buffer = String::new(); - let num_bytes = stdin().read_to_string(&mut buffer).unwrap(); - if num_bytes == 0 { - eprintln!("empty file on stdin, expected a json file"); - exit(1); - } - - println!("Parsing stdin..."); - let demo: MintDemo = serde_json::from_str(&buffer).unwrap_or_else(|e| { - eprintln!("failed to parse json: {}", e); - exit(1); - }); - let mut client = mk_client(&client_addr, &leader); - - println!("Get last ID..."); - let last_id = client.get_last_id().wait().unwrap(); - println!("Got last ID {:?}", last_id); - - let rnd = GenKeys::new(demo.mint.keypair().public_key_bytes()); - - println!("Creating keypairs..."); - let txs = demo.num_accounts / 2; - let keypairs = rnd.gen_n_keypairs(demo.num_accounts); - let keypair_pairs: Vec<_> = keypairs.chunks(2).collect(); - - println!("Signing transactions..."); - let now = Instant::now(); - let transactions: Vec<_> = keypair_pairs - .into_par_iter() - .map(|chunk| Transaction::new(&chunk[0], chunk[1].pubkey(), 1, last_id)) - .collect(); - let duration = now.elapsed(); - let ns = duration.as_secs() * 1_000_000_000 + u64::from(duration.subsec_nanos()); - let bsps = txs as f64 / ns as f64; - let nsps = ns as f64 / txs as f64; - println!( - "Done. {} thousand signatures per second, {}us per signature", - bsps * 1_000_000_f64, - nsps / 1_000_f64 - ); - - let first_count = client.transaction_count(); - println!("initial count {}", first_count); - - println!("Transfering {} transactions in {} batches", txs, threads); - let sz = transactions.len() / threads; - let chunks: Vec<_> = transactions.chunks(sz).collect(); - chunks.into_par_iter().for_each(|txs| { - println!("Transferring 1 unit {} times... to", txs.len()); - let client = mk_client(&client_addr, &leader); - for tx in txs { - client.transfer_signed(tx.clone()).unwrap(); - } - }); - - println!("Sampling tps every second...",); - validators.into_par_iter().for_each(|val| { - let mut client = mk_client(&client_addr, &val); - let mut now = Instant::now(); - let mut initial_tx_count = client.transaction_count(); - for i in 0..100 { - let tx_count = client.transaction_count(); - let duration = now.elapsed(); - now = Instant::now(); - let sample = tx_count - initial_tx_count; - initial_tx_count = tx_count; - println!( - "{}: Transactions processed {}", - val.transactions_addr, sample - ); - let ns = duration.as_secs() * 1_000_000_000 + u64::from(duration.subsec_nanos()); - let tps = (sample * 1_000_000_000) as f64 / ns as f64; - println!("{}: {} tps", val.transactions_addr, tps); - let total = tx_count - first_count; - println!( - "{}: Total Transactions processed {}", - val.transactions_addr, total - ); - if total == transactions.len() as u64 { - break; - } - if i > 20 && sample == 0 { - break; - } - sleep(Duration::new(1, 0)); - } - }); - signal.store(true, Ordering::Relaxed); - for t in c_threads { - t.join().unwrap(); - } -} - -fn mk_client(locked_addr: &Arc>, r: &ReplicatedData) -> ThinClient { - let mut addr = locked_addr.write().unwrap(); - let port = addr.port(); - let transactions_socket = UdpSocket::bind(addr.clone()).unwrap(); - addr.set_port(port + 1); - let requests_socket = UdpSocket::bind(addr.clone()).unwrap(); - addr.set_port(port + 2); - ThinClient::new( - r.requests_addr, - requests_socket, - r.transactions_addr, - transactions_socket, - ) -} - -fn spy_node(client_addr: &Arc>) -> (ReplicatedData, UdpSocket) { - let mut addr = client_addr.write().unwrap(); - let port = addr.port(); - let gossip = UdpSocket::bind(addr.clone()).unwrap(); - addr.set_port(port + 1); - let daddr = "0.0.0.0:0".parse().unwrap(); - let pubkey = KeyPair::new().pubkey(); - let node = ReplicatedData::new(pubkey, gossip.local_addr().unwrap(), daddr, daddr, daddr); - (node, gossip) -} - -fn converge( - client_addr: &Arc>, - leader: &ReplicatedData, - exit: Arc, - num_nodes: usize, - threads: &mut Vec>, -) -> Vec { - //lets spy on the network - let daddr = "0.0.0.0:0".parse().unwrap(); - let (spy, spy_gossip) = spy_node(client_addr); - let mut spy_crdt = Crdt::new(spy); - spy_crdt.insert(&leader); - spy_crdt.set_leader(leader.id); - - let spy_ref = Arc::new(RwLock::new(spy_crdt)); - let spy_window = default_window(); - let t_spy_listen = Crdt::listen(spy_ref.clone(), spy_window, spy_gossip, exit.clone()); - let t_spy_gossip = Crdt::gossip(spy_ref.clone(), exit.clone()); - //wait for the network to converge - for _ in 0..30 { - let min = spy_ref.read().unwrap().convergence(); - if num_nodes as u64 == min { - println!("converged!"); - break; - } - sleep(Duration::new(1, 0)); - } - threads.push(t_spy_listen); - threads.push(t_spy_gossip); - let v: Vec = spy_ref - .read() - .unwrap() - .table - .values() - .into_iter() - .filter(|x| x.requests_addr != daddr) - .map(|x| x.clone()) - .collect(); - v.clone() -} - -fn read_leader(path: String) -> ReplicatedData { - let file = File::open(path).expect("file"); - serde_json::from_reader(file).expect("parse") -} From 1da7826396b61ef6fd7a757fbc75eedd30e002ac Mon Sep 17 00:00:00 2001 From: Greg Fitzgerald Date: Fri, 25 May 2018 16:56:32 -0600 Subject: [PATCH 3/7] testnode -> fullnode It's the real deal. --- Cargo.toml | 4 ++-- README.md | 12 ++++++------ src/bin/{testnode.rs => fullnode.rs} | 0 3 files changed, 8 insertions(+), 8 deletions(-) rename src/bin/{testnode.rs => fullnode.rs} (100%) diff --git a/Cargo.toml b/Cargo.toml index efc2e21554da22..0eb6f858189429 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -17,8 +17,8 @@ name = "solana-client-demo" path = "src/bin/client-demo.rs" [[bin]] -name = "solana-testnode" -path = "src/bin/testnode.rs" +name = "solana-fullnode" +path = "src/bin/fullnode.rs" [[bin]] name = "solana-genesis" diff --git a/README.md b/README.md index 7c876c766da1cf..d7d2b18c91cf31 100644 --- a/README.md +++ b/README.md @@ -36,7 +36,7 @@ $ git clone https://github.com/solana-labs/solana.git $ cd solana ``` -The testnode server is initialized with a ledger from stdin and +The fullnode server is initialized with a ledger from stdin and generates new ledger entries on stdout. To create the input ledger, we'll need to create *the mint* and use it to generate a *genesis ledger*. It's done in two steps because the mint-demo.json file contains private keys that will be @@ -50,7 +50,7 @@ used later in this demo. Now you can start the server: ```bash - $ cat genesis.log | cargo run --release --bin solana-testnode > transactions0.log + $ cat genesis.log | cargo run --release --bin solana-fullnode > transactions0.log ``` Wait a few seconds for the server to initialize. It will print "Ready." when it's safe @@ -76,7 +76,7 @@ Now restart the server from where we left off. Pass it both the genesis ledger, the transaction ledger. ```bash - $ cat genesis.log transactions0.log | cargo run --release --bin solana-testnode > transactions1.log + $ cat genesis.log transactions0.log | cargo run --release --bin solana-fullnode > transactions1.log ``` Lastly, run the client demo again, and verify that all funds were spent in the @@ -128,11 +128,11 @@ Debugging --- There are some useful debug messages in the code, you can enable them on a per-module and per-level -basis with the normal RUST\_LOG environment variable. Run the testnode with this syntax: +basis with the normal RUST\_LOG environment variable. Run the fullnode with this syntax: ```bash -$ RUST_LOG=solana::streamer=debug,solana::accountant_skel=info cat genesis.log | ./target/release/solana-testnode > transactions0.log +$ RUST_LOG=solana::streamer=debug,solana::server=info cat genesis.log | ./target/release/solana-fullnode > transactions0.log ``` -to see the debug and info sections for streamer and accountant\_skel respectively. Generally +to see the debug and info sections for streamer and server respectively. Generally we are using debug for infrequent debug messages, trace for potentially frequent messages and info for performance-related logging. diff --git a/src/bin/testnode.rs b/src/bin/fullnode.rs similarity index 100% rename from src/bin/testnode.rs rename to src/bin/fullnode.rs From 04d5960866e8c03f62cc7a895bfc3d0def0ccaef Mon Sep 17 00:00:00 2001 From: Greg Fitzgerald Date: Fri, 25 May 2018 16:57:47 -0600 Subject: [PATCH 4/7] Fix comments --- src/request_processor.rs | 2 +- src/sigverify_stage.rs | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/src/request_processor.rs b/src/request_processor.rs index 165b3eebce3b70..6b7fd03f986bdf 100644 --- a/src/request_processor.rs +++ b/src/request_processor.rs @@ -1,4 +1,4 @@ -//! The `request_stage` processes thin client Request messages. +//! The `request_processor` processes thin client Request messages. use bank::Bank; use bincode::{deserialize, serialize}; diff --git a/src/sigverify_stage.rs b/src/sigverify_stage.rs index 3750378df527ec..41c8238e0ea515 100644 --- a/src/sigverify_stage.rs +++ b/src/sigverify_stage.rs @@ -1,4 +1,4 @@ -//! The `sig_verify_stage` implements the signature verification stage of the TPU. +//! The `sigverify_stage` implements the signature verification stage of the TPU. use packet::SharedPackets; use rand::{thread_rng, Rng}; From aea78962a0fabb397eb368f0ed30c847a562c5be Mon Sep 17 00:00:00 2001 From: Greg Fitzgerald Date: Fri, 25 May 2018 17:04:28 -0600 Subject: [PATCH 5/7] Move channel-oriented code into request_stage --- src/request_processor.rs | 97 ---------------------------------------- src/request_stage.rs | 97 +++++++++++++++++++++++++++++++++++++++- 2 files changed, 96 insertions(+), 98 deletions(-) diff --git a/src/request_processor.rs b/src/request_processor.rs index 6b7fd03f986bdf..2ce0ecf48789b3 100644 --- a/src/request_processor.rs +++ b/src/request_processor.rs @@ -1,19 +1,9 @@ //! The `request_processor` processes thin client Request messages. use bank::Bank; -use bincode::{deserialize, serialize}; -use packet; -use packet::SharedPackets; -use rayon::prelude::*; use request::{Request, Response}; -use result::Result; -use std::collections::VecDeque; use std::net::SocketAddr; use std::sync::Arc; -use std::sync::mpsc::Receiver; -use std::time::Instant; -use streamer; -use timing; pub struct RequestProcessor { bank: Arc, @@ -61,91 +51,4 @@ impl RequestProcessor { .filter_map(|(req, rsp_addr)| self.process_request(req, rsp_addr)) .collect() } - - pub fn deserialize_requests(p: &packet::Packets) -> Vec> { - p.packets - .par_iter() - .map(|x| { - deserialize(&x.data[0..x.meta.size]) - .map(|req| (req, x.meta.addr())) - .ok() - }) - .collect() - } - - /// Split Request list into verified transactions and the rest - fn serialize_response( - resp: Response, - rsp_addr: SocketAddr, - blob_recycler: &packet::BlobRecycler, - ) -> Result { - let blob = blob_recycler.allocate(); - { - let mut b = blob.write().unwrap(); - let v = serialize(&resp)?; - let len = v.len(); - b.data[..len].copy_from_slice(&v); - b.meta.size = len; - b.meta.set_addr(&rsp_addr); - } - Ok(blob) - } - - fn serialize_responses( - rsps: Vec<(Response, SocketAddr)>, - blob_recycler: &packet::BlobRecycler, - ) -> Result> { - let mut blobs = VecDeque::new(); - for (resp, rsp_addr) in rsps { - blobs.push_back(Self::serialize_response(resp, rsp_addr, blob_recycler)?); - } - Ok(blobs) - } - - pub fn process_request_packets( - &self, - packet_receiver: &Receiver, - blob_sender: &streamer::BlobSender, - packet_recycler: &packet::PacketRecycler, - blob_recycler: &packet::BlobRecycler, - ) -> Result<()> { - let (batch, batch_len) = streamer::recv_batch(packet_receiver)?; - - info!( - "@{:?} request_stage: processing: {}", - timing::timestamp(), - batch_len - ); - - let mut reqs_len = 0; - let proc_start = Instant::now(); - for msgs in batch { - let reqs: Vec<_> = Self::deserialize_requests(&msgs.read().unwrap()) - .into_iter() - .filter_map(|x| x) - .collect(); - reqs_len += reqs.len(); - - let rsps = self.process_requests(reqs); - - let blobs = Self::serialize_responses(rsps, blob_recycler)?; - if !blobs.is_empty() { - info!("process: sending blobs: {}", blobs.len()); - //don't wake up the other side if there is nothing - blob_sender.send(blobs)?; - } - packet_recycler.recycle(msgs); - } - let total_time_s = timing::duration_as_s(&proc_start.elapsed()); - let total_time_ms = timing::duration_as_ms(&proc_start.elapsed()); - info!( - "@{:?} done process batches: {} time: {:?}ms reqs: {} reqs/s: {}", - timing::timestamp(), - batch_len, - total_time_ms, - reqs_len, - (reqs_len as f32) / (total_time_s) - ); - Ok(()) - } } diff --git a/src/request_stage.rs b/src/request_stage.rs index cd98a7d439bdd6..8b4e0db09e52a3 100644 --- a/src/request_stage.rs +++ b/src/request_stage.rs @@ -1,13 +1,21 @@ //! The `request_stage` processes thin client Request messages. +use bincode::{deserialize, serialize}; use packet; use packet::SharedPackets; +use rayon::prelude::*; +use request::{Request, Response}; use request_processor::RequestProcessor; +use result::Result; +use std::collections::VecDeque; +use std::net::SocketAddr; use std::sync::Arc; use std::sync::atomic::{AtomicBool, Ordering}; use std::sync::mpsc::{channel, Receiver}; use std::thread::{spawn, JoinHandle}; +use std::time::Instant; use streamer; +use timing; pub struct RequestStage { pub thread_hdl: JoinHandle<()>, @@ -16,6 +24,92 @@ pub struct RequestStage { } impl RequestStage { + pub fn deserialize_requests(p: &packet::Packets) -> Vec> { + p.packets + .par_iter() + .map(|x| { + deserialize(&x.data[0..x.meta.size]) + .map(|req| (req, x.meta.addr())) + .ok() + }) + .collect() + } + + /// Split Request list into verified transactions and the rest + fn serialize_response( + resp: Response, + rsp_addr: SocketAddr, + blob_recycler: &packet::BlobRecycler, + ) -> Result { + let blob = blob_recycler.allocate(); + { + let mut b = blob.write().unwrap(); + let v = serialize(&resp)?; + let len = v.len(); + b.data[..len].copy_from_slice(&v); + b.meta.size = len; + b.meta.set_addr(&rsp_addr); + } + Ok(blob) + } + + fn serialize_responses( + rsps: Vec<(Response, SocketAddr)>, + blob_recycler: &packet::BlobRecycler, + ) -> Result> { + let mut blobs = VecDeque::new(); + for (resp, rsp_addr) in rsps { + blobs.push_back(Self::serialize_response(resp, rsp_addr, blob_recycler)?); + } + Ok(blobs) + } + + pub fn process_request_packets( + request_processor: &RequestProcessor, + packet_receiver: &Receiver, + blob_sender: &streamer::BlobSender, + packet_recycler: &packet::PacketRecycler, + blob_recycler: &packet::BlobRecycler, + ) -> Result<()> { + let (batch, batch_len) = streamer::recv_batch(packet_receiver)?; + + info!( + "@{:?} request_stage: processing: {}", + timing::timestamp(), + batch_len + ); + + let mut reqs_len = 0; + let proc_start = Instant::now(); + for msgs in batch { + let reqs: Vec<_> = Self::deserialize_requests(&msgs.read().unwrap()) + .into_iter() + .filter_map(|x| x) + .collect(); + reqs_len += reqs.len(); + + let rsps = request_processor.process_requests(reqs); + + let blobs = Self::serialize_responses(rsps, blob_recycler)?; + if !blobs.is_empty() { + info!("process: sending blobs: {}", blobs.len()); + //don't wake up the other side if there is nothing + blob_sender.send(blobs)?; + } + packet_recycler.recycle(msgs); + } + let total_time_s = timing::duration_as_s(&proc_start.elapsed()); + let total_time_ms = timing::duration_as_ms(&proc_start.elapsed()); + info!( + "@{:?} done process batches: {} time: {:?}ms reqs: {} reqs/s: {}", + timing::timestamp(), + batch_len, + total_time_ms, + reqs_len, + (reqs_len as f32) / (total_time_s) + ); + Ok(()) + } pub fn new( request_processor: RequestProcessor, exit: Arc, @@ -27,7 +121,8 @@ impl RequestStage { let request_processor_ = request_processor.clone(); let (blob_sender, blob_receiver) = channel(); let thread_hdl = spawn(move || loop { - let e = request_processor_.process_request_packets( + let e = Self::process_request_packets( + &request_processor_, &packet_receiver, &blob_sender, &packet_recycler, From f19aa06a3eea3125c9acc0f26162e78e24bceb73 Mon Sep 17 00:00:00 2001 From: Greg Fitzgerald Date: Fri, 25 May 2018 17:06:10 -0600 Subject: [PATCH 6/7] Generalize serialize_responses --- src/request_stage.rs | 11 ++++++----- 1 file changed, 6 insertions(+), 5 deletions(-) diff --git a/src/request_stage.rs b/src/request_stage.rs index 8b4e0db09e52a3..63344ede788590 100644 --- a/src/request_stage.rs +++ b/src/request_stage.rs @@ -4,9 +4,10 @@ use bincode::{deserialize, serialize}; use packet; use packet::SharedPackets; use rayon::prelude::*; -use request::{Request, Response}; +use request::Request; use request_processor::RequestProcessor; use result::Result; +use serde::Serialize; use std::collections::VecDeque; use std::net::SocketAddr; use std::sync::Arc; @@ -36,8 +37,8 @@ impl RequestStage { } /// Split Request list into verified transactions and the rest - fn serialize_response( - resp: Response, + fn serialize_response( + resp: T, rsp_addr: SocketAddr, blob_recycler: &packet::BlobRecycler, ) -> Result { @@ -53,8 +54,8 @@ impl RequestStage { Ok(blob) } - fn serialize_responses( - rsps: Vec<(Response, SocketAddr)>, + fn serialize_responses( + rsps: Vec<(T, SocketAddr)>, blob_recycler: &packet::BlobRecycler, ) -> Result> { let mut blobs = VecDeque::new(); From 3186512c3b828f7e742888f6323064e8de0fb93e Mon Sep 17 00:00:00 2001 From: Greg Fitzgerald Date: Fri, 25 May 2018 17:10:14 -0600 Subject: [PATCH 7/7] request_stage::serialize_packets -> packet::to_blobs Good stuff - no need to hide them. --- src/packet.rs | 28 ++++++++++++++++++++++++++++ src/request_stage.rs | 35 ++--------------------------------- 2 files changed, 30 insertions(+), 33 deletions(-) diff --git a/src/packet.rs b/src/packet.rs index 086cd8e0ccdacf..064f65d4e0132a 100644 --- a/src/packet.rs +++ b/src/packet.rs @@ -236,6 +236,34 @@ pub fn to_packets(r: &PacketRecycler, xs: Vec) -> Vec( + resp: T, + rsp_addr: SocketAddr, + blob_recycler: &BlobRecycler, +) -> Result { + let blob = blob_recycler.allocate(); + { + let mut b = blob.write().unwrap(); + let v = serialize(&resp)?; + let len = v.len(); + b.data[..len].copy_from_slice(&v); + b.meta.size = len; + b.meta.set_addr(&rsp_addr); + } + Ok(blob) +} + +pub fn to_blobs( + rsps: Vec<(T, SocketAddr)>, + blob_recycler: &BlobRecycler, +) -> Result> { + let mut blobs = VecDeque::new(); + for (resp, rsp_addr) in rsps { + blobs.push_back(to_blob(resp, rsp_addr, blob_recycler)?); + } + Ok(blobs) +} + const BLOB_INDEX_END: usize = size_of::(); const BLOB_ID_END: usize = BLOB_INDEX_END + size_of::() + size_of::(); diff --git a/src/request_stage.rs b/src/request_stage.rs index 63344ede788590..33d845aab4824f 100644 --- a/src/request_stage.rs +++ b/src/request_stage.rs @@ -1,14 +1,12 @@ //! The `request_stage` processes thin client Request messages. -use bincode::{deserialize, serialize}; +use bincode::deserialize; use packet; use packet::SharedPackets; use rayon::prelude::*; use request::Request; use request_processor::RequestProcessor; use result::Result; -use serde::Serialize; -use std::collections::VecDeque; use std::net::SocketAddr; use std::sync::Arc; use std::sync::atomic::{AtomicBool, Ordering}; @@ -36,35 +34,6 @@ impl RequestStage { .collect() } - /// Split Request list into verified transactions and the rest - fn serialize_response( - resp: T, - rsp_addr: SocketAddr, - blob_recycler: &packet::BlobRecycler, - ) -> Result { - let blob = blob_recycler.allocate(); - { - let mut b = blob.write().unwrap(); - let v = serialize(&resp)?; - let len = v.len(); - b.data[..len].copy_from_slice(&v); - b.meta.size = len; - b.meta.set_addr(&rsp_addr); - } - Ok(blob) - } - - fn serialize_responses( - rsps: Vec<(T, SocketAddr)>, - blob_recycler: &packet::BlobRecycler, - ) -> Result> { - let mut blobs = VecDeque::new(); - for (resp, rsp_addr) in rsps { - blobs.push_back(Self::serialize_response(resp, rsp_addr, blob_recycler)?); - } - Ok(blobs) - } - pub fn process_request_packets( request_processor: &RequestProcessor, packet_receiver: &Receiver, @@ -91,7 +60,7 @@ impl RequestStage { let rsps = request_processor.process_requests(reqs); - let blobs = Self::serialize_responses(rsps, blob_recycler)?; + let blobs = packet::to_blobs(rsps, blob_recycler)?; if !blobs.is_empty() { info!("process: sending blobs: {}", blobs.len()); //don't wake up the other side if there is nothing