From d2a20bd9fdabd02ff8bda5ed97289a8d04653f04 Mon Sep 17 00:00:00 2001 From: Anatoly Yakovenko Date: Thu, 3 May 2018 14:35:04 -0700 Subject: [PATCH 1/2] fix entry serialize --- src/accountant_skel.rs | 64 ++++++++++++++++++++++++------------------ 1 file changed, 36 insertions(+), 28 deletions(-) diff --git a/src/accountant_skel.rs b/src/accountant_skel.rs index c08e86620d3f92..7e548b8710024a 100644 --- a/src/accountant_skel.rs +++ b/src/accountant_skel.rs @@ -118,29 +118,27 @@ impl AccountantSkel { trace!("notify_entry_info done"); } - fn receive_to_list( + fn receive_all( obj: &SharedSkel, writer: &Arc>, - max: usize, - ) -> Result> { + ) -> Result> { //TODO implement a serialize for channel that does this without allocations - let mut num = 0; - let mut l = LinkedList::new(); + let mut l = vec![]; let entry = obj.historian .output .lock() .unwrap() .recv_timeout(Duration::new(1, 0))?; + trace!("obj.write 1 {:?}", entry); Self::update_entry(obj, writer, &entry); - l.push_back(entry); + trace!("obj.write 1.end"); + l.push(entry); while let Ok(entry) = obj.historian.receive() { + trace!("obj.write 2"); Self::update_entry(obj, writer, &entry); - l.push_back(entry); - num += 1; - if num == max { - break; - } - trace!("receive_to_list entries num: {}", num); + trace!("obj.write 2.end"); + l.push(entry); + trace!("num: {}", num); } Ok(l) } @@ -154,24 +152,34 @@ impl AccountantSkel { writer: &Arc>, exit: Arc, ) -> Result<()> { - // TODO: should it be the serialized Entry size? - let max = BLOB_SIZE / size_of::(); let mut q = VecDeque::new(); - let mut count = 0; trace!("max: {}", max); - while let Ok(list) = Self::receive_to_list(&obj, writer, max) { - trace!("New blobs? {} {}", count, list.len()); - let b = blob_recycler.allocate(); - let pos = { - let mut bd = b.write().unwrap(); - let mut out = Cursor::new(bd.data_mut()); - serialize_into(&mut out, &list).expect("failed to serialize output"); - out.position() as usize - }; - assert!(pos < BLOB_SIZE); - b.write().unwrap().set_size(pos); - q.push_back(b); - count += 1; + while let Ok(list) = Self::receive_all(&obj, writer) { + trace!("New blobs? {}", list.len()); + let mut start = 0; + let mut end = 0; + while start < list.len() { + let total = 0; + for i in list[start..] { + total += size_of::() * i.events.len(); + total += size_of::(); + if total >= BLOB_SIZE { + break; + } + end += 1; + } + let b = blob_recycler.allocate(); + let pos = { + let mut bd = b.write().unwrap(); + let mut out = Cursor::new(bd.data_mut()); + serialize_into(&mut out, &list[start .. end]).expect("failed to serialize output"); + out.position() as usize + }; + assert!(pos < BLOB_SIZE); + b.write().unwrap().set_size(pos); + q.push_back(b); + start = end; + } if exit.load(Ordering::Relaxed) { break; } From c1ce4372ac8aa016fd9402ecb60a2eef693852b9 Mon Sep 17 00:00:00 2001 From: Stephen Akridge Date: Thu, 3 May 2018 14:47:05 -0700 Subject: [PATCH 2/2] Compiles/fmt and add assert for forward progress --- src/accountant_skel.rs | 24 ++++++++---------------- 1 file changed, 8 insertions(+), 16 deletions(-) diff --git a/src/accountant_skel.rs b/src/accountant_skel.rs index 7e548b8710024a..c9a7a087034f0f 100644 --- a/src/accountant_skel.rs +++ b/src/accountant_skel.rs @@ -18,7 +18,6 @@ use result::Result; use serde_json; use signature::PublicKey; use std::cmp::max; -use std::collections::LinkedList; use std::collections::VecDeque; use std::io::{Cursor, Write}; use std::mem::size_of; @@ -113,15 +112,10 @@ impl AccountantSkel { "{}", serde_json::to_string(&entry).unwrap() ).unwrap(); - trace!("notify_entry_info entry"); Self::notify_entry_info_subscribers(obj, &entry); - trace!("notify_entry_info done"); } - fn receive_all( - obj: &SharedSkel, - writer: &Arc>, - ) -> Result> { + fn receive_all(obj: &SharedSkel, writer: &Arc>) -> Result> { //TODO implement a serialize for channel that does this without allocations let mut l = vec![]; let entry = obj.historian @@ -129,16 +123,11 @@ impl AccountantSkel { .lock() .unwrap() .recv_timeout(Duration::new(1, 0))?; - trace!("obj.write 1 {:?}", entry); Self::update_entry(obj, writer, &entry); - trace!("obj.write 1.end"); l.push(entry); while let Ok(entry) = obj.historian.receive() { - trace!("obj.write 2"); Self::update_entry(obj, writer, &entry); - trace!("obj.write 2.end"); l.push(entry); - trace!("num: {}", num); } Ok(l) } @@ -153,14 +142,13 @@ impl AccountantSkel { exit: Arc, ) -> Result<()> { let mut q = VecDeque::new(); - trace!("max: {}", max); while let Ok(list) = Self::receive_all(&obj, writer) { trace!("New blobs? {}", list.len()); let mut start = 0; let mut end = 0; while start < list.len() { - let total = 0; - for i in list[start..] { + let mut total = 0; + for i in &list[start..] { total += size_of::() * i.events.len(); total += size_of::(); if total >= BLOB_SIZE { @@ -168,11 +156,15 @@ impl AccountantSkel { } end += 1; } + // See that we made progress and a single + // vec of Events wasn't too big for a single packet + assert!(end > start); let b = blob_recycler.allocate(); let pos = { let mut bd = b.write().unwrap(); let mut out = Cursor::new(bd.data_mut()); - serialize_into(&mut out, &list[start .. end]).expect("failed to serialize output"); + serialize_into(&mut out, &list[start..end]) + .expect("failed to serialize output"); out.position() as usize }; assert!(pos < BLOB_SIZE);