diff --git a/src/bank.rs b/src/bank.rs index 13dc1a07784165..b87325807d9800 100644 --- a/src/bank.rs +++ b/src/bank.rs @@ -96,6 +96,12 @@ impl Bank { last_item.0 } + /// Return all recent entry IDs. The last item is the most recent. + pub fn last_ids(&self) -> Vec { + let last_ids = self.last_ids.read().expect("'last_ids' read lock"); + last_ids.iter().map(|x| x.0).collect() + } + fn reserve_signature(signatures: &RwLock>, sig: &Signature) -> Result<()> { if signatures .read() diff --git a/src/bin/client-demo.rs b/src/bin/client-demo.rs index a8a226f5498db7..c7d613549831bc 100644 --- a/src/bin/client-demo.rs +++ b/src/bin/client-demo.rs @@ -130,9 +130,10 @@ fn main() { }); let mut client = mk_client(&client_addr, &leader); - println!("Get last ID..."); - let last_id = client.get_last_id(); - println!("Got last ID {:?}", last_id); + println!("Get last IDs..."); + let last_ids = client.get_last_ids(); + let last_ids = &last_ids[1024..]; // Ignore the oldest ones, since they'd get rejected after new entries are added. + println!("Got last IDs {:?}", last_ids); let rnd = GenKeys::new(demo.mint.keypair().public_key_bytes()); @@ -145,7 +146,10 @@ fn main() { let now = Instant::now(); let transactions: Vec<_> = keypair_pairs .into_par_iter() - .map(|chunk| Transaction::new(&chunk[0], chunk[1].pubkey(), 1, last_id)) + .enumerate() + .map(|(i, pair)| { + Transaction::new(&pair[0], pair[1].pubkey(), 1, last_ids[i % last_ids.len()]) + }) .collect(); let duration = now.elapsed(); let ns = duration.as_secs() * 1_000_000_000 + u64::from(duration.subsec_nanos()); diff --git a/src/request.rs b/src/request.rs index 7c5d23b52baf7d..de1ed860546c7b 100644 --- a/src/request.rs +++ b/src/request.rs @@ -8,6 +8,7 @@ use signature::PublicKey; pub enum Request { GetBalance { key: PublicKey }, GetLastId, + GetLastIds, GetTransactionCount, } @@ -22,5 +23,6 @@ impl Request { pub enum Response { Balance { key: PublicKey, val: Option }, LastId { id: Hash }, + LastIds { ids: Vec }, TransactionCount { transaction_count: u64 }, } diff --git a/src/request_processor.rs b/src/request_processor.rs index 2ce0ecf48789b3..bdff171a653d2f 100644 --- a/src/request_processor.rs +++ b/src/request_processor.rs @@ -34,6 +34,12 @@ impl RequestProcessor { info!("Response::LastId {:?}", rsp); Some(rsp) } + Request::GetLastIds => { + let ids = self.bank.last_ids(); + let rsp = (Response::LastIds { ids }, rsp_addr); + info!("Response::LastIds {:?}", rsp); + Some(rsp) + } Request::GetTransactionCount => { let transaction_count = self.bank.transaction_count() as u64; let rsp = (Response::TransactionCount { transaction_count }, rsp_addr); diff --git a/src/thin_client.rs b/src/thin_client.rs index 3b40ca04723f47..4a6b123035b0cf 100644 --- a/src/thin_client.rs +++ b/src/thin_client.rs @@ -17,7 +17,7 @@ pub struct ThinClient { requests_socket: UdpSocket, transactions_addr: SocketAddr, transactions_socket: UdpSocket, - last_id: Option, + last_ids: Vec, transaction_count: u64, balances: HashMap>, } @@ -37,7 +37,7 @@ impl ThinClient { requests_socket, transactions_addr, transactions_socket, - last_id: None, + last_ids: vec![], transaction_count: 0, balances: HashMap::new(), }; @@ -61,7 +61,11 @@ impl ThinClient { } Response::LastId { id } => { info!("Response last_id {:?}", id); - self.last_id = Some(id); + self.last_ids = vec![id]; + } + Response::LastIds { ids } => { + info!("Response last_ids {:?}", ids); + self.last_ids = ids; } Response::TransactionCount { transaction_count } => { info!("Response transaction count {:?}", transaction_count); @@ -152,7 +156,27 @@ impl ThinClient { } self.process_response(resp); } - self.last_id.expect("some last_id") + *self.last_ids.last().expect("some last_id") + } + + /// Request the last Entry ID from the server. This method blocks + /// until the server sends a response. + pub fn get_last_ids(&mut self) -> Vec { + info!("get_last_id"); + let req = Request::GetLastIds; + let data = serialize(&req).expect("serialize GetLastId in pub fn get_last_id"); + self.requests_socket + .send_to(&data, &self.requests_addr) + .expect("buffer error in pub fn get_last_id"); + let mut done = false; + while !done { + let resp = self.recv_response().expect("get_last_id response"); + if let &Response::LastIds { .. } = &resp { + done = true; + } + self.process_response(resp); + } + self.last_ids.clone() } pub fn poll_get_balance(&mut self, pubkey: &PublicKey) -> io::Result { @@ -233,6 +257,47 @@ mod tests { } } + #[test] + fn test_get_last_ids() { + logger::setup(); + let leader = TestNode::new(); + + let mint = Mint::new(10_000); + let bank = Bank::new(&mint); + let exit = Arc::new(AtomicBool::new(false)); + + let server = Server::new_leader( + bank, + None, + leader.data.clone(), + leader.sockets.requests, + leader.sockets.transaction, + leader.sockets.broadcast, + leader.sockets.respond, + leader.sockets.gossip, + exit.clone(), + sink(), + ); + sleep(Duration::from_millis(900)); + + let requests_socket = UdpSocket::bind("0.0.0.0:0").unwrap(); + let transactions_socket = UdpSocket::bind("0.0.0.0:0").unwrap(); + + let mut client = ThinClient::new( + leader.data.requests_addr, + requests_socket, + leader.data.transactions_addr, + transactions_socket, + ); + let last_ids = client.get_last_ids(); + assert_eq!(last_ids.len(), 1); + + exit.store(true, Ordering::Relaxed); + for t in server.thread_hdls { + t.join().unwrap(); + } + } + #[test] fn test_bad_sig() { logger::setup();