Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix entry serialize #172

Merged
merged 2 commits into from
May 3, 2018
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
68 changes: 34 additions & 34 deletions src/accountant_skel.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@ use result::Result;
use serde_json;
use signature::PublicKey;
use std::cmp::max;
use std::collections::LinkedList;
use std::collections::VecDeque;
use std::io::{Cursor, Write};
use std::mem::size_of;
Expand Down Expand Up @@ -113,34 +112,22 @@ impl AccountantSkel {
"{}",
serde_json::to_string(&entry).unwrap()
).unwrap();
trace!("notify_entry_info entry");
Self::notify_entry_info_subscribers(obj, &entry);
trace!("notify_entry_info done");
}

fn receive_to_list<W: Write>(
obj: &SharedSkel,
writer: &Arc<Mutex<W>>,
max: usize,
) -> Result<LinkedList<Entry>> {
fn receive_all<W: Write>(obj: &SharedSkel, writer: &Arc<Mutex<W>>) -> Result<Vec<Entry>> {
//TODO implement a serialize for channel that does this without allocations
let mut num = 0;
let mut l = LinkedList::new();
let mut l = vec![];
let entry = obj.historian
.output
.lock()
.unwrap()
.recv_timeout(Duration::new(1, 0))?;
Self::update_entry(obj, writer, &entry);
l.push_back(entry);
l.push(entry);
while let Ok(entry) = obj.historian.receive() {
Self::update_entry(obj, writer, &entry);
l.push_back(entry);
num += 1;
if num == max {
break;
}
trace!("receive_to_list entries num: {}", num);
l.push(entry);
}
Ok(l)
}
Expand All @@ -154,24 +141,37 @@ impl AccountantSkel {
writer: &Arc<Mutex<W>>,
exit: Arc<AtomicBool>,
) -> Result<()> {
// TODO: should it be the serialized Entry size?
let max = BLOB_SIZE / size_of::<Entry>();
let mut q = VecDeque::new();
let mut count = 0;
trace!("max: {}", max);
while let Ok(list) = Self::receive_to_list(&obj, writer, max) {
trace!("New blobs? {} {}", count, list.len());
let b = blob_recycler.allocate();
let pos = {
let mut bd = b.write().unwrap();
let mut out = Cursor::new(bd.data_mut());
serialize_into(&mut out, &list).expect("failed to serialize output");
out.position() as usize
};
assert!(pos < BLOB_SIZE);
b.write().unwrap().set_size(pos);
q.push_back(b);
count += 1;
while let Ok(list) = Self::receive_all(&obj, writer) {
trace!("New blobs? {}", list.len());
let mut start = 0;
let mut end = 0;
while start < list.len() {
let mut total = 0;
for i in &list[start..] {
total += size_of::<Event>() * i.events.len();
total += size_of::<Entry>();
if total >= BLOB_SIZE {
break;
}
end += 1;
}
// See that we made progress and a single
// vec of Events wasn't too big for a single packet
assert!(end > start);
let b = blob_recycler.allocate();
let pos = {
let mut bd = b.write().unwrap();
let mut out = Cursor::new(bd.data_mut());
serialize_into(&mut out, &list[start..end])
.expect("failed to serialize output");
out.position() as usize
};
assert!(pos < BLOB_SIZE);
b.write().unwrap().set_size(pos);
q.push_back(b);
start = end;
}
if exit.load(Ordering::Relaxed) {
break;
}
Expand Down