diff --git a/src/banking_stage.rs b/src/banking_stage.rs new file mode 100644 index 00000000000000..cf72430c0a6040 --- /dev/null +++ b/src/banking_stage.rs @@ -0,0 +1,253 @@ +//! The `banking_stage` processes Event messages. + +use bank::Bank; +use bincode::deserialize; +use event::Event; +use packet; +use packet::SharedPackets; +use rayon::prelude::*; +use recorder::Signal; +use result::Result; +use std::net::SocketAddr; +use std::sync::Arc; +use std::sync::atomic::{AtomicBool, Ordering}; +use std::sync::mpsc::{channel, Receiver, Sender}; +use std::thread::{spawn, JoinHandle}; +use std::time::Duration; +use std::time::Instant; +use timing; + +pub struct BankingStage { + pub thread_hdl: JoinHandle<()>, + pub signal_receiver: Receiver, +} + +impl BankingStage { + pub fn new( + bank: Arc, + exit: Arc, + verified_receiver: Receiver)>>, + packet_recycler: packet::PacketRecycler, + ) -> Self { + let (signal_sender, signal_receiver) = channel(); + let thread_hdl = spawn(move || loop { + let e = Self::process_packets( + bank.clone(), + &verified_receiver, + &signal_sender, + &packet_recycler, + ); + if e.is_err() { + if exit.load(Ordering::Relaxed) { + break; + } + } + }); + BankingStage { + thread_hdl, + signal_receiver, + } + } + + fn deserialize_events(p: &packet::Packets) -> Vec> { + p.packets + .par_iter() + .map(|x| { + deserialize(&x.data[0..x.meta.size]) + .map(|req| (req, x.meta.addr())) + .ok() + }) + .collect() + } + + fn process_packets( + bank: Arc, + verified_receiver: &Receiver)>>, + signal_sender: &Sender, + packet_recycler: &packet::PacketRecycler, + ) -> Result<()> { + let timer = Duration::new(1, 0); + let recv_start = Instant::now(); + let mms = verified_receiver.recv_timeout(timer)?; + let mut reqs_len = 0; + let mms_len = mms.len(); + info!( + "@{:?} process start stalled for: {:?}ms batches: {}", + timing::timestamp(), + timing::duration_as_ms(&recv_start.elapsed()), + mms.len(), + ); + let proc_start = Instant::now(); + for (msgs, vers) in mms { + let events = Self::deserialize_events(&msgs.read().unwrap()); + reqs_len += events.len(); + let events = events + .into_iter() + .zip(vers) + .filter_map(|(event, ver)| match event { + None => None, + Some((event, _addr)) => if event.verify() && ver != 0 { + Some(event) + } else { + None + }, + }) + .collect(); + + debug!("process_events"); + let results = bank.process_verified_events(events); + let events = results.into_iter().filter_map(|x| x.ok()).collect(); + signal_sender.send(Signal::Events(events))?; + debug!("done process_events"); + + packet_recycler.recycle(msgs); + } + let total_time_s = timing::duration_as_s(&proc_start.elapsed()); + let total_time_ms = timing::duration_as_ms(&proc_start.elapsed()); + info!( + "@{:?} done processing event batches: {} time: {:?}ms reqs: {} reqs/s: {}", + timing::timestamp(), + mms_len, + total_time_ms, + reqs_len, + (reqs_len as f32) / (total_time_s) + ); + Ok(()) + } +} + +// TODO: When banking is pulled out of RequestStage, add this test back in. + +//use bank::Bank; +//use entry::Entry; +//use event::Event; +//use hash::Hash; +//use record_stage::RecordStage; +//use recorder::Signal; +//use result::Result; +//use std::sync::mpsc::{channel, Sender}; +//use std::sync::{Arc, Mutex}; +//use std::time::Duration; +// +//#[cfg(test)] +//mod tests { +// use bank::Bank; +// use event::Event; +// use event_processor::EventProcessor; +// use mint::Mint; +// use signature::{KeyPair, KeyPairUtil}; +// use transaction::Transaction; +// +// #[test] +// // TODO: Move this test banking_stage. Calling process_events() directly +// // defeats the purpose of this test. +// fn test_banking_sequential_consistency() { +// // In this attack we'll demonstrate that a verifier can interpret the ledger +// // differently if either the server doesn't signal the ledger to add an +// // Entry OR if the verifier tries to parallelize across multiple Entries. +// let mint = Mint::new(2); +// let bank = Bank::new(&mint); +// let event_processor = EventProcessor::new(bank, &mint.last_id(), None); +// +// // Process a batch that includes a transaction that receives two tokens. +// let alice = KeyPair::new(); +// let tr = Transaction::new(&mint.keypair(), alice.pubkey(), 2, mint.last_id()); +// let events = vec![Event::Transaction(tr)]; +// let entry0 = event_processor.process_events(events).unwrap(); +// +// // Process a second batch that spends one of those tokens. +// let tr = Transaction::new(&alice, mint.pubkey(), 1, mint.last_id()); +// let events = vec![Event::Transaction(tr)]; +// let entry1 = event_processor.process_events(events).unwrap(); +// +// // Collect the ledger and feed it to a new bank. +// let entries = vec![entry0, entry1]; +// +// // Assert the user holds one token, not two. If the server only output one +// // entry, then the second transaction will be rejected, because it drives +// // the account balance below zero before the credit is added. +// let bank = Bank::new(&mint); +// for entry in entries { +// assert!( +// bank +// .process_verified_events(entry.events) +// .into_iter() +// .all(|x| x.is_ok()) +// ); +// } +// assert_eq!(bank.get_balance(&alice.pubkey()), Some(1)); +// } +//} +// +//#[cfg(all(feature = "unstable", test))] +//mod bench { +// extern crate test; +// use self::test::Bencher; +// use bank::{Bank, MAX_ENTRY_IDS}; +// use bincode::serialize; +// use event_processor::*; +// use hash::hash; +// use mint::Mint; +// use rayon::prelude::*; +// use signature::{KeyPair, KeyPairUtil}; +// use std::collections::HashSet; +// use std::time::Instant; +// use transaction::Transaction; +// +// #[bench] +// fn process_events_bench(_bencher: &mut Bencher) { +// let mint = Mint::new(100_000_000); +// let bank = Bank::new(&mint); +// // Create transactions between unrelated parties. +// let txs = 100_000; +// let last_ids: Mutex> = Mutex::new(HashSet::new()); +// let transactions: Vec<_> = (0..txs) +// .into_par_iter() +// .map(|i| { +// // Seed the 'to' account and a cell for its signature. +// let dummy_id = i % (MAX_ENTRY_IDS as i32); +// let last_id = hash(&serialize(&dummy_id).unwrap()); // Semi-unique hash +// { +// let mut last_ids = last_ids.lock().unwrap(); +// if !last_ids.contains(&last_id) { +// last_ids.insert(last_id); +// bank.register_entry_id(&last_id); +// } +// } +// +// // Seed the 'from' account. +// let rando0 = KeyPair::new(); +// let tr = Transaction::new(&mint.keypair(), rando0.pubkey(), 1_000, last_id); +// bank.process_verified_transaction(&tr).unwrap(); +// +// let rando1 = KeyPair::new(); +// let tr = Transaction::new(&rando0, rando1.pubkey(), 2, last_id); +// bank.process_verified_transaction(&tr).unwrap(); +// +// // Finally, return a transaction that's unique +// Transaction::new(&rando0, rando1.pubkey(), 1, last_id) +// }) +// .collect(); +// +// let events: Vec<_> = transactions +// .into_iter() +// .map(|tr| Event::Transaction(tr)) +// .collect(); +// +// let event_processor = EventProcessor::new(bank, &mint.last_id(), None); +// +// let now = Instant::now(); +// assert!(event_processor.process_events(events).is_ok()); +// let duration = now.elapsed(); +// let sec = duration.as_secs() as f64 + duration.subsec_nanos() as f64 / 1_000_000_000.0; +// let tps = txs as f64 / sec; +// +// // Ensure that all transactions were successfully logged. +// drop(event_processor.historian_input); +// let entries: Vec = event_processor.output.lock().unwrap().iter().collect(); +// assert_eq!(entries.len(), 1); +// assert_eq!(entries[0].events.len(), txs as usize); +// +// println!("{} tps", tps); +// } +//} diff --git a/src/bin/testnode.rs b/src/bin/testnode.rs index 51e4e4527b1205..ca4a6f7a3412d7 100644 --- a/src/bin/testnode.rs +++ b/src/bin/testnode.rs @@ -10,7 +10,7 @@ use solana::bank::Bank; use solana::crdt::ReplicatedData; use solana::entry::Entry; use solana::event::Event; -use solana::rpu::Rpu; +use solana::server::Server; use solana::signature::{KeyPair, KeyPairUtil}; use std::env; use std::io::{stdin, stdout, Read}; @@ -116,11 +116,14 @@ fn main() { eprintln!("creating networking stack..."); let exit = Arc::new(AtomicBool::new(false)); - let rpu = Rpu::new(bank, last_id, Some(Duration::from_millis(1000))); let serve_sock = UdpSocket::bind(&serve_addr).unwrap(); + serve_sock + .set_read_timeout(Some(Duration::new(1, 0))) + .unwrap(); + let gossip_sock = UdpSocket::bind(&gossip_addr).unwrap(); let replicate_sock = UdpSocket::bind(&replicate_addr).unwrap(); - let _events_sock = UdpSocket::bind(&events_addr).unwrap(); + let events_sock = UdpSocket::bind(&events_addr).unwrap(); let pubkey = KeyPair::new().pubkey(); let d = ReplicatedData::new( pubkey, @@ -128,11 +131,28 @@ fn main() { replicate_sock.local_addr().unwrap(), serve_sock.local_addr().unwrap(), ); + + let mut local = serve_sock.local_addr().unwrap(); + local.set_port(0); + let broadcast_socket = UdpSocket::bind(local).unwrap(); + let respond_socket = UdpSocket::bind(local.clone()).unwrap(); + eprintln!("starting server..."); - let threads = rpu.serve(d, serve_sock, gossip_sock, exit.clone(), stdout()) - .unwrap(); + let server = Server::new( + bank, + last_id, + Some(Duration::from_millis(1000)), + d, + serve_sock, + events_sock, + broadcast_socket, + respond_socket, + gossip_sock, + exit.clone(), + stdout(), + ); eprintln!("Ready. Listening on {}", serve_addr); - for t in threads { + for t in server.thread_hdls { t.join().expect("join"); } } diff --git a/src/ecdsa.rs b/src/ecdsa.rs index b2477b67129a08..12ecb306ae1c06 100644 --- a/src/ecdsa.rs +++ b/src/ecdsa.rs @@ -136,14 +136,23 @@ pub fn ed25519_verify(batches: &Vec) -> Vec> { mod tests { use bincode::serialize; use ecdsa; + use event::Event; use packet::{Packet, Packets, SharedPackets}; - use request::Request; use std::sync::RwLock; use transaction::Transaction; - use transaction::test_tx; + use transaction::{memfind, test_tx}; + + #[test] + fn test_layout() { + let tr = test_tx(); + let tx = serialize(&tr).unwrap(); + let packet = serialize(&Event::Transaction(tr)).unwrap(); + assert_matches!(memfind(&packet, &tx), Some(ecdsa::TX_OFFSET)); + assert_matches!(memfind(&packet, &[0, 1, 2, 3, 4, 5, 6, 7, 8, 9]), None); + } fn make_packet_from_transaction(tr: Transaction) -> Packet { - let tx = serialize(&Request::Transaction(tr)).unwrap(); + let tx = serialize(&Event::Transaction(tr)).unwrap(); let mut packet = Packet::default(); packet.meta.size = tx.len(); packet.data[..packet.meta.size].copy_from_slice(&tx); diff --git a/src/lib.rs b/src/lib.rs index 1c14c9320f91aa..7439d16a81d20f 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -1,5 +1,6 @@ #![cfg_attr(feature = "unstable", feature(test))] pub mod bank; +pub mod banking_stage; pub mod crdt; pub mod ecdsa; pub mod entry; @@ -20,11 +21,13 @@ pub mod request_processor; pub mod request_stage; pub mod result; pub mod rpu; +pub mod server; pub mod sig_verify_stage; pub mod signature; pub mod streamer; pub mod thin_client; pub mod timing; +pub mod tpu; pub mod transaction; pub mod tvu; pub mod write_stage; diff --git a/src/request.rs b/src/request.rs index d1c692ae423bf4..1be8ee01dc79f7 100644 --- a/src/request.rs +++ b/src/request.rs @@ -5,12 +5,10 @@ use hash::Hash; use packet; use packet::SharedPackets; use signature::PublicKey; -use transaction::Transaction; #[cfg_attr(feature = "cargo-clippy", allow(large_enum_variant))] #[derive(Serialize, Deserialize, Debug, Clone)] pub enum Request { - Transaction(Transaction), GetBalance { key: PublicKey }, GetLastId, GetTransactionCount, @@ -19,10 +17,7 @@ pub enum Request { impl Request { /// Verify the request is valid. pub fn verify(&self) -> bool { - match *self { - Request::Transaction(ref tr) => tr.verify_plan(), - _ => true, - } + true } } @@ -54,24 +49,12 @@ pub fn to_request_packets(r: &packet::PacketRecycler, reqs: Vec) -> Vec #[cfg(test)] mod tests { - use bincode::serialize; - use ecdsa; use packet::{PacketRecycler, NUM_PACKETS}; use request::{to_request_packets, Request}; - use transaction::{memfind, test_tx}; - - #[test] - fn test_layout() { - let tr = test_tx(); - let tx = serialize(&tr).unwrap(); - let packet = serialize(&Request::Transaction(tr)).unwrap(); - assert_matches!(memfind(&packet, &tx), Some(ecdsa::TX_OFFSET)); - assert_matches!(memfind(&packet, &[0, 1, 2, 3, 4, 5, 6, 7, 8, 9]), None); - } #[test] fn test_to_packets() { - let tr = Request::Transaction(test_tx()); + let tr = Request::GetTransactionCount; let re = PacketRecycler::default(); let rv = to_request_packets(&re, vec![tr.clone(); 1]); assert_eq!(rv.len(), 1); diff --git a/src/request_processor.rs b/src/request_processor.rs index f070bbfe3d91d2..63f6c6ddb97863 100644 --- a/src/request_processor.rs +++ b/src/request_processor.rs @@ -6,14 +6,12 @@ use event::Event; use packet; use packet::SharedPackets; use rayon::prelude::*; -use recorder::Signal; use request::{Request, Response}; use result::Result; use std::collections::VecDeque; use std::net::SocketAddr; use std::sync::Arc; -use std::sync::mpsc::{Receiver, Sender}; -use std::time::Duration; +use std::sync::mpsc::Receiver; use std::time::Instant; use streamer; use timing; @@ -53,7 +51,6 @@ impl RequestProcessor { info!("Response::TransactionCount {:?}", rsp); Some(rsp) } - Request::Transaction(_) => unreachable!(), } } @@ -91,24 +88,6 @@ impl RequestProcessor { } /// Split Request list into verified transactions and the rest - fn partition_requests( - req_vers: Vec<(Request, SocketAddr, u8)>, - ) -> (Vec, Vec<(Request, SocketAddr)>) { - let mut events = vec![]; - let mut reqs = vec![]; - for (msg, rsp_addr, verify) in req_vers { - match msg { - Request::Transaction(tr) => { - if verify != 0 { - events.push(Event::Transaction(tr)); - } - } - _ => reqs.push((msg, rsp_addr)), - } - } - (events, reqs) - } - fn serialize_response( resp: Response, rsp_addr: SocketAddr, @@ -139,49 +118,29 @@ impl RequestProcessor { pub fn process_request_packets( &self, - verified_receiver: &Receiver)>>, - signal_sender: &Sender, + packet_receiver: &Receiver, blob_sender: &streamer::BlobSender, packet_recycler: &packet::PacketRecycler, blob_recycler: &packet::BlobRecycler, ) -> Result<()> { - let timer = Duration::new(1, 0); - let recv_start = Instant::now(); - let mms = verified_receiver.recv_timeout(timer)?; - let mut reqs_len = 0; - let mms_len = mms.len(); + let (batch, batch_len) = streamer::recv_batch(packet_receiver)?; + info!( - "@{:?} process start stalled for: {:?}ms batches: {}", + "@{:?} request_stage: processing: {}", timing::timestamp(), - timing::duration_as_ms(&recv_start.elapsed()), - mms.len(), + batch_len ); + + let mut reqs_len = 0; let proc_start = Instant::now(); - for (msgs, vers) in mms { - let reqs = Self::deserialize_requests(&msgs.read().unwrap()); - reqs_len += reqs.len(); - let req_vers = reqs.into_iter() - .zip(vers) - .filter_map(|(req, ver)| req.map(|(msg, addr)| (msg, addr, ver))) - .filter(|x| { - let v = x.0.verify(); - v - }) + for msgs in batch { + let reqs: Vec<_> = Self::deserialize_requests(&msgs.read().unwrap()) + .into_iter() + .filter_map(|x| x) .collect(); + reqs_len += reqs.len(); - debug!("partitioning"); - let (events, reqs) = Self::partition_requests(req_vers); - debug!("events: {} reqs: {}", events.len(), reqs.len()); - - debug!("process_events"); - let results = self.bank.process_verified_events(events); - let events = results.into_iter().filter_map(|x| x.ok()).collect(); - signal_sender.send(Signal::Events(events))?; - debug!("done process_events"); - - debug!("process_requests"); let rsps = self.process_requests(reqs); - debug!("done process_requests"); let blobs = Self::serialize_responses(rsps, blob_recycler)?; if !blobs.is_empty() { @@ -196,7 +155,7 @@ impl RequestProcessor { info!( "@{:?} done process batches: {} time: {:?}ms reqs: {} reqs/s: {}", timing::timestamp(), - mms_len, + batch_len, total_time_ms, reqs_len, (reqs_len as f32) / (total_time_s) diff --git a/src/request_stage.rs b/src/request_stage.rs index 8efc4f8237b70d..cd98a7d439bdd6 100644 --- a/src/request_stage.rs +++ b/src/request_stage.rs @@ -2,7 +2,6 @@ use packet; use packet::SharedPackets; -use recorder::Signal; use request_processor::RequestProcessor; use std::sync::Arc; use std::sync::atomic::{AtomicBool, Ordering}; @@ -12,7 +11,6 @@ use streamer; pub struct RequestStage { pub thread_hdl: JoinHandle<()>, - pub signal_receiver: Receiver, pub blob_receiver: streamer::BlobReceiver, pub request_processor: Arc, } @@ -21,18 +19,16 @@ impl RequestStage { pub fn new( request_processor: RequestProcessor, exit: Arc, - verified_receiver: Receiver)>>, + packet_receiver: Receiver, packet_recycler: packet::PacketRecycler, blob_recycler: packet::BlobRecycler, ) -> Self { let request_processor = Arc::new(request_processor); let request_processor_ = request_processor.clone(); - let (signal_sender, signal_receiver) = channel(); let (blob_sender, blob_receiver) = channel(); let thread_hdl = spawn(move || loop { let e = request_processor_.process_request_packets( - &verified_receiver, - &signal_sender, + &packet_receiver, &blob_sender, &packet_recycler, &blob_recycler, @@ -45,145 +41,8 @@ impl RequestStage { }); RequestStage { thread_hdl, - signal_receiver, blob_receiver, request_processor, } } } - -// TODO: When banking is pulled out of RequestStage, add this test back in. - -//use bank::Bank; -//use entry::Entry; -//use event::Event; -//use hash::Hash; -//use record_stage::RecordStage; -//use recorder::Signal; -//use result::Result; -//use std::sync::mpsc::{channel, Sender}; -//use std::sync::{Arc, Mutex}; -//use std::time::Duration; -// -//#[cfg(test)] -//mod tests { -// use bank::Bank; -// use event::Event; -// use event_processor::EventProcessor; -// use mint::Mint; -// use signature::{KeyPair, KeyPairUtil}; -// use transaction::Transaction; -// -// #[test] -// // TODO: Move this test banking_stage. Calling process_events() directly -// // defeats the purpose of this test. -// fn test_banking_sequential_consistency() { -// // In this attack we'll demonstrate that a verifier can interpret the ledger -// // differently if either the server doesn't signal the ledger to add an -// // Entry OR if the verifier tries to parallelize across multiple Entries. -// let mint = Mint::new(2); -// let bank = Bank::new(&mint); -// let event_processor = EventProcessor::new(bank, &mint.last_id(), None); -// -// // Process a batch that includes a transaction that receives two tokens. -// let alice = KeyPair::new(); -// let tr = Transaction::new(&mint.keypair(), alice.pubkey(), 2, mint.last_id()); -// let events = vec![Event::Transaction(tr)]; -// let entry0 = event_processor.process_events(events).unwrap(); -// -// // Process a second batch that spends one of those tokens. -// let tr = Transaction::new(&alice, mint.pubkey(), 1, mint.last_id()); -// let events = vec![Event::Transaction(tr)]; -// let entry1 = event_processor.process_events(events).unwrap(); -// -// // Collect the ledger and feed it to a new bank. -// let entries = vec![entry0, entry1]; -// -// // Assert the user holds one token, not two. If the server only output one -// // entry, then the second transaction will be rejected, because it drives -// // the account balance below zero before the credit is added. -// let bank = Bank::new(&mint); -// for entry in entries { -// assert!( -// bank -// .process_verified_events(entry.events) -// .into_iter() -// .all(|x| x.is_ok()) -// ); -// } -// assert_eq!(bank.get_balance(&alice.pubkey()), Some(1)); -// } -//} -// -//#[cfg(all(feature = "unstable", test))] -//mod bench { -// extern crate test; -// use self::test::Bencher; -// use bank::{Bank, MAX_ENTRY_IDS}; -// use bincode::serialize; -// use event_processor::*; -// use hash::hash; -// use mint::Mint; -// use rayon::prelude::*; -// use signature::{KeyPair, KeyPairUtil}; -// use std::collections::HashSet; -// use std::time::Instant; -// use transaction::Transaction; -// -// #[bench] -// fn process_events_bench(_bencher: &mut Bencher) { -// let mint = Mint::new(100_000_000); -// let bank = Bank::new(&mint); -// // Create transactions between unrelated parties. -// let txs = 100_000; -// let last_ids: Mutex> = Mutex::new(HashSet::new()); -// let transactions: Vec<_> = (0..txs) -// .into_par_iter() -// .map(|i| { -// // Seed the 'to' account and a cell for its signature. -// let dummy_id = i % (MAX_ENTRY_IDS as i32); -// let last_id = hash(&serialize(&dummy_id).unwrap()); // Semi-unique hash -// { -// let mut last_ids = last_ids.lock().unwrap(); -// if !last_ids.contains(&last_id) { -// last_ids.insert(last_id); -// bank.register_entry_id(&last_id); -// } -// } -// -// // Seed the 'from' account. -// let rando0 = KeyPair::new(); -// let tr = Transaction::new(&mint.keypair(), rando0.pubkey(), 1_000, last_id); -// bank.process_verified_transaction(&tr).unwrap(); -// -// let rando1 = KeyPair::new(); -// let tr = Transaction::new(&rando0, rando1.pubkey(), 2, last_id); -// bank.process_verified_transaction(&tr).unwrap(); -// -// // Finally, return a transaction that's unique -// Transaction::new(&rando0, rando1.pubkey(), 1, last_id) -// }) -// .collect(); -// -// let events: Vec<_> = transactions -// .into_iter() -// .map(|tr| Event::Transaction(tr)) -// .collect(); -// -// let event_processor = EventProcessor::new(bank, &mint.last_id(), None); -// -// let now = Instant::now(); -// assert!(event_processor.process_events(events).is_ok()); -// let duration = now.elapsed(); -// let sec = duration.as_secs() as f64 + duration.subsec_nanos() as f64 / 1_000_000_000.0; -// let tps = txs as f64 / sec; -// -// // Ensure that all transactions were successfully logged. -// drop(event_processor.historian_input); -// let entries: Vec = event_processor.output.lock().unwrap().iter().collect(); -// assert_eq!(entries.len(), 1); -// assert_eq!(entries[0].events.len(), txs as usize); -// -// println!("{} tps", tps); -// } -//} diff --git a/src/rpu.rs b/src/rpu.rs index d48dc1510f083d..eef1515d21a3ad 100644 --- a/src/rpu.rs +++ b/src/rpu.rs @@ -2,60 +2,27 @@ //! 5-stage transaction processing pipeline in software. use bank::Bank; -use crdt::{Crdt, ReplicatedData}; -use hash::Hash; use packet; -use record_stage::RecordStage; use request_processor::RequestProcessor; use request_stage::RequestStage; -use result::Result; -use sig_verify_stage::SigVerifyStage; -use std::io::Write; use std::net::UdpSocket; +use std::sync::Arc; use std::sync::atomic::AtomicBool; use std::sync::mpsc::channel; -use std::sync::{Arc, Mutex, RwLock}; use std::thread::JoinHandle; -use std::time::Duration; use streamer; -use write_stage::WriteStage; pub struct Rpu { - bank: Arc, - start_hash: Hash, - tick_duration: Option, + pub thread_hdls: Vec>, } impl Rpu { - /// Create a new Rpu that wraps the given Bank. - pub fn new(bank: Bank, start_hash: Hash, tick_duration: Option) -> Self { - Rpu { - bank: Arc::new(bank), - start_hash, - tick_duration, - } - } - - /// Create a UDP microservice that forwards messages the given Rpu. - /// This service is the network leader - /// Set `exit` to shutdown its threads. - pub fn serve( - &self, - me: ReplicatedData, + pub fn new( + bank: Arc, requests_socket: UdpSocket, - gossip: UdpSocket, + respond_socket: UdpSocket, exit: Arc, - writer: W, - ) -> Result>> { - let crdt = Arc::new(RwLock::new(Crdt::new(me))); - let t_gossip = Crdt::gossip(crdt.clone(), exit.clone()); - let window = streamer::default_window(); - let t_listen = Crdt::listen(crdt.clone(), window.clone(), gossip, exit.clone()); - - // make sure we are on the same interface - let mut local = requests_socket.local_addr()?; - local.set_port(0); - + ) -> Self { let packet_recycler = packet::PacketRecycler::default(); let (packet_sender, packet_receiver) = channel(); let t_receiver = streamer::receiver( @@ -63,45 +30,18 @@ impl Rpu { exit.clone(), packet_recycler.clone(), packet_sender, - )?; - - let sig_verify_stage = SigVerifyStage::new(exit.clone(), packet_receiver); + ); let blob_recycler = packet::BlobRecycler::default(); - let request_processor = RequestProcessor::new(self.bank.clone()); + let request_processor = RequestProcessor::new(bank.clone()); let request_stage = RequestStage::new( request_processor, exit.clone(), - sig_verify_stage.verified_receiver, + packet_receiver, packet_recycler.clone(), blob_recycler.clone(), ); - let record_stage = RecordStage::new( - request_stage.signal_receiver, - &self.start_hash, - self.tick_duration, - ); - - let write_stage = WriteStage::new( - self.bank.clone(), - exit.clone(), - blob_recycler.clone(), - Mutex::new(writer), - record_stage.entry_receiver, - ); - - let broadcast_socket = UdpSocket::bind(local)?; - let t_broadcast = streamer::broadcaster( - broadcast_socket, - exit.clone(), - crdt.clone(), - window, - blob_recycler.clone(), - write_stage.blob_receiver, - ); - - let respond_socket = UdpSocket::bind(local.clone())?; let t_responder = streamer::responder( respond_socket, exit.clone(), @@ -109,16 +49,7 @@ impl Rpu { request_stage.blob_receiver, ); - let mut threads = vec![ - t_receiver, - t_responder, - request_stage.thread_hdl, - write_stage.thread_hdl, - t_gossip, - t_listen, - t_broadcast, - ]; - threads.extend(sig_verify_stage.thread_hdls.into_iter()); - Ok(threads) + let thread_hdls = vec![t_receiver, t_responder, request_stage.thread_hdl]; + Rpu { thread_hdls } } } diff --git a/src/server.rs b/src/server.rs new file mode 100644 index 00000000000000..b250e86d84bf0a --- /dev/null +++ b/src/server.rs @@ -0,0 +1,53 @@ +//! The `server` module hosts all the server microservices. + +use bank::Bank; +use crdt::ReplicatedData; +use hash::Hash; +use rpu::Rpu; +use std::io::Write; +use std::net::UdpSocket; +use std::sync::Arc; +use std::sync::atomic::AtomicBool; +use std::thread::JoinHandle; +use std::time::Duration; +use tpu::Tpu; + +pub struct Server { + pub thread_hdls: Vec>, +} + +impl Server { + pub fn new( + bank: Bank, + start_hash: Hash, + tick_duration: Option, + me: ReplicatedData, + requests_socket: UdpSocket, + events_socket: UdpSocket, + broadcast_socket: UdpSocket, + respond_socket: UdpSocket, + gossip: UdpSocket, + exit: Arc, + writer: W, + ) -> Self { + let bank = Arc::new(bank); + let mut thread_hdls = vec![]; + let rpu = Rpu::new(bank.clone(), requests_socket, respond_socket, exit.clone()); + thread_hdls.extend(rpu.thread_hdls); + + let tpu = Tpu::new( + bank.clone(), + start_hash, + tick_duration, + me, + events_socket, + broadcast_socket, + gossip, + exit.clone(), + writer, + ); + thread_hdls.extend(tpu.thread_hdls); + + Server { thread_hdls } + } +} diff --git a/src/sig_verify_stage.rs b/src/sig_verify_stage.rs index 6528537c6a0512..ccb201e3aec723 100644 --- a/src/sig_verify_stage.rs +++ b/src/sig_verify_stage.rs @@ -18,9 +18,9 @@ pub struct SigVerifyStage { } impl SigVerifyStage { - pub fn new(exit: Arc, packets_receiver: Receiver) -> Self { + pub fn new(exit: Arc, packet_receiver: Receiver) -> Self { let (verified_sender, verified_receiver) = channel(); - let thread_hdls = Self::verifier_services(exit, packets_receiver, verified_sender); + let thread_hdls = Self::verifier_services(exit, packet_receiver, verified_sender); SigVerifyStage { thread_hdls, verified_receiver, @@ -71,11 +71,11 @@ impl SigVerifyStage { fn verifier_service( exit: Arc, - packets_receiver: Arc>, + packet_receiver: Arc>, verified_sender: Arc)>>>>, ) -> JoinHandle<()> { spawn(move || loop { - let e = Self::verifier(&packets_receiver.clone(), &verified_sender.clone()); + let e = Self::verifier(&packet_receiver.clone(), &verified_sender.clone()); if e.is_err() && exit.load(Ordering::Relaxed) { break; } @@ -84,11 +84,11 @@ impl SigVerifyStage { fn verifier_services( exit: Arc, - packets_receiver: streamer::PacketReceiver, + packet_receiver: streamer::PacketReceiver, verified_sender: Sender)>>, ) -> Vec> { let sender = Arc::new(Mutex::new(verified_sender)); - let receiver = Arc::new(Mutex::new(packets_receiver)); + let receiver = Arc::new(Mutex::new(packet_receiver)); (0..4) .map(|_| Self::verifier_service(exit.clone(), receiver.clone(), sender.clone())) .collect() diff --git a/src/streamer.rs b/src/streamer.rs index f3332fbc0830f5..4047cf3fb6a59f 100644 --- a/src/streamer.rs +++ b/src/streamer.rs @@ -51,14 +51,12 @@ pub fn receiver( sock: UdpSocket, exit: Arc, recycler: PacketRecycler, - channel: PacketSender, -) -> Result> { - let timer = Duration::new(1, 0); - sock.set_read_timeout(Some(timer))?; - Ok(spawn(move || { - let _ = recv_loop(&sock, &exit, &recycler, &channel); + packet_sender: PacketSender, +) -> JoinHandle<()> { + spawn(move || { + let _ = recv_loop(&sock, &exit, &recycler, &packet_sender); () - })) + }) } fn recv_send(sock: &UdpSocket, recycler: &BlobRecycler, r: &BlobReceiver) -> Result<()> { @@ -515,6 +513,8 @@ mod bench { } fn run_streamer_bench() -> Result<()> { let read = UdpSocket::bind("127.0.0.1:0")?; + read.set_read_timeout(Some(Duration::new(1, 0)))?; + let addr = read.local_addr()?; let exit = Arc::new(AtomicBool::new(false)); let pack_recycler = PacketRecycler::default(); @@ -591,13 +591,15 @@ mod test { #[test] pub fn streamer_send_test() { let read = UdpSocket::bind("127.0.0.1:0").expect("bind"); + read.set_read_timeout(Some(Duration::new(1, 0))).unwrap(); + let addr = read.local_addr().unwrap(); let send = UdpSocket::bind("127.0.0.1:0").expect("bind"); let exit = Arc::new(AtomicBool::new(false)); let pack_recycler = PacketRecycler::default(); let resp_recycler = BlobRecycler::default(); let (s_reader, r_reader) = channel(); - let t_receiver = receiver(read, exit.clone(), pack_recycler.clone(), s_reader).unwrap(); + let t_receiver = receiver(read, exit.clone(), pack_recycler.clone(), s_reader); let (s_responder, r_responder) = channel(); let t_responder = responder(send, exit.clone(), resp_recycler.clone(), r_responder); let mut msgs = VecDeque::new(); diff --git a/src/thin_client.rs b/src/thin_client.rs index 01363c0e5a458c..1979d42ff2adad 100644 --- a/src/thin_client.rs +++ b/src/thin_client.rs @@ -4,6 +4,7 @@ //! unstable and may change in future releases. use bincode::{deserialize, serialize}; +use event::Event; use futures::future::{ok, FutureResult}; use hash::Hash; use request::{Request, Response}; @@ -67,9 +68,9 @@ impl ThinClient { /// Send a signed Transaction to the server for processing. This method /// does not wait for a response. pub fn transfer_signed(&self, tr: Transaction) -> io::Result { - let req = Request::Transaction(tr); - let data = serialize(&req).expect("serialize Transaction in pub fn transfer_signed"); - self.requests_socket.send_to(&data, &self.addr) + let event = Event::Transaction(tr); + let data = serialize(&event).expect("serialize Transaction in pub fn transfer_signed"); + self.events_socket.send_to(&data, &self.addr) } /// Creates, signs, and processes a Transaction. Useful for writing unit-tests. @@ -160,7 +161,7 @@ mod tests { use logger; use mint::Mint; use plan::Plan; - use rpu::Rpu; + use server::Server; use signature::{KeyPair, KeyPairUtil}; use std::io::sink; use std::sync::atomic::{AtomicBool, Ordering}; @@ -177,23 +178,43 @@ mod tests { fn test_thin_client() { logger::setup(); let gossip = UdpSocket::bind("0.0.0.0:0").unwrap(); - let serve = UdpSocket::bind("0.0.0.0:0").unwrap(); - let _events_socket = UdpSocket::bind("0.0.0.0:0").unwrap(); - let addr = serve.local_addr().unwrap(); + let requests_socket = UdpSocket::bind("0.0.0.0:0").unwrap(); + requests_socket + .set_read_timeout(Some(Duration::new(1, 0))) + .unwrap(); + let events_socket = UdpSocket::bind("0.0.0.0:0").unwrap(); + let addr = requests_socket.local_addr().unwrap(); let pubkey = KeyPair::new().pubkey(); let d = ReplicatedData::new( pubkey, gossip.local_addr().unwrap(), "0.0.0.0:0".parse().unwrap(), - serve.local_addr().unwrap(), + requests_socket.local_addr().unwrap(), ); let alice = Mint::new(10_000); let bank = Bank::new(&alice); let bob_pubkey = KeyPair::new().pubkey(); let exit = Arc::new(AtomicBool::new(false)); - let rpu = Rpu::new(bank, alice.last_id(), Some(Duration::from_millis(30))); - let threads = rpu.serve(d, serve, gossip, exit.clone(), sink()).unwrap(); + + let mut local = requests_socket.local_addr().unwrap(); + local.set_port(0); + let broadcast_socket = UdpSocket::bind(local).unwrap(); + let respond_socket = UdpSocket::bind(local.clone()).unwrap(); + + let server = Server::new( + bank, + alice.last_id(), + Some(Duration::from_millis(30)), + d, + requests_socket, + events_socket, + broadcast_socket, + respond_socket, + gossip, + exit.clone(), + sink(), + ); sleep(Duration::from_millis(900)); let requests_socket = UdpSocket::bind("0.0.0.0:0").unwrap(); @@ -217,7 +238,7 @@ mod tests { } assert_eq!(balance.unwrap(), 500); exit.store(true, Ordering::Relaxed); - for t in threads { + for t in server.thread_hdls { t.join().unwrap(); } } @@ -230,15 +251,27 @@ mod tests { let bank = Bank::new(&alice); let bob_pubkey = KeyPair::new().pubkey(); let exit = Arc::new(AtomicBool::new(false)); - let rpu = Rpu::new(bank, alice.last_id(), Some(Duration::from_millis(30))); let serve_addr = leader_serve.local_addr().unwrap(); - let threads = rpu.serve( + + let mut local = leader_serve.local_addr().unwrap(); + local.set_port(0); + let events_socket = UdpSocket::bind("0.0.0.0:0").unwrap(); + let broadcast_socket = UdpSocket::bind(local).unwrap(); + let respond_socket = UdpSocket::bind(local.clone()).unwrap(); + + let server = Server::new( + bank, + alice.last_id(), + Some(Duration::from_millis(30)), leader_data, leader_serve, + events_socket, + broadcast_socket, + respond_socket, leader_gossip, exit.clone(), sink(), - ).unwrap(); + ); sleep(Duration::from_millis(300)); let requests_socket = UdpSocket::bind("127.0.0.1:0").unwrap(); @@ -266,7 +299,7 @@ mod tests { trace!("exiting"); exit.store(true, Ordering::Relaxed); trace!("joining threads"); - for t in threads { + for t in server.thread_hdls { t.join().unwrap(); } } @@ -274,6 +307,8 @@ mod tests { fn test_node() -> (ReplicatedData, UdpSocket, UdpSocket, UdpSocket, UdpSocket) { let gossip = UdpSocket::bind("0.0.0.0:0").unwrap(); let serve = UdpSocket::bind("0.0.0.0:0").unwrap(); + serve.set_read_timeout(Some(Duration::new(1, 0))).unwrap(); + let events_socket = UdpSocket::bind("0.0.0.0:0").unwrap(); let replicate = UdpSocket::bind("0.0.0.0:0").unwrap(); let pubkey = KeyPair::new().pubkey(); @@ -369,15 +404,28 @@ mod tests { let bob_pubkey = KeyPair::new().pubkey(); let exit = Arc::new(AtomicBool::new(false)); - let leader_bank = { - let bank = Bank::new(&alice); - Rpu::new(bank, alice.last_id(), None) - }; - - let mut threads = leader_bank - .serve(leader.0.clone(), leader.2, leader.1, exit.clone(), sink()) - .unwrap(); + let leader_bank = Bank::new(&alice); + + let mut local = leader.2.local_addr().unwrap(); + local.set_port(0); + let broadcast_socket = UdpSocket::bind(local).unwrap(); + let respond_socket = UdpSocket::bind(local.clone()).unwrap(); + + let server = Server::new( + leader_bank, + alice.last_id(), + None, + leader.0.clone(), + leader.2, + leader.4, + broadcast_socket, + respond_socket, + leader.1, + exit.clone(), + sink(), + ); + let mut threads = server.thread_hdls; for _ in 0..N { replicant(&leader.0, exit.clone(), &alice, &mut threads); } diff --git a/src/tpu.rs b/src/tpu.rs new file mode 100644 index 00000000000000..f128b551ebde38 --- /dev/null +++ b/src/tpu.rs @@ -0,0 +1,92 @@ +//! The `tpu` module implements the Transaction Processing Unit, a +//! 5-stage transaction processing pipeline in software. + +use bank::Bank; +use banking_stage::BankingStage; +use crdt::{Crdt, ReplicatedData}; +use hash::Hash; +use packet; +use record_stage::RecordStage; +use sig_verify_stage::SigVerifyStage; +use std::io::Write; +use std::net::UdpSocket; +use std::sync::atomic::AtomicBool; +use std::sync::mpsc::channel; +use std::sync::{Arc, Mutex, RwLock}; +use std::thread::JoinHandle; +use std::time::Duration; +use streamer; +use write_stage::WriteStage; + +pub struct Tpu { + pub thread_hdls: Vec>, +} + +impl Tpu { + pub fn new( + bank: Arc, + start_hash: Hash, + tick_duration: Option, + me: ReplicatedData, + events_socket: UdpSocket, + broadcast_socket: UdpSocket, + gossip: UdpSocket, + exit: Arc, + writer: W, + ) -> Self { + let packet_recycler = packet::PacketRecycler::default(); + let (packet_sender, packet_receiver) = channel(); + let t_receiver = streamer::receiver( + events_socket, + exit.clone(), + packet_recycler.clone(), + packet_sender, + ); + + let sig_verify_stage = SigVerifyStage::new(exit.clone(), packet_receiver); + + let blob_recycler = packet::BlobRecycler::default(); + let banking_stage = BankingStage::new( + bank.clone(), + exit.clone(), + sig_verify_stage.verified_receiver, + packet_recycler.clone(), + ); + + let record_stage = + RecordStage::new(banking_stage.signal_receiver, &start_hash, tick_duration); + + let write_stage = WriteStage::new( + bank.clone(), + exit.clone(), + blob_recycler.clone(), + Mutex::new(writer), + record_stage.entry_receiver, + ); + + let crdt = Arc::new(RwLock::new(Crdt::new(me))); + let t_gossip = Crdt::gossip(crdt.clone(), exit.clone()); + let window = streamer::default_window(); + let t_listen = Crdt::listen(crdt.clone(), window.clone(), gossip, exit.clone()); + + let t_broadcast = streamer::broadcaster( + broadcast_socket, + exit.clone(), + crdt.clone(), + window, + blob_recycler.clone(), + write_stage.blob_receiver, + ); + + let mut thread_hdls = vec![ + t_receiver, + banking_stage.thread_hdl, + write_stage.thread_hdl, + t_gossip, + t_listen, + t_broadcast, + ]; + thread_hdls.extend(sig_verify_stage.thread_hdls.into_iter()); + Tpu { thread_hdls } + } +} diff --git a/src/tvu.rs b/src/tvu.rs index d26171d7094588..3a87a6dab4ef44 100644 --- a/src/tvu.rs +++ b/src/tvu.rs @@ -2,13 +2,12 @@ //! 5-stage transaction validation pipeline in software. use bank::Bank; +use banking_stage::BankingStage; use crdt::{Crdt, ReplicatedData}; use hash::Hash; use ledger; use packet; use record_stage::RecordStage; -use request_processor::RequestProcessor; -use request_stage::RequestStage; use result::Result; use sig_verify_stage::SigVerifyStage; use std::net::UdpSocket; @@ -146,31 +145,27 @@ impl Tvu { // make sure we are on the same interface let mut local = requests_socket.local_addr()?; local.set_port(0); - let respond_socket = UdpSocket::bind(local.clone())?; let packet_recycler = packet::PacketRecycler::default(); - let blob_recycler = packet::BlobRecycler::default(); let (packet_sender, packet_receiver) = channel(); let t_packet_receiver = streamer::receiver( requests_socket, exit.clone(), packet_recycler.clone(), packet_sender, - )?; + ); let sig_verify_stage = SigVerifyStage::new(exit.clone(), packet_receiver); - let request_processor = RequestProcessor::new(obj.bank.clone()); - let request_stage = RequestStage::new( - request_processor, + let banking_stage = BankingStage::new( + obj.bank.clone(), exit.clone(), sig_verify_stage.verified_receiver, packet_recycler.clone(), - blob_recycler.clone(), ); let record_stage = RecordStage::new( - request_stage.signal_receiver, + banking_stage.signal_receiver, &obj.start_hash, obj.tick_duration, ); @@ -178,13 +173,6 @@ impl Tvu { let write_stage = WriteStage::new_drain(obj.bank.clone(), exit.clone(), record_stage.entry_receiver); - let t_responder = streamer::responder( - respond_socket, - exit.clone(), - blob_recycler.clone(), - request_stage.blob_receiver, - ); - let mut threads = vec![ //replicate threads t_blob_receiver, @@ -195,8 +183,7 @@ impl Tvu { t_listen, //serve threads t_packet_receiver, - t_responder, - request_stage.thread_hdl, + banking_stage.thread_hdl, write_stage.thread_hdl, ]; threads.extend(sig_verify_stage.thread_hdls.into_iter()); @@ -212,6 +199,9 @@ pub fn test_node() -> (ReplicatedData, UdpSocket, UdpSocket, UdpSocket, UdpSocke let gossip = UdpSocket::bind("127.0.0.1:0").unwrap(); let replicate = UdpSocket::bind("127.0.0.1:0").unwrap(); let requests_socket = UdpSocket::bind("127.0.0.1:0").unwrap(); + requests_socket + .set_read_timeout(Some(Duration::new(1, 0))) + .unwrap(); let pubkey = KeyPair::new().pubkey(); let d = ReplicatedData::new( pubkey,