From fb0b98633863b630266aca0df4fa4cc166ff9471 Mon Sep 17 00:00:00 2001 From: Greg Fitzgerald Date: Thu, 3 May 2018 13:24:35 -0600 Subject: [PATCH 1/5] Remove duplicate state --- src/accountant.rs | 9 +++++++++ src/accountant_skel.rs | 24 +++++++----------------- src/accountant_stub.rs | 2 +- src/bin/testnode.rs | 2 +- 4 files changed, 18 insertions(+), 19 deletions(-) diff --git a/src/accountant.rs b/src/accountant.rs index 49c5d2c8838042..a94629837ef279 100644 --- a/src/accountant.rs +++ b/src/accountant.rs @@ -74,6 +74,13 @@ impl Accountant { acc } + /// Return the last entry ID registered + pub fn last_id(&self) -> Hash { + let last_ids = self.last_ids.read().unwrap(); + let last_item = last_ids.iter().last().expect("empty last_ids list"); + last_item.0 + } + fn reserve_signature(signatures: &RwLock>, sig: &Signature) -> bool { if signatures.read().unwrap().contains(sig) { return false; @@ -327,6 +334,8 @@ mod tests { let alice = Mint::new(10_000); let bob_pubkey = KeyPair::new().pubkey(); let acc = Accountant::new(&alice); + assert_eq!(acc.last_id(), alice.last_id()); + acc.transfer(1_000, &alice.keypair(), bob_pubkey, alice.last_id()) .unwrap(); assert_eq!(acc.get_balance(&bob_pubkey).unwrap(), 1_000); diff --git a/src/accountant_skel.rs b/src/accountant_skel.rs index bd8c7ddd24a642..1d213cb731084b 100644 --- a/src/accountant_skel.rs +++ b/src/accountant_skel.rs @@ -33,7 +33,6 @@ use transaction::Transaction; pub struct AccountantSkel { acc: Mutex, - last_id: Mutex, historian_input: Mutex>, historian: Historian, entry_info_subscribers: Mutex>, @@ -81,15 +80,9 @@ pub enum Response { impl AccountantSkel { /// Create a new AccountantSkel that wraps the given Accountant. - pub fn new( - acc: Accountant, - last_id: Hash, - historian_input: SyncSender, - historian: Historian, - ) -> Self { + pub fn new(acc: Accountant, historian_input: SyncSender, historian: Historian) -> Self { AccountantSkel { acc: Mutex::new(acc), - last_id: Mutex::new(last_id), entry_info_subscribers: Mutex::new(vec![]), historian_input: Mutex::new(historian_input), historian, @@ -116,10 +109,7 @@ impl AccountantSkel { fn update_entry(obj: &SharedSkel, writer: &Arc>, entry: &Entry) { trace!("update_entry entry"); - let mut last_id_l = obj.last_id.lock().unwrap(); - *last_id_l = entry.id; - obj.acc.lock().unwrap().register_entry_id(&last_id_l); - drop(last_id_l); + obj.acc.lock().unwrap().register_entry_id(&entry.id); writeln!( writer.lock().unwrap(), "{}", @@ -228,7 +218,7 @@ impl AccountantSkel { } Request::GetLastId => Some(( Response::LastId { - id: *self.last_id.lock().unwrap(), + id: self.acc.lock().unwrap().last_id(), }, rsp_addr, )), @@ -699,7 +689,7 @@ mod tests { let rsp_addr: SocketAddr = "0.0.0.0:0".parse().expect("socket address"); let (input, event_receiver) = sync_channel(10); let historian = Historian::new(event_receiver, &mint.last_id(), None); - let skel = AccountantSkel::new(acc, mint.last_id(), input, historian); + let skel = AccountantSkel::new(acc, input, historian); // Process a batch that includes a transaction that receives two tokens. let alice = KeyPair::new(); @@ -740,7 +730,7 @@ mod tests { let exit = Arc::new(AtomicBool::new(false)); let (input, event_receiver) = sync_channel(10); let historian = Historian::new(event_receiver, &alice.last_id(), Some(30)); - let acc_skel = Arc::new(AccountantSkel::new(acc, alice.last_id(), input, historian)); + let acc_skel = Arc::new(AccountantSkel::new(acc, input, historian)); let serve_addr = leader_serve.local_addr().unwrap(); let threads = AccountantSkel::serve( &acc_skel, @@ -858,7 +848,7 @@ mod tests { let acc = Accountant::new(&alice); let (input, event_receiver) = sync_channel(10); let historian = Historian::new(event_receiver, &alice.last_id(), Some(30)); - let acc = Arc::new(AccountantSkel::new(acc, alice.last_id(), input, historian)); + let acc = Arc::new(AccountantSkel::new(acc, input, historian)); let replicate_addr = target1_data.replicate_addr; let threads = AccountantSkel::replicate( &acc, @@ -1007,7 +997,7 @@ mod bench { let (input, event_receiver) = sync_channel(10); let historian = Historian::new(event_receiver, &mint.last_id(), None); - let skel = AccountantSkel::new(acc, mint.last_id(), input, historian); + let skel = AccountantSkel::new(acc, input, historian); let now = Instant::now(); assert!(skel.process_packets(req_vers).is_ok()); diff --git a/src/accountant_stub.rs b/src/accountant_stub.rs index 15a6a69cb71b70..b8c72167a44057 100644 --- a/src/accountant_stub.rs +++ b/src/accountant_stub.rs @@ -192,7 +192,7 @@ mod tests { let exit = Arc::new(AtomicBool::new(false)); let (input, event_receiver) = sync_channel(10); let historian = Historian::new(event_receiver, &alice.last_id(), Some(30)); - let acc = Arc::new(AccountantSkel::new(acc, alice.last_id(), input, historian)); + let acc = Arc::new(AccountantSkel::new(acc, input, historian)); let threads = AccountantSkel::serve(&acc, d, serve, gossip, exit.clone(), sink()).unwrap(); sleep(Duration::from_millis(300)); diff --git a/src/bin/testnode.rs b/src/bin/testnode.rs index 2c585e6f27653e..c12840e843808f 100644 --- a/src/bin/testnode.rs +++ b/src/bin/testnode.rs @@ -104,7 +104,7 @@ fn main() { let (input, event_receiver) = sync_channel(10_000); let historian = Historian::new(event_receiver, &last_id, Some(1000)); let exit = Arc::new(AtomicBool::new(false)); - let skel = Arc::new(AccountantSkel::new(acc, last_id, input, historian)); + let skel = Arc::new(AccountantSkel::new(acc, input, historian)); let serve_sock = UdpSocket::bind(&serve_addr).unwrap(); let gossip_sock = UdpSocket::bind(&gossip_addr).unwrap(); let replicate_sock = UdpSocket::bind(&replicate_addr).unwrap(); From 2302b7106e883961aeb6734479d936538b87bebd Mon Sep 17 00:00:00 2001 From: Greg Fitzgerald Date: Thu, 3 May 2018 13:26:45 -0600 Subject: [PATCH 2/5] Update comment The last PR added a thread that logs entries without needing to be driven by the client. --- src/accountant_stub.rs | 4 +--- 1 file changed, 1 insertion(+), 3 deletions(-) diff --git a/src/accountant_stub.rs b/src/accountant_stub.rs index b8c72167a44057..b782c614e7c5a3 100644 --- a/src/accountant_stub.rs +++ b/src/accountant_stub.rs @@ -109,9 +109,7 @@ impl AccountantStub { } /// Request the last Entry ID from the server. This method blocks - /// until the server sends a response. At the time of this writing, - /// it also has the side-effect of causing the server to log any - /// entries that have been published by the Historian. + /// until the server sends a response. pub fn get_last_id(&mut self) -> FutureResult { let req = Request::GetLastId; let data = serialize(&req).expect("serialize GetId"); From cdb53dc3688f4efaaae874c5baef0dbe730b8bfd Mon Sep 17 00:00:00 2001 From: Greg Fitzgerald Date: Thu, 3 May 2018 13:29:54 -0600 Subject: [PATCH 3/5] Implement GetLastId with EntryInfo subscription --- src/accountant_skel.rs | 8 -------- src/accountant_stub.rs | 10 +--------- 2 files changed, 1 insertion(+), 17 deletions(-) diff --git a/src/accountant_skel.rs b/src/accountant_skel.rs index 1d213cb731084b..c08e86620d3f92 100644 --- a/src/accountant_skel.rs +++ b/src/accountant_skel.rs @@ -43,7 +43,6 @@ pub struct AccountantSkel { pub enum Request { Transaction(Transaction), GetBalance { key: PublicKey }, - GetLastId, Subscribe { subscriptions: Vec }, } @@ -75,7 +74,6 @@ type SharedSkel = Arc; pub enum Response { Balance { key: PublicKey, val: Option }, EntryInfo(EntryInfo), - LastId { id: Hash }, } impl AccountantSkel { @@ -216,12 +214,6 @@ impl AccountantSkel { let val = self.acc.lock().unwrap().get_balance(&key); Some((Response::Balance { key, val }, rsp_addr)) } - Request::GetLastId => Some(( - Response::LastId { - id: self.acc.lock().unwrap().last_id(), - }, - rsp_addr, - )), Request::Transaction(_) => unreachable!(), Request::Subscribe { subscriptions } => { for subscription in subscriptions { diff --git a/src/accountant_stub.rs b/src/accountant_stub.rs index b782c614e7c5a3..48ba9c5c17ada4 100644 --- a/src/accountant_stub.rs +++ b/src/accountant_stub.rs @@ -57,9 +57,6 @@ impl AccountantStub { Response::Balance { key, val } => { self.balances.insert(key, val); } - Response::LastId { id } => { - self.last_id = Some(id); - } Response::EntryInfo(entry_info) => { self.last_id = Some(entry_info.id); self.num_events += entry_info.num_events; @@ -111,15 +108,10 @@ impl AccountantStub { /// Request the last Entry ID from the server. This method blocks /// until the server sends a response. pub fn get_last_id(&mut self) -> FutureResult { - let req = Request::GetLastId; - let data = serialize(&req).expect("serialize GetId"); - self.socket - .send_to(&data, &self.addr) - .expect("buffer error"); let mut done = false; while !done { let resp = self.recv_response().expect("recv response"); - if let &Response::LastId { .. } = &resp { + if let &Response::EntryInfo { .. } = &resp { done = true; } self.process_response(resp); From e390852f9d912fddf02ea68de951b83a94ec1eb1 Mon Sep 17 00:00:00 2001 From: Greg Fitzgerald Date: Thu, 3 May 2018 13:34:54 -0600 Subject: [PATCH 4/5] Implement get_last_id() with transaction_count() This is more precice than the previous implementation because it'll drain the EntryInfo queue and return the most recent last_id instead of the first one. --- src/accountant_stub.rs | 9 +-------- 1 file changed, 1 insertion(+), 8 deletions(-) diff --git a/src/accountant_stub.rs b/src/accountant_stub.rs index 48ba9c5c17ada4..1797a53e1c6ca1 100644 --- a/src/accountant_stub.rs +++ b/src/accountant_stub.rs @@ -108,14 +108,7 @@ impl AccountantStub { /// Request the last Entry ID from the server. This method blocks /// until the server sends a response. pub fn get_last_id(&mut self) -> FutureResult { - let mut done = false; - while !done { - let resp = self.recv_response().expect("recv response"); - if let &Response::EntryInfo { .. } = &resp { - done = true; - } - self.process_response(resp); - } + self.transaction_count(); ok(self.last_id.unwrap_or(Hash::default())) } From 5781f62d589e746276170d81f2b041d0c9f2b844 Mon Sep 17 00:00:00 2001 From: Greg Fitzgerald Date: Thu, 3 May 2018 13:56:10 -0600 Subject: [PATCH 5/5] Sooth all versions of rustfmt --- src/streamer.rs | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/src/streamer.rs b/src/streamer.rs index 471f1f29ce920d..808eea1e76ab3a 100644 --- a/src/streamer.rs +++ b/src/streamer.rs @@ -438,8 +438,8 @@ mod test { use std::sync::{Arc, RwLock}; use std::thread::sleep; use std::time::Duration; - use streamer::{blob_receiver, receiver, responder, retransmitter, window, BlobReceiver, - PacketReceiver}; + use streamer::{blob_receiver, receiver, responder, retransmitter, window}; + use streamer::{BlobReceiver, PacketReceiver}; fn get_msgs(r: PacketReceiver, num: &mut usize) { for _t in 0..5 {