Skip to content

Commit

Permalink
Compiles/fmt and add assert for forward progress
Browse files Browse the repository at this point in the history
  • Loading branch information
sakridge committed May 3, 2018
1 parent d2a20bd commit c1ce437
Showing 1 changed file with 8 additions and 16 deletions.
24 changes: 8 additions & 16 deletions src/accountant_skel.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@ use result::Result;
use serde_json;
use signature::PublicKey;
use std::cmp::max;
use std::collections::LinkedList;
use std::collections::VecDeque;
use std::io::{Cursor, Write};
use std::mem::size_of;
Expand Down Expand Up @@ -113,32 +112,22 @@ impl AccountantSkel {
"{}",
serde_json::to_string(&entry).unwrap()
).unwrap();
trace!("notify_entry_info entry");
Self::notify_entry_info_subscribers(obj, &entry);
trace!("notify_entry_info done");
}

fn receive_all<W: Write>(
obj: &SharedSkel,
writer: &Arc<Mutex<W>>,
) -> Result<Vec<Entry>> {
fn receive_all<W: Write>(obj: &SharedSkel, writer: &Arc<Mutex<W>>) -> Result<Vec<Entry>> {
//TODO implement a serialize for channel that does this without allocations
let mut l = vec![];
let entry = obj.historian
.output
.lock()
.unwrap()
.recv_timeout(Duration::new(1, 0))?;
trace!("obj.write 1 {:?}", entry);
Self::update_entry(obj, writer, &entry);
trace!("obj.write 1.end");
l.push(entry);
while let Ok(entry) = obj.historian.receive() {
trace!("obj.write 2");
Self::update_entry(obj, writer, &entry);
trace!("obj.write 2.end");
l.push(entry);
trace!("num: {}", num);
}
Ok(l)
}
Expand All @@ -153,26 +142,29 @@ impl AccountantSkel {
exit: Arc<AtomicBool>,
) -> Result<()> {
let mut q = VecDeque::new();
trace!("max: {}", max);
while let Ok(list) = Self::receive_all(&obj, writer) {
trace!("New blobs? {}", list.len());
let mut start = 0;
let mut end = 0;
while start < list.len() {
let total = 0;
for i in list[start..] {
let mut total = 0;
for i in &list[start..] {
total += size_of::<Event>() * i.events.len();
total += size_of::<Entry>();
if total >= BLOB_SIZE {
break;
}
end += 1;
}
// See that we made progress and a single
// vec of Events wasn't too big for a single packet
assert!(end > start);
let b = blob_recycler.allocate();
let pos = {
let mut bd = b.write().unwrap();
let mut out = Cursor::new(bd.data_mut());
serialize_into(&mut out, &list[start .. end]).expect("failed to serialize output");
serialize_into(&mut out, &list[start..end])
.expect("failed to serialize output");
out.position() as usize
};
assert!(pos < BLOB_SIZE);
Expand Down

0 comments on commit c1ce437

Please sign in to comment.