From 374bff6550b8add9f1a45040c467f8b3c34d8535 Mon Sep 17 00:00:00 2001 From: Greg Fitzgerald Date: Mon, 14 May 2018 17:31:13 -0600 Subject: [PATCH 01/18] Extract event processing from request_stage --- src/banking_stage.rs | 253 +++++++++++++++++++++++++++++++++++++++++++ src/lib.rs | 1 + 2 files changed, 254 insertions(+) create mode 100644 src/banking_stage.rs diff --git a/src/banking_stage.rs b/src/banking_stage.rs new file mode 100644 index 00000000000000..cf72430c0a6040 --- /dev/null +++ b/src/banking_stage.rs @@ -0,0 +1,253 @@ +//! The `banking_stage` processes Event messages. + +use bank::Bank; +use bincode::deserialize; +use event::Event; +use packet; +use packet::SharedPackets; +use rayon::prelude::*; +use recorder::Signal; +use result::Result; +use std::net::SocketAddr; +use std::sync::Arc; +use std::sync::atomic::{AtomicBool, Ordering}; +use std::sync::mpsc::{channel, Receiver, Sender}; +use std::thread::{spawn, JoinHandle}; +use std::time::Duration; +use std::time::Instant; +use timing; + +pub struct BankingStage { + pub thread_hdl: JoinHandle<()>, + pub signal_receiver: Receiver, +} + +impl BankingStage { + pub fn new( + bank: Arc, + exit: Arc, + verified_receiver: Receiver)>>, + packet_recycler: packet::PacketRecycler, + ) -> Self { + let (signal_sender, signal_receiver) = channel(); + let thread_hdl = spawn(move || loop { + let e = Self::process_packets( + bank.clone(), + &verified_receiver, + &signal_sender, + &packet_recycler, + ); + if e.is_err() { + if exit.load(Ordering::Relaxed) { + break; + } + } + }); + BankingStage { + thread_hdl, + signal_receiver, + } + } + + fn deserialize_events(p: &packet::Packets) -> Vec> { + p.packets + .par_iter() + .map(|x| { + deserialize(&x.data[0..x.meta.size]) + .map(|req| (req, x.meta.addr())) + .ok() + }) + .collect() + } + + fn process_packets( + bank: Arc, + verified_receiver: &Receiver)>>, + signal_sender: &Sender, + packet_recycler: &packet::PacketRecycler, + ) -> Result<()> { + let timer = Duration::new(1, 0); + let recv_start = Instant::now(); + let mms = verified_receiver.recv_timeout(timer)?; + let mut reqs_len = 0; + let mms_len = mms.len(); + info!( + "@{:?} process start stalled for: {:?}ms batches: {}", + timing::timestamp(), + timing::duration_as_ms(&recv_start.elapsed()), + mms.len(), + ); + let proc_start = Instant::now(); + for (msgs, vers) in mms { + let events = Self::deserialize_events(&msgs.read().unwrap()); + reqs_len += events.len(); + let events = events + .into_iter() + .zip(vers) + .filter_map(|(event, ver)| match event { + None => None, + Some((event, _addr)) => if event.verify() && ver != 0 { + Some(event) + } else { + None + }, + }) + .collect(); + + debug!("process_events"); + let results = bank.process_verified_events(events); + let events = results.into_iter().filter_map(|x| x.ok()).collect(); + signal_sender.send(Signal::Events(events))?; + debug!("done process_events"); + + packet_recycler.recycle(msgs); + } + let total_time_s = timing::duration_as_s(&proc_start.elapsed()); + let total_time_ms = timing::duration_as_ms(&proc_start.elapsed()); + info!( + "@{:?} done processing event batches: {} time: {:?}ms reqs: {} reqs/s: {}", + timing::timestamp(), + mms_len, + total_time_ms, + reqs_len, + (reqs_len as f32) / (total_time_s) + ); + Ok(()) + } +} + +// TODO: When banking is pulled out of RequestStage, add this test back in. + +//use bank::Bank; +//use entry::Entry; +//use event::Event; +//use hash::Hash; +//use record_stage::RecordStage; +//use recorder::Signal; +//use result::Result; +//use std::sync::mpsc::{channel, Sender}; +//use std::sync::{Arc, Mutex}; +//use std::time::Duration; +// +//#[cfg(test)] +//mod tests { +// use bank::Bank; +// use event::Event; +// use event_processor::EventProcessor; +// use mint::Mint; +// use signature::{KeyPair, KeyPairUtil}; +// use transaction::Transaction; +// +// #[test] +// // TODO: Move this test banking_stage. Calling process_events() directly +// // defeats the purpose of this test. +// fn test_banking_sequential_consistency() { +// // In this attack we'll demonstrate that a verifier can interpret the ledger +// // differently if either the server doesn't signal the ledger to add an +// // Entry OR if the verifier tries to parallelize across multiple Entries. +// let mint = Mint::new(2); +// let bank = Bank::new(&mint); +// let event_processor = EventProcessor::new(bank, &mint.last_id(), None); +// +// // Process a batch that includes a transaction that receives two tokens. +// let alice = KeyPair::new(); +// let tr = Transaction::new(&mint.keypair(), alice.pubkey(), 2, mint.last_id()); +// let events = vec![Event::Transaction(tr)]; +// let entry0 = event_processor.process_events(events).unwrap(); +// +// // Process a second batch that spends one of those tokens. +// let tr = Transaction::new(&alice, mint.pubkey(), 1, mint.last_id()); +// let events = vec![Event::Transaction(tr)]; +// let entry1 = event_processor.process_events(events).unwrap(); +// +// // Collect the ledger and feed it to a new bank. +// let entries = vec![entry0, entry1]; +// +// // Assert the user holds one token, not two. If the server only output one +// // entry, then the second transaction will be rejected, because it drives +// // the account balance below zero before the credit is added. +// let bank = Bank::new(&mint); +// for entry in entries { +// assert!( +// bank +// .process_verified_events(entry.events) +// .into_iter() +// .all(|x| x.is_ok()) +// ); +// } +// assert_eq!(bank.get_balance(&alice.pubkey()), Some(1)); +// } +//} +// +//#[cfg(all(feature = "unstable", test))] +//mod bench { +// extern crate test; +// use self::test::Bencher; +// use bank::{Bank, MAX_ENTRY_IDS}; +// use bincode::serialize; +// use event_processor::*; +// use hash::hash; +// use mint::Mint; +// use rayon::prelude::*; +// use signature::{KeyPair, KeyPairUtil}; +// use std::collections::HashSet; +// use std::time::Instant; +// use transaction::Transaction; +// +// #[bench] +// fn process_events_bench(_bencher: &mut Bencher) { +// let mint = Mint::new(100_000_000); +// let bank = Bank::new(&mint); +// // Create transactions between unrelated parties. +// let txs = 100_000; +// let last_ids: Mutex> = Mutex::new(HashSet::new()); +// let transactions: Vec<_> = (0..txs) +// .into_par_iter() +// .map(|i| { +// // Seed the 'to' account and a cell for its signature. +// let dummy_id = i % (MAX_ENTRY_IDS as i32); +// let last_id = hash(&serialize(&dummy_id).unwrap()); // Semi-unique hash +// { +// let mut last_ids = last_ids.lock().unwrap(); +// if !last_ids.contains(&last_id) { +// last_ids.insert(last_id); +// bank.register_entry_id(&last_id); +// } +// } +// +// // Seed the 'from' account. +// let rando0 = KeyPair::new(); +// let tr = Transaction::new(&mint.keypair(), rando0.pubkey(), 1_000, last_id); +// bank.process_verified_transaction(&tr).unwrap(); +// +// let rando1 = KeyPair::new(); +// let tr = Transaction::new(&rando0, rando1.pubkey(), 2, last_id); +// bank.process_verified_transaction(&tr).unwrap(); +// +// // Finally, return a transaction that's unique +// Transaction::new(&rando0, rando1.pubkey(), 1, last_id) +// }) +// .collect(); +// +// let events: Vec<_> = transactions +// .into_iter() +// .map(|tr| Event::Transaction(tr)) +// .collect(); +// +// let event_processor = EventProcessor::new(bank, &mint.last_id(), None); +// +// let now = Instant::now(); +// assert!(event_processor.process_events(events).is_ok()); +// let duration = now.elapsed(); +// let sec = duration.as_secs() as f64 + duration.subsec_nanos() as f64 / 1_000_000_000.0; +// let tps = txs as f64 / sec; +// +// // Ensure that all transactions were successfully logged. +// drop(event_processor.historian_input); +// let entries: Vec = event_processor.output.lock().unwrap().iter().collect(); +// assert_eq!(entries.len(), 1); +// assert_eq!(entries[0].events.len(), txs as usize); +// +// println!("{} tps", tps); +// } +//} diff --git a/src/lib.rs b/src/lib.rs index 1c14c9320f91aa..dfe8697fddd4c7 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -1,5 +1,6 @@ #![cfg_attr(feature = "unstable", feature(test))] pub mod bank; +pub mod banking_stage; pub mod crdt; pub mod ecdsa; pub mod entry; From ef6bd7e3b8c6c6c1a8577c961e5677f13bc5236c Mon Sep 17 00:00:00 2001 From: Greg Fitzgerald Date: Mon, 14 May 2018 17:36:19 -0600 Subject: [PATCH 02/18] Add TPU --- src/lib.rs | 1 + src/tpu.rs | 112 +++++++++++++++++++++++++++++++++++++++++++++++++++++ 2 files changed, 113 insertions(+) create mode 100644 src/tpu.rs diff --git a/src/lib.rs b/src/lib.rs index dfe8697fddd4c7..993b04cb8c22bf 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -26,6 +26,7 @@ pub mod signature; pub mod streamer; pub mod thin_client; pub mod timing; +pub mod tpu; pub mod transaction; pub mod tvu; pub mod write_stage; diff --git a/src/tpu.rs b/src/tpu.rs new file mode 100644 index 00000000000000..accda0d52db25d --- /dev/null +++ b/src/tpu.rs @@ -0,0 +1,112 @@ +//! The `tpu` module implements the Transaction Processing Unit, a +//! 5-stage transaction processing pipeline in software. + +use bank::Bank; +use banking_stage::BankingStage; +use crdt::{Crdt, ReplicatedData}; +use hash::Hash; +use packet; +use record_stage::RecordStage; +use result::Result; +use sig_verify_stage::SigVerifyStage; +use std::io::Write; +use std::net::UdpSocket; +use std::sync::atomic::AtomicBool; +use std::sync::mpsc::channel; +use std::sync::{Arc, Mutex, RwLock}; +use std::thread::JoinHandle; +use std::time::Duration; +use streamer; +use write_stage::WriteStage; + +pub struct Tpu { + bank: Arc, + start_hash: Hash, + tick_duration: Option, +} + +impl Tpu { + /// Create a new Tpu that wraps the given Bank. + pub fn new(bank: Bank, start_hash: Hash, tick_duration: Option) -> Self { + Tpu { + bank: Arc::new(bank), + start_hash, + tick_duration, + } + } + + /// Create a UDP microservice that forwards messages the given Tpu. + /// This service is the network leader + /// Set `exit` to shutdown its threads. + pub fn serve( + &self, + me: ReplicatedData, + requests_socket: UdpSocket, + gossip: UdpSocket, + exit: Arc, + writer: W, + ) -> Result>> { + let crdt = Arc::new(RwLock::new(Crdt::new(me))); + let t_gossip = Crdt::gossip(crdt.clone(), exit.clone()); + let window = streamer::default_window(); + let t_listen = Crdt::listen(crdt.clone(), window.clone(), gossip, exit.clone()); + + // make sure we are on the same interface + let mut local = requests_socket.local_addr()?; + local.set_port(0); + + let packet_recycler = packet::PacketRecycler::default(); + let (packet_sender, packet_receiver) = channel(); + let t_receiver = streamer::receiver( + requests_socket, + exit.clone(), + packet_recycler.clone(), + packet_sender, + )?; + + let sig_verify_stage = SigVerifyStage::new(exit.clone(), packet_receiver); + + let blob_recycler = packet::BlobRecycler::default(); + let banking_stage = BankingStage::new( + self.bank.clone(), + exit.clone(), + sig_verify_stage.verified_receiver, + packet_recycler.clone(), + ); + + let record_stage = RecordStage::new( + banking_stage.signal_receiver, + &self.start_hash, + self.tick_duration, + ); + + let write_stage = WriteStage::new( + self.bank.clone(), + exit.clone(), + blob_recycler.clone(), + Mutex::new(writer), + record_stage.entry_receiver, + ); + + let broadcast_socket = UdpSocket::bind(local)?; + let t_broadcast = streamer::broadcaster( + broadcast_socket, + exit.clone(), + crdt.clone(), + window, + blob_recycler.clone(), + write_stage.blob_receiver, + ); + + let mut threads = vec![ + t_receiver, + banking_stage.thread_hdl, + write_stage.thread_hdl, + t_gossip, + t_listen, + t_broadcast, + ]; + threads.extend(sig_verify_stage.thread_hdls.into_iter()); + Ok(threads) + } +} From 2a268aa528101310d25555868631d2fdc67f027f Mon Sep 17 00:00:00 2001 From: Greg Fitzgerald Date: Tue, 15 May 2018 09:17:48 -0600 Subject: [PATCH 03/18] Reorder to reflect dependencies --- src/rpu.rs | 14 +++++++------- src/tpu.rs | 12 ++++++------ 2 files changed, 13 insertions(+), 13 deletions(-) diff --git a/src/rpu.rs b/src/rpu.rs index d48dc1510f083d..df7adca90c1d50 100644 --- a/src/rpu.rs +++ b/src/rpu.rs @@ -47,14 +47,11 @@ impl Rpu { exit: Arc, writer: W, ) -> Result>> { - let crdt = Arc::new(RwLock::new(Crdt::new(me))); - let t_gossip = Crdt::gossip(crdt.clone(), exit.clone()); - let window = streamer::default_window(); - let t_listen = Crdt::listen(crdt.clone(), window.clone(), gossip, exit.clone()); - // make sure we are on the same interface let mut local = requests_socket.local_addr()?; local.set_port(0); + let broadcast_socket = UdpSocket::bind(local)?; + let respond_socket = UdpSocket::bind(local.clone())?; let packet_recycler = packet::PacketRecycler::default(); let (packet_sender, packet_receiver) = channel(); @@ -91,7 +88,11 @@ impl Rpu { record_stage.entry_receiver, ); - let broadcast_socket = UdpSocket::bind(local)?; + let crdt = Arc::new(RwLock::new(Crdt::new(me))); + let t_gossip = Crdt::gossip(crdt.clone(), exit.clone()); + let window = streamer::default_window(); + let t_listen = Crdt::listen(crdt.clone(), window.clone(), gossip, exit.clone()); + let t_broadcast = streamer::broadcaster( broadcast_socket, exit.clone(), @@ -101,7 +102,6 @@ impl Rpu { write_stage.blob_receiver, ); - let respond_socket = UdpSocket::bind(local.clone())?; let t_responder = streamer::responder( respond_socket, exit.clone(), diff --git a/src/tpu.rs b/src/tpu.rs index accda0d52db25d..455a32ad103b63 100644 --- a/src/tpu.rs +++ b/src/tpu.rs @@ -46,14 +46,10 @@ impl Tpu { exit: Arc, writer: W, ) -> Result>> { - let crdt = Arc::new(RwLock::new(Crdt::new(me))); - let t_gossip = Crdt::gossip(crdt.clone(), exit.clone()); - let window = streamer::default_window(); - let t_listen = Crdt::listen(crdt.clone(), window.clone(), gossip, exit.clone()); - // make sure we are on the same interface let mut local = requests_socket.local_addr()?; local.set_port(0); + let broadcast_socket = UdpSocket::bind(local)?; let packet_recycler = packet::PacketRecycler::default(); let (packet_sender, packet_receiver) = channel(); @@ -88,7 +84,11 @@ impl Tpu { record_stage.entry_receiver, ); - let broadcast_socket = UdpSocket::bind(local)?; + let crdt = Arc::new(RwLock::new(Crdt::new(me))); + let t_gossip = Crdt::gossip(crdt.clone(), exit.clone()); + let window = streamer::default_window(); + let t_listen = Crdt::listen(crdt.clone(), window.clone(), gossip, exit.clone()); + let t_broadcast = streamer::broadcaster( broadcast_socket, exit.clone(), From ee3fb985ea98bb9433f923fd6f67e5c516943ce2 Mon Sep 17 00:00:00 2001 From: Greg Fitzgerald Date: Tue, 15 May 2018 09:42:28 -0600 Subject: [PATCH 04/18] Hoist set_timeout --- src/bin/testnode.rs | 4 ++++ src/streamer.rs | 4 ++-- src/thin_client.rs | 3 +++ src/tvu.rs | 3 +++ 4 files changed, 12 insertions(+), 2 deletions(-) diff --git a/src/bin/testnode.rs b/src/bin/testnode.rs index 51e4e4527b1205..715506263ca9b9 100644 --- a/src/bin/testnode.rs +++ b/src/bin/testnode.rs @@ -118,6 +118,10 @@ fn main() { let exit = Arc::new(AtomicBool::new(false)); let rpu = Rpu::new(bank, last_id, Some(Duration::from_millis(1000))); let serve_sock = UdpSocket::bind(&serve_addr).unwrap(); + serve_sock + .set_read_timeout(Some(Duration::new(1, 0))) + .unwrap(); + let gossip_sock = UdpSocket::bind(&gossip_addr).unwrap(); let replicate_sock = UdpSocket::bind(&replicate_addr).unwrap(); let _events_sock = UdpSocket::bind(&events_addr).unwrap(); diff --git a/src/streamer.rs b/src/streamer.rs index f3332fbc0830f5..7e08d16ce8495e 100644 --- a/src/streamer.rs +++ b/src/streamer.rs @@ -53,8 +53,6 @@ pub fn receiver( recycler: PacketRecycler, channel: PacketSender, ) -> Result> { - let timer = Duration::new(1, 0); - sock.set_read_timeout(Some(timer))?; Ok(spawn(move || { let _ = recv_loop(&sock, &exit, &recycler, &channel); () @@ -515,6 +513,8 @@ mod bench { } fn run_streamer_bench() -> Result<()> { let read = UdpSocket::bind("127.0.0.1:0")?; + read.set_read_timeout(Some(Duration::new(1, 0)))?; + let addr = read.local_addr()?; let exit = Arc::new(AtomicBool::new(false)); let pack_recycler = PacketRecycler::default(); diff --git a/src/thin_client.rs b/src/thin_client.rs index 01363c0e5a458c..53bf7032d08469 100644 --- a/src/thin_client.rs +++ b/src/thin_client.rs @@ -178,6 +178,7 @@ mod tests { logger::setup(); let gossip = UdpSocket::bind("0.0.0.0:0").unwrap(); let serve = UdpSocket::bind("0.0.0.0:0").unwrap(); + serve.set_read_timeout(Some(Duration::new(1, 0))).unwrap(); let _events_socket = UdpSocket::bind("0.0.0.0:0").unwrap(); let addr = serve.local_addr().unwrap(); let pubkey = KeyPair::new().pubkey(); @@ -274,6 +275,8 @@ mod tests { fn test_node() -> (ReplicatedData, UdpSocket, UdpSocket, UdpSocket, UdpSocket) { let gossip = UdpSocket::bind("0.0.0.0:0").unwrap(); let serve = UdpSocket::bind("0.0.0.0:0").unwrap(); + serve.set_read_timeout(Some(Duration::new(1, 0))).unwrap(); + let events_socket = UdpSocket::bind("0.0.0.0:0").unwrap(); let replicate = UdpSocket::bind("0.0.0.0:0").unwrap(); let pubkey = KeyPair::new().pubkey(); diff --git a/src/tvu.rs b/src/tvu.rs index d26171d7094588..8634a99de55631 100644 --- a/src/tvu.rs +++ b/src/tvu.rs @@ -212,6 +212,9 @@ pub fn test_node() -> (ReplicatedData, UdpSocket, UdpSocket, UdpSocket, UdpSocke let gossip = UdpSocket::bind("127.0.0.1:0").unwrap(); let replicate = UdpSocket::bind("127.0.0.1:0").unwrap(); let requests_socket = UdpSocket::bind("127.0.0.1:0").unwrap(); + requests_socket + .set_read_timeout(Some(Duration::new(1, 0))) + .unwrap(); let pubkey = KeyPair::new().pubkey(); let d = ReplicatedData::new( pubkey, From 7e44005a0fc19c8fb9ffdf56e27d777aa98cb20d Mon Sep 17 00:00:00 2001 From: Greg Fitzgerald Date: Tue, 15 May 2018 09:53:51 -0600 Subject: [PATCH 05/18] Don't do error-prone things in functions that spawn threads --- src/rpu.rs | 2 +- src/streamer.rs | 14 ++++++++------ src/tpu.rs | 2 +- src/tvu.rs | 2 +- 4 files changed, 11 insertions(+), 9 deletions(-) diff --git a/src/rpu.rs b/src/rpu.rs index df7adca90c1d50..329789a99156d9 100644 --- a/src/rpu.rs +++ b/src/rpu.rs @@ -60,7 +60,7 @@ impl Rpu { exit.clone(), packet_recycler.clone(), packet_sender, - )?; + ); let sig_verify_stage = SigVerifyStage::new(exit.clone(), packet_receiver); diff --git a/src/streamer.rs b/src/streamer.rs index 7e08d16ce8495e..4047cf3fb6a59f 100644 --- a/src/streamer.rs +++ b/src/streamer.rs @@ -51,12 +51,12 @@ pub fn receiver( sock: UdpSocket, exit: Arc, recycler: PacketRecycler, - channel: PacketSender, -) -> Result> { - Ok(spawn(move || { - let _ = recv_loop(&sock, &exit, &recycler, &channel); + packet_sender: PacketSender, +) -> JoinHandle<()> { + spawn(move || { + let _ = recv_loop(&sock, &exit, &recycler, &packet_sender); () - })) + }) } fn recv_send(sock: &UdpSocket, recycler: &BlobRecycler, r: &BlobReceiver) -> Result<()> { @@ -591,13 +591,15 @@ mod test { #[test] pub fn streamer_send_test() { let read = UdpSocket::bind("127.0.0.1:0").expect("bind"); + read.set_read_timeout(Some(Duration::new(1, 0))).unwrap(); + let addr = read.local_addr().unwrap(); let send = UdpSocket::bind("127.0.0.1:0").expect("bind"); let exit = Arc::new(AtomicBool::new(false)); let pack_recycler = PacketRecycler::default(); let resp_recycler = BlobRecycler::default(); let (s_reader, r_reader) = channel(); - let t_receiver = receiver(read, exit.clone(), pack_recycler.clone(), s_reader).unwrap(); + let t_receiver = receiver(read, exit.clone(), pack_recycler.clone(), s_reader); let (s_responder, r_responder) = channel(); let t_responder = responder(send, exit.clone(), resp_recycler.clone(), r_responder); let mut msgs = VecDeque::new(); diff --git a/src/tpu.rs b/src/tpu.rs index 455a32ad103b63..51e11d1289d604 100644 --- a/src/tpu.rs +++ b/src/tpu.rs @@ -58,7 +58,7 @@ impl Tpu { exit.clone(), packet_recycler.clone(), packet_sender, - )?; + ); let sig_verify_stage = SigVerifyStage::new(exit.clone(), packet_receiver); diff --git a/src/tvu.rs b/src/tvu.rs index 8634a99de55631..fece1980cf8aa0 100644 --- a/src/tvu.rs +++ b/src/tvu.rs @@ -156,7 +156,7 @@ impl Tvu { exit.clone(), packet_recycler.clone(), packet_sender, - )?; + ); let sig_verify_stage = SigVerifyStage::new(exit.clone(), packet_receiver); From 5f5be83a1798d6226b26dbd2b943295cf69a8900 Mon Sep 17 00:00:00 2001 From: Greg Fitzgerald Date: Tue, 15 May 2018 10:05:20 -0600 Subject: [PATCH 06/18] Hoist socket creation/configuration TODO: Add a library for socket configuration. --- src/bin/testnode.rs | 17 +++++++++++++++-- src/rpu.rs | 8 ++------ src/thin_client.rs | 39 +++++++++++++++++++++++++++++++++++++-- 3 files changed, 54 insertions(+), 10 deletions(-) diff --git a/src/bin/testnode.rs b/src/bin/testnode.rs index 715506263ca9b9..c1f32c68ca4226 100644 --- a/src/bin/testnode.rs +++ b/src/bin/testnode.rs @@ -132,9 +132,22 @@ fn main() { replicate_sock.local_addr().unwrap(), serve_sock.local_addr().unwrap(), ); + + let mut local = serve_sock.local_addr().unwrap(); + local.set_port(0); + let broadcast_socket = UdpSocket::bind(local).unwrap(); + let respond_socket = UdpSocket::bind(local.clone()).unwrap(); + eprintln!("starting server..."); - let threads = rpu.serve(d, serve_sock, gossip_sock, exit.clone(), stdout()) - .unwrap(); + let threads = rpu.serve( + d, + serve_sock, + broadcast_socket, + respond_socket, + gossip_sock, + exit.clone(), + stdout(), + ).unwrap(); eprintln!("Ready. Listening on {}", serve_addr); for t in threads { t.join().expect("join"); diff --git a/src/rpu.rs b/src/rpu.rs index 329789a99156d9..867c0bd766abd4 100644 --- a/src/rpu.rs +++ b/src/rpu.rs @@ -43,16 +43,12 @@ impl Rpu { &self, me: ReplicatedData, requests_socket: UdpSocket, + broadcast_socket: UdpSocket, + respond_socket: UdpSocket, gossip: UdpSocket, exit: Arc, writer: W, ) -> Result>> { - // make sure we are on the same interface - let mut local = requests_socket.local_addr()?; - local.set_port(0); - let broadcast_socket = UdpSocket::bind(local)?; - let respond_socket = UdpSocket::bind(local.clone())?; - let packet_recycler = packet::PacketRecycler::default(); let (packet_sender, packet_receiver) = channel(); let t_receiver = streamer::receiver( diff --git a/src/thin_client.rs b/src/thin_client.rs index 53bf7032d08469..426625f071e36e 100644 --- a/src/thin_client.rs +++ b/src/thin_client.rs @@ -194,7 +194,21 @@ mod tests { let bob_pubkey = KeyPair::new().pubkey(); let exit = Arc::new(AtomicBool::new(false)); let rpu = Rpu::new(bank, alice.last_id(), Some(Duration::from_millis(30))); - let threads = rpu.serve(d, serve, gossip, exit.clone(), sink()).unwrap(); + + let mut local = serve.local_addr().unwrap(); + local.set_port(0); + let broadcast_socket = UdpSocket::bind(local).unwrap(); + let respond_socket = UdpSocket::bind(local.clone()).unwrap(); + + let threads = rpu.serve( + d, + serve, + broadcast_socket, + respond_socket, + gossip, + exit.clone(), + sink(), + ).unwrap(); sleep(Duration::from_millis(900)); let requests_socket = UdpSocket::bind("0.0.0.0:0").unwrap(); @@ -233,9 +247,17 @@ mod tests { let exit = Arc::new(AtomicBool::new(false)); let rpu = Rpu::new(bank, alice.last_id(), Some(Duration::from_millis(30))); let serve_addr = leader_serve.local_addr().unwrap(); + + let mut local = leader_serve.local_addr().unwrap(); + local.set_port(0); + let broadcast_socket = UdpSocket::bind(local).unwrap(); + let respond_socket = UdpSocket::bind(local.clone()).unwrap(); + let threads = rpu.serve( leader_data, leader_serve, + broadcast_socket, + respond_socket, leader_gossip, exit.clone(), sink(), @@ -377,8 +399,21 @@ mod tests { Rpu::new(bank, alice.last_id(), None) }; + let mut local = leader.2.local_addr().unwrap(); + local.set_port(0); + let broadcast_socket = UdpSocket::bind(local).unwrap(); + let respond_socket = UdpSocket::bind(local.clone()).unwrap(); + let mut threads = leader_bank - .serve(leader.0.clone(), leader.2, leader.1, exit.clone(), sink()) + .serve( + leader.0.clone(), + leader.2, + broadcast_socket, + respond_socket, + leader.1, + exit.clone(), + sink(), + ) .unwrap(); for _ in 0..N { From 0aaa500f7c1202e71fb6b25e6c145d4d5a84a6f9 Mon Sep 17 00:00:00 2001 From: Greg Fitzgerald Date: Tue, 15 May 2018 10:09:54 -0600 Subject: [PATCH 07/18] Rpu/Tpu serve() functions now only spin up threads --- src/bin/testnode.rs | 2 +- src/rpu.rs | 5 ++--- src/thin_client.rs | 24 +++++++++++------------- src/tpu.rs | 10 +++------- 4 files changed, 17 insertions(+), 24 deletions(-) diff --git a/src/bin/testnode.rs b/src/bin/testnode.rs index c1f32c68ca4226..3d8e4f212615f9 100644 --- a/src/bin/testnode.rs +++ b/src/bin/testnode.rs @@ -147,7 +147,7 @@ fn main() { gossip_sock, exit.clone(), stdout(), - ).unwrap(); + ); eprintln!("Ready. Listening on {}", serve_addr); for t in threads { t.join().expect("join"); diff --git a/src/rpu.rs b/src/rpu.rs index 867c0bd766abd4..c6ee2d6f55d50c 100644 --- a/src/rpu.rs +++ b/src/rpu.rs @@ -8,7 +8,6 @@ use packet; use record_stage::RecordStage; use request_processor::RequestProcessor; use request_stage::RequestStage; -use result::Result; use sig_verify_stage::SigVerifyStage; use std::io::Write; use std::net::UdpSocket; @@ -48,7 +47,7 @@ impl Rpu { gossip: UdpSocket, exit: Arc, writer: W, - ) -> Result>> { + ) -> Vec> { let packet_recycler = packet::PacketRecycler::default(); let (packet_sender, packet_receiver) = channel(); let t_receiver = streamer::receiver( @@ -115,6 +114,6 @@ impl Rpu { t_broadcast, ]; threads.extend(sig_verify_stage.thread_hdls.into_iter()); - Ok(threads) + threads } } diff --git a/src/thin_client.rs b/src/thin_client.rs index 426625f071e36e..36a4464381391b 100644 --- a/src/thin_client.rs +++ b/src/thin_client.rs @@ -208,7 +208,7 @@ mod tests { gossip, exit.clone(), sink(), - ).unwrap(); + ); sleep(Duration::from_millis(900)); let requests_socket = UdpSocket::bind("0.0.0.0:0").unwrap(); @@ -261,7 +261,7 @@ mod tests { leader_gossip, exit.clone(), sink(), - ).unwrap(); + ); sleep(Duration::from_millis(300)); let requests_socket = UdpSocket::bind("127.0.0.1:0").unwrap(); @@ -404,17 +404,15 @@ mod tests { let broadcast_socket = UdpSocket::bind(local).unwrap(); let respond_socket = UdpSocket::bind(local.clone()).unwrap(); - let mut threads = leader_bank - .serve( - leader.0.clone(), - leader.2, - broadcast_socket, - respond_socket, - leader.1, - exit.clone(), - sink(), - ) - .unwrap(); + let mut threads = leader_bank.serve( + leader.0.clone(), + leader.2, + broadcast_socket, + respond_socket, + leader.1, + exit.clone(), + sink(), + ); for _ in 0..N { replicant(&leader.0, exit.clone(), &alice, &mut threads); diff --git a/src/tpu.rs b/src/tpu.rs index 51e11d1289d604..6b47207353d9b3 100644 --- a/src/tpu.rs +++ b/src/tpu.rs @@ -42,15 +42,11 @@ impl Tpu { &self, me: ReplicatedData, requests_socket: UdpSocket, + broadcast_socket: UdpSocket, gossip: UdpSocket, exit: Arc, writer: W, - ) -> Result>> { - // make sure we are on the same interface - let mut local = requests_socket.local_addr()?; - local.set_port(0); - let broadcast_socket = UdpSocket::bind(local)?; - + ) -> Vec> { let packet_recycler = packet::PacketRecycler::default(); let (packet_sender, packet_receiver) = channel(); let t_receiver = streamer::receiver( @@ -107,6 +103,6 @@ impl Tpu { t_broadcast, ]; threads.extend(sig_verify_stage.thread_hdls.into_iter()); - Ok(threads) + threads } } From 99dc4ea4a9fdfff3de52d06fe98dd2dbb5351374 Mon Sep 17 00:00:00 2001 From: Greg Fitzgerald Date: Tue, 15 May 2018 10:27:18 -0600 Subject: [PATCH 08/18] Spin up threads from Rpu/Tpu constructors --- src/bin/testnode.rs | 8 +++++--- src/rpu.rs | 28 ++++++++++++++++++++++++++++ src/thin_client.rs | 27 ++++++++++++++++----------- src/tpu.rs | 20 +++++++++++++++++++- 4 files changed, 68 insertions(+), 15 deletions(-) diff --git a/src/bin/testnode.rs b/src/bin/testnode.rs index 3d8e4f212615f9..17571783402b41 100644 --- a/src/bin/testnode.rs +++ b/src/bin/testnode.rs @@ -116,7 +116,6 @@ fn main() { eprintln!("creating networking stack..."); let exit = Arc::new(AtomicBool::new(false)); - let rpu = Rpu::new(bank, last_id, Some(Duration::from_millis(1000))); let serve_sock = UdpSocket::bind(&serve_addr).unwrap(); serve_sock .set_read_timeout(Some(Duration::new(1, 0))) @@ -139,7 +138,10 @@ fn main() { let respond_socket = UdpSocket::bind(local.clone()).unwrap(); eprintln!("starting server..."); - let threads = rpu.serve( + let rpu = Rpu::new1( + bank, + last_id, + Some(Duration::from_millis(1000)), d, serve_sock, broadcast_socket, @@ -149,7 +151,7 @@ fn main() { stdout(), ); eprintln!("Ready. Listening on {}", serve_addr); - for t in threads { + for t in rpu.thread_hdls { t.join().expect("join"); } } diff --git a/src/rpu.rs b/src/rpu.rs index c6ee2d6f55d50c..fa85f6b97d6e5a 100644 --- a/src/rpu.rs +++ b/src/rpu.rs @@ -23,6 +23,7 @@ pub struct Rpu { bank: Arc, start_hash: Hash, tick_duration: Option, + pub thread_hdls: Vec>, } impl Rpu { @@ -32,9 +33,36 @@ impl Rpu { bank: Arc::new(bank), start_hash, tick_duration, + thread_hdls: vec![], } } + pub fn new1( + bank: Bank, + start_hash: Hash, + tick_duration: Option, + me: ReplicatedData, + requests_socket: UdpSocket, + broadcast_socket: UdpSocket, + respond_socket: UdpSocket, + gossip: UdpSocket, + exit: Arc, + writer: W, + ) -> Self { + let mut rpu = Self::new(bank, start_hash, tick_duration); + let thread_hdls = rpu.serve( + me, + requests_socket, + broadcast_socket, + respond_socket, + gossip, + exit, + writer, + ); + rpu.thread_hdls.extend(thread_hdls); + rpu + } + /// Create a UDP microservice that forwards messages the given Rpu. /// This service is the network leader /// Set `exit` to shutdown its threads. diff --git a/src/thin_client.rs b/src/thin_client.rs index 36a4464381391b..3e3b8dadf579bb 100644 --- a/src/thin_client.rs +++ b/src/thin_client.rs @@ -193,14 +193,16 @@ mod tests { let bank = Bank::new(&alice); let bob_pubkey = KeyPair::new().pubkey(); let exit = Arc::new(AtomicBool::new(false)); - let rpu = Rpu::new(bank, alice.last_id(), Some(Duration::from_millis(30))); let mut local = serve.local_addr().unwrap(); local.set_port(0); let broadcast_socket = UdpSocket::bind(local).unwrap(); let respond_socket = UdpSocket::bind(local.clone()).unwrap(); - let threads = rpu.serve( + let rpu = Rpu::new1( + bank, + alice.last_id(), + Some(Duration::from_millis(30)), d, serve, broadcast_socket, @@ -232,7 +234,7 @@ mod tests { } assert_eq!(balance.unwrap(), 500); exit.store(true, Ordering::Relaxed); - for t in threads { + for t in rpu.thread_hdls { t.join().unwrap(); } } @@ -245,7 +247,6 @@ mod tests { let bank = Bank::new(&alice); let bob_pubkey = KeyPair::new().pubkey(); let exit = Arc::new(AtomicBool::new(false)); - let rpu = Rpu::new(bank, alice.last_id(), Some(Duration::from_millis(30))); let serve_addr = leader_serve.local_addr().unwrap(); let mut local = leader_serve.local_addr().unwrap(); @@ -253,7 +254,10 @@ mod tests { let broadcast_socket = UdpSocket::bind(local).unwrap(); let respond_socket = UdpSocket::bind(local.clone()).unwrap(); - let threads = rpu.serve( + let rpu = Rpu::new1( + bank, + alice.last_id(), + Some(Duration::from_millis(30)), leader_data, leader_serve, broadcast_socket, @@ -289,7 +293,7 @@ mod tests { trace!("exiting"); exit.store(true, Ordering::Relaxed); trace!("joining threads"); - for t in threads { + for t in rpu.thread_hdls { t.join().unwrap(); } } @@ -394,17 +398,17 @@ mod tests { let bob_pubkey = KeyPair::new().pubkey(); let exit = Arc::new(AtomicBool::new(false)); - let leader_bank = { - let bank = Bank::new(&alice); - Rpu::new(bank, alice.last_id(), None) - }; + let leader_bank = Bank::new(&alice); let mut local = leader.2.local_addr().unwrap(); local.set_port(0); let broadcast_socket = UdpSocket::bind(local).unwrap(); let respond_socket = UdpSocket::bind(local.clone()).unwrap(); - let mut threads = leader_bank.serve( + let rpu = Rpu::new1( + leader_bank, + alice.last_id(), + None, leader.0.clone(), leader.2, broadcast_socket, @@ -414,6 +418,7 @@ mod tests { sink(), ); + let mut threads = rpu.thread_hdls; for _ in 0..N { replicant(&leader.0, exit.clone(), &alice, &mut threads); } diff --git a/src/tpu.rs b/src/tpu.rs index 6b47207353d9b3..39f6da0559ffa7 100644 --- a/src/tpu.rs +++ b/src/tpu.rs @@ -7,7 +7,6 @@ use crdt::{Crdt, ReplicatedData}; use hash::Hash; use packet; use record_stage::RecordStage; -use result::Result; use sig_verify_stage::SigVerifyStage; use std::io::Write; use std::net::UdpSocket; @@ -23,6 +22,7 @@ pub struct Tpu { bank: Arc, start_hash: Hash, tick_duration: Option, + pub thread_hdls: Vec>, } impl Tpu { @@ -32,9 +32,27 @@ impl Tpu { bank: Arc::new(bank), start_hash, tick_duration, + thread_hdls: vec![], } } + pub fn new1( + bank: Bank, + start_hash: Hash, + tick_duration: Option, + me: ReplicatedData, + requests_socket: UdpSocket, + broadcast_socket: UdpSocket, + gossip: UdpSocket, + exit: Arc, + writer: W, + ) -> Self { + let mut tpu = Tpu::new(bank, start_hash, tick_duration); + let thread_hdls = tpu.serve(me, requests_socket, broadcast_socket, gossip, exit, writer); + tpu.thread_hdls.extend(thread_hdls); + tpu + } + /// Create a UDP microservice that forwards messages the given Tpu. /// This service is the network leader /// Set `exit` to shutdown its threads. From 9026c70952778ca2b8ffdc7f66ce96a9a106cfe1 Mon Sep 17 00:00:00 2001 From: Greg Fitzgerald Date: Tue, 15 May 2018 10:33:16 -0600 Subject: [PATCH 09/18] Inline Rpu::new --- src/rpu.rs | 16 ++++++---------- src/tpu.rs | 16 ++++++---------- 2 files changed, 12 insertions(+), 20 deletions(-) diff --git a/src/rpu.rs b/src/rpu.rs index fa85f6b97d6e5a..0f7d31674d9b1a 100644 --- a/src/rpu.rs +++ b/src/rpu.rs @@ -28,15 +28,6 @@ pub struct Rpu { impl Rpu { /// Create a new Rpu that wraps the given Bank. - pub fn new(bank: Bank, start_hash: Hash, tick_duration: Option) -> Self { - Rpu { - bank: Arc::new(bank), - start_hash, - tick_duration, - thread_hdls: vec![], - } - } - pub fn new1( bank: Bank, start_hash: Hash, @@ -49,7 +40,12 @@ impl Rpu { exit: Arc, writer: W, ) -> Self { - let mut rpu = Self::new(bank, start_hash, tick_duration); + let mut rpu = Rpu { + bank: Arc::new(bank), + start_hash, + tick_duration, + thread_hdls: vec![], + }; let thread_hdls = rpu.serve( me, requests_socket, diff --git a/src/tpu.rs b/src/tpu.rs index 39f6da0559ffa7..4908aeb7e2b3a5 100644 --- a/src/tpu.rs +++ b/src/tpu.rs @@ -27,15 +27,6 @@ pub struct Tpu { impl Tpu { /// Create a new Tpu that wraps the given Bank. - pub fn new(bank: Bank, start_hash: Hash, tick_duration: Option) -> Self { - Tpu { - bank: Arc::new(bank), - start_hash, - tick_duration, - thread_hdls: vec![], - } - } - pub fn new1( bank: Bank, start_hash: Hash, @@ -47,7 +38,12 @@ impl Tpu { exit: Arc, writer: W, ) -> Self { - let mut tpu = Tpu::new(bank, start_hash, tick_duration); + let mut tpu = Tpu { + bank: Arc::new(bank), + start_hash, + tick_duration, + thread_hdls: vec![], + }; let thread_hdls = tpu.serve(me, requests_socket, broadcast_socket, gossip, exit, writer); tpu.thread_hdls.extend(thread_hdls); tpu From 5456de63e96b4476f0af56c3d4e73d88f729cfb7 Mon Sep 17 00:00:00 2001 From: Greg Fitzgerald Date: Tue, 15 May 2018 10:38:17 -0600 Subject: [PATCH 10/18] Less state --- src/rpu.rs | 15 ++++++--------- src/tpu.rs | 24 ++++++++++++++---------- 2 files changed, 20 insertions(+), 19 deletions(-) diff --git a/src/rpu.rs b/src/rpu.rs index 0f7d31674d9b1a..17187bec74eeca 100644 --- a/src/rpu.rs +++ b/src/rpu.rs @@ -21,8 +21,6 @@ use write_stage::WriteStage; pub struct Rpu { bank: Arc, - start_hash: Hash, - tick_duration: Option, pub thread_hdls: Vec>, } @@ -42,11 +40,11 @@ impl Rpu { ) -> Self { let mut rpu = Rpu { bank: Arc::new(bank), - start_hash, - tick_duration, thread_hdls: vec![], }; let thread_hdls = rpu.serve( + start_hash, + tick_duration, me, requests_socket, broadcast_socket, @@ -64,6 +62,8 @@ impl Rpu { /// Set `exit` to shutdown its threads. pub fn serve( &self, + start_hash: Hash, + tick_duration: Option, me: ReplicatedData, requests_socket: UdpSocket, broadcast_socket: UdpSocket, @@ -93,11 +93,8 @@ impl Rpu { blob_recycler.clone(), ); - let record_stage = RecordStage::new( - request_stage.signal_receiver, - &self.start_hash, - self.tick_duration, - ); + let record_stage = + RecordStage::new(request_stage.signal_receiver, &start_hash, tick_duration); let write_stage = WriteStage::new( self.bank.clone(), diff --git a/src/tpu.rs b/src/tpu.rs index 4908aeb7e2b3a5..94b8e0f2569d7f 100644 --- a/src/tpu.rs +++ b/src/tpu.rs @@ -20,8 +20,6 @@ use write_stage::WriteStage; pub struct Tpu { bank: Arc, - start_hash: Hash, - tick_duration: Option, pub thread_hdls: Vec>, } @@ -40,11 +38,18 @@ impl Tpu { ) -> Self { let mut tpu = Tpu { bank: Arc::new(bank), - start_hash, - tick_duration, thread_hdls: vec![], }; - let thread_hdls = tpu.serve(me, requests_socket, broadcast_socket, gossip, exit, writer); + let thread_hdls = tpu.serve( + start_hash, + tick_duration, + me, + requests_socket, + broadcast_socket, + gossip, + exit, + writer, + ); tpu.thread_hdls.extend(thread_hdls); tpu } @@ -54,6 +59,8 @@ impl Tpu { /// Set `exit` to shutdown its threads. pub fn serve( &self, + start_hash: Hash, + tick_duration: Option, me: ReplicatedData, requests_socket: UdpSocket, broadcast_socket: UdpSocket, @@ -80,11 +87,8 @@ impl Tpu { packet_recycler.clone(), ); - let record_stage = RecordStage::new( - banking_stage.signal_receiver, - &self.start_hash, - self.tick_duration, - ); + let record_stage = + RecordStage::new(banking_stage.signal_receiver, &start_hash, tick_duration); let write_stage = WriteStage::new( self.bank.clone(), From 1bcf3891b43b11322423787d2e8d499a4b22fe8c Mon Sep 17 00:00:00 2001 From: Greg Fitzgerald Date: Tue, 15 May 2018 10:44:47 -0600 Subject: [PATCH 11/18] New TPU/RPU constructors --- src/rpu.rs | 47 +++++++---------------------------------------- src/tpu.rs | 44 ++++++-------------------------------------- 2 files changed, 13 insertions(+), 78 deletions(-) diff --git a/src/rpu.rs b/src/rpu.rs index 17187bec74eeca..6c1889863df107 100644 --- a/src/rpu.rs +++ b/src/rpu.rs @@ -20,12 +20,10 @@ use streamer; use write_stage::WriteStage; pub struct Rpu { - bank: Arc, pub thread_hdls: Vec>, } impl Rpu { - /// Create a new Rpu that wraps the given Bank. pub fn new1( bank: Bank, start_hash: Hash, @@ -38,40 +36,8 @@ impl Rpu { exit: Arc, writer: W, ) -> Self { - let mut rpu = Rpu { - bank: Arc::new(bank), - thread_hdls: vec![], - }; - let thread_hdls = rpu.serve( - start_hash, - tick_duration, - me, - requests_socket, - broadcast_socket, - respond_socket, - gossip, - exit, - writer, - ); - rpu.thread_hdls.extend(thread_hdls); - rpu - } + let bank = Arc::new(bank); - /// Create a UDP microservice that forwards messages the given Rpu. - /// This service is the network leader - /// Set `exit` to shutdown its threads. - pub fn serve( - &self, - start_hash: Hash, - tick_duration: Option, - me: ReplicatedData, - requests_socket: UdpSocket, - broadcast_socket: UdpSocket, - respond_socket: UdpSocket, - gossip: UdpSocket, - exit: Arc, - writer: W, - ) -> Vec> { let packet_recycler = packet::PacketRecycler::default(); let (packet_sender, packet_receiver) = channel(); let t_receiver = streamer::receiver( @@ -84,7 +50,7 @@ impl Rpu { let sig_verify_stage = SigVerifyStage::new(exit.clone(), packet_receiver); let blob_recycler = packet::BlobRecycler::default(); - let request_processor = RequestProcessor::new(self.bank.clone()); + let request_processor = RequestProcessor::new(bank.clone()); let request_stage = RequestStage::new( request_processor, exit.clone(), @@ -97,7 +63,7 @@ impl Rpu { RecordStage::new(request_stage.signal_receiver, &start_hash, tick_duration); let write_stage = WriteStage::new( - self.bank.clone(), + bank.clone(), exit.clone(), blob_recycler.clone(), Mutex::new(writer), @@ -125,7 +91,7 @@ impl Rpu { request_stage.blob_receiver, ); - let mut threads = vec![ + let mut thread_hdls = vec![ t_receiver, t_responder, request_stage.thread_hdl, @@ -134,7 +100,8 @@ impl Rpu { t_listen, t_broadcast, ]; - threads.extend(sig_verify_stage.thread_hdls.into_iter()); - threads + thread_hdls.extend(sig_verify_stage.thread_hdls.into_iter()); + + Rpu { thread_hdls } } } diff --git a/src/tpu.rs b/src/tpu.rs index 94b8e0f2569d7f..9d8862424fbd9f 100644 --- a/src/tpu.rs +++ b/src/tpu.rs @@ -19,12 +19,10 @@ use streamer; use write_stage::WriteStage; pub struct Tpu { - bank: Arc, pub thread_hdls: Vec>, } impl Tpu { - /// Create a new Tpu that wraps the given Bank. pub fn new1( bank: Bank, start_hash: Hash, @@ -36,38 +34,8 @@ impl Tpu { exit: Arc, writer: W, ) -> Self { - let mut tpu = Tpu { - bank: Arc::new(bank), - thread_hdls: vec![], - }; - let thread_hdls = tpu.serve( - start_hash, - tick_duration, - me, - requests_socket, - broadcast_socket, - gossip, - exit, - writer, - ); - tpu.thread_hdls.extend(thread_hdls); - tpu - } + let bank = Arc::new(bank); - /// Create a UDP microservice that forwards messages the given Tpu. - /// This service is the network leader - /// Set `exit` to shutdown its threads. - pub fn serve( - &self, - start_hash: Hash, - tick_duration: Option, - me: ReplicatedData, - requests_socket: UdpSocket, - broadcast_socket: UdpSocket, - gossip: UdpSocket, - exit: Arc, - writer: W, - ) -> Vec> { let packet_recycler = packet::PacketRecycler::default(); let (packet_sender, packet_receiver) = channel(); let t_receiver = streamer::receiver( @@ -81,7 +49,7 @@ impl Tpu { let blob_recycler = packet::BlobRecycler::default(); let banking_stage = BankingStage::new( - self.bank.clone(), + bank.clone(), exit.clone(), sig_verify_stage.verified_receiver, packet_recycler.clone(), @@ -91,7 +59,7 @@ impl Tpu { RecordStage::new(banking_stage.signal_receiver, &start_hash, tick_duration); let write_stage = WriteStage::new( - self.bank.clone(), + bank.clone(), exit.clone(), blob_recycler.clone(), Mutex::new(writer), @@ -112,7 +80,7 @@ impl Tpu { write_stage.blob_receiver, ); - let mut threads = vec![ + let mut thread_hdls = vec![ t_receiver, banking_stage.thread_hdl, write_stage.thread_hdl, @@ -120,7 +88,7 @@ impl Tpu { t_listen, t_broadcast, ]; - threads.extend(sig_verify_stage.thread_hdls.into_iter()); - threads + thread_hdls.extend(sig_verify_stage.thread_hdls.into_iter()); + Tpu { thread_hdls } } } From e9ee020b5f5b70b528999ba5ae83333b3e7b25d9 Mon Sep 17 00:00:00 2001 From: Greg Fitzgerald Date: Tue, 15 May 2018 10:45:33 -0600 Subject: [PATCH 12/18] Rename constructors --- src/bin/testnode.rs | 2 +- src/rpu.rs | 2 +- src/thin_client.rs | 6 +++--- src/tpu.rs | 2 +- 4 files changed, 6 insertions(+), 6 deletions(-) diff --git a/src/bin/testnode.rs b/src/bin/testnode.rs index 17571783402b41..e01a36c07a9020 100644 --- a/src/bin/testnode.rs +++ b/src/bin/testnode.rs @@ -138,7 +138,7 @@ fn main() { let respond_socket = UdpSocket::bind(local.clone()).unwrap(); eprintln!("starting server..."); - let rpu = Rpu::new1( + let rpu = Rpu::new( bank, last_id, Some(Duration::from_millis(1000)), diff --git a/src/rpu.rs b/src/rpu.rs index 6c1889863df107..851452b85ae893 100644 --- a/src/rpu.rs +++ b/src/rpu.rs @@ -24,7 +24,7 @@ pub struct Rpu { } impl Rpu { - pub fn new1( + pub fn new( bank: Bank, start_hash: Hash, tick_duration: Option, diff --git a/src/thin_client.rs b/src/thin_client.rs index 3e3b8dadf579bb..575db0c55a0c92 100644 --- a/src/thin_client.rs +++ b/src/thin_client.rs @@ -199,7 +199,7 @@ mod tests { let broadcast_socket = UdpSocket::bind(local).unwrap(); let respond_socket = UdpSocket::bind(local.clone()).unwrap(); - let rpu = Rpu::new1( + let rpu = Rpu::new( bank, alice.last_id(), Some(Duration::from_millis(30)), @@ -254,7 +254,7 @@ mod tests { let broadcast_socket = UdpSocket::bind(local).unwrap(); let respond_socket = UdpSocket::bind(local.clone()).unwrap(); - let rpu = Rpu::new1( + let rpu = Rpu::new( bank, alice.last_id(), Some(Duration::from_millis(30)), @@ -405,7 +405,7 @@ mod tests { let broadcast_socket = UdpSocket::bind(local).unwrap(); let respond_socket = UdpSocket::bind(local.clone()).unwrap(); - let rpu = Rpu::new1( + let rpu = Rpu::new( leader_bank, alice.last_id(), None, diff --git a/src/tpu.rs b/src/tpu.rs index 9d8862424fbd9f..e730f13d700f94 100644 --- a/src/tpu.rs +++ b/src/tpu.rs @@ -23,7 +23,7 @@ pub struct Tpu { } impl Tpu { - pub fn new1( + pub fn new( bank: Bank, start_hash: Hash, tick_duration: Option, From cfe8b3fc55c94c852777a5f000356be0098fa598 Mon Sep 17 00:00:00 2001 From: Greg Fitzgerald Date: Tue, 15 May 2018 11:00:01 -0600 Subject: [PATCH 13/18] Wrap the RPU with new object Server --- src/bin/testnode.rs | 6 +++--- src/lib.rs | 1 + src/server.rs | 47 +++++++++++++++++++++++++++++++++++++++++++++ src/thin_client.rs | 14 +++++++------- 4 files changed, 58 insertions(+), 10 deletions(-) create mode 100644 src/server.rs diff --git a/src/bin/testnode.rs b/src/bin/testnode.rs index e01a36c07a9020..41c6550dba5554 100644 --- a/src/bin/testnode.rs +++ b/src/bin/testnode.rs @@ -10,7 +10,7 @@ use solana::bank::Bank; use solana::crdt::ReplicatedData; use solana::entry::Entry; use solana::event::Event; -use solana::rpu::Rpu; +use solana::server::Server; use solana::signature::{KeyPair, KeyPairUtil}; use std::env; use std::io::{stdin, stdout, Read}; @@ -138,7 +138,7 @@ fn main() { let respond_socket = UdpSocket::bind(local.clone()).unwrap(); eprintln!("starting server..."); - let rpu = Rpu::new( + let server = Server::new( bank, last_id, Some(Duration::from_millis(1000)), @@ -151,7 +151,7 @@ fn main() { stdout(), ); eprintln!("Ready. Listening on {}", serve_addr); - for t in rpu.thread_hdls { + for t in server.thread_hdls { t.join().expect("join"); } } diff --git a/src/lib.rs b/src/lib.rs index 993b04cb8c22bf..7439d16a81d20f 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -21,6 +21,7 @@ pub mod request_processor; pub mod request_stage; pub mod result; pub mod rpu; +pub mod server; pub mod sig_verify_stage; pub mod signature; pub mod streamer; diff --git a/src/server.rs b/src/server.rs new file mode 100644 index 00000000000000..e8893d15bcc28e --- /dev/null +++ b/src/server.rs @@ -0,0 +1,47 @@ +//! The `server` module hosts all the server microservices. + +use bank::Bank; +use crdt::ReplicatedData; +use hash::Hash; +use rpu::Rpu; +use std::io::Write; +use std::net::UdpSocket; +use std::sync::Arc; +use std::sync::atomic::AtomicBool; +use std::thread::JoinHandle; +use std::time::Duration; + +pub struct Server { + pub thread_hdls: Vec>, +} + +impl Server { + pub fn new( + bank: Bank, + start_hash: Hash, + tick_duration: Option, + me: ReplicatedData, + requests_socket: UdpSocket, + broadcast_socket: UdpSocket, + respond_socket: UdpSocket, + gossip: UdpSocket, + exit: Arc, + writer: W, + ) -> Self { + let rpu = Rpu::new( + bank, + start_hash, + tick_duration, + me, + requests_socket, + broadcast_socket, + respond_socket, + gossip, + exit, + writer, + ); + Server { + thread_hdls: rpu.thread_hdls, + } + } +} diff --git a/src/thin_client.rs b/src/thin_client.rs index 575db0c55a0c92..28aff97c6cae85 100644 --- a/src/thin_client.rs +++ b/src/thin_client.rs @@ -160,7 +160,7 @@ mod tests { use logger; use mint::Mint; use plan::Plan; - use rpu::Rpu; + use server::Server; use signature::{KeyPair, KeyPairUtil}; use std::io::sink; use std::sync::atomic::{AtomicBool, Ordering}; @@ -199,7 +199,7 @@ mod tests { let broadcast_socket = UdpSocket::bind(local).unwrap(); let respond_socket = UdpSocket::bind(local.clone()).unwrap(); - let rpu = Rpu::new( + let server = Server::new( bank, alice.last_id(), Some(Duration::from_millis(30)), @@ -234,7 +234,7 @@ mod tests { } assert_eq!(balance.unwrap(), 500); exit.store(true, Ordering::Relaxed); - for t in rpu.thread_hdls { + for t in server.thread_hdls { t.join().unwrap(); } } @@ -254,7 +254,7 @@ mod tests { let broadcast_socket = UdpSocket::bind(local).unwrap(); let respond_socket = UdpSocket::bind(local.clone()).unwrap(); - let rpu = Rpu::new( + let server = Server::new( bank, alice.last_id(), Some(Duration::from_millis(30)), @@ -293,7 +293,7 @@ mod tests { trace!("exiting"); exit.store(true, Ordering::Relaxed); trace!("joining threads"); - for t in rpu.thread_hdls { + for t in server.thread_hdls { t.join().unwrap(); } } @@ -405,7 +405,7 @@ mod tests { let broadcast_socket = UdpSocket::bind(local).unwrap(); let respond_socket = UdpSocket::bind(local.clone()).unwrap(); - let rpu = Rpu::new( + let server = Server::new( leader_bank, alice.last_id(), None, @@ -418,7 +418,7 @@ mod tests { sink(), ); - let mut threads = rpu.thread_hdls; + let mut threads = server.thread_hdls; for _ in 0..N { replicant(&leader.0, exit.clone(), &alice, &mut threads); } From 3f38c0a245681dee4e6c93aeac2eb0beb288b238 Mon Sep 17 00:00:00 2001 From: Greg Fitzgerald Date: Tue, 15 May 2018 11:19:58 -0600 Subject: [PATCH 14/18] Feed events socket into the server --- src/bin/testnode.rs | 3 ++- src/server.rs | 1 + src/thin_client.rs | 20 +++++++++++++------- 3 files changed, 16 insertions(+), 8 deletions(-) diff --git a/src/bin/testnode.rs b/src/bin/testnode.rs index 41c6550dba5554..ca4a6f7a3412d7 100644 --- a/src/bin/testnode.rs +++ b/src/bin/testnode.rs @@ -123,7 +123,7 @@ fn main() { let gossip_sock = UdpSocket::bind(&gossip_addr).unwrap(); let replicate_sock = UdpSocket::bind(&replicate_addr).unwrap(); - let _events_sock = UdpSocket::bind(&events_addr).unwrap(); + let events_sock = UdpSocket::bind(&events_addr).unwrap(); let pubkey = KeyPair::new().pubkey(); let d = ReplicatedData::new( pubkey, @@ -144,6 +144,7 @@ fn main() { Some(Duration::from_millis(1000)), d, serve_sock, + events_sock, broadcast_socket, respond_socket, gossip_sock, diff --git a/src/server.rs b/src/server.rs index e8893d15bcc28e..7cdad5fe2e750d 100644 --- a/src/server.rs +++ b/src/server.rs @@ -22,6 +22,7 @@ impl Server { tick_duration: Option, me: ReplicatedData, requests_socket: UdpSocket, + _events_socket: UdpSocket, broadcast_socket: UdpSocket, respond_socket: UdpSocket, gossip: UdpSocket, diff --git a/src/thin_client.rs b/src/thin_client.rs index 28aff97c6cae85..30036c340481c6 100644 --- a/src/thin_client.rs +++ b/src/thin_client.rs @@ -177,16 +177,18 @@ mod tests { fn test_thin_client() { logger::setup(); let gossip = UdpSocket::bind("0.0.0.0:0").unwrap(); - let serve = UdpSocket::bind("0.0.0.0:0").unwrap(); - serve.set_read_timeout(Some(Duration::new(1, 0))).unwrap(); - let _events_socket = UdpSocket::bind("0.0.0.0:0").unwrap(); - let addr = serve.local_addr().unwrap(); + let requests_socket = UdpSocket::bind("0.0.0.0:0").unwrap(); + requests_socket + .set_read_timeout(Some(Duration::new(1, 0))) + .unwrap(); + let events_socket = UdpSocket::bind("0.0.0.0:0").unwrap(); + let addr = requests_socket.local_addr().unwrap(); let pubkey = KeyPair::new().pubkey(); let d = ReplicatedData::new( pubkey, gossip.local_addr().unwrap(), "0.0.0.0:0".parse().unwrap(), - serve.local_addr().unwrap(), + requests_socket.local_addr().unwrap(), ); let alice = Mint::new(10_000); @@ -194,7 +196,7 @@ mod tests { let bob_pubkey = KeyPair::new().pubkey(); let exit = Arc::new(AtomicBool::new(false)); - let mut local = serve.local_addr().unwrap(); + let mut local = requests_socket.local_addr().unwrap(); local.set_port(0); let broadcast_socket = UdpSocket::bind(local).unwrap(); let respond_socket = UdpSocket::bind(local.clone()).unwrap(); @@ -204,7 +206,8 @@ mod tests { alice.last_id(), Some(Duration::from_millis(30)), d, - serve, + requests_socket, + events_socket, broadcast_socket, respond_socket, gossip, @@ -251,6 +254,7 @@ mod tests { let mut local = leader_serve.local_addr().unwrap(); local.set_port(0); + let events_socket = UdpSocket::bind("0.0.0.0:0").unwrap(); let broadcast_socket = UdpSocket::bind(local).unwrap(); let respond_socket = UdpSocket::bind(local.clone()).unwrap(); @@ -260,6 +264,7 @@ mod tests { Some(Duration::from_millis(30)), leader_data, leader_serve, + events_socket, broadcast_socket, respond_socket, leader_gossip, @@ -411,6 +416,7 @@ mod tests { None, leader.0.clone(), leader.2, + leader.4, broadcast_socket, respond_socket, leader.1, From 5855e18a4e968ad3cbcae0f1e461cdd008439024 Mon Sep 17 00:00:00 2001 From: Greg Fitzgerald Date: Tue, 15 May 2018 11:21:40 -0600 Subject: [PATCH 15/18] Let server own the bank, not TPU/RPU --- src/rpu.rs | 4 +--- src/server.rs | 3 ++- src/tpu.rs | 4 +--- 3 files changed, 4 insertions(+), 7 deletions(-) diff --git a/src/rpu.rs b/src/rpu.rs index 851452b85ae893..4d6f5b9935a1e0 100644 --- a/src/rpu.rs +++ b/src/rpu.rs @@ -25,7 +25,7 @@ pub struct Rpu { impl Rpu { pub fn new( - bank: Bank, + bank: Arc, start_hash: Hash, tick_duration: Option, me: ReplicatedData, @@ -36,8 +36,6 @@ impl Rpu { exit: Arc, writer: W, ) -> Self { - let bank = Arc::new(bank); - let packet_recycler = packet::PacketRecycler::default(); let (packet_sender, packet_receiver) = channel(); let t_receiver = streamer::receiver( diff --git a/src/server.rs b/src/server.rs index 7cdad5fe2e750d..e9a473ac037e8d 100644 --- a/src/server.rs +++ b/src/server.rs @@ -29,8 +29,9 @@ impl Server { exit: Arc, writer: W, ) -> Self { + let bank = Arc::new(bank); let rpu = Rpu::new( - bank, + bank.clone(), start_hash, tick_duration, me, diff --git a/src/tpu.rs b/src/tpu.rs index e730f13d700f94..953d1db6b1a0a5 100644 --- a/src/tpu.rs +++ b/src/tpu.rs @@ -24,7 +24,7 @@ pub struct Tpu { impl Tpu { pub fn new( - bank: Bank, + bank: Arc, start_hash: Hash, tick_duration: Option, me: ReplicatedData, @@ -34,8 +34,6 @@ impl Tpu { exit: Arc, writer: W, ) -> Self { - let bank = Arc::new(bank); - let packet_recycler = packet::PacketRecycler::default(); let (packet_sender, packet_receiver) = channel(); let t_receiver = streamer::receiver( From b826f837f8d5a0439f7431c001f3edb38536adf8 Mon Sep 17 00:00:00 2001 From: Greg Fitzgerald Date: Tue, 15 May 2018 11:25:55 -0600 Subject: [PATCH 16/18] First attempt to pull TPU into the server --- src/server.rs | 23 +++++++++++++++++++---- src/tpu.rs | 4 ++-- 2 files changed, 21 insertions(+), 6 deletions(-) diff --git a/src/server.rs b/src/server.rs index e9a473ac037e8d..07b11e4262a814 100644 --- a/src/server.rs +++ b/src/server.rs @@ -10,6 +10,7 @@ use std::sync::Arc; use std::sync::atomic::AtomicBool; use std::thread::JoinHandle; use std::time::Duration; +//use tpu::Tpu; pub struct Server { pub thread_hdls: Vec>, @@ -30,6 +31,7 @@ impl Server { writer: W, ) -> Self { let bank = Arc::new(bank); + let mut thread_hdls = vec![]; let rpu = Rpu::new( bank.clone(), start_hash, @@ -39,11 +41,24 @@ impl Server { broadcast_socket, respond_socket, gossip, - exit, + exit.clone(), writer, ); - Server { - thread_hdls: rpu.thread_hdls, - } + thread_hdls.extend(rpu.thread_hdls); + + //let tpu = Tpu::new( + // bank.clone(), + // start_hash, + // tick_duration, + // me, + // events_socket, + // broadcast_socket, + // gossip, + // exit.clone(), + // writer, + //); + //thread_hdls.extend(tpu.thread_hdls); + + Server { thread_hdls } } } diff --git a/src/tpu.rs b/src/tpu.rs index 953d1db6b1a0a5..f128b551ebde38 100644 --- a/src/tpu.rs +++ b/src/tpu.rs @@ -28,7 +28,7 @@ impl Tpu { start_hash: Hash, tick_duration: Option, me: ReplicatedData, - requests_socket: UdpSocket, + events_socket: UdpSocket, broadcast_socket: UdpSocket, gossip: UdpSocket, exit: Arc, @@ -37,7 +37,7 @@ impl Tpu { let packet_recycler = packet::PacketRecycler::default(); let (packet_sender, packet_receiver) = channel(); let t_receiver = streamer::receiver( - requests_socket, + events_socket, exit.clone(), packet_recycler.clone(), packet_sender, From 6d4defdf96b7314b5faa4dfc425b7e6865d4b452 Mon Sep 17 00:00:00 2001 From: Greg Fitzgerald Date: Tue, 15 May 2018 11:33:43 -0600 Subject: [PATCH 17/18] Offload event processing to the TPU --- src/rpu.rs | 51 +++------------------------------------------- src/server.rs | 27 ++++++++---------------- src/thin_client.rs | 2 +- 3 files changed, 12 insertions(+), 68 deletions(-) diff --git a/src/rpu.rs b/src/rpu.rs index 4d6f5b9935a1e0..ba4c4efc6fb2a3 100644 --- a/src/rpu.rs +++ b/src/rpu.rs @@ -2,39 +2,27 @@ //! 5-stage transaction processing pipeline in software. use bank::Bank; -use crdt::{Crdt, ReplicatedData}; -use hash::Hash; use packet; -use record_stage::RecordStage; use request_processor::RequestProcessor; use request_stage::RequestStage; use sig_verify_stage::SigVerifyStage; -use std::io::Write; use std::net::UdpSocket; +use std::sync::Arc; use std::sync::atomic::AtomicBool; use std::sync::mpsc::channel; -use std::sync::{Arc, Mutex, RwLock}; use std::thread::JoinHandle; -use std::time::Duration; use streamer; -use write_stage::WriteStage; pub struct Rpu { pub thread_hdls: Vec>, } impl Rpu { - pub fn new( + pub fn new( bank: Arc, - start_hash: Hash, - tick_duration: Option, - me: ReplicatedData, requests_socket: UdpSocket, - broadcast_socket: UdpSocket, respond_socket: UdpSocket, - gossip: UdpSocket, exit: Arc, - writer: W, ) -> Self { let packet_recycler = packet::PacketRecycler::default(); let (packet_sender, packet_receiver) = channel(); @@ -57,31 +45,6 @@ impl Rpu { blob_recycler.clone(), ); - let record_stage = - RecordStage::new(request_stage.signal_receiver, &start_hash, tick_duration); - - let write_stage = WriteStage::new( - bank.clone(), - exit.clone(), - blob_recycler.clone(), - Mutex::new(writer), - record_stage.entry_receiver, - ); - - let crdt = Arc::new(RwLock::new(Crdt::new(me))); - let t_gossip = Crdt::gossip(crdt.clone(), exit.clone()); - let window = streamer::default_window(); - let t_listen = Crdt::listen(crdt.clone(), window.clone(), gossip, exit.clone()); - - let t_broadcast = streamer::broadcaster( - broadcast_socket, - exit.clone(), - crdt.clone(), - window, - blob_recycler.clone(), - write_stage.blob_receiver, - ); - let t_responder = streamer::responder( respond_socket, exit.clone(), @@ -89,15 +52,7 @@ impl Rpu { request_stage.blob_receiver, ); - let mut thread_hdls = vec![ - t_receiver, - t_responder, - request_stage.thread_hdl, - write_stage.thread_hdl, - t_gossip, - t_listen, - t_broadcast, - ]; + let mut thread_hdls = vec![t_receiver, t_responder, request_stage.thread_hdl]; thread_hdls.extend(sig_verify_stage.thread_hdls.into_iter()); Rpu { thread_hdls } diff --git a/src/server.rs b/src/server.rs index 07b11e4262a814..b250e86d84bf0a 100644 --- a/src/server.rs +++ b/src/server.rs @@ -10,7 +10,7 @@ use std::sync::Arc; use std::sync::atomic::AtomicBool; use std::thread::JoinHandle; use std::time::Duration; -//use tpu::Tpu; +use tpu::Tpu; pub struct Server { pub thread_hdls: Vec>, @@ -23,7 +23,7 @@ impl Server { tick_duration: Option, me: ReplicatedData, requests_socket: UdpSocket, - _events_socket: UdpSocket, + events_socket: UdpSocket, broadcast_socket: UdpSocket, respond_socket: UdpSocket, gossip: UdpSocket, @@ -32,32 +32,21 @@ impl Server { ) -> Self { let bank = Arc::new(bank); let mut thread_hdls = vec![]; - let rpu = Rpu::new( + let rpu = Rpu::new(bank.clone(), requests_socket, respond_socket, exit.clone()); + thread_hdls.extend(rpu.thread_hdls); + + let tpu = Tpu::new( bank.clone(), start_hash, tick_duration, me, - requests_socket, + events_socket, broadcast_socket, - respond_socket, gossip, exit.clone(), writer, ); - thread_hdls.extend(rpu.thread_hdls); - - //let tpu = Tpu::new( - // bank.clone(), - // start_hash, - // tick_duration, - // me, - // events_socket, - // broadcast_socket, - // gossip, - // exit.clone(), - // writer, - //); - //thread_hdls.extend(tpu.thread_hdls); + thread_hdls.extend(tpu.thread_hdls); Server { thread_hdls } } diff --git a/src/thin_client.rs b/src/thin_client.rs index 30036c340481c6..6214ef520ae1e6 100644 --- a/src/thin_client.rs +++ b/src/thin_client.rs @@ -69,7 +69,7 @@ impl ThinClient { pub fn transfer_signed(&self, tr: Transaction) -> io::Result { let req = Request::Transaction(tr); let data = serialize(&req).expect("serialize Transaction in pub fn transfer_signed"); - self.requests_socket.send_to(&data, &self.addr) + self.events_socket.send_to(&data, &self.addr) } /// Creates, signs, and processes a Transaction. Useful for writing unit-tests. From f7083e09234872ca85b3507353ef30bf8afd9688 Mon Sep 17 00:00:00 2001 From: Greg Fitzgerald Date: Tue, 15 May 2018 12:15:29 -0600 Subject: [PATCH 18/18] Remove transaction processing from RPU and request processing from TVU --- src/ecdsa.rs | 15 +++- src/request.rs | 21 +----- src/request_processor.rs | 69 ++++--------------- src/request_stage.rs | 145 +-------------------------------------- src/rpu.rs | 9 +-- src/sig_verify_stage.rs | 12 ++-- src/thin_client.rs | 5 +- src/tvu.rs | 23 ++----- 8 files changed, 46 insertions(+), 253 deletions(-) diff --git a/src/ecdsa.rs b/src/ecdsa.rs index b2477b67129a08..12ecb306ae1c06 100644 --- a/src/ecdsa.rs +++ b/src/ecdsa.rs @@ -136,14 +136,23 @@ pub fn ed25519_verify(batches: &Vec) -> Vec> { mod tests { use bincode::serialize; use ecdsa; + use event::Event; use packet::{Packet, Packets, SharedPackets}; - use request::Request; use std::sync::RwLock; use transaction::Transaction; - use transaction::test_tx; + use transaction::{memfind, test_tx}; + + #[test] + fn test_layout() { + let tr = test_tx(); + let tx = serialize(&tr).unwrap(); + let packet = serialize(&Event::Transaction(tr)).unwrap(); + assert_matches!(memfind(&packet, &tx), Some(ecdsa::TX_OFFSET)); + assert_matches!(memfind(&packet, &[0, 1, 2, 3, 4, 5, 6, 7, 8, 9]), None); + } fn make_packet_from_transaction(tr: Transaction) -> Packet { - let tx = serialize(&Request::Transaction(tr)).unwrap(); + let tx = serialize(&Event::Transaction(tr)).unwrap(); let mut packet = Packet::default(); packet.meta.size = tx.len(); packet.data[..packet.meta.size].copy_from_slice(&tx); diff --git a/src/request.rs b/src/request.rs index d1c692ae423bf4..1be8ee01dc79f7 100644 --- a/src/request.rs +++ b/src/request.rs @@ -5,12 +5,10 @@ use hash::Hash; use packet; use packet::SharedPackets; use signature::PublicKey; -use transaction::Transaction; #[cfg_attr(feature = "cargo-clippy", allow(large_enum_variant))] #[derive(Serialize, Deserialize, Debug, Clone)] pub enum Request { - Transaction(Transaction), GetBalance { key: PublicKey }, GetLastId, GetTransactionCount, @@ -19,10 +17,7 @@ pub enum Request { impl Request { /// Verify the request is valid. pub fn verify(&self) -> bool { - match *self { - Request::Transaction(ref tr) => tr.verify_plan(), - _ => true, - } + true } } @@ -54,24 +49,12 @@ pub fn to_request_packets(r: &packet::PacketRecycler, reqs: Vec) -> Vec #[cfg(test)] mod tests { - use bincode::serialize; - use ecdsa; use packet::{PacketRecycler, NUM_PACKETS}; use request::{to_request_packets, Request}; - use transaction::{memfind, test_tx}; - - #[test] - fn test_layout() { - let tr = test_tx(); - let tx = serialize(&tr).unwrap(); - let packet = serialize(&Request::Transaction(tr)).unwrap(); - assert_matches!(memfind(&packet, &tx), Some(ecdsa::TX_OFFSET)); - assert_matches!(memfind(&packet, &[0, 1, 2, 3, 4, 5, 6, 7, 8, 9]), None); - } #[test] fn test_to_packets() { - let tr = Request::Transaction(test_tx()); + let tr = Request::GetTransactionCount; let re = PacketRecycler::default(); let rv = to_request_packets(&re, vec![tr.clone(); 1]); assert_eq!(rv.len(), 1); diff --git a/src/request_processor.rs b/src/request_processor.rs index f070bbfe3d91d2..63f6c6ddb97863 100644 --- a/src/request_processor.rs +++ b/src/request_processor.rs @@ -6,14 +6,12 @@ use event::Event; use packet; use packet::SharedPackets; use rayon::prelude::*; -use recorder::Signal; use request::{Request, Response}; use result::Result; use std::collections::VecDeque; use std::net::SocketAddr; use std::sync::Arc; -use std::sync::mpsc::{Receiver, Sender}; -use std::time::Duration; +use std::sync::mpsc::Receiver; use std::time::Instant; use streamer; use timing; @@ -53,7 +51,6 @@ impl RequestProcessor { info!("Response::TransactionCount {:?}", rsp); Some(rsp) } - Request::Transaction(_) => unreachable!(), } } @@ -91,24 +88,6 @@ impl RequestProcessor { } /// Split Request list into verified transactions and the rest - fn partition_requests( - req_vers: Vec<(Request, SocketAddr, u8)>, - ) -> (Vec, Vec<(Request, SocketAddr)>) { - let mut events = vec![]; - let mut reqs = vec![]; - for (msg, rsp_addr, verify) in req_vers { - match msg { - Request::Transaction(tr) => { - if verify != 0 { - events.push(Event::Transaction(tr)); - } - } - _ => reqs.push((msg, rsp_addr)), - } - } - (events, reqs) - } - fn serialize_response( resp: Response, rsp_addr: SocketAddr, @@ -139,49 +118,29 @@ impl RequestProcessor { pub fn process_request_packets( &self, - verified_receiver: &Receiver)>>, - signal_sender: &Sender, + packet_receiver: &Receiver, blob_sender: &streamer::BlobSender, packet_recycler: &packet::PacketRecycler, blob_recycler: &packet::BlobRecycler, ) -> Result<()> { - let timer = Duration::new(1, 0); - let recv_start = Instant::now(); - let mms = verified_receiver.recv_timeout(timer)?; - let mut reqs_len = 0; - let mms_len = mms.len(); + let (batch, batch_len) = streamer::recv_batch(packet_receiver)?; + info!( - "@{:?} process start stalled for: {:?}ms batches: {}", + "@{:?} request_stage: processing: {}", timing::timestamp(), - timing::duration_as_ms(&recv_start.elapsed()), - mms.len(), + batch_len ); + + let mut reqs_len = 0; let proc_start = Instant::now(); - for (msgs, vers) in mms { - let reqs = Self::deserialize_requests(&msgs.read().unwrap()); - reqs_len += reqs.len(); - let req_vers = reqs.into_iter() - .zip(vers) - .filter_map(|(req, ver)| req.map(|(msg, addr)| (msg, addr, ver))) - .filter(|x| { - let v = x.0.verify(); - v - }) + for msgs in batch { + let reqs: Vec<_> = Self::deserialize_requests(&msgs.read().unwrap()) + .into_iter() + .filter_map(|x| x) .collect(); + reqs_len += reqs.len(); - debug!("partitioning"); - let (events, reqs) = Self::partition_requests(req_vers); - debug!("events: {} reqs: {}", events.len(), reqs.len()); - - debug!("process_events"); - let results = self.bank.process_verified_events(events); - let events = results.into_iter().filter_map(|x| x.ok()).collect(); - signal_sender.send(Signal::Events(events))?; - debug!("done process_events"); - - debug!("process_requests"); let rsps = self.process_requests(reqs); - debug!("done process_requests"); let blobs = Self::serialize_responses(rsps, blob_recycler)?; if !blobs.is_empty() { @@ -196,7 +155,7 @@ impl RequestProcessor { info!( "@{:?} done process batches: {} time: {:?}ms reqs: {} reqs/s: {}", timing::timestamp(), - mms_len, + batch_len, total_time_ms, reqs_len, (reqs_len as f32) / (total_time_s) diff --git a/src/request_stage.rs b/src/request_stage.rs index 8efc4f8237b70d..cd98a7d439bdd6 100644 --- a/src/request_stage.rs +++ b/src/request_stage.rs @@ -2,7 +2,6 @@ use packet; use packet::SharedPackets; -use recorder::Signal; use request_processor::RequestProcessor; use std::sync::Arc; use std::sync::atomic::{AtomicBool, Ordering}; @@ -12,7 +11,6 @@ use streamer; pub struct RequestStage { pub thread_hdl: JoinHandle<()>, - pub signal_receiver: Receiver, pub blob_receiver: streamer::BlobReceiver, pub request_processor: Arc, } @@ -21,18 +19,16 @@ impl RequestStage { pub fn new( request_processor: RequestProcessor, exit: Arc, - verified_receiver: Receiver)>>, + packet_receiver: Receiver, packet_recycler: packet::PacketRecycler, blob_recycler: packet::BlobRecycler, ) -> Self { let request_processor = Arc::new(request_processor); let request_processor_ = request_processor.clone(); - let (signal_sender, signal_receiver) = channel(); let (blob_sender, blob_receiver) = channel(); let thread_hdl = spawn(move || loop { let e = request_processor_.process_request_packets( - &verified_receiver, - &signal_sender, + &packet_receiver, &blob_sender, &packet_recycler, &blob_recycler, @@ -45,145 +41,8 @@ impl RequestStage { }); RequestStage { thread_hdl, - signal_receiver, blob_receiver, request_processor, } } } - -// TODO: When banking is pulled out of RequestStage, add this test back in. - -//use bank::Bank; -//use entry::Entry; -//use event::Event; -//use hash::Hash; -//use record_stage::RecordStage; -//use recorder::Signal; -//use result::Result; -//use std::sync::mpsc::{channel, Sender}; -//use std::sync::{Arc, Mutex}; -//use std::time::Duration; -// -//#[cfg(test)] -//mod tests { -// use bank::Bank; -// use event::Event; -// use event_processor::EventProcessor; -// use mint::Mint; -// use signature::{KeyPair, KeyPairUtil}; -// use transaction::Transaction; -// -// #[test] -// // TODO: Move this test banking_stage. Calling process_events() directly -// // defeats the purpose of this test. -// fn test_banking_sequential_consistency() { -// // In this attack we'll demonstrate that a verifier can interpret the ledger -// // differently if either the server doesn't signal the ledger to add an -// // Entry OR if the verifier tries to parallelize across multiple Entries. -// let mint = Mint::new(2); -// let bank = Bank::new(&mint); -// let event_processor = EventProcessor::new(bank, &mint.last_id(), None); -// -// // Process a batch that includes a transaction that receives two tokens. -// let alice = KeyPair::new(); -// let tr = Transaction::new(&mint.keypair(), alice.pubkey(), 2, mint.last_id()); -// let events = vec![Event::Transaction(tr)]; -// let entry0 = event_processor.process_events(events).unwrap(); -// -// // Process a second batch that spends one of those tokens. -// let tr = Transaction::new(&alice, mint.pubkey(), 1, mint.last_id()); -// let events = vec![Event::Transaction(tr)]; -// let entry1 = event_processor.process_events(events).unwrap(); -// -// // Collect the ledger and feed it to a new bank. -// let entries = vec![entry0, entry1]; -// -// // Assert the user holds one token, not two. If the server only output one -// // entry, then the second transaction will be rejected, because it drives -// // the account balance below zero before the credit is added. -// let bank = Bank::new(&mint); -// for entry in entries { -// assert!( -// bank -// .process_verified_events(entry.events) -// .into_iter() -// .all(|x| x.is_ok()) -// ); -// } -// assert_eq!(bank.get_balance(&alice.pubkey()), Some(1)); -// } -//} -// -//#[cfg(all(feature = "unstable", test))] -//mod bench { -// extern crate test; -// use self::test::Bencher; -// use bank::{Bank, MAX_ENTRY_IDS}; -// use bincode::serialize; -// use event_processor::*; -// use hash::hash; -// use mint::Mint; -// use rayon::prelude::*; -// use signature::{KeyPair, KeyPairUtil}; -// use std::collections::HashSet; -// use std::time::Instant; -// use transaction::Transaction; -// -// #[bench] -// fn process_events_bench(_bencher: &mut Bencher) { -// let mint = Mint::new(100_000_000); -// let bank = Bank::new(&mint); -// // Create transactions between unrelated parties. -// let txs = 100_000; -// let last_ids: Mutex> = Mutex::new(HashSet::new()); -// let transactions: Vec<_> = (0..txs) -// .into_par_iter() -// .map(|i| { -// // Seed the 'to' account and a cell for its signature. -// let dummy_id = i % (MAX_ENTRY_IDS as i32); -// let last_id = hash(&serialize(&dummy_id).unwrap()); // Semi-unique hash -// { -// let mut last_ids = last_ids.lock().unwrap(); -// if !last_ids.contains(&last_id) { -// last_ids.insert(last_id); -// bank.register_entry_id(&last_id); -// } -// } -// -// // Seed the 'from' account. -// let rando0 = KeyPair::new(); -// let tr = Transaction::new(&mint.keypair(), rando0.pubkey(), 1_000, last_id); -// bank.process_verified_transaction(&tr).unwrap(); -// -// let rando1 = KeyPair::new(); -// let tr = Transaction::new(&rando0, rando1.pubkey(), 2, last_id); -// bank.process_verified_transaction(&tr).unwrap(); -// -// // Finally, return a transaction that's unique -// Transaction::new(&rando0, rando1.pubkey(), 1, last_id) -// }) -// .collect(); -// -// let events: Vec<_> = transactions -// .into_iter() -// .map(|tr| Event::Transaction(tr)) -// .collect(); -// -// let event_processor = EventProcessor::new(bank, &mint.last_id(), None); -// -// let now = Instant::now(); -// assert!(event_processor.process_events(events).is_ok()); -// let duration = now.elapsed(); -// let sec = duration.as_secs() as f64 + duration.subsec_nanos() as f64 / 1_000_000_000.0; -// let tps = txs as f64 / sec; -// -// // Ensure that all transactions were successfully logged. -// drop(event_processor.historian_input); -// let entries: Vec = event_processor.output.lock().unwrap().iter().collect(); -// assert_eq!(entries.len(), 1); -// assert_eq!(entries[0].events.len(), txs as usize); -// -// println!("{} tps", tps); -// } -//} diff --git a/src/rpu.rs b/src/rpu.rs index ba4c4efc6fb2a3..eef1515d21a3ad 100644 --- a/src/rpu.rs +++ b/src/rpu.rs @@ -5,7 +5,6 @@ use bank::Bank; use packet; use request_processor::RequestProcessor; use request_stage::RequestStage; -use sig_verify_stage::SigVerifyStage; use std::net::UdpSocket; use std::sync::Arc; use std::sync::atomic::AtomicBool; @@ -33,14 +32,12 @@ impl Rpu { packet_sender, ); - let sig_verify_stage = SigVerifyStage::new(exit.clone(), packet_receiver); - let blob_recycler = packet::BlobRecycler::default(); let request_processor = RequestProcessor::new(bank.clone()); let request_stage = RequestStage::new( request_processor, exit.clone(), - sig_verify_stage.verified_receiver, + packet_receiver, packet_recycler.clone(), blob_recycler.clone(), ); @@ -52,9 +49,7 @@ impl Rpu { request_stage.blob_receiver, ); - let mut thread_hdls = vec![t_receiver, t_responder, request_stage.thread_hdl]; - thread_hdls.extend(sig_verify_stage.thread_hdls.into_iter()); - + let thread_hdls = vec![t_receiver, t_responder, request_stage.thread_hdl]; Rpu { thread_hdls } } } diff --git a/src/sig_verify_stage.rs b/src/sig_verify_stage.rs index 6528537c6a0512..ccb201e3aec723 100644 --- a/src/sig_verify_stage.rs +++ b/src/sig_verify_stage.rs @@ -18,9 +18,9 @@ pub struct SigVerifyStage { } impl SigVerifyStage { - pub fn new(exit: Arc, packets_receiver: Receiver) -> Self { + pub fn new(exit: Arc, packet_receiver: Receiver) -> Self { let (verified_sender, verified_receiver) = channel(); - let thread_hdls = Self::verifier_services(exit, packets_receiver, verified_sender); + let thread_hdls = Self::verifier_services(exit, packet_receiver, verified_sender); SigVerifyStage { thread_hdls, verified_receiver, @@ -71,11 +71,11 @@ impl SigVerifyStage { fn verifier_service( exit: Arc, - packets_receiver: Arc>, + packet_receiver: Arc>, verified_sender: Arc)>>>>, ) -> JoinHandle<()> { spawn(move || loop { - let e = Self::verifier(&packets_receiver.clone(), &verified_sender.clone()); + let e = Self::verifier(&packet_receiver.clone(), &verified_sender.clone()); if e.is_err() && exit.load(Ordering::Relaxed) { break; } @@ -84,11 +84,11 @@ impl SigVerifyStage { fn verifier_services( exit: Arc, - packets_receiver: streamer::PacketReceiver, + packet_receiver: streamer::PacketReceiver, verified_sender: Sender)>>, ) -> Vec> { let sender = Arc::new(Mutex::new(verified_sender)); - let receiver = Arc::new(Mutex::new(packets_receiver)); + let receiver = Arc::new(Mutex::new(packet_receiver)); (0..4) .map(|_| Self::verifier_service(exit.clone(), receiver.clone(), sender.clone())) .collect() diff --git a/src/thin_client.rs b/src/thin_client.rs index 6214ef520ae1e6..1979d42ff2adad 100644 --- a/src/thin_client.rs +++ b/src/thin_client.rs @@ -4,6 +4,7 @@ //! unstable and may change in future releases. use bincode::{deserialize, serialize}; +use event::Event; use futures::future::{ok, FutureResult}; use hash::Hash; use request::{Request, Response}; @@ -67,8 +68,8 @@ impl ThinClient { /// Send a signed Transaction to the server for processing. This method /// does not wait for a response. pub fn transfer_signed(&self, tr: Transaction) -> io::Result { - let req = Request::Transaction(tr); - let data = serialize(&req).expect("serialize Transaction in pub fn transfer_signed"); + let event = Event::Transaction(tr); + let data = serialize(&event).expect("serialize Transaction in pub fn transfer_signed"); self.events_socket.send_to(&data, &self.addr) } diff --git a/src/tvu.rs b/src/tvu.rs index fece1980cf8aa0..3a87a6dab4ef44 100644 --- a/src/tvu.rs +++ b/src/tvu.rs @@ -2,13 +2,12 @@ //! 5-stage transaction validation pipeline in software. use bank::Bank; +use banking_stage::BankingStage; use crdt::{Crdt, ReplicatedData}; use hash::Hash; use ledger; use packet; use record_stage::RecordStage; -use request_processor::RequestProcessor; -use request_stage::RequestStage; use result::Result; use sig_verify_stage::SigVerifyStage; use std::net::UdpSocket; @@ -146,10 +145,8 @@ impl Tvu { // make sure we are on the same interface let mut local = requests_socket.local_addr()?; local.set_port(0); - let respond_socket = UdpSocket::bind(local.clone())?; let packet_recycler = packet::PacketRecycler::default(); - let blob_recycler = packet::BlobRecycler::default(); let (packet_sender, packet_receiver) = channel(); let t_packet_receiver = streamer::receiver( requests_socket, @@ -160,17 +157,15 @@ impl Tvu { let sig_verify_stage = SigVerifyStage::new(exit.clone(), packet_receiver); - let request_processor = RequestProcessor::new(obj.bank.clone()); - let request_stage = RequestStage::new( - request_processor, + let banking_stage = BankingStage::new( + obj.bank.clone(), exit.clone(), sig_verify_stage.verified_receiver, packet_recycler.clone(), - blob_recycler.clone(), ); let record_stage = RecordStage::new( - request_stage.signal_receiver, + banking_stage.signal_receiver, &obj.start_hash, obj.tick_duration, ); @@ -178,13 +173,6 @@ impl Tvu { let write_stage = WriteStage::new_drain(obj.bank.clone(), exit.clone(), record_stage.entry_receiver); - let t_responder = streamer::responder( - respond_socket, - exit.clone(), - blob_recycler.clone(), - request_stage.blob_receiver, - ); - let mut threads = vec![ //replicate threads t_blob_receiver, @@ -195,8 +183,7 @@ impl Tvu { t_listen, //serve threads t_packet_receiver, - t_responder, - request_stage.thread_hdl, + banking_stage.thread_hdl, write_stage.thread_hdl, ]; threads.extend(sig_verify_stage.thread_hdls.into_iter());