Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Serialize entries over multiple blobs #193

Merged
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
69 changes: 49 additions & 20 deletions src/tpu.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ use ecdsa;
use entry::Entry;
use event::Event;
use packet;
use packet::{SharedBlob, SharedPackets, BLOB_SIZE};
use packet::{SharedBlob, SharedPackets, BLOB_DATA_SIZE, BLOB_SIZE};
use rand::{thread_rng, Rng};
use rayon::prelude::*;
use result::Result;
Expand Down Expand Up @@ -83,33 +83,49 @@ impl Tpu {
let mut start = 0;
let mut end = 0;
while start < list.len() {
let mut entries: Vec<Vec<Entry>> = Vec::new();
let mut total = 0;
for i in &list[start..] {
total += size_of::<Event>() * i.events.len();
total += size_of::<Entry>();
if total >= BLOB_SIZE {
if total >= BLOB_DATA_SIZE {
break;
}
end += 1;
}
// See that we made progress and a single
// vec of Events wasn't too big for a single packet
// See if we need to split the events
if end <= start {
// Trust the recorder to not package more than we can
// serialize
trace!("splitting events");
let mut event_start = 0;
let num_events_per_blob = BLOB_DATA_SIZE / size_of::<Event>();
let total_entry_chunks = list[end].events.len() / num_events_per_blob;
for _ in 0..total_entry_chunks {
let event_end = event_start + num_events_per_blob;
let mut entry = Entry {
num_hashes: list[end].num_hashes,
id: list[end].id,
events: list[end].events[event_start..event_end].to_vec(),
};
entries.push(vec![entry]);
event_start = event_end;
}
end += 1;
} else {
entries.push(list[start..end].to_vec());
}

let b = blob_recycler.allocate();
let pos = {
let mut bd = b.write().unwrap();
let mut out = Cursor::new(bd.data_mut());
serialize_into(&mut out, &list[start..end]).expect("failed to serialize output");
out.position() as usize
};
assert!(pos < BLOB_SIZE);
b.write().unwrap().set_size(pos);
q.push_back(b);
for entry in entries {
let b = blob_recycler.allocate();
let pos = {
let mut bd = b.write().unwrap();
let mut out = Cursor::new(bd.data_mut());
serialize_into(&mut out, &entry).expect("failed to serialize output");
out.position() as usize
};
assert!(pos < BLOB_SIZE);
b.write().unwrap().set_size(pos);
q.push_back(b);
}
start = end;
}
}
Expand Down Expand Up @@ -693,7 +709,7 @@ pub fn test_node() -> (ReplicatedData, UdpSocket, UdpSocket, UdpSocket, UdpSocke
mod tests {
use accountant::Accountant;
use accounting_stage::AccountingStage;
use bincode::serialize;
use bincode::{deserialize, serialize};
use chrono::prelude::*;
use crdt::Crdt;
use ecdsa;
Expand Down Expand Up @@ -883,10 +899,11 @@ mod tests {
let zero = Hash::default();
let keypair = KeyPair::new();
let tr0 = Event::Transaction(Transaction::new(&keypair, keypair.pubkey(), 0, zero));
let tr1 = Event::Transaction(Transaction::new(&keypair, keypair.pubkey(), 1, zero));
let e0 = entry::create_entry(&zero, 0, vec![tr0.clone(), tr1.clone()]);
let events = vec![tr0.clone(); 10000];
//let tr1 = Event::Transaction(Transaction::new(&keypair, keypair.pubkey(), 1, zero));
let e0 = entry::create_entry(&zero, 0, events);

let entry_list = vec![e0; 1000];
let entry_list = vec![e0.clone(); 1];
let blob_recycler = BlobRecycler::default();
let mut blob_q = VecDeque::new();
Tpu::process_entry_list_into_blobs(&entry_list, &blob_recycler, &mut blob_q);
Expand All @@ -895,6 +912,18 @@ mod tests {
if serialized_entry_list.len() % BLOB_SIZE != 0 {
num_blobs_ref += 1
}
let mut new_events = Vec::new();
for b in &blob_q {
let blob = b.read().unwrap();
let entries: Vec<entry::Entry> = deserialize(&blob.data()[..blob.meta.size]).unwrap();
assert_eq!(entries.len(), 1);
assert_eq!(entries[0].num_hashes, e0.num_hashes);
assert_eq!(entries[0].id, e0.id);
new_events.extend(entries[0].events.clone());
}
for (i, e) in new_events.iter().enumerate() {
assert_eq!(*e, e0.events[i]);
}
trace!("len: {} ref_len: {}", blob_q.len(), num_blobs_ref);
assert!(blob_q.len() > num_blobs_ref);
}
Expand Down