From bc779d89859ca09f91c3ec655259fdcacad79c5f Mon Sep 17 00:00:00 2001 From: Stephen Akridge Date: Thu, 3 May 2018 15:55:59 -0700 Subject: [PATCH] Factor out entry processing and fix replicate test to call global setup fn --- src/accountant_skel.rs | 109 +++++++++++++++++++++++++---------------- 1 file changed, 66 insertions(+), 43 deletions(-) diff --git a/src/accountant_skel.rs b/src/accountant_skel.rs index b687fd69dcc64d..7982ae4fefe019 100644 --- a/src/accountant_skel.rs +++ b/src/accountant_skel.rs @@ -11,7 +11,7 @@ use event::Event; use hash::Hash; use historian::Historian; use packet; -use packet::{SharedPackets, BLOB_SIZE}; +use packet::{SharedBlob, SharedPackets, BLOB_SIZE}; use rayon::prelude::*; use recorder::Signal; use result::Result; @@ -135,6 +135,46 @@ impl AccountantSkel { Ok(l) } + fn process_entry_list_into_blobs( + list: &Vec, + blob_recycler: &packet::BlobRecycler, + q: &mut VecDeque, + ) { + let mut start = 0; + let mut end = 0; + while start < list.len() { + let mut total = 0; + for i in &list[start..] { + total += size_of::() * i.events.len(); + total += size_of::(); + if total >= BLOB_SIZE { + break; + } + end += 1; + } + // See that we made progress and a single + // vec of Events wasn't too big for a single packet + if end <= start { + eprintln!("Event too big for the blob!"); + start += 1; + end = start; + continue; + } + + let b = blob_recycler.allocate(); + let pos = { + let mut bd = b.write().unwrap(); + let mut out = Cursor::new(bd.data_mut()); + serialize_into(&mut out, &list[start..end]).expect("failed to serialize output"); + out.position() as usize + }; + assert!(pos < BLOB_SIZE); + b.write().unwrap().set_size(pos); + q.push_back(b); + start = end; + } + } + /// Process any Entry items that have been published by the Historian. /// continuosly broadcast blobs of entries out fn run_sync( @@ -147,34 +187,7 @@ impl AccountantSkel { let mut q = VecDeque::new(); while let Ok(list) = Self::receive_all(&obj, writer) { trace!("New blobs? {}", list.len()); - let mut start = 0; - let mut end = 0; - while start < list.len() { - let mut total = 0; - for i in &list[start..] { - total += size_of::() * i.events.len(); - total += size_of::(); - if total >= BLOB_SIZE { - break; - } - end += 1; - } - // See that we made progress and a single - // vec of Events wasn't too big for a single packet - assert!(end > start); - let b = blob_recycler.allocate(); - let pos = { - let mut bd = b.write().unwrap(); - let mut out = Cursor::new(bd.data_mut()); - serialize_into(&mut out, &list[start..end]) - .expect("failed to serialize output"); - out.position() as usize - }; - assert!(pos < BLOB_SIZE); - b.write().unwrap().set_size(pos); - q.push_back(b); - start = end; - } + Self::process_entry_list_into_blobs(&list, blob_recycler, &mut q); if exit.load(Ordering::Relaxed) { break; } @@ -617,7 +630,7 @@ mod tests { use accountant_skel::{to_packets, Request}; use bincode::serialize; use ecdsa; - use packet::{BlobRecycler, PacketRecycler, NUM_PACKETS}; + use packet::{BlobRecycler, PacketRecycler, BLOB_SIZE, NUM_PACKETS}; use transaction::{memfind, test_tx}; use accountant::Accountant; @@ -632,6 +645,7 @@ mod tests { use futures::Future; use hash::{hash, Hash}; use historian::Historian; + use logger; use mint::Mint; use plan::Plan; use recorder::Signal; @@ -764,18 +778,6 @@ mod tests { } } - use std::sync::{Once, ONCE_INIT}; - extern crate env_logger; - - static INIT: Once = ONCE_INIT; - - /// Setup function that is only run once, even if called multiple times. - fn setup() { - INIT.call_once(|| { - env_logger::init().unwrap(); - }); - } - fn test_node() -> (ReplicatedData, UdpSocket, UdpSocket, UdpSocket) { let gossip = UdpSocket::bind("127.0.0.1:0").unwrap(); let replicate = UdpSocket::bind("127.0.0.1:0").unwrap(); @@ -793,7 +795,7 @@ mod tests { /// Test that mesasge sent from leader to target1 and repliated to target2 #[test] fn test_replicate() { - setup(); + logger::setup(); let (leader_data, leader_gossip, _, leader_serve) = test_node(); let (target1_data, target1_gossip, target1_replicate, _) = test_node(); let (target2_data, target2_gossip, target2_replicate, _) = test_node(); @@ -932,6 +934,27 @@ mod tests { t_l_gossip.join().expect("join"); t_l_listen.join().expect("join"); } + + #[test] + fn test_entry_to_blobs() { + let zero = Hash::default(); + let keypair = KeyPair::new(); + let tr0 = Event::Transaction(Transaction::new(&keypair, keypair.pubkey(), 0, zero)); + let tr1 = Event::Transaction(Transaction::new(&keypair, keypair.pubkey(), 1, zero)); + let e0 = entry::create_entry(&zero, 0, vec![tr0.clone(), tr1.clone()]); + + let entry_list = vec![e0; 1000]; + let blob_recycler = BlobRecycler::default(); + let mut blob_q = VecDeque::new(); + AccountantSkel::process_entry_list_into_blobs(&entry_list, &blob_recycler, &mut blob_q); + let serialized_entry_list = serialize(&entry_list).unwrap(); + let mut num_blobs_ref = serialized_entry_list.len() / BLOB_SIZE; + if serialized_entry_list.len() % BLOB_SIZE != 0 { + num_blobs_ref += 1 + } + trace!("len: {} ref_len: {}", blob_q.len(), num_blobs_ref); + assert!(blob_q.len() > num_blobs_ref); + } } #[cfg(all(feature = "unstable", test))]