diff --git a/src/bin/client-demo.rs b/src/bin/client-demo.rs index 3e4f01e88b8bda..da47e0e6325583 100644 --- a/src/bin/client-demo.rs +++ b/src/bin/client-demo.rs @@ -35,7 +35,7 @@ fn print_usage(program: &str, opts: Options) { fn main() { let mut threads = 4usize; let mut addr: String = "127.0.0.1:8000".to_string(); - let mut client_addr: String = "127.0.0.1:8010".to_string(); + let mut requests_addr: String = "127.0.0.1:8010".to_string(); let mut opts = Options::new(); opts.optopt("s", "", "server address", "host:port"); @@ -60,12 +60,16 @@ fn main() { addr = matches.opt_str("s").unwrap(); } if matches.opt_present("c") { - client_addr = matches.opt_str("c").unwrap(); + requests_addr = matches.opt_str("c").unwrap(); } if matches.opt_present("t") { threads = matches.opt_str("t").unwrap().parse().expect("integer"); } + let mut events_addr: SocketAddr = requests_addr.parse().unwrap(); + let requests_port = events_addr.port(); + events_addr.set_port(requests_port + 1); + if stdin_isatty() { eprintln!("nothing found on stdin, expected a json file"); exit(1); @@ -84,13 +88,16 @@ fn main() { exit(1); }); - println!("Binding to {}", client_addr); - let socket = UdpSocket::bind(&client_addr).unwrap(); - socket.set_read_timeout(Some(Duration::new(5, 0))).unwrap(); - let mut accountant = ThinClient::new(addr.parse().unwrap(), socket); + println!("Binding to {}", requests_addr); + let requests_socket = UdpSocket::bind(&requests_addr).unwrap(); + requests_socket + .set_read_timeout(Some(Duration::new(5, 0))) + .unwrap(); + let events_socket = UdpSocket::bind(&events_addr).unwrap(); + let mut client = ThinClient::new(addr.parse().unwrap(), requests_socket, events_socket); println!("Get last ID..."); - let last_id = accountant.get_last_id().wait().unwrap(); + let last_id = client.get_last_id().wait().unwrap(); println!("Got last ID {:?}", last_id); let rnd = GenKeys::new(demo.mint.keypair().public_key_bytes()); @@ -122,7 +129,7 @@ fn main() { nsps / 1_000_f64 ); - let initial_tx_count = accountant.transaction_count(); + let initial_tx_count = client.transaction_count(); println!("initial count {}", initial_tx_count); println!("Transfering {} transactions in {} batches", txs, threads); @@ -131,19 +138,26 @@ fn main() { let chunks: Vec<_> = transactions.chunks(sz).collect(); chunks.into_par_iter().for_each(|trs| { println!("Transferring 1 unit {} times... to", trs.len()); - let mut client_addr: SocketAddr = client_addr.parse().unwrap(); - client_addr.set_port(0); - let socket = UdpSocket::bind(client_addr).unwrap(); - let accountant = ThinClient::new(addr.parse().unwrap(), socket); + let mut requests_addr: SocketAddr = requests_addr.parse().unwrap(); + requests_addr.set_port(0); + let requests_socket = UdpSocket::bind(requests_addr).unwrap(); + requests_socket + .set_read_timeout(Some(Duration::new(5, 0))) + .unwrap(); + let mut events_addr: SocketAddr = requests_addr.clone(); + let requests_port = events_addr.port(); + events_addr.set_port(requests_port + 1); + let events_socket = UdpSocket::bind(&events_addr).unwrap(); + let client = ThinClient::new(addr.parse().unwrap(), requests_socket, events_socket); for tr in trs { - accountant.transfer_signed(tr.clone()).unwrap(); + client.transfer_signed(tr.clone()).unwrap(); } }); println!("Waiting for transactions to complete...",); let mut tx_count; for _ in 0..10 { - tx_count = accountant.transaction_count(); + tx_count = client.transaction_count(); duration = now.elapsed(); let txs = tx_count - initial_tx_count; println!("Transactions processed {}", txs); diff --git a/src/bin/testnode.rs b/src/bin/testnode.rs index 0eb38e195ecbaa..74056b13b86ddb 100644 --- a/src/bin/testnode.rs +++ b/src/bin/testnode.rs @@ -7,12 +7,12 @@ extern crate solana; use getopts::Options; use isatty::stdin_isatty; use solana::accountant::Accountant; -use solana::accounting_stage::AccountingStage; use solana::crdt::ReplicatedData; use solana::entry::Entry; use solana::event::Event; +use solana::event_processor::EventProcessor; +use solana::rpu::Rpu; use solana::signature::{KeyPair, KeyPairUtil}; -use solana::tpu::Tpu; use std::env; use std::io::{stdin, stdout, Read}; use std::net::UdpSocket; @@ -115,13 +115,13 @@ fn main() { eprintln!("creating networking stack..."); - let accounting_stage = AccountingStage::new(accountant, &last_id, Some(1000)); + let event_processor = EventProcessor::new(accountant, &last_id, Some(1000)); let exit = Arc::new(AtomicBool::new(false)); - let tpu = Arc::new(Tpu::new(accounting_stage)); + let rpu = Rpu::new(event_processor); let serve_sock = UdpSocket::bind(&serve_addr).unwrap(); let gossip_sock = UdpSocket::bind(&gossip_addr).unwrap(); let replicate_sock = UdpSocket::bind(&replicate_addr).unwrap(); - let events_sock = UdpSocket::bind(&events_addr).unwrap(); + let _events_sock = UdpSocket::bind(&events_addr).unwrap(); let pubkey = KeyPair::new().pubkey(); let d = ReplicatedData::new( pubkey, @@ -130,15 +130,8 @@ fn main() { serve_sock.local_addr().unwrap(), ); eprintln!("starting server..."); - let threads = Tpu::serve( - &tpu, - d, - serve_sock, - events_sock, - gossip_sock, - exit.clone(), - stdout(), - ).unwrap(); + let threads = rpu.serve(d, serve_sock, gossip_sock, exit.clone(), stdout()) + .unwrap(); eprintln!("Ready. Listening on {}", serve_addr); for t in threads { t.join().expect("join"); diff --git a/src/ecdsa.rs b/src/ecdsa.rs index ea10be9471fc36..14237e6cb7cc30 100644 --- a/src/ecdsa.rs +++ b/src/ecdsa.rs @@ -137,8 +137,8 @@ mod tests { use bincode::serialize; use ecdsa; use packet::{Packet, Packets, SharedPackets}; + use request_stage::Request; use std::sync::RwLock; - use thin_client_service::Request; use transaction::Transaction; use transaction::test_tx; diff --git a/src/entry_writer.rs b/src/entry_writer.rs new file mode 100644 index 00000000000000..24a13e1792db63 --- /dev/null +++ b/src/entry_writer.rs @@ -0,0 +1,88 @@ +//! The `entry_writer` module helps implement the TPU's write stage. + +use entry::Entry; +use event_processor::EventProcessor; +use ledger; +use packet; +use request_stage::RequestProcessor; +use result::Result; +use serde_json; +use std::collections::VecDeque; +use std::io::Write; +use std::io::sink; +use std::sync::mpsc::Receiver; +use std::sync::{Arc, Mutex}; +use std::time::Duration; +use streamer; + +pub struct EntryWriter<'a> { + event_processor: &'a EventProcessor, + request_processor: &'a RequestProcessor, +} + +impl<'a> EntryWriter<'a> { + /// Create a new Tpu that wraps the given Accountant. + pub fn new( + event_processor: &'a EventProcessor, + request_processor: &'a RequestProcessor, + ) -> Self { + EntryWriter { + event_processor, + request_processor, + } + } + + fn write_entry(&self, writer: &Mutex, entry: &Entry) { + trace!("write_entry entry"); + self.event_processor.accountant.register_entry_id(&entry.id); + writeln!( + writer.lock().expect("'writer' lock in fn fn write_entry"), + "{}", + serde_json::to_string(&entry).expect("'entry' to_strong in fn write_entry") + ).expect("writeln! in fn write_entry"); + self.request_processor.notify_entry_info_subscribers(&entry); + } + + fn write_entries( + &self, + writer: &Mutex, + entry_receiver: &Receiver, + ) -> Result> { + //TODO implement a serialize for channel that does this without allocations + let mut l = vec![]; + let entry = entry_receiver.recv_timeout(Duration::new(1, 0))?; + self.write_entry(writer, &entry); + l.push(entry); + while let Ok(entry) = entry_receiver.try_recv() { + self.write_entry(writer, &entry); + l.push(entry); + } + Ok(l) + } + + /// Process any Entry items that have been published by the Historian. + /// continuosly broadcast blobs of entries out + pub fn write_and_send_entries( + &self, + broadcast: &streamer::BlobSender, + blob_recycler: &packet::BlobRecycler, + writer: &Mutex, + entry_receiver: &Receiver, + ) -> Result<()> { + let mut q = VecDeque::new(); + let list = self.write_entries(writer, entry_receiver)?; + trace!("New blobs? {}", list.len()); + ledger::process_entry_list_into_blobs(&list, blob_recycler, &mut q); + if !q.is_empty() { + broadcast.send(q)?; + } + Ok(()) + } + + /// Process any Entry items that have been published by the Historian. + /// continuosly broadcast blobs of entries out + pub fn drain_entries(&self, entry_receiver: &Receiver) -> Result<()> { + self.write_entries(&Arc::new(Mutex::new(sink())), entry_receiver)?; + Ok(()) + } +} diff --git a/src/accounting_stage.rs b/src/event_processor.rs similarity index 79% rename from src/accounting_stage.rs rename to src/event_processor.rs index 105d5b667d83b1..8d3b9cdafefa74 100644 --- a/src/accounting_stage.rs +++ b/src/event_processor.rs @@ -1,4 +1,4 @@ -//! The `accounting_stage` module implements the accounting stage of the TPU. +//! The `event_processor` module implements the accounting stage of the TPU. use accountant::Accountant; use entry::Entry; @@ -7,26 +7,21 @@ use hash::Hash; use historian::Historian; use recorder::Signal; use result::Result; -use std::sync::mpsc::{channel, Receiver, Sender}; +use std::sync::mpsc::{channel, Sender}; use std::sync::{Arc, Mutex}; -pub struct AccountingStage { - pub output: Mutex>, - entry_sender: Mutex>, +pub struct EventProcessor { pub accountant: Arc, historian_input: Mutex>, historian: Mutex, } -impl AccountingStage { - /// Create a new Tpu that wraps the given Accountant. +impl EventProcessor { + /// Create a new stage of the TPU for event and transaction processing pub fn new(accountant: Accountant, start_hash: &Hash, ms_per_tick: Option) -> Self { let (historian_input, event_receiver) = channel(); let historian = Historian::new(event_receiver, start_hash, ms_per_tick); - let (entry_sender, output) = channel(); - AccountingStage { - output: Mutex::new(output), - entry_sender: Mutex::new(entry_sender), + EventProcessor { accountant: Arc::new(accountant), historian_input: Mutex::new(historian_input), historian: Mutex::new(historian), @@ -34,7 +29,7 @@ impl AccountingStage { } /// Process the transactions in parallel and then log the successful ones. - pub fn process_events(&self, events: Vec) -> Result<()> { + pub fn process_events(&self, events: Vec) -> Result { let historian = self.historian.lock().unwrap(); let results = self.accountant.process_verified_events(events); let events = results.into_iter().filter_map(|x| x.ok()).collect(); @@ -42,46 +37,45 @@ impl AccountingStage { sender.send(Signal::Events(events))?; // Wait for the historian to tag our Events with an ID and then register it. - let entry = historian.output.lock().unwrap().recv()?; + let entry = historian.entry_receiver.lock().unwrap().recv()?; self.accountant.register_entry_id(&entry.id); - self.entry_sender.lock().unwrap().send(entry)?; - Ok(()) + Ok(entry) } } #[cfg(test)] mod tests { use accountant::Accountant; - use accounting_stage::AccountingStage; - use entry::Entry; use event::Event; + use event_processor::EventProcessor; use mint::Mint; use signature::{KeyPair, KeyPairUtil}; use transaction::Transaction; #[test] + // TODO: Move this test accounting_stage. Calling process_events() directly + // defeats the purpose of this test. fn test_accounting_sequential_consistency() { // In this attack we'll demonstrate that a verifier can interpret the ledger // differently if either the server doesn't signal the ledger to add an // Entry OR if the verifier tries to parallelize across multiple Entries. let mint = Mint::new(2); let accountant = Accountant::new(&mint); - let accounting_stage = AccountingStage::new(accountant, &mint.last_id(), None); + let event_processor = EventProcessor::new(accountant, &mint.last_id(), None); // Process a batch that includes a transaction that receives two tokens. let alice = KeyPair::new(); let tr = Transaction::new(&mint.keypair(), alice.pubkey(), 2, mint.last_id()); let events = vec![Event::Transaction(tr)]; - assert!(accounting_stage.process_events(events).is_ok()); + let entry0 = event_processor.process_events(events).unwrap(); // Process a second batch that spends one of those tokens. let tr = Transaction::new(&alice, mint.pubkey(), 1, mint.last_id()); let events = vec![Event::Transaction(tr)]; - assert!(accounting_stage.process_events(events).is_ok()); + let entry1 = event_processor.process_events(events).unwrap(); // Collect the ledger and feed it to a new accountant. - drop(accounting_stage.entry_sender); - let entries: Vec = accounting_stage.output.lock().unwrap().iter().collect(); + let entries = vec![entry0, entry1]; // Assert the user holds one token, not two. If the server only output one // entry, then the second transaction will be rejected, because it drives @@ -104,8 +98,8 @@ mod bench { extern crate test; use self::test::Bencher; use accountant::{Accountant, MAX_ENTRY_IDS}; - use accounting_stage::*; use bincode::serialize; + use event_processor::*; use hash::hash; use mint::Mint; use rayon::prelude::*; @@ -154,17 +148,17 @@ mod bench { .map(|tr| Event::Transaction(tr)) .collect(); - let accounting_stage = AccountingStage::new(accountant, &mint.last_id(), None); + let event_processor = EventProcessor::new(accountant, &mint.last_id(), None); let now = Instant::now(); - assert!(accounting_stage.process_events(events).is_ok()); + assert!(event_processor.process_events(events).is_ok()); let duration = now.elapsed(); let sec = duration.as_secs() as f64 + duration.subsec_nanos() as f64 / 1_000_000_000.0; let tps = txs as f64 / sec; // Ensure that all transactions were successfully logged. - drop(accounting_stage.historian_input); - let entries: Vec = accounting_stage.output.lock().unwrap().iter().collect(); + drop(event_processor.historian_input); + let entries: Vec = event_processor.output.lock().unwrap().iter().collect(); assert_eq!(entries.len(), 1); assert_eq!(entries[0].events.len(), txs as usize); diff --git a/src/historian.rs b/src/historian.rs index 351fca9c085636..b44fabece9b8ea 100644 --- a/src/historian.rs +++ b/src/historian.rs @@ -10,7 +10,7 @@ use std::thread::{spawn, JoinHandle}; use std::time::Instant; pub struct Historian { - pub output: Mutex>, + pub entry_receiver: Mutex>, pub thread_hdl: JoinHandle, } @@ -20,11 +20,11 @@ impl Historian { start_hash: &Hash, ms_per_tick: Option, ) -> Self { - let (entry_sender, output) = channel(); + let (entry_sender, entry_receiver) = channel(); let thread_hdl = Historian::create_recorder(*start_hash, ms_per_tick, event_receiver, entry_sender); Historian { - output: Mutex::new(output), + entry_receiver: Mutex::new(entry_receiver), thread_hdl, } } @@ -52,9 +52,9 @@ impl Historian { } pub fn receive(self: &Self) -> Result { - self.output + self.entry_receiver .lock() - .expect("'output' lock in pub fn receive") + .expect("'entry_receiver' lock in pub fn receive") .try_recv() } } @@ -78,9 +78,9 @@ mod tests { sleep(Duration::new(0, 1_000_000)); input.send(Signal::Tick).unwrap(); - let entry0 = hist.output.lock().unwrap().recv().unwrap(); - let entry1 = hist.output.lock().unwrap().recv().unwrap(); - let entry2 = hist.output.lock().unwrap().recv().unwrap(); + let entry0 = hist.entry_receiver.lock().unwrap().recv().unwrap(); + let entry1 = hist.entry_receiver.lock().unwrap().recv().unwrap(); + let entry2 = hist.entry_receiver.lock().unwrap().recv().unwrap(); assert_eq!(entry0.num_hashes, 0); assert_eq!(entry1.num_hashes, 0); @@ -100,7 +100,7 @@ mod tests { let (input, event_receiver) = channel(); let zero = Hash::default(); let hist = Historian::new(event_receiver, &zero, None); - drop(hist.output); + drop(hist.entry_receiver); input.send(Signal::Tick).unwrap(); assert_eq!( hist.thread_hdl.join().unwrap(), @@ -116,7 +116,7 @@ mod tests { sleep(Duration::from_millis(300)); input.send(Signal::Tick).unwrap(); drop(input); - let entries: Vec = hist.output.lock().unwrap().iter().collect(); + let entries: Vec = hist.entry_receiver.lock().unwrap().iter().collect(); assert!(entries.len() > 1); // Ensure the ID is not the seed. diff --git a/src/lib.rs b/src/lib.rs index 10716a9eb8eef4..522cb33fc85d57 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -1,12 +1,13 @@ #![cfg_attr(feature = "unstable", feature(test))] pub mod accountant; -pub mod accounting_stage; pub mod crdt; pub mod ecdsa; pub mod entry; +pub mod entry_writer; #[cfg(feature = "erasure")] pub mod erasure; pub mod event; +pub mod event_processor; pub mod hash; pub mod historian; pub mod ledger; @@ -15,14 +16,16 @@ pub mod mint; pub mod packet; pub mod plan; pub mod recorder; +pub mod request_stage; pub mod result; +pub mod rpu; +pub mod sig_verify_stage; pub mod signature; pub mod streamer; pub mod thin_client; -pub mod thin_client_service; pub mod timing; -pub mod tpu; pub mod transaction; +pub mod tvu; extern crate bincode; extern crate byteorder; extern crate chrono; diff --git a/src/thin_client_service.rs b/src/request_stage.rs similarity index 83% rename from src/thin_client_service.rs rename to src/request_stage.rs index f86ff014264337..6b9a693cd9d4c6 100644 --- a/src/thin_client_service.rs +++ b/src/request_stage.rs @@ -1,11 +1,10 @@ -//! The `thin_client_service` sits alongside the TPU and queries it for information -//! on behalf of thing clients. +//! The `request_stage` processes thin client Request messages. use accountant::Accountant; -use accounting_stage::AccountingStage; use bincode::{deserialize, serialize}; use entry::Entry; use event::Event; +use event_processor::EventProcessor; use hash::Hash; use packet; use packet::SharedPackets; @@ -14,17 +13,15 @@ use result::Result; use signature::PublicKey; use std::collections::VecDeque; use std::net::{SocketAddr, UdpSocket}; -use transaction::Transaction; -//use std::io::{Cursor, Write}; -//use std::sync::atomic::{AtomicBool, Ordering}; -//use std::sync::mpsc::{channel, Receiver, Sender}; -use std::sync::mpsc::Receiver; +use std::sync::atomic::{AtomicBool, Ordering}; +use std::sync::mpsc::{channel, Receiver, Sender}; use std::sync::{Arc, Mutex}; -//use std::thread::{spawn, JoinHandle}; +use std::thread::{spawn, JoinHandle}; use std::time::Duration; use std::time::Instant; use streamer; use timing; +use transaction::Transaction; #[cfg_attr(feature = "cargo-clippy", allow(large_enum_variant))] #[derive(Serialize, Deserialize, Debug, Clone)] @@ -62,20 +59,15 @@ pub enum Response { EntryInfo(EntryInfo), } -pub struct ThinClientService { - //pub output: Mutex>, - //response_sender: Mutex>, +pub struct RequestProcessor { accountant: Arc, entry_info_subscribers: Mutex>, } -impl ThinClientService { +impl RequestProcessor { /// Create a new Tpu that wraps the given Accountant. pub fn new(accountant: Arc) -> Self { - //let (response_sender, output) = channel(); - ThinClientService { - //output: Mutex::new(output), - //response_sender: Mutex::new(response_sender), + RequestProcessor { accountant, entry_info_subscribers: Mutex::new(vec![]), } @@ -213,9 +205,10 @@ impl ThinClientService { pub fn process_request_packets( &self, - accounting_stage: &AccountingStage, + event_processor: &EventProcessor, verified_receiver: &Receiver)>>, - responder_sender: &streamer::BlobSender, + entry_sender: &Sender, + blob_sender: &streamer::BlobSender, packet_recycler: &packet::PacketRecycler, blob_recycler: &packet::BlobRecycler, ) -> Result<()> { @@ -248,7 +241,8 @@ impl ThinClientService { debug!("events: {} reqs: {}", events.len(), reqs.len()); debug!("process_events"); - accounting_stage.process_events(events)?; + let entry = event_processor.process_events(events)?; + entry_sender.send(entry)?; debug!("done process_events"); debug!("process_requests"); @@ -259,7 +253,7 @@ impl ThinClientService { if !blobs.is_empty() { info!("process: sending blobs: {}", blobs.len()); //don't wake up the other side if there is nothing - responder_sender.send(blobs)?; + blob_sender.send(blobs)?; } packet_recycler.recycle(msgs); } @@ -277,6 +271,50 @@ impl ThinClientService { } } +pub struct RequestStage { + pub thread_hdl: JoinHandle<()>, + pub entry_receiver: Receiver, + pub blob_receiver: streamer::BlobReceiver, + pub request_processor: Arc, +} + +impl RequestStage { + pub fn new( + request_processor: RequestProcessor, + event_processor: Arc, + exit: Arc, + verified_receiver: Receiver)>>, + packet_recycler: packet::PacketRecycler, + blob_recycler: packet::BlobRecycler, + ) -> Self { + let request_processor = Arc::new(request_processor); + let request_processor_ = request_processor.clone(); + let (entry_sender, entry_receiver) = channel(); + let (blob_sender, blob_receiver) = channel(); + let thread_hdl = spawn(move || loop { + let e = request_processor_.process_request_packets( + &event_processor, + &verified_receiver, + &entry_sender, + &blob_sender, + &packet_recycler, + &blob_recycler, + ); + if e.is_err() { + if exit.load(Ordering::Relaxed) { + break; + } + } + }); + RequestStage { + thread_hdl, + entry_receiver, + blob_receiver, + request_processor, + } + } +} + #[cfg(test)] pub fn to_request_packets(r: &packet::PacketRecycler, reqs: Vec) -> Vec { let mut out = vec![]; @@ -302,7 +340,7 @@ mod tests { use bincode::serialize; use ecdsa; use packet::{PacketRecycler, NUM_PACKETS}; - use thin_client_service::{to_request_packets, Request}; + use request_stage::{to_request_packets, Request}; use transaction::{memfind, test_tx}; #[test] diff --git a/src/rpu.rs b/src/rpu.rs new file mode 100644 index 00000000000000..fe7a2e5eb4db43 --- /dev/null +++ b/src/rpu.rs @@ -0,0 +1,137 @@ +//! The `rpu` module implements the Request Processing Unit, a +//! 5-stage transaction processing pipeline in software. + +use crdt::{Crdt, ReplicatedData}; +use entry::Entry; +use entry_writer::EntryWriter; +use event_processor::EventProcessor; +use packet; +use request_stage::{RequestProcessor, RequestStage}; +use result::Result; +use sig_verify_stage::SigVerifyStage; +use std::io::Write; +use std::net::UdpSocket; +use std::sync::atomic::{AtomicBool, Ordering}; +use std::sync::mpsc::{channel, Receiver}; +use std::sync::{Arc, Mutex, RwLock}; +use std::thread::{spawn, JoinHandle}; +use streamer; + +pub struct Rpu { + event_processor: Arc, +} + +impl Rpu { + /// Create a new Rpu that wraps the given Accountant. + pub fn new(event_processor: EventProcessor) -> Self { + Rpu { + event_processor: Arc::new(event_processor), + } + } + + fn write_service( + event_processor: Arc, + request_processor: Arc, + exit: Arc, + broadcast: streamer::BlobSender, + blob_recycler: packet::BlobRecycler, + writer: Mutex, + entry_receiver: Receiver, + ) -> JoinHandle<()> { + spawn(move || loop { + let entry_writer = EntryWriter::new(&event_processor, &request_processor); + let _ = entry_writer.write_and_send_entries( + &broadcast, + &blob_recycler, + &writer, + &entry_receiver, + ); + if exit.load(Ordering::Relaxed) { + info!("broadcat_service exiting"); + break; + } + }) + } + + /// Create a UDP microservice that forwards messages the given Rpu. + /// This service is the network leader + /// Set `exit` to shutdown its threads. + pub fn serve( + &self, + me: ReplicatedData, + requests_socket: UdpSocket, + gossip: UdpSocket, + exit: Arc, + writer: W, + ) -> Result>> { + let crdt = Arc::new(RwLock::new(Crdt::new(me))); + let t_gossip = Crdt::gossip(crdt.clone(), exit.clone()); + let t_listen = Crdt::listen(crdt.clone(), gossip, exit.clone()); + + // make sure we are on the same interface + let mut local = requests_socket.local_addr()?; + local.set_port(0); + + let packet_recycler = packet::PacketRecycler::default(); + let (packet_sender, packet_receiver) = channel(); + let t_receiver = streamer::receiver( + requests_socket, + exit.clone(), + packet_recycler.clone(), + packet_sender, + )?; + + let sig_verify_stage = SigVerifyStage::new(exit.clone(), packet_receiver); + + let blob_recycler = packet::BlobRecycler::default(); + let request_processor = RequestProcessor::new(self.event_processor.accountant.clone()); + let request_stage = RequestStage::new( + request_processor, + self.event_processor.clone(), + exit.clone(), + sig_verify_stage.verified_receiver, + packet_recycler.clone(), + blob_recycler.clone(), + ); + + let (broadcast_sender, broadcast_receiver) = channel(); + let t_write = Self::write_service( + self.event_processor.clone(), + request_stage.request_processor.clone(), + exit.clone(), + broadcast_sender, + blob_recycler.clone(), + Mutex::new(writer), + request_stage.entry_receiver, + ); + + let broadcast_socket = UdpSocket::bind(local)?; + let t_broadcast = streamer::broadcaster( + broadcast_socket, + exit.clone(), + crdt.clone(), + blob_recycler.clone(), + broadcast_receiver, + ); + + let respond_socket = UdpSocket::bind(local.clone())?; + let t_responder = streamer::responder( + respond_socket, + exit.clone(), + blob_recycler.clone(), + request_stage.blob_receiver, + ); + + let mut threads = vec![ + t_receiver, + t_responder, + request_stage.thread_hdl, + t_write, + t_gossip, + t_listen, + t_broadcast, + ]; + threads.extend(sig_verify_stage.thread_hdls.into_iter()); + Ok(threads) + } +} diff --git a/src/sig_verify_stage.rs b/src/sig_verify_stage.rs new file mode 100644 index 00000000000000..6528537c6a0512 --- /dev/null +++ b/src/sig_verify_stage.rs @@ -0,0 +1,96 @@ +//! The `sig_verify_stage` implements the signature verification stage of the TPU. + +use ecdsa; +use packet::SharedPackets; +use rand::{thread_rng, Rng}; +use result::Result; +use std::sync::atomic::{AtomicBool, Ordering}; +use std::sync::mpsc::{channel, Receiver, Sender}; +use std::sync::{Arc, Mutex}; +use std::thread::{spawn, JoinHandle}; +use std::time::Instant; +use streamer; +use timing; + +pub struct SigVerifyStage { + pub verified_receiver: Receiver)>>, + pub thread_hdls: Vec>, +} + +impl SigVerifyStage { + pub fn new(exit: Arc, packets_receiver: Receiver) -> Self { + let (verified_sender, verified_receiver) = channel(); + let thread_hdls = Self::verifier_services(exit, packets_receiver, verified_sender); + SigVerifyStage { + thread_hdls, + verified_receiver, + } + } + + fn verify_batch(batch: Vec) -> Vec<(SharedPackets, Vec)> { + let r = ecdsa::ed25519_verify(&batch); + batch.into_iter().zip(r).collect() + } + + fn verifier( + recvr: &Arc>, + sendr: &Arc)>>>>, + ) -> Result<()> { + let (batch, len) = + streamer::recv_batch(&recvr.lock().expect("'recvr' lock in fn verifier"))?; + + let now = Instant::now(); + let batch_len = batch.len(); + let rand_id = thread_rng().gen_range(0, 100); + info!( + "@{:?} verifier: verifying: {} id: {}", + timing::timestamp(), + batch.len(), + rand_id + ); + + let verified_batch = Self::verify_batch(batch); + sendr + .lock() + .expect("lock in fn verify_batch in tpu") + .send(verified_batch)?; + + let total_time_ms = timing::duration_as_ms(&now.elapsed()); + let total_time_s = timing::duration_as_s(&now.elapsed()); + info!( + "@{:?} verifier: done. batches: {} total verify time: {:?} id: {} verified: {} v/s {}", + timing::timestamp(), + batch_len, + total_time_ms, + rand_id, + len, + (len as f32 / total_time_s) + ); + Ok(()) + } + + fn verifier_service( + exit: Arc, + packets_receiver: Arc>, + verified_sender: Arc)>>>>, + ) -> JoinHandle<()> { + spawn(move || loop { + let e = Self::verifier(&packets_receiver.clone(), &verified_sender.clone()); + if e.is_err() && exit.load(Ordering::Relaxed) { + break; + } + }) + } + + fn verifier_services( + exit: Arc, + packets_receiver: streamer::PacketReceiver, + verified_sender: Sender)>>, + ) -> Vec> { + let sender = Arc::new(Mutex::new(verified_sender)); + let receiver = Arc::new(Mutex::new(packets_receiver)); + (0..4) + .map(|_| Self::verifier_service(exit.clone(), receiver.clone(), sender.clone())) + .collect() + } +} diff --git a/src/thin_client.rs b/src/thin_client.rs index e1ee2b4dcd1dbf..318377ae35630c 100644 --- a/src/thin_client.rs +++ b/src/thin_client.rs @@ -6,29 +6,31 @@ use bincode::{deserialize, serialize}; use futures::future::{ok, FutureResult}; use hash::Hash; +use request_stage::{Request, Response, Subscription}; use signature::{KeyPair, PublicKey, Signature}; use std::collections::HashMap; use std::io; use std::net::{SocketAddr, UdpSocket}; -use thin_client_service::{Request, Response, Subscription}; use transaction::Transaction; pub struct ThinClient { pub addr: SocketAddr, - pub socket: UdpSocket, + pub requests_socket: UdpSocket, + pub events_socket: UdpSocket, last_id: Option, num_events: u64, balances: HashMap>, } impl ThinClient { - /// Create a new ThinClient that will interface with Tpu - /// over `socket`. To receive responses, the caller must bind `socket` + /// Create a new ThinClient that will interface with Rpu + /// over `requests_socket` and `events_socket`. To receive responses, the caller must bind `socket` /// to a public address before invoking ThinClient methods. - pub fn new(addr: SocketAddr, socket: UdpSocket) -> Self { + pub fn new(addr: SocketAddr, requests_socket: UdpSocket, events_socket: UdpSocket) -> Self { let client = ThinClient { addr: addr, - socket, + requests_socket, + events_socket, last_id: None, num_events: 0, balances: HashMap::new(), @@ -42,13 +44,13 @@ impl ThinClient { let req = Request::Subscribe { subscriptions }; let data = serialize(&req).expect("serialize Subscribe in thin_client"); trace!("subscribing to {}", self.addr); - let _res = self.socket.send_to(&data, &self.addr); + let _res = self.requests_socket.send_to(&data, &self.addr); } pub fn recv_response(&self) -> io::Result { let mut buf = vec![0u8; 1024]; info!("start recv_from"); - self.socket.recv_from(&mut buf)?; + self.requests_socket.recv_from(&mut buf)?; info!("end recv_from"); let resp = deserialize(&buf).expect("deserialize balance in thin_client"); Ok(resp) @@ -73,7 +75,7 @@ impl ThinClient { pub fn transfer_signed(&self, tr: Transaction) -> io::Result { let req = Request::Transaction(tr); let data = serialize(&req).expect("serialize Transaction in pub fn transfer_signed"); - self.socket.send_to(&data, &self.addr) + self.requests_socket.send_to(&data, &self.addr) } /// Creates, signs, and processes a Transaction. Useful for writing unit-tests. @@ -96,7 +98,7 @@ impl ThinClient { info!("get_balance"); let req = Request::GetBalance { key: *pubkey }; let data = serialize(&req).expect("serialize GetBalance in pub fn get_balance"); - self.socket + self.requests_socket .send_to(&data, &self.addr) .expect("buffer error in pub fn get_balance"); let mut done = false; @@ -133,7 +135,7 @@ impl ThinClient { } // Then take the rest. - self.socket + self.requests_socket .set_nonblocking(true) .expect("set_nonblocking in pub fn transaction_count"); loop { @@ -142,7 +144,7 @@ impl ThinClient { Ok(resp) => self.process_response(resp), } } - self.socket + self.requests_socket .set_nonblocking(false) .expect("set_nonblocking in pub fn transaction_count"); self.num_events @@ -153,12 +155,13 @@ impl ThinClient { mod tests { use super::*; use accountant::Accountant; - use accounting_stage::AccountingStage; use crdt::{Crdt, ReplicatedData}; + use event_processor::EventProcessor; use futures::Future; use logger; use mint::Mint; use plan::Plan; + use rpu::Rpu; use signature::{KeyPair, KeyPairUtil}; use std::io::sink; use std::sync::atomic::{AtomicBool, Ordering}; @@ -166,14 +169,14 @@ mod tests { use std::thread::sleep; use std::time::Duration; use std::time::Instant; - use tpu::{self, Tpu}; + use tvu::{self, Tvu}; #[test] fn test_thin_client() { logger::setup(); let gossip = UdpSocket::bind("0.0.0.0:0").unwrap(); let serve = UdpSocket::bind("0.0.0.0:0").unwrap(); - let events_socket = UdpSocket::bind("0.0.0.0:0").unwrap(); + let _events_socket = UdpSocket::bind("0.0.0.0:0").unwrap(); let addr = serve.local_addr().unwrap(); let pubkey = KeyPair::new().pubkey(); let d = ReplicatedData::new( @@ -187,30 +190,23 @@ mod tests { let accountant = Accountant::new(&alice); let bob_pubkey = KeyPair::new().pubkey(); let exit = Arc::new(AtomicBool::new(false)); - let accounting_stage = AccountingStage::new(accountant, &alice.last_id(), Some(30)); - let accountant = Arc::new(Tpu::new(accounting_stage)); - let threads = Tpu::serve( - &accountant, - d, - serve, - events_socket, - gossip, - exit.clone(), - sink(), - ).unwrap(); + let event_processor = EventProcessor::new(accountant, &alice.last_id(), Some(30)); + let rpu = Arc::new(Rpu::new(event_processor)); + let threads = rpu.serve(d, serve, gossip, exit.clone(), sink()).unwrap(); sleep(Duration::from_millis(300)); - let socket = UdpSocket::bind("0.0.0.0:0").unwrap(); + let requests_socket = UdpSocket::bind("0.0.0.0:0").unwrap(); + let events_socket = UdpSocket::bind("0.0.0.0:0").unwrap(); - let mut accountant = ThinClient::new(addr, socket); - let last_id = accountant.get_last_id().wait().unwrap(); - let _sig = accountant + let mut client = ThinClient::new(addr, requests_socket, events_socket); + let last_id = client.get_last_id().wait().unwrap(); + let _sig = client .transfer(500, &alice.keypair(), bob_pubkey, &last_id) .unwrap(); let mut balance; let now = Instant::now(); loop { - balance = accountant.get_balance(&bob_pubkey); + balance = client.get_balance(&bob_pubkey); if balance.is_ok() { break; } @@ -227,28 +223,29 @@ mod tests { #[test] fn test_bad_sig() { - let (leader_data, leader_gossip, _, leader_serve, leader_events) = tpu::test_node(); + let (leader_data, leader_gossip, _, leader_serve, _leader_events) = tvu::test_node(); let alice = Mint::new(10_000); let accountant = Accountant::new(&alice); let bob_pubkey = KeyPair::new().pubkey(); let exit = Arc::new(AtomicBool::new(false)); - let accounting_stage = AccountingStage::new(accountant, &alice.last_id(), Some(30)); - let tpu = Arc::new(Tpu::new(accounting_stage)); + let event_processor = EventProcessor::new(accountant, &alice.last_id(), Some(30)); + let rpu = Arc::new(Rpu::new(event_processor)); let serve_addr = leader_serve.local_addr().unwrap(); - let threads = Tpu::serve( - &tpu, + let threads = rpu.serve( leader_data, leader_serve, - leader_events, leader_gossip, exit.clone(), sink(), ).unwrap(); sleep(Duration::from_millis(300)); - let socket = UdpSocket::bind("127.0.0.1:0").unwrap(); - socket.set_read_timeout(Some(Duration::new(5, 0))).unwrap(); - let mut client = ThinClient::new(serve_addr, socket); + let requests_socket = UdpSocket::bind("127.0.0.1:0").unwrap(); + requests_socket + .set_read_timeout(Some(Duration::new(5, 0))) + .unwrap(); + let events_socket = UdpSocket::bind("127.0.0.1:0").unwrap(); + let mut client = ThinClient::new(serve_addr, requests_socket, events_socket); let last_id = client.get_last_id().wait().unwrap(); trace!("doing stuff"); @@ -301,26 +298,20 @@ mod tests { let leader_acc = { let accountant = Accountant::new(&alice); - let accounting_stage = AccountingStage::new(accountant, &alice.last_id(), Some(30)); - Arc::new(Tpu::new(accounting_stage)) + let event_processor = EventProcessor::new(accountant, &alice.last_id(), Some(30)); + Arc::new(Rpu::new(event_processor)) }; let replicant_acc = { let accountant = Accountant::new(&alice); - let accounting_stage = AccountingStage::new(accountant, &alice.last_id(), Some(30)); - Arc::new(Tpu::new(accounting_stage)) + let event_processor = EventProcessor::new(accountant, &alice.last_id(), Some(30)); + Arc::new(Tvu::new(event_processor)) }; - let leader_threads = Tpu::serve( - &leader_acc, - leader.0.clone(), - leader.2, - leader.4, - leader.1, - exit.clone(), - sink(), - ).unwrap(); - let replicant_threads = Tpu::replicate( + let leader_threads = leader_acc + .serve(leader.0.clone(), leader.2, leader.1, exit.clone(), sink()) + .unwrap(); + let replicant_threads = Tvu::serve( &replicant_acc, replicant.0.clone(), replicant.1, @@ -369,29 +360,36 @@ mod tests { //verify leader can do transfer let leader_balance = { - let socket = UdpSocket::bind("0.0.0.0:0").unwrap(); - socket.set_read_timeout(Some(Duration::new(1, 0))).unwrap(); + let requests_socket = UdpSocket::bind("0.0.0.0:0").unwrap(); + requests_socket + .set_read_timeout(Some(Duration::new(1, 0))) + .unwrap(); + let events_socket = UdpSocket::bind("0.0.0.0:0").unwrap(); - let mut accountant = ThinClient::new(leader.0.serve_addr, socket); + let mut client = ThinClient::new(leader.0.serve_addr, requests_socket, events_socket); info!("getting leader last_id"); - let last_id = accountant.get_last_id().wait().unwrap(); + let last_id = client.get_last_id().wait().unwrap(); info!("executing leader transer"); - let _sig = accountant + let _sig = client .transfer(500, &alice.keypair(), bob_pubkey, &last_id) .unwrap(); info!("getting leader balance"); - accountant.get_balance(&bob_pubkey).unwrap() + client.get_balance(&bob_pubkey).unwrap() }; assert_eq!(leader_balance, 500); //verify replicant has the same balance let mut replicant_balance = 0; for _ in 0..10 { - let socket = UdpSocket::bind("0.0.0.0:0").unwrap(); - socket.set_read_timeout(Some(Duration::new(1, 0))).unwrap(); + let requests_socket = UdpSocket::bind("0.0.0.0:0").unwrap(); + requests_socket + .set_read_timeout(Some(Duration::new(1, 0))) + .unwrap(); + let events_socket = UdpSocket::bind("0.0.0.0:0").unwrap(); - let mut accountant = ThinClient::new(replicant.0.serve_addr, socket); + let mut client = + ThinClient::new(replicant.0.serve_addr, requests_socket, events_socket); info!("getting replicant balance"); - if let Ok(bal) = accountant.get_balance(&bob_pubkey) { + if let Ok(bal) = client.get_balance(&bob_pubkey) { replicant_balance = bal; } info!("replicant balance {}", replicant_balance); diff --git a/src/tpu.rs b/src/tvu.rs similarity index 50% rename from src/tpu.rs rename to src/tvu.rs index 95bfbc48ad35a5..59bb599a3a3ea2 100644 --- a/src/tpu.rs +++ b/src/tvu.rs @@ -1,185 +1,57 @@ -//! The `tpu` module implements the Transaction Processing Unit, a -//! 5-stage transaction processing pipeline in software. +//! The `tvu` module implements the Transaction Validation Unit, a +//! 5-stage transaction validation pipeline in software. -use accounting_stage::AccountingStage; use crdt::{Crdt, ReplicatedData}; -use ecdsa; use entry::Entry; +use entry_writer::EntryWriter; +use event_processor::EventProcessor; use ledger; use packet; -use packet::SharedPackets; -use rand::{thread_rng, Rng}; +use request_stage::{RequestProcessor, RequestStage}; use result::Result; -use serde_json; -use std::collections::VecDeque; -use std::io::Write; -use std::io::sink; +use sig_verify_stage::SigVerifyStage; use std::net::UdpSocket; use std::sync::atomic::{AtomicBool, Ordering}; -use std::sync::mpsc::{channel, Sender}; -use std::sync::{Arc, Mutex, RwLock}; +use std::sync::mpsc::{channel, Receiver}; +use std::sync::{Arc, RwLock}; use std::thread::{spawn, JoinHandle}; use std::time::Duration; -use std::time::Instant; use streamer; -use thin_client_service::ThinClientService; -use timing; -pub struct Tpu { - accounting_stage: AccountingStage, - thin_client_service: ThinClientService, +pub struct Tvu { + event_processor: Arc, } -type SharedTpu = Arc; - -impl Tpu { - /// Create a new Tpu that wraps the given Accountant. - pub fn new(accounting_stage: AccountingStage) -> Self { - let thin_client_service = ThinClientService::new(accounting_stage.accountant.clone()); - Tpu { - accounting_stage, - thin_client_service, - } - } - - fn write_entry(&self, writer: &Mutex, entry: &Entry) { - trace!("write_entry entry"); - self.accounting_stage - .accountant - .register_entry_id(&entry.id); - writeln!( - writer.lock().expect("'writer' lock in fn fn write_entry"), - "{}", - serde_json::to_string(&entry).expect("'entry' to_strong in fn write_entry") - ).expect("writeln! in fn write_entry"); - self.thin_client_service - .notify_entry_info_subscribers(&entry); - } - - fn write_entries(&self, writer: &Mutex) -> Result> { - //TODO implement a serialize for channel that does this without allocations - let mut l = vec![]; - let entry = self.accounting_stage - .output - .lock() - .expect("'ouput' lock in fn receive_all") - .recv_timeout(Duration::new(1, 0))?; - self.write_entry(writer, &entry); - l.push(entry); - while let Ok(entry) = self.accounting_stage - .output - .lock() - .expect("'output' lock in fn write_entries") - .try_recv() - { - self.write_entry(writer, &entry); - l.push(entry); +impl Tvu { + /// Create a new Tvu that wraps the given Accountant. + pub fn new(event_processor: EventProcessor) -> Self { + Tvu { + event_processor: Arc::new(event_processor), } - Ok(l) } - /// Process any Entry items that have been published by the Historian. - /// continuosly broadcast blobs of entries out - fn run_sync( - &self, - broadcast: &streamer::BlobSender, - blob_recycler: &packet::BlobRecycler, - writer: &Mutex, - ) -> Result<()> { - let mut q = VecDeque::new(); - let list = self.write_entries(writer)?; - trace!("New blobs? {}", list.len()); - ledger::process_entry_list_into_blobs(&list, blob_recycler, &mut q); - if !q.is_empty() { - broadcast.send(q)?; - } - Ok(()) - } - - pub fn sync_service( - obj: SharedTpu, + fn drain_service( + event_processor: Arc, + request_processor: Arc, exit: Arc, - broadcast: streamer::BlobSender, - blob_recycler: packet::BlobRecycler, - writer: Mutex, + entry_receiver: Receiver, ) -> JoinHandle<()> { - spawn(move || loop { - let _ = obj.run_sync(&broadcast, &blob_recycler, &writer); - if exit.load(Ordering::Relaxed) { - info!("sync_service exiting"); - break; - } - }) - } - - /// Process any Entry items that have been published by the Historian. - /// continuosly broadcast blobs of entries out - fn run_sync_no_broadcast(&self) -> Result<()> { - self.write_entries(&Arc::new(Mutex::new(sink())))?; - Ok(()) - } - - pub fn sync_no_broadcast_service(obj: SharedTpu, exit: Arc) -> JoinHandle<()> { - spawn(move || loop { - let _ = obj.run_sync_no_broadcast(); - if exit.load(Ordering::Relaxed) { - info!("sync_no_broadcast_service exiting"); - break; + spawn(move || { + let entry_writer = EntryWriter::new(&event_processor, &request_processor); + loop { + let _ = entry_writer.drain_entries(&entry_receiver); + if exit.load(Ordering::Relaxed) { + info!("drain_service exiting"); + break; + } } }) } - fn verify_batch( - batch: Vec, - sendr: &Arc)>>>>, - ) -> Result<()> { - let r = ecdsa::ed25519_verify(&batch); - let res = batch.into_iter().zip(r).collect(); - sendr - .lock() - .expect("lock in fn verify_batch in tpu") - .send(res)?; - // TODO: fix error handling here? - Ok(()) - } - - fn verifier( - recvr: &Arc>, - sendr: &Arc)>>>>, - ) -> Result<()> { - let (batch, len) = - streamer::recv_batch(&recvr.lock().expect("'recvr' lock in fn verifier"))?; - - let now = Instant::now(); - let batch_len = batch.len(); - let rand_id = thread_rng().gen_range(0, 100); - info!( - "@{:?} verifier: verifying: {} id: {}", - timing::timestamp(), - batch.len(), - rand_id - ); - - Self::verify_batch(batch, sendr).expect("verify_batch in fn verifier"); - - let total_time_ms = timing::duration_as_ms(&now.elapsed()); - let total_time_s = timing::duration_as_s(&now.elapsed()); - info!( - "@{:?} verifier: done. batches: {} total verify time: {:?} id: {} verified: {} v/s {}", - timing::timestamp(), - batch_len, - total_time_ms, - rand_id, - len, - (len as f32 / total_time_s) - ); - Ok(()) - } - /// Process verified blobs, already in order /// Respond with a signed hash of the state fn replicate_state( - obj: &Tpu, + obj: &Tvu, verified_receiver: &streamer::BlobReceiver, blob_recycler: &packet::BlobRecycler, ) -> Result<()> { @@ -187,7 +59,7 @@ impl Tpu { let blobs = verified_receiver.recv_timeout(timer)?; trace!("replicating blobs {}", blobs.len()); let entries = ledger::reconstruct_entries_from_blobs(&blobs); - obj.accounting_stage + obj.event_processor .accountant .process_verified_entries(entries)?; for blob in blobs { @@ -196,105 +68,6 @@ impl Tpu { Ok(()) } - /// Create a UDP microservice that forwards messages the given Tpu. - /// This service is the network leader - /// Set `exit` to shutdown its threads. - pub fn serve( - obj: &SharedTpu, - me: ReplicatedData, - serve: UdpSocket, - _events_socket: UdpSocket, - gossip: UdpSocket, - exit: Arc, - writer: W, - ) -> Result>> { - let crdt = Arc::new(RwLock::new(Crdt::new(me))); - let t_gossip = Crdt::gossip(crdt.clone(), exit.clone()); - let t_listen = Crdt::listen(crdt.clone(), gossip, exit.clone()); - - // make sure we are on the same interface - let mut local = serve.local_addr()?; - local.set_port(0); - let respond_socket = UdpSocket::bind(local.clone())?; - - let packet_recycler = packet::PacketRecycler::default(); - let blob_recycler = packet::BlobRecycler::default(); - let (packet_sender, packet_receiver) = channel(); - let t_receiver = - streamer::receiver(serve, exit.clone(), packet_recycler.clone(), packet_sender)?; - let (responder_sender, responder_receiver) = channel(); - let t_responder = streamer::responder( - respond_socket, - exit.clone(), - blob_recycler.clone(), - responder_receiver, - ); - let (verified_sender, verified_receiver) = channel(); - - let mut verify_threads = Vec::new(); - let shared_verified_sender = Arc::new(Mutex::new(verified_sender)); - let shared_packet_receiver = Arc::new(Mutex::new(packet_receiver)); - for _ in 0..4 { - let exit_ = exit.clone(); - let recv = shared_packet_receiver.clone(); - let sender = shared_verified_sender.clone(); - let thread = spawn(move || loop { - let e = Self::verifier(&recv, &sender); - if e.is_err() && exit_.load(Ordering::Relaxed) { - break; - } - }); - verify_threads.push(thread); - } - - let (broadcast_sender, broadcast_receiver) = channel(); - - let broadcast_socket = UdpSocket::bind(local)?; - let t_broadcast = streamer::broadcaster( - broadcast_socket, - exit.clone(), - crdt.clone(), - blob_recycler.clone(), - broadcast_receiver, - ); - - let t_sync = Self::sync_service( - obj.clone(), - exit.clone(), - broadcast_sender, - blob_recycler.clone(), - Mutex::new(writer), - ); - - let tpu = obj.clone(); - let t_server = spawn(move || loop { - let e = tpu.thin_client_service.process_request_packets( - &tpu.accounting_stage, - &verified_receiver, - &responder_sender, - &packet_recycler, - &blob_recycler, - ); - if e.is_err() { - if exit.load(Ordering::Relaxed) { - break; - } - } - }); - - let mut threads = vec![ - t_receiver, - t_responder, - t_server, - t_sync, - t_gossip, - t_listen, - t_broadcast, - ]; - threads.extend(verify_threads.into_iter()); - Ok(threads) - } - /// This service receives messages from a leader in the network and processes the transactions /// on the accountant state. /// # Arguments @@ -313,11 +86,11 @@ impl Tpu { /// d. make sure that the blobs PoH sequences connect (TODO) /// 4. process the transaction state machine /// 5. respond with the hash of the state back to the leader - pub fn replicate( - obj: &SharedTpu, + pub fn serve( + obj: &Arc, me: ReplicatedData, gossip: UdpSocket, - serve: UdpSocket, + requests_socket: UdpSocket, replicate: UdpSocket, leader: ReplicatedData, exit: Arc, @@ -369,10 +142,10 @@ impl Tpu { retransmit_sender, ); - let tpu = obj.clone(); + let tvu = obj.clone(); let s_exit = exit.clone(); let t_replicator = spawn(move || loop { - let e = Self::replicate_state(&tpu, &window_receiver, &blob_recycler); + let e = Self::replicate_state(&tvu, &window_receiver, &blob_recycler); if e.is_err() && s_exit.load(Ordering::Relaxed) { break; } @@ -380,57 +153,45 @@ impl Tpu { //serve pipeline // make sure we are on the same interface - let mut local = serve.local_addr()?; + let mut local = requests_socket.local_addr()?; local.set_port(0); let respond_socket = UdpSocket::bind(local.clone())?; let packet_recycler = packet::PacketRecycler::default(); let blob_recycler = packet::BlobRecycler::default(); let (packet_sender, packet_receiver) = channel(); - let t_packet_receiver = - streamer::receiver(serve, exit.clone(), packet_recycler.clone(), packet_sender)?; - let (responder_sender, responder_receiver) = channel(); + let t_packet_receiver = streamer::receiver( + requests_socket, + exit.clone(), + packet_recycler.clone(), + packet_sender, + )?; + + let sig_verify_stage = SigVerifyStage::new(exit.clone(), packet_receiver); + + let request_processor = RequestProcessor::new(obj.event_processor.accountant.clone()); + let request_stage = RequestStage::new( + request_processor, + obj.event_processor.clone(), + exit.clone(), + sig_verify_stage.verified_receiver, + packet_recycler.clone(), + blob_recycler.clone(), + ); + + let t_write = Self::drain_service( + obj.event_processor.clone(), + request_stage.request_processor.clone(), + exit.clone(), + request_stage.entry_receiver, + ); + let t_responder = streamer::responder( respond_socket, exit.clone(), blob_recycler.clone(), - responder_receiver, + request_stage.blob_receiver, ); - let (verified_sender, verified_receiver) = channel(); - - let mut verify_threads = Vec::new(); - let shared_verified_sender = Arc::new(Mutex::new(verified_sender)); - let shared_packet_receiver = Arc::new(Mutex::new(packet_receiver)); - for _ in 0..4 { - let exit_ = exit.clone(); - let recv = shared_packet_receiver.clone(); - let sender = shared_verified_sender.clone(); - let thread = spawn(move || loop { - let e = Self::verifier(&recv, &sender); - if e.is_err() && exit_.load(Ordering::Relaxed) { - break; - } - }); - verify_threads.push(thread); - } - let t_sync = Self::sync_no_broadcast_service(obj.clone(), exit.clone()); - - let tpu = obj.clone(); - let s_exit = exit.clone(); - let t_server = spawn(move || loop { - let e = tpu.thin_client_service.process_request_packets( - &tpu.accounting_stage, - &verified_receiver, - &responder_sender, - &packet_recycler, - &blob_recycler, - ); - if e.is_err() { - if s_exit.load(Ordering::Relaxed) { - break; - } - } - }); let mut threads = vec![ //replicate threads @@ -443,10 +204,10 @@ impl Tpu { //serve threads t_packet_receiver, t_responder, - t_server, - t_sync, + request_stage.thread_hdl, + t_write, ]; - threads.extend(verify_threads.into_iter()); + threads.extend(sig_verify_stage.thread_hdls.into_iter()); Ok(threads) } } @@ -458,26 +219,26 @@ pub fn test_node() -> (ReplicatedData, UdpSocket, UdpSocket, UdpSocket, UdpSocke let events_socket = UdpSocket::bind("127.0.0.1:0").unwrap(); let gossip = UdpSocket::bind("127.0.0.1:0").unwrap(); let replicate = UdpSocket::bind("127.0.0.1:0").unwrap(); - let serve = UdpSocket::bind("127.0.0.1:0").unwrap(); + let requests_socket = UdpSocket::bind("127.0.0.1:0").unwrap(); let pubkey = KeyPair::new().pubkey(); let d = ReplicatedData::new( pubkey, gossip.local_addr().unwrap(), replicate.local_addr().unwrap(), - serve.local_addr().unwrap(), + requests_socket.local_addr().unwrap(), ); - (d, gossip, replicate, serve, events_socket) + (d, gossip, replicate, requests_socket, events_socket) } #[cfg(test)] mod tests { use accountant::Accountant; - use accounting_stage::AccountingStage; use bincode::serialize; use chrono::prelude::*; use crdt::Crdt; use entry; use event::Event; + use event_processor::EventProcessor; use hash::{hash, Hash}; use logger; use mint::Mint; @@ -489,8 +250,8 @@ mod tests { use std::sync::{Arc, RwLock}; use std::time::Duration; use streamer; - use tpu::{test_node, Tpu}; use transaction::Transaction; + use tvu::{test_node, Tvu}; /// Test that mesasge sent from leader to target1 and repliated to target2 #[test] @@ -544,11 +305,11 @@ mod tests { let starting_balance = 10_000; let alice = Mint::new(starting_balance); let accountant = Accountant::new(&alice); - let accounting_stage = AccountingStage::new(accountant, &alice.last_id(), Some(30)); - let tpu = Arc::new(Tpu::new(accounting_stage)); + let event_processor = EventProcessor::new(accountant, &alice.last_id(), Some(30)); + let tvu = Arc::new(Tvu::new(event_processor)); let replicate_addr = target1_data.replicate_addr; - let threads = Tpu::replicate( - &tpu, + let threads = Tvu::serve( + &tvu, target1_data, target1_gossip, target1_serve, @@ -570,7 +331,7 @@ mod tests { w.set_index(i).unwrap(); w.set_id(leader_id).unwrap(); - let accountant = &tpu.accounting_stage.accountant; + let accountant = &tvu.event_processor.accountant; let tr0 = Event::new_timestamp(&bob_keypair, Utc::now()); let entry0 = entry::create_entry(&cur_hash, i, vec![tr0]); @@ -612,7 +373,7 @@ mod tests { msgs.push(msg); } - let accountant = &tpu.accounting_stage.accountant; + let accountant = &tvu.event_processor.accountant; let alice_balance = accountant.get_balance(&alice.keypair().pubkey()).unwrap(); assert_eq!(alice_balance, alice_ref_balance);