diff --git a/src/tpu.rs b/src/tpu.rs index 37b6b25478a537..f97916ea3ef6c1 100644 --- a/src/tpu.rs +++ b/src/tpu.rs @@ -32,13 +32,98 @@ use streamer; use timing; use transaction::Transaction; -pub struct Tpu { +struct AccountingStage { acc: Mutex, historian_input: Mutex>, historian: Historian, entry_info_subscribers: Mutex>, } +impl AccountingStage { + /// Create a new Tpu that wraps the given Accountant. + pub fn new(acc: Accountant, historian_input: SyncSender, historian: Historian) -> Self { + AccountingStage { + acc: Mutex::new(acc), + entry_info_subscribers: Mutex::new(vec![]), + historian_input: Mutex::new(historian_input), + historian, + } + } + + /// Process the transactions in parallel and then log the successful ones. + pub fn process_events(&self, events: Vec) -> Result<()> { + let results = self.acc.lock().unwrap().process_verified_events(events); + let events = results.into_iter().filter_map(|x| x.ok()).collect(); + let sender = self.historian_input.lock().unwrap(); + sender.send(Signal::Events(events))?; + debug!("after historian_input"); + Ok(()) + } + + /// Process Request items sent by clients. + fn process_request( + &self, + msg: Request, + rsp_addr: SocketAddr, + ) -> Option<(Response, SocketAddr)> { + match msg { + Request::GetBalance { key } => { + let val = self.acc.lock().unwrap().get_balance(&key); + let rsp = (Response::Balance { key, val }, rsp_addr); + info!("Response::Balance {:?}", rsp); + Some(rsp) + } + Request::Transaction(_) => unreachable!(), + Request::Subscribe { subscriptions } => { + for subscription in subscriptions { + match subscription { + Subscription::EntryInfo => { + self.entry_info_subscribers.lock().unwrap().push(rsp_addr) + } + } + } + None + } + } + } + + pub fn process_requests( + &self, + reqs: Vec<(Request, SocketAddr)>, + ) -> Vec<(Response, SocketAddr)> { + reqs.into_iter() + .filter_map(|(req, rsp_addr)| self.process_request(req, rsp_addr)) + .collect() + } + + pub fn notify_entry_info_subscribers(&self, entry: &Entry) { + // TODO: No need to bind(). + let socket = UdpSocket::bind("0.0.0.0:0").expect("bind"); + + // copy subscribers to avoid taking lock while doing io + let addrs = self.entry_info_subscribers.lock().unwrap().clone(); + trace!("Sending to {} addrs", addrs.len()); + for addr in addrs { + let entry_info = EntryInfo { + id: entry.id, + num_hashes: entry.num_hashes, + num_events: entry.events.len() as u64, + }; + let data = serialize(&Response::EntryInfo(entry_info)).expect("serialize EntryInfo"); + trace!("sending {} to {}", data.len(), addr); + //TODO dont do IO here, this needs to be on a separate channel + let res = socket.send_to(&data, addr); + if res.is_err() { + eprintln!("couldn't send response: {:?}", res); + } + } + } +} + +pub struct Tpu { + accounting: AccountingStage, +} + #[cfg_attr(feature = "cargo-clippy", allow(large_enum_variant))] #[derive(Serialize, Deserialize, Debug, Clone)] pub enum Request { @@ -80,59 +165,37 @@ pub enum Response { impl Tpu { /// Create a new Tpu that wraps the given Accountant. pub fn new(acc: Accountant, historian_input: SyncSender, historian: Historian) -> Self { - Tpu { - acc: Mutex::new(acc), - entry_info_subscribers: Mutex::new(vec![]), - historian_input: Mutex::new(historian_input), - historian, - } - } - - fn notify_entry_info_subscribers(obj: &SharedTpu, entry: &Entry) { - // TODO: No need to bind(). - let socket = UdpSocket::bind("0.0.0.0:0").expect("bind"); - - // copy subscribers to avoid taking lock while doing io - let addrs = obj.entry_info_subscribers.lock().unwrap().clone(); - trace!("Sending to {} addrs", addrs.len()); - for addr in addrs { - let entry_info = EntryInfo { - id: entry.id, - num_hashes: entry.num_hashes, - num_events: entry.events.len() as u64, - }; - let data = serialize(&Response::EntryInfo(entry_info)).expect("serialize EntryInfo"); - trace!("sending {} to {}", data.len(), addr); - //TODO dont do IO here, this needs to be on a separate channel - let res = socket.send_to(&data, addr); - if res.is_err() { - eprintln!("couldn't send response: {:?}", res); - } - } + let accounting = AccountingStage::new(acc, historian_input, historian); + Tpu { accounting } } fn update_entry(obj: &SharedTpu, writer: &Arc>, entry: &Entry) { trace!("update_entry entry"); - obj.acc.lock().unwrap().register_entry_id(&entry.id); + obj.accounting + .acc + .lock() + .unwrap() + .register_entry_id(&entry.id); writeln!( writer.lock().unwrap(), "{}", serde_json::to_string(&entry).unwrap() ).unwrap(); - Self::notify_entry_info_subscribers(obj, &entry); + obj.accounting.notify_entry_info_subscribers(&entry); } fn receive_all(obj: &SharedTpu, writer: &Arc>) -> Result> { //TODO implement a serialize for channel that does this without allocations let mut l = vec![]; - let entry = obj.historian + let entry = obj.accounting + .historian .output .lock() .unwrap() .recv_timeout(Duration::new(1, 0))?; Self::update_entry(obj, writer, &entry); l.push(entry); - while let Ok(entry) = obj.historian.receive() { + while let Ok(entry) = obj.accounting.historian.receive() { Self::update_entry(obj, writer, &entry); l.push(entry); } @@ -247,33 +310,6 @@ impl Tpu { }) } - /// Process Request items sent by clients. - pub fn process_request( - &self, - msg: Request, - rsp_addr: SocketAddr, - ) -> Option<(Response, SocketAddr)> { - match msg { - Request::GetBalance { key } => { - let val = self.acc.lock().unwrap().get_balance(&key); - let rsp = (Response::Balance { key, val }, rsp_addr); - info!("Response::Balance {:?}", rsp); - Some(rsp) - } - Request::Transaction(_) => unreachable!(), - Request::Subscribe { subscriptions } => { - for subscription in subscriptions { - match subscription { - Subscription::EntryInfo => { - self.entry_info_subscribers.lock().unwrap().push(rsp_addr) - } - } - } - None - } - } - } - fn recv_batch(recvr: &streamer::PacketReceiver) -> Result<(Vec, usize)> { let timer = Duration::new(1, 0); let msgs = recvr.recv_timeout(timer)?; @@ -365,22 +401,6 @@ impl Tpu { (events, reqs) } - /// Process the transactions in parallel and then log the successful ones. - fn process_events(&self, events: Vec) -> Result<()> { - let results = self.acc.lock().unwrap().process_verified_events(events); - let events = results.into_iter().filter_map(|x| x.ok()).collect(); - let sender = self.historian_input.lock().unwrap(); - sender.send(Signal::Events(events))?; - debug!("after historian_input"); - Ok(()) - } - - fn process_requests(&self, reqs: Vec<(Request, SocketAddr)>) -> Vec<(Response, SocketAddr)> { - reqs.into_iter() - .filter_map(|(req, rsp_addr)| self.process_request(req, rsp_addr)) - .collect() - } - fn serialize_response( resp: Response, rsp_addr: SocketAddr, @@ -445,11 +465,11 @@ impl Tpu { debug!("events: {} reqs: {}", events.len(), reqs.len()); debug!("process_events"); - obj.process_events(events)?; + obj.accounting.process_events(events)?; debug!("done process_events"); debug!("process_requests"); - let rsps = obj.process_requests(reqs); + let rsps = obj.accounting.process_requests(reqs); debug!("done process_requests"); let blobs = Self::serialize_responses(rsps, blob_recycler)?; @@ -485,7 +505,7 @@ impl Tpu { for msgs in &blobs { let blob = msgs.read().unwrap(); let entries: Vec = deserialize(&blob.data()[..blob.meta.size]).unwrap(); - let acc = obj.acc.lock().unwrap(); + let acc = obj.accounting.acc.lock().unwrap(); for entry in entries { acc.register_entry_id(&entry.id); for result in acc.process_verified_events(entry.events) { @@ -807,7 +827,7 @@ mod tests { use std::time::Duration; use streamer; use thin_client::ThinClient; - use tpu::Tpu; + use tpu::{AccountingStage, Tpu}; use transaction::Transaction; #[test] @@ -845,22 +865,22 @@ mod tests { let acc = Accountant::new(&mint); let (input, event_receiver) = sync_channel(10); let historian = Historian::new(event_receiver, &mint.last_id(), None); - let tpu = Tpu::new(acc, input, historian); + let stage = AccountingStage::new(acc, input, historian); // Process a batch that includes a transaction that receives two tokens. let alice = KeyPair::new(); let tr = Transaction::new(&mint.keypair(), alice.pubkey(), 2, mint.last_id()); let events = vec![Event::Transaction(tr)]; - assert!(tpu.process_events(events).is_ok()); + assert!(stage.process_events(events).is_ok()); // Process a second batch that spends one of those tokens. let tr = Transaction::new(&alice, mint.pubkey(), 1, mint.last_id()); let events = vec![Event::Transaction(tr)]; - assert!(tpu.process_events(events).is_ok()); + assert!(stage.process_events(events).is_ok()); // Collect the ledger and feed it to a new accountant. - drop(tpu.historian_input); - let entries: Vec = tpu.historian.output.lock().unwrap().iter().collect(); + drop(stage.historian_input); + let entries: Vec = stage.historian.output.lock().unwrap().iter().collect(); // Assert the user holds one token, not two. If the server only output one // entry, then the second transaction will be rejected, because it drives @@ -993,10 +1013,10 @@ mod tests { let acc = Accountant::new(&alice); let (input, event_receiver) = sync_channel(10); let historian = Historian::new(event_receiver, &alice.last_id(), Some(30)); - let acc = Arc::new(Tpu::new(acc, input, historian)); + let tpu = Arc::new(Tpu::new(acc, input, historian)); let replicate_addr = target1_data.replicate_addr; let threads = Tpu::replicate( - &acc, + &tpu, target1_data, target1_gossip, target1_serve, @@ -1018,9 +1038,11 @@ mod tests { w.set_index(i).unwrap(); w.set_id(leader_id).unwrap(); + let acc = tpu.accounting.acc.lock().unwrap(); + let tr0 = Event::new_timestamp(&bob_keypair, Utc::now()); let entry0 = entry::create_entry(&cur_hash, i, vec![tr0]); - acc.acc.lock().unwrap().register_entry_id(&cur_hash); + acc.register_entry_id(&cur_hash); cur_hash = hash(&cur_hash); let tr1 = Transaction::new( @@ -1029,11 +1051,11 @@ mod tests { transfer_amount, cur_hash, ); - acc.acc.lock().unwrap().register_entry_id(&cur_hash); + acc.register_entry_id(&cur_hash); cur_hash = hash(&cur_hash); let entry1 = entry::create_entry(&cur_hash, i + num_blobs, vec![Event::Transaction(tr1)]); - acc.acc.lock().unwrap().register_entry_id(&cur_hash); + acc.register_entry_id(&cur_hash); cur_hash = hash(&cur_hash); alice_ref_balance -= transfer_amount; @@ -1058,18 +1080,11 @@ mod tests { msgs.push(msg); } - let alice_balance = acc.acc - .lock() - .unwrap() - .get_balance(&alice.keypair().pubkey()) - .unwrap(); + let acc = tpu.accounting.acc.lock().unwrap(); + let alice_balance = acc.get_balance(&alice.keypair().pubkey()).unwrap(); assert_eq!(alice_balance, alice_ref_balance); - let bob_balance = acc.acc - .lock() - .unwrap() - .get_balance(&bob_keypair.pubkey()) - .unwrap(); + let bob_balance = acc.get_balance(&bob_keypair.pubkey()).unwrap(); assert_eq!(bob_balance, starting_balance - alice_ref_balance); exit.store(true, Ordering::Relaxed); @@ -1164,17 +1179,17 @@ mod bench { let (input, event_receiver) = sync_channel(10); let historian = Historian::new(event_receiver, &mint.last_id(), None); - let tpu = Tpu::new(acc, input, historian); + let stage = AccountingStage::new(acc, input, historian); let now = Instant::now(); - assert!(tpu.process_events(req_vers).is_ok()); + assert!(stage.process_events(req_vers).is_ok()); let duration = now.elapsed(); let sec = duration.as_secs() as f64 + duration.subsec_nanos() as f64 / 1_000_000_000.0; let tps = txs as f64 / sec; // Ensure that all transactions were successfully logged. - drop(tpu.historian_input); - let entries: Vec = tpu.historian.output.lock().unwrap().iter().collect(); + drop(stage.historian_input); + let entries: Vec = stage.historian.output.lock().unwrap().iter().collect(); assert_eq!(entries.len(), 1); assert_eq!(entries[0].events.len(), txs as usize);