From b2e32995396ee16ef58a2d5af34088866ec1b5d2 Mon Sep 17 00:00:00 2001 From: Greg Fitzgerald Date: Sat, 12 May 2018 16:55:33 -0600 Subject: [PATCH 1/4] Only pass accountant write_service --- src/entry_writer.rs | 13 +++++-------- src/rpu.rs | 7 ++++--- src/thin_client.rs | 6 +++--- src/tvu.rs | 7 ++++--- 4 files changed, 16 insertions(+), 17 deletions(-) diff --git a/src/entry_writer.rs b/src/entry_writer.rs index 24a13e1792db63..9f8b8dac3ebd63 100644 --- a/src/entry_writer.rs +++ b/src/entry_writer.rs @@ -1,7 +1,7 @@ //! The `entry_writer` module helps implement the TPU's write stage. +use accountant::Accountant; use entry::Entry; -use event_processor::EventProcessor; use ledger; use packet; use request_stage::RequestProcessor; @@ -16,25 +16,22 @@ use std::time::Duration; use streamer; pub struct EntryWriter<'a> { - event_processor: &'a EventProcessor, + accountant: &'a Accountant, request_processor: &'a RequestProcessor, } impl<'a> EntryWriter<'a> { /// Create a new Tpu that wraps the given Accountant. - pub fn new( - event_processor: &'a EventProcessor, - request_processor: &'a RequestProcessor, - ) -> Self { + pub fn new(accountant: &'a Accountant, request_processor: &'a RequestProcessor) -> Self { EntryWriter { - event_processor, + accountant, request_processor, } } fn write_entry(&self, writer: &Mutex, entry: &Entry) { trace!("write_entry entry"); - self.event_processor.accountant.register_entry_id(&entry.id); + self.accountant.register_entry_id(&entry.id); writeln!( writer.lock().expect("'writer' lock in fn fn write_entry"), "{}", diff --git a/src/rpu.rs b/src/rpu.rs index fe7a2e5eb4db43..99707b8a885afa 100644 --- a/src/rpu.rs +++ b/src/rpu.rs @@ -1,6 +1,7 @@ //! The `rpu` module implements the Request Processing Unit, a //! 5-stage transaction processing pipeline in software. +use accountant::Accountant; use crdt::{Crdt, ReplicatedData}; use entry::Entry; use entry_writer::EntryWriter; @@ -30,7 +31,7 @@ impl Rpu { } fn write_service( - event_processor: Arc, + accountant: Arc, request_processor: Arc, exit: Arc, broadcast: streamer::BlobSender, @@ -39,7 +40,7 @@ impl Rpu { entry_receiver: Receiver, ) -> JoinHandle<()> { spawn(move || loop { - let entry_writer = EntryWriter::new(&event_processor, &request_processor); + let entry_writer = EntryWriter::new(&accountant, &request_processor); let _ = entry_writer.write_and_send_entries( &broadcast, &blob_recycler, @@ -96,7 +97,7 @@ impl Rpu { let (broadcast_sender, broadcast_receiver) = channel(); let t_write = Self::write_service( - self.event_processor.clone(), + self.event_processor.accountant.clone(), request_stage.request_processor.clone(), exit.clone(), broadcast_sender, diff --git a/src/thin_client.rs b/src/thin_client.rs index 318377ae35630c..3517fcb819b04d 100644 --- a/src/thin_client.rs +++ b/src/thin_client.rs @@ -191,7 +191,7 @@ mod tests { let bob_pubkey = KeyPair::new().pubkey(); let exit = Arc::new(AtomicBool::new(false)); let event_processor = EventProcessor::new(accountant, &alice.last_id(), Some(30)); - let rpu = Arc::new(Rpu::new(event_processor)); + let rpu = Rpu::new(event_processor); let threads = rpu.serve(d, serve, gossip, exit.clone(), sink()).unwrap(); sleep(Duration::from_millis(300)); @@ -229,7 +229,7 @@ mod tests { let bob_pubkey = KeyPair::new().pubkey(); let exit = Arc::new(AtomicBool::new(false)); let event_processor = EventProcessor::new(accountant, &alice.last_id(), Some(30)); - let rpu = Arc::new(Rpu::new(event_processor)); + let rpu = Rpu::new(event_processor); let serve_addr = leader_serve.local_addr().unwrap(); let threads = rpu.serve( leader_data, @@ -299,7 +299,7 @@ mod tests { let leader_acc = { let accountant = Accountant::new(&alice); let event_processor = EventProcessor::new(accountant, &alice.last_id(), Some(30)); - Arc::new(Rpu::new(event_processor)) + Rpu::new(event_processor) }; let replicant_acc = { diff --git a/src/tvu.rs b/src/tvu.rs index 59bb599a3a3ea2..0716ae725c122a 100644 --- a/src/tvu.rs +++ b/src/tvu.rs @@ -1,6 +1,7 @@ //! The `tvu` module implements the Transaction Validation Unit, a //! 5-stage transaction validation pipeline in software. +use accountant::Accountant; use crdt::{Crdt, ReplicatedData}; use entry::Entry; use entry_writer::EntryWriter; @@ -31,13 +32,13 @@ impl Tvu { } fn drain_service( - event_processor: Arc, + accountant: Arc, request_processor: Arc, exit: Arc, entry_receiver: Receiver, ) -> JoinHandle<()> { spawn(move || { - let entry_writer = EntryWriter::new(&event_processor, &request_processor); + let entry_writer = EntryWriter::new(&accountant, &request_processor); loop { let _ = entry_writer.drain_entries(&entry_receiver); if exit.load(Ordering::Relaxed) { @@ -180,7 +181,7 @@ impl Tvu { ); let t_write = Self::drain_service( - obj.event_processor.clone(), + obj.event_processor.accountant.clone(), request_stage.request_processor.clone(), exit.clone(), request_stage.entry_receiver, From e5d46d998b6f11d0d400e8b0d024835364959685 Mon Sep 17 00:00:00 2001 From: Greg Fitzgerald Date: Sat, 12 May 2018 17:41:27 -0600 Subject: [PATCH 2/4] Move thin client messages into their own module --- src/ecdsa.rs | 2 +- src/lib.rs | 1 + src/request_stage.rs | 96 +------------------------------------------- src/thin_client.rs | 2 +- 4 files changed, 4 insertions(+), 97 deletions(-) diff --git a/src/ecdsa.rs b/src/ecdsa.rs index 14237e6cb7cc30..b2477b67129a08 100644 --- a/src/ecdsa.rs +++ b/src/ecdsa.rs @@ -137,7 +137,7 @@ mod tests { use bincode::serialize; use ecdsa; use packet::{Packet, Packets, SharedPackets}; - use request_stage::Request; + use request::Request; use std::sync::RwLock; use transaction::Transaction; use transaction::test_tx; diff --git a/src/lib.rs b/src/lib.rs index 522cb33fc85d57..66c90180fe0f8e 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -16,6 +16,7 @@ pub mod mint; pub mod packet; pub mod plan; pub mod recorder; +pub mod request; pub mod request_stage; pub mod result; pub mod rpu; diff --git a/src/request_stage.rs b/src/request_stage.rs index 6b9a693cd9d4c6..5f753411e0cbb3 100644 --- a/src/request_stage.rs +++ b/src/request_stage.rs @@ -5,12 +5,11 @@ use bincode::{deserialize, serialize}; use entry::Entry; use event::Event; use event_processor::EventProcessor; -use hash::Hash; use packet; use packet::SharedPackets; use rayon::prelude::*; +use request::{EntryInfo, Request, Response, Subscription}; use result::Result; -use signature::PublicKey; use std::collections::VecDeque; use std::net::{SocketAddr, UdpSocket}; use std::sync::atomic::{AtomicBool, Ordering}; @@ -21,43 +20,6 @@ use std::time::Duration; use std::time::Instant; use streamer; use timing; -use transaction::Transaction; - -#[cfg_attr(feature = "cargo-clippy", allow(large_enum_variant))] -#[derive(Serialize, Deserialize, Debug, Clone)] -pub enum Request { - Transaction(Transaction), - GetBalance { key: PublicKey }, - Subscribe { subscriptions: Vec }, -} - -#[derive(Serialize, Deserialize, Debug, Clone)] -pub enum Subscription { - EntryInfo, -} - -#[derive(Serialize, Deserialize, Debug, Clone)] -pub struct EntryInfo { - pub id: Hash, - pub num_hashes: u64, - pub num_events: u64, -} - -impl Request { - /// Verify the request is valid. - pub fn verify(&self) -> bool { - match *self { - Request::Transaction(ref tr) => tr.verify_plan(), - _ => true, - } - } -} - -#[derive(Serialize, Deserialize, Debug)] -pub enum Response { - Balance { key: PublicKey, val: Option }, - EntryInfo(EntryInfo), -} pub struct RequestProcessor { accountant: Arc, @@ -314,59 +276,3 @@ impl RequestStage { } } } - -#[cfg(test)] -pub fn to_request_packets(r: &packet::PacketRecycler, reqs: Vec) -> Vec { - let mut out = vec![]; - for rrs in reqs.chunks(packet::NUM_PACKETS) { - let p = r.allocate(); - p.write() - .unwrap() - .packets - .resize(rrs.len(), Default::default()); - for (i, o) in rrs.iter().zip(p.write().unwrap().packets.iter_mut()) { - let v = serialize(&i).expect("serialize request"); - let len = v.len(); - o.data[..len].copy_from_slice(&v); - o.meta.size = len; - } - out.push(p); - } - return out; -} - -#[cfg(test)] -mod tests { - use bincode::serialize; - use ecdsa; - use packet::{PacketRecycler, NUM_PACKETS}; - use request_stage::{to_request_packets, Request}; - use transaction::{memfind, test_tx}; - - #[test] - fn test_layout() { - let tr = test_tx(); - let tx = serialize(&tr).unwrap(); - let packet = serialize(&Request::Transaction(tr)).unwrap(); - assert_matches!(memfind(&packet, &tx), Some(ecdsa::TX_OFFSET)); - assert_matches!(memfind(&packet, &[0, 1, 2, 3, 4, 5, 6, 7, 8, 9]), None); - } - - #[test] - fn test_to_packets() { - let tr = Request::Transaction(test_tx()); - let re = PacketRecycler::default(); - let rv = to_request_packets(&re, vec![tr.clone(); 1]); - assert_eq!(rv.len(), 1); - assert_eq!(rv[0].read().unwrap().packets.len(), 1); - - let rv = to_request_packets(&re, vec![tr.clone(); NUM_PACKETS]); - assert_eq!(rv.len(), 1); - assert_eq!(rv[0].read().unwrap().packets.len(), NUM_PACKETS); - - let rv = to_request_packets(&re, vec![tr.clone(); NUM_PACKETS + 1]); - assert_eq!(rv.len(), 2); - assert_eq!(rv[0].read().unwrap().packets.len(), NUM_PACKETS); - assert_eq!(rv[1].read().unwrap().packets.len(), 1); - } -} diff --git a/src/thin_client.rs b/src/thin_client.rs index 3517fcb819b04d..25a5bdf2e219d2 100644 --- a/src/thin_client.rs +++ b/src/thin_client.rs @@ -6,7 +6,7 @@ use bincode::{deserialize, serialize}; use futures::future::{ok, FutureResult}; use hash::Hash; -use request_stage::{Request, Response, Subscription}; +use request::{Request, Response, Subscription}; use signature::{KeyPair, PublicKey, Signature}; use std::collections::HashMap; use std::io; From aec05ef6027501a40a52ef8462bb5f41fdb445b0 Mon Sep 17 00:00:00 2001 From: Greg Fitzgerald Date: Sat, 12 May 2018 17:50:55 -0600 Subject: [PATCH 3/4] Move RequestProcessor into its own module --- src/entry_writer.rs | 2 +- src/lib.rs | 1 + src/request_stage.rs | 228 +------------------------------------------ src/rpu.rs | 3 +- src/tvu.rs | 3 +- 5 files changed, 9 insertions(+), 228 deletions(-) diff --git a/src/entry_writer.rs b/src/entry_writer.rs index 9f8b8dac3ebd63..f50a7d313bf96e 100644 --- a/src/entry_writer.rs +++ b/src/entry_writer.rs @@ -4,7 +4,7 @@ use accountant::Accountant; use entry::Entry; use ledger; use packet; -use request_stage::RequestProcessor; +use request_processor::RequestProcessor; use result::Result; use serde_json; use std::collections::VecDeque; diff --git a/src/lib.rs b/src/lib.rs index 66c90180fe0f8e..6d50c18e37c452 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -17,6 +17,7 @@ pub mod packet; pub mod plan; pub mod recorder; pub mod request; +pub mod request_processor; pub mod request_stage; pub mod result; pub mod rpu; diff --git a/src/request_stage.rs b/src/request_stage.rs index 5f753411e0cbb3..26ac76d779aac1 100644 --- a/src/request_stage.rs +++ b/src/request_stage.rs @@ -1,237 +1,15 @@ //! The `request_stage` processes thin client Request messages. -use accountant::Accountant; -use bincode::{deserialize, serialize}; use entry::Entry; -use event::Event; use event_processor::EventProcessor; use packet; use packet::SharedPackets; -use rayon::prelude::*; -use request::{EntryInfo, Request, Response, Subscription}; -use result::Result; -use std::collections::VecDeque; -use std::net::{SocketAddr, UdpSocket}; +use request_processor::RequestProcessor; +use std::sync::Arc; use std::sync::atomic::{AtomicBool, Ordering}; -use std::sync::mpsc::{channel, Receiver, Sender}; -use std::sync::{Arc, Mutex}; +use std::sync::mpsc::{channel, Receiver}; use std::thread::{spawn, JoinHandle}; -use std::time::Duration; -use std::time::Instant; use streamer; -use timing; - -pub struct RequestProcessor { - accountant: Arc, - entry_info_subscribers: Mutex>, -} - -impl RequestProcessor { - /// Create a new Tpu that wraps the given Accountant. - pub fn new(accountant: Arc) -> Self { - RequestProcessor { - accountant, - entry_info_subscribers: Mutex::new(vec![]), - } - } - - /// Process Request items sent by clients. - fn process_request( - &self, - msg: Request, - rsp_addr: SocketAddr, - ) -> Option<(Response, SocketAddr)> { - match msg { - Request::GetBalance { key } => { - let val = self.accountant.get_balance(&key); - let rsp = (Response::Balance { key, val }, rsp_addr); - info!("Response::Balance {:?}", rsp); - Some(rsp) - } - Request::Transaction(_) => unreachable!(), - Request::Subscribe { subscriptions } => { - for subscription in subscriptions { - match subscription { - Subscription::EntryInfo => { - self.entry_info_subscribers.lock().unwrap().push(rsp_addr) - } - } - } - None - } - } - } - - pub fn process_requests( - &self, - reqs: Vec<(Request, SocketAddr)>, - ) -> Vec<(Response, SocketAddr)> { - reqs.into_iter() - .filter_map(|(req, rsp_addr)| self.process_request(req, rsp_addr)) - .collect() - } - - pub fn notify_entry_info_subscribers(&self, entry: &Entry) { - // TODO: No need to bind(). - let socket = UdpSocket::bind("0.0.0.0:0").expect("bind"); - - // copy subscribers to avoid taking lock while doing io - let addrs = self.entry_info_subscribers.lock().unwrap().clone(); - trace!("Sending to {} addrs", addrs.len()); - for addr in addrs { - let entry_info = EntryInfo { - id: entry.id, - num_hashes: entry.num_hashes, - num_events: entry.events.len() as u64, - }; - let data = serialize(&Response::EntryInfo(entry_info)).expect("serialize EntryInfo"); - trace!("sending {} to {}", data.len(), addr); - //TODO dont do IO here, this needs to be on a separate channel - let res = socket.send_to(&data, addr); - if res.is_err() { - eprintln!("couldn't send response: {:?}", res); - } - } - } - - fn deserialize_requests(p: &packet::Packets) -> Vec> { - p.packets - .par_iter() - .map(|x| { - deserialize(&x.data[0..x.meta.size]) - .map(|req| (req, x.meta.addr())) - .ok() - }) - .collect() - } - - // Copy-paste of deserialize_requests() because I can't figure out how to - // route the lifetimes in a generic version. - pub fn deserialize_events(p: &packet::Packets) -> Vec> { - p.packets - .par_iter() - .map(|x| { - deserialize(&x.data[0..x.meta.size]) - .map(|req| (req, x.meta.addr())) - .ok() - }) - .collect() - } - - /// Split Request list into verified transactions and the rest - fn partition_requests( - req_vers: Vec<(Request, SocketAddr, u8)>, - ) -> (Vec, Vec<(Request, SocketAddr)>) { - let mut events = vec![]; - let mut reqs = vec![]; - for (msg, rsp_addr, verify) in req_vers { - match msg { - Request::Transaction(tr) => { - if verify != 0 { - events.push(Event::Transaction(tr)); - } - } - _ => reqs.push((msg, rsp_addr)), - } - } - (events, reqs) - } - - fn serialize_response( - resp: Response, - rsp_addr: SocketAddr, - blob_recycler: &packet::BlobRecycler, - ) -> Result { - let blob = blob_recycler.allocate(); - { - let mut b = blob.write().unwrap(); - let v = serialize(&resp)?; - let len = v.len(); - b.data[..len].copy_from_slice(&v); - b.meta.size = len; - b.meta.set_addr(&rsp_addr); - } - Ok(blob) - } - - fn serialize_responses( - rsps: Vec<(Response, SocketAddr)>, - blob_recycler: &packet::BlobRecycler, - ) -> Result> { - let mut blobs = VecDeque::new(); - for (resp, rsp_addr) in rsps { - blobs.push_back(Self::serialize_response(resp, rsp_addr, blob_recycler)?); - } - Ok(blobs) - } - - pub fn process_request_packets( - &self, - event_processor: &EventProcessor, - verified_receiver: &Receiver)>>, - entry_sender: &Sender, - blob_sender: &streamer::BlobSender, - packet_recycler: &packet::PacketRecycler, - blob_recycler: &packet::BlobRecycler, - ) -> Result<()> { - let timer = Duration::new(1, 0); - let recv_start = Instant::now(); - let mms = verified_receiver.recv_timeout(timer)?; - let mut reqs_len = 0; - let mms_len = mms.len(); - info!( - "@{:?} process start stalled for: {:?}ms batches: {}", - timing::timestamp(), - timing::duration_as_ms(&recv_start.elapsed()), - mms.len(), - ); - let proc_start = Instant::now(); - for (msgs, vers) in mms { - let reqs = Self::deserialize_requests(&msgs.read().unwrap()); - reqs_len += reqs.len(); - let req_vers = reqs.into_iter() - .zip(vers) - .filter_map(|(req, ver)| req.map(|(msg, addr)| (msg, addr, ver))) - .filter(|x| { - let v = x.0.verify(); - v - }) - .collect(); - - debug!("partitioning"); - let (events, reqs) = Self::partition_requests(req_vers); - debug!("events: {} reqs: {}", events.len(), reqs.len()); - - debug!("process_events"); - let entry = event_processor.process_events(events)?; - entry_sender.send(entry)?; - debug!("done process_events"); - - debug!("process_requests"); - let rsps = self.process_requests(reqs); - debug!("done process_requests"); - - let blobs = Self::serialize_responses(rsps, blob_recycler)?; - if !blobs.is_empty() { - info!("process: sending blobs: {}", blobs.len()); - //don't wake up the other side if there is nothing - blob_sender.send(blobs)?; - } - packet_recycler.recycle(msgs); - } - let total_time_s = timing::duration_as_s(&proc_start.elapsed()); - let total_time_ms = timing::duration_as_ms(&proc_start.elapsed()); - info!( - "@{:?} done process batches: {} time: {:?}ms reqs: {} reqs/s: {}", - timing::timestamp(), - mms_len, - total_time_ms, - reqs_len, - (reqs_len as f32) / (total_time_s) - ); - Ok(()) - } -} pub struct RequestStage { pub thread_hdl: JoinHandle<()>, diff --git a/src/rpu.rs b/src/rpu.rs index 99707b8a885afa..2cea96fab314d8 100644 --- a/src/rpu.rs +++ b/src/rpu.rs @@ -7,7 +7,8 @@ use entry::Entry; use entry_writer::EntryWriter; use event_processor::EventProcessor; use packet; -use request_stage::{RequestProcessor, RequestStage}; +use request_processor::RequestProcessor; +use request_stage::RequestStage; use result::Result; use sig_verify_stage::SigVerifyStage; use std::io::Write; diff --git a/src/tvu.rs b/src/tvu.rs index 0716ae725c122a..d486a5888af54a 100644 --- a/src/tvu.rs +++ b/src/tvu.rs @@ -8,7 +8,8 @@ use entry_writer::EntryWriter; use event_processor::EventProcessor; use ledger; use packet; -use request_stage::{RequestProcessor, RequestStage}; +use request_processor::RequestProcessor; +use request_stage::RequestStage; use result::Result; use sig_verify_stage::SigVerifyStage; use std::net::UdpSocket; From d239d4a49597b774ab00f24c4ae566eb0bdaf993 Mon Sep 17 00:00:00 2001 From: Greg Fitzgerald Date: Sat, 12 May 2018 17:57:28 -0600 Subject: [PATCH 4/4] Add missing files --- .gitignore | 1 + src/request.rs | 99 +++++++++++++++++ src/request_processor.rs | 232 +++++++++++++++++++++++++++++++++++++++ 3 files changed, 332 insertions(+) create mode 100644 src/request.rs create mode 100644 src/request_processor.rs diff --git a/.gitignore b/.gitignore index f8d7348a3e4f8c..3d4a8febfbe381 100644 --- a/.gitignore +++ b/.gitignore @@ -1,3 +1,4 @@ Cargo.lock /target/ **/*.rs.bk +.cargo diff --git a/src/request.rs b/src/request.rs new file mode 100644 index 00000000000000..7575010ad8328f --- /dev/null +++ b/src/request.rs @@ -0,0 +1,99 @@ +//! The `request` module defines the messages for the thin client. + +use bincode::serialize; +use hash::Hash; +use packet; +use packet::SharedPackets; +use signature::PublicKey; +use transaction::Transaction; + +#[cfg_attr(feature = "cargo-clippy", allow(large_enum_variant))] +#[derive(Serialize, Deserialize, Debug, Clone)] +pub enum Request { + Transaction(Transaction), + GetBalance { key: PublicKey }, + Subscribe { subscriptions: Vec }, +} + +#[derive(Serialize, Deserialize, Debug, Clone)] +pub enum Subscription { + EntryInfo, +} + +#[derive(Serialize, Deserialize, Debug, Clone)] +pub struct EntryInfo { + pub id: Hash, + pub num_hashes: u64, + pub num_events: u64, +} + +impl Request { + /// Verify the request is valid. + pub fn verify(&self) -> bool { + match *self { + Request::Transaction(ref tr) => tr.verify_plan(), + _ => true, + } + } +} + +#[derive(Serialize, Deserialize, Debug)] +pub enum Response { + Balance { key: PublicKey, val: Option }, + EntryInfo(EntryInfo), +} + +pub fn to_request_packets(r: &packet::PacketRecycler, reqs: Vec) -> Vec { + let mut out = vec![]; + for rrs in reqs.chunks(packet::NUM_PACKETS) { + let p = r.allocate(); + p.write() + .unwrap() + .packets + .resize(rrs.len(), Default::default()); + for (i, o) in rrs.iter().zip(p.write().unwrap().packets.iter_mut()) { + let v = serialize(&i).expect("serialize request"); + let len = v.len(); + o.data[..len].copy_from_slice(&v); + o.meta.size = len; + } + out.push(p); + } + return out; +} + +#[cfg(test)] +mod tests { + use bincode::serialize; + use ecdsa; + use packet::{PacketRecycler, NUM_PACKETS}; + use request::{to_request_packets, Request}; + use transaction::{memfind, test_tx}; + + #[test] + fn test_layout() { + let tr = test_tx(); + let tx = serialize(&tr).unwrap(); + let packet = serialize(&Request::Transaction(tr)).unwrap(); + assert_matches!(memfind(&packet, &tx), Some(ecdsa::TX_OFFSET)); + assert_matches!(memfind(&packet, &[0, 1, 2, 3, 4, 5, 6, 7, 8, 9]), None); + } + + #[test] + fn test_to_packets() { + let tr = Request::Transaction(test_tx()); + let re = PacketRecycler::default(); + let rv = to_request_packets(&re, vec![tr.clone(); 1]); + assert_eq!(rv.len(), 1); + assert_eq!(rv[0].read().unwrap().packets.len(), 1); + + let rv = to_request_packets(&re, vec![tr.clone(); NUM_PACKETS]); + assert_eq!(rv.len(), 1); + assert_eq!(rv[0].read().unwrap().packets.len(), NUM_PACKETS); + + let rv = to_request_packets(&re, vec![tr.clone(); NUM_PACKETS + 1]); + assert_eq!(rv.len(), 2); + assert_eq!(rv[0].read().unwrap().packets.len(), NUM_PACKETS); + assert_eq!(rv[1].read().unwrap().packets.len(), 1); + } +} diff --git a/src/request_processor.rs b/src/request_processor.rs new file mode 100644 index 00000000000000..6f2f3bd7c94c2c --- /dev/null +++ b/src/request_processor.rs @@ -0,0 +1,232 @@ +//! The `request_stage` processes thin client Request messages. + +use accountant::Accountant; +use bincode::{deserialize, serialize}; +use entry::Entry; +use event::Event; +use event_processor::EventProcessor; +use packet; +use packet::SharedPackets; +use rayon::prelude::*; +use request::{EntryInfo, Request, Response, Subscription}; +use result::Result; +use std::collections::VecDeque; +use std::net::{SocketAddr, UdpSocket}; +use std::sync::mpsc::{Receiver, Sender}; +use std::sync::{Arc, Mutex}; +use std::time::Duration; +use std::time::Instant; +use streamer; +use timing; + +pub struct RequestProcessor { + accountant: Arc, + entry_info_subscribers: Mutex>, +} + +impl RequestProcessor { + /// Create a new Tpu that wraps the given Accountant. + pub fn new(accountant: Arc) -> Self { + RequestProcessor { + accountant, + entry_info_subscribers: Mutex::new(vec![]), + } + } + + /// Process Request items sent by clients. + fn process_request( + &self, + msg: Request, + rsp_addr: SocketAddr, + ) -> Option<(Response, SocketAddr)> { + match msg { + Request::GetBalance { key } => { + let val = self.accountant.get_balance(&key); + let rsp = (Response::Balance { key, val }, rsp_addr); + info!("Response::Balance {:?}", rsp); + Some(rsp) + } + Request::Transaction(_) => unreachable!(), + Request::Subscribe { subscriptions } => { + for subscription in subscriptions { + match subscription { + Subscription::EntryInfo => { + self.entry_info_subscribers.lock().unwrap().push(rsp_addr) + } + } + } + None + } + } + } + + pub fn process_requests( + &self, + reqs: Vec<(Request, SocketAddr)>, + ) -> Vec<(Response, SocketAddr)> { + reqs.into_iter() + .filter_map(|(req, rsp_addr)| self.process_request(req, rsp_addr)) + .collect() + } + + pub fn notify_entry_info_subscribers(&self, entry: &Entry) { + // TODO: No need to bind(). + let socket = UdpSocket::bind("0.0.0.0:0").expect("bind"); + + // copy subscribers to avoid taking lock while doing io + let addrs = self.entry_info_subscribers.lock().unwrap().clone(); + trace!("Sending to {} addrs", addrs.len()); + for addr in addrs { + let entry_info = EntryInfo { + id: entry.id, + num_hashes: entry.num_hashes, + num_events: entry.events.len() as u64, + }; + let data = serialize(&Response::EntryInfo(entry_info)).expect("serialize EntryInfo"); + trace!("sending {} to {}", data.len(), addr); + //TODO dont do IO here, this needs to be on a separate channel + let res = socket.send_to(&data, addr); + if res.is_err() { + eprintln!("couldn't send response: {:?}", res); + } + } + } + + fn deserialize_requests(p: &packet::Packets) -> Vec> { + p.packets + .par_iter() + .map(|x| { + deserialize(&x.data[0..x.meta.size]) + .map(|req| (req, x.meta.addr())) + .ok() + }) + .collect() + } + + // Copy-paste of deserialize_requests() because I can't figure out how to + // route the lifetimes in a generic version. + pub fn deserialize_events(p: &packet::Packets) -> Vec> { + p.packets + .par_iter() + .map(|x| { + deserialize(&x.data[0..x.meta.size]) + .map(|req| (req, x.meta.addr())) + .ok() + }) + .collect() + } + + /// Split Request list into verified transactions and the rest + fn partition_requests( + req_vers: Vec<(Request, SocketAddr, u8)>, + ) -> (Vec, Vec<(Request, SocketAddr)>) { + let mut events = vec![]; + let mut reqs = vec![]; + for (msg, rsp_addr, verify) in req_vers { + match msg { + Request::Transaction(tr) => { + if verify != 0 { + events.push(Event::Transaction(tr)); + } + } + _ => reqs.push((msg, rsp_addr)), + } + } + (events, reqs) + } + + fn serialize_response( + resp: Response, + rsp_addr: SocketAddr, + blob_recycler: &packet::BlobRecycler, + ) -> Result { + let blob = blob_recycler.allocate(); + { + let mut b = blob.write().unwrap(); + let v = serialize(&resp)?; + let len = v.len(); + b.data[..len].copy_from_slice(&v); + b.meta.size = len; + b.meta.set_addr(&rsp_addr); + } + Ok(blob) + } + + fn serialize_responses( + rsps: Vec<(Response, SocketAddr)>, + blob_recycler: &packet::BlobRecycler, + ) -> Result> { + let mut blobs = VecDeque::new(); + for (resp, rsp_addr) in rsps { + blobs.push_back(Self::serialize_response(resp, rsp_addr, blob_recycler)?); + } + Ok(blobs) + } + + pub fn process_request_packets( + &self, + event_processor: &EventProcessor, + verified_receiver: &Receiver)>>, + entry_sender: &Sender, + blob_sender: &streamer::BlobSender, + packet_recycler: &packet::PacketRecycler, + blob_recycler: &packet::BlobRecycler, + ) -> Result<()> { + let timer = Duration::new(1, 0); + let recv_start = Instant::now(); + let mms = verified_receiver.recv_timeout(timer)?; + let mut reqs_len = 0; + let mms_len = mms.len(); + info!( + "@{:?} process start stalled for: {:?}ms batches: {}", + timing::timestamp(), + timing::duration_as_ms(&recv_start.elapsed()), + mms.len(), + ); + let proc_start = Instant::now(); + for (msgs, vers) in mms { + let reqs = Self::deserialize_requests(&msgs.read().unwrap()); + reqs_len += reqs.len(); + let req_vers = reqs.into_iter() + .zip(vers) + .filter_map(|(req, ver)| req.map(|(msg, addr)| (msg, addr, ver))) + .filter(|x| { + let v = x.0.verify(); + v + }) + .collect(); + + debug!("partitioning"); + let (events, reqs) = Self::partition_requests(req_vers); + debug!("events: {} reqs: {}", events.len(), reqs.len()); + + debug!("process_events"); + let entry = event_processor.process_events(events)?; + entry_sender.send(entry)?; + debug!("done process_events"); + + debug!("process_requests"); + let rsps = self.process_requests(reqs); + debug!("done process_requests"); + + let blobs = Self::serialize_responses(rsps, blob_recycler)?; + if !blobs.is_empty() { + info!("process: sending blobs: {}", blobs.len()); + //don't wake up the other side if there is nothing + blob_sender.send(blobs)?; + } + packet_recycler.recycle(msgs); + } + let total_time_s = timing::duration_as_s(&proc_start.elapsed()); + let total_time_ms = timing::duration_as_ms(&proc_start.elapsed()); + info!( + "@{:?} done process batches: {} time: {:?}ms reqs: {} reqs/s: {}", + timing::timestamp(), + mms_len, + total_time_ms, + reqs_len, + (reqs_len as f32) / (total_time_s) + ); + Ok(()) + } +}