Skip to content
This repository has been archived by the owner on Jan 22, 2025. It is now read-only.

Commit

Permalink
Serialize entries over multiple blobs
Browse files Browse the repository at this point in the history
  • Loading branch information
sakridge committed May 9, 2018
1 parent 0601e05 commit 900b4f2
Showing 1 changed file with 49 additions and 20 deletions.
69 changes: 49 additions & 20 deletions src/tpu.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ use ecdsa;
use entry::Entry;
use event::Event;
use packet;
use packet::{SharedBlob, SharedPackets, BLOB_SIZE};
use packet::{SharedBlob, SharedPackets, BLOB_DATA_SIZE, BLOB_SIZE};
use rand::{thread_rng, Rng};
use rayon::prelude::*;
use result::Result;
Expand Down Expand Up @@ -83,33 +83,49 @@ impl Tpu {
let mut start = 0;
let mut end = 0;
while start < list.len() {
let mut entries: Vec<Vec<Entry>> = Vec::new();
let mut total = 0;
for i in &list[start..] {
total += size_of::<Event>() * i.events.len();
total += size_of::<Entry>();
if total >= BLOB_SIZE {
if total >= BLOB_DATA_SIZE {
break;
}
end += 1;
}
// See that we made progress and a single
// vec of Events wasn't too big for a single packet
// See if we need to split the events
if end <= start {
// Trust the recorder to not package more than we can
// serialize
trace!("splitting events");
let mut event_start = 0;
let num_events_per_blob = BLOB_DATA_SIZE / size_of::<Event>();
let total_entry_chunks = list[end].events.len() / num_events_per_blob;
for _ in 0..total_entry_chunks {
let event_end = event_start + num_events_per_blob;
let mut entry = Entry {
num_hashes: list[end].num_hashes,
id: list[end].id,
events: list[end].events[event_start..event_end].to_vec(),
};
entries.push(vec![entry]);
event_start = event_end;
}
end += 1;
} else {
entries.push(list[start..end].to_vec());
}

let b = blob_recycler.allocate();
let pos = {
let mut bd = b.write().unwrap();
let mut out = Cursor::new(bd.data_mut());
serialize_into(&mut out, &list[start..end]).expect("failed to serialize output");
out.position() as usize
};
assert!(pos < BLOB_SIZE);
b.write().unwrap().set_size(pos);
q.push_back(b);
for entry in entries {
let b = blob_recycler.allocate();
let pos = {
let mut bd = b.write().unwrap();
let mut out = Cursor::new(bd.data_mut());
serialize_into(&mut out, &entry).expect("failed to serialize output");
out.position() as usize
};
assert!(pos < BLOB_SIZE);
b.write().unwrap().set_size(pos);
q.push_back(b);
}
start = end;
}
}
Expand Down Expand Up @@ -693,7 +709,7 @@ pub fn test_node() -> (ReplicatedData, UdpSocket, UdpSocket, UdpSocket, UdpSocke
mod tests {
use accountant::Accountant;
use accounting_stage::AccountingStage;
use bincode::serialize;
use bincode::{deserialize, serialize};
use chrono::prelude::*;
use crdt::Crdt;
use ecdsa;
Expand Down Expand Up @@ -883,10 +899,11 @@ mod tests {
let zero = Hash::default();
let keypair = KeyPair::new();
let tr0 = Event::Transaction(Transaction::new(&keypair, keypair.pubkey(), 0, zero));
let tr1 = Event::Transaction(Transaction::new(&keypair, keypair.pubkey(), 1, zero));
let e0 = entry::create_entry(&zero, 0, vec![tr0.clone(), tr1.clone()]);
let events = vec![tr0.clone(); 10000];
//let tr1 = Event::Transaction(Transaction::new(&keypair, keypair.pubkey(), 1, zero));
let e0 = entry::create_entry(&zero, 0, events);

let entry_list = vec![e0; 1000];
let entry_list = vec![e0.clone(); 1];
let blob_recycler = BlobRecycler::default();
let mut blob_q = VecDeque::new();
Tpu::process_entry_list_into_blobs(&entry_list, &blob_recycler, &mut blob_q);
Expand All @@ -895,6 +912,18 @@ mod tests {
if serialized_entry_list.len() % BLOB_SIZE != 0 {
num_blobs_ref += 1
}
let mut new_events = Vec::new();
for b in &blob_q {
let blob = b.read().unwrap();
let entries: Vec<entry::Entry> = deserialize(&blob.data()[..blob.meta.size]).unwrap();
assert_eq!(entries.len(), 1);
assert_eq!(entries[0].num_hashes, e0.num_hashes);
assert_eq!(entries[0].id, e0.id);
new_events.extend(entries[0].events.clone());
}
for (i, e) in new_events.iter().enumerate() {
assert_eq!(*e, e0.events[i]);
}
trace!("len: {} ref_len: {}", blob_q.len(), num_blobs_ref);
assert!(blob_q.len() > num_blobs_ref);
}
Expand Down

0 comments on commit 900b4f2

Please sign in to comment.