From 900b4f26443d5d58f3e34fa21b44795eb403f90a Mon Sep 17 00:00:00 2001 From: Stephen Akridge Date: Wed, 9 May 2018 15:41:18 -0700 Subject: [PATCH] Serialize entries over multiple blobs --- src/tpu.rs | 69 ++++++++++++++++++++++++++++++++++++++---------------- 1 file changed, 49 insertions(+), 20 deletions(-) diff --git a/src/tpu.rs b/src/tpu.rs index 502c90cd58b7f6..0c04af830158ec 100644 --- a/src/tpu.rs +++ b/src/tpu.rs @@ -9,7 +9,7 @@ use ecdsa; use entry::Entry; use event::Event; use packet; -use packet::{SharedBlob, SharedPackets, BLOB_SIZE}; +use packet::{SharedBlob, SharedPackets, BLOB_DATA_SIZE, BLOB_SIZE}; use rand::{thread_rng, Rng}; use rayon::prelude::*; use result::Result; @@ -83,33 +83,49 @@ impl Tpu { let mut start = 0; let mut end = 0; while start < list.len() { + let mut entries: Vec> = Vec::new(); let mut total = 0; for i in &list[start..] { total += size_of::() * i.events.len(); total += size_of::(); - if total >= BLOB_SIZE { + if total >= BLOB_DATA_SIZE { break; } end += 1; } - // See that we made progress and a single - // vec of Events wasn't too big for a single packet + // See if we need to split the events if end <= start { - // Trust the recorder to not package more than we can - // serialize + trace!("splitting events"); + let mut event_start = 0; + let num_events_per_blob = BLOB_DATA_SIZE / size_of::(); + let total_entry_chunks = list[end].events.len() / num_events_per_blob; + for _ in 0..total_entry_chunks { + let event_end = event_start + num_events_per_blob; + let mut entry = Entry { + num_hashes: list[end].num_hashes, + id: list[end].id, + events: list[end].events[event_start..event_end].to_vec(), + }; + entries.push(vec![entry]); + event_start = event_end; + } end += 1; + } else { + entries.push(list[start..end].to_vec()); } - let b = blob_recycler.allocate(); - let pos = { - let mut bd = b.write().unwrap(); - let mut out = Cursor::new(bd.data_mut()); - serialize_into(&mut out, &list[start..end]).expect("failed to serialize output"); - out.position() as usize - }; - assert!(pos < BLOB_SIZE); - b.write().unwrap().set_size(pos); - q.push_back(b); + for entry in entries { + let b = blob_recycler.allocate(); + let pos = { + let mut bd = b.write().unwrap(); + let mut out = Cursor::new(bd.data_mut()); + serialize_into(&mut out, &entry).expect("failed to serialize output"); + out.position() as usize + }; + assert!(pos < BLOB_SIZE); + b.write().unwrap().set_size(pos); + q.push_back(b); + } start = end; } } @@ -693,7 +709,7 @@ pub fn test_node() -> (ReplicatedData, UdpSocket, UdpSocket, UdpSocket, UdpSocke mod tests { use accountant::Accountant; use accounting_stage::AccountingStage; - use bincode::serialize; + use bincode::{deserialize, serialize}; use chrono::prelude::*; use crdt::Crdt; use ecdsa; @@ -883,10 +899,11 @@ mod tests { let zero = Hash::default(); let keypair = KeyPair::new(); let tr0 = Event::Transaction(Transaction::new(&keypair, keypair.pubkey(), 0, zero)); - let tr1 = Event::Transaction(Transaction::new(&keypair, keypair.pubkey(), 1, zero)); - let e0 = entry::create_entry(&zero, 0, vec![tr0.clone(), tr1.clone()]); + let events = vec![tr0.clone(); 10000]; + //let tr1 = Event::Transaction(Transaction::new(&keypair, keypair.pubkey(), 1, zero)); + let e0 = entry::create_entry(&zero, 0, events); - let entry_list = vec![e0; 1000]; + let entry_list = vec![e0.clone(); 1]; let blob_recycler = BlobRecycler::default(); let mut blob_q = VecDeque::new(); Tpu::process_entry_list_into_blobs(&entry_list, &blob_recycler, &mut blob_q); @@ -895,6 +912,18 @@ mod tests { if serialized_entry_list.len() % BLOB_SIZE != 0 { num_blobs_ref += 1 } + let mut new_events = Vec::new(); + for b in &blob_q { + let blob = b.read().unwrap(); + let entries: Vec = deserialize(&blob.data()[..blob.meta.size]).unwrap(); + assert_eq!(entries.len(), 1); + assert_eq!(entries[0].num_hashes, e0.num_hashes); + assert_eq!(entries[0].id, e0.id); + new_events.extend(entries[0].events.clone()); + } + for (i, e) in new_events.iter().enumerate() { + assert_eq!(*e, e0.events[i]); + } trace!("len: {} ref_len: {}", blob_q.len(), num_blobs_ref); assert!(blob_q.len() > num_blobs_ref); }