Skip to content

Commit

Permalink
Fixes for serializing entries over blobs
Browse files Browse the repository at this point in the history
  • Loading branch information
sakridge committed May 10, 2018
1 parent 521ae21 commit 84aaca6
Showing 1 changed file with 78 additions and 15 deletions.
93 changes: 78 additions & 15 deletions src/tpu.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,12 +8,14 @@ use crdt::{Crdt, ReplicatedData};
use ecdsa;
use entry::Entry;
use event::Event;
use hash::Hash;
use packet;
use packet::{SharedBlob, SharedPackets, BLOB_DATA_SIZE, BLOB_SIZE};
use rand::{thread_rng, Rng};
use rayon::prelude::*;
use result::Result;
use serde_json;
use std::cmp::min;
use std::collections::VecDeque;
use std::io::sink;
use std::io::{Cursor, Write};
Expand Down Expand Up @@ -95,17 +97,19 @@ impl Tpu {
}
// See if we need to split the events
if end <= start {
trace!("splitting events");
let mut event_start = 0;
let num_events_per_blob = BLOB_DATA_SIZE / size_of::<Event>();
let total_entry_chunks = list[end].events.len() / num_events_per_blob;
let total_entry_chunks = (list[end].events.len() + num_events_per_blob - 1) / num_events_per_blob;
trace!("splitting events end: {} total_chunks: {}", end, total_entry_chunks);
for _ in 0..total_entry_chunks {
let event_end = event_start + num_events_per_blob;
let event_end = min(event_start + num_events_per_blob, list[end].events.len());
trace!("event start: {} end: {}", event_start, event_end);
let mut entry = Entry {
num_hashes: list[end].num_hashes,
id: list[end].id,
events: list[end].events[event_start..event_end].to_vec(),
};
trace!("event[0]: {:?}", entry.events[0]);
entries.push(vec![entry]);
event_start = event_end;
}
Expand All @@ -115,6 +119,8 @@ impl Tpu {
}

for entry in entries {
trace!("entry.id: {:?} num_hashes: {} num events: {}",
entry[0].id, entry[0].num_hashes, entry[0].events.len());
let b = blob_recycler.allocate();
let pos = {
let mut bd = b.write().unwrap();
Expand Down Expand Up @@ -381,6 +387,38 @@ impl Tpu {
);
Ok(())
}
fn apply_blob_state(obj: &Tpu, blobs: &VecDeque<SharedBlob>) -> Result<()> {
let mut entries_to_apply:Vec<Entry> = Vec::new();
let mut last_id = Hash::default();
for msgs in blobs {
let blob = msgs.read().unwrap();
let entries: Vec<Entry> = deserialize(&blob.data()[..blob.meta.size]).unwrap();
for entry in entries {
if entry.id == last_id {
trace!("same id!");
if let Some(last_entry) = entries_to_apply.last_mut() {
last_entry.events.extend(entry.events);
}
} else {
last_id = entry.id;
entries_to_apply.push(entry);
}
}
//TODO respond back to leader with hash of the state
}

trace!("{} entries to apply", entries_to_apply.len());
let accountant = &obj.accounting_stage.accountant;
for entry in entries_to_apply {
trace!("{} events to apply id: {:?}", entry.events.len(), entry.id);
accountant.register_entry_id(&entry.id);
for result in accountant.process_verified_events(entry.events) {
result?;
}
}

Ok(())
}
/// Process verified blobs, already in order
/// Respond with a signed hash of the state
fn replicate_state(
Expand All @@ -391,18 +429,7 @@ impl Tpu {
let timer = Duration::new(1, 0);
let blobs = verified_receiver.recv_timeout(timer)?;
trace!("replicating blobs {}", blobs.len());
for msgs in &blobs {
let blob = msgs.read().unwrap();
let entries: Vec<Entry> = deserialize(&blob.data()[..blob.meta.size]).unwrap();
let accountant = &obj.accounting_stage.accountant;
for entry in entries {
accountant.register_entry_id(&entry.id);
for result in accountant.process_verified_events(entry.events) {
result?;
}
}
//TODO respond back to leader with hash of the state
}
Self::apply_blob_state(obj, &blobs)?;
for blob in blobs {
blob_recycler.recycle(blob);
}
Expand Down Expand Up @@ -728,6 +755,7 @@ mod tests {
use streamer;
use tpu::{test_node, to_packets, Request, Tpu};
use transaction::{memfind, test_tx, Transaction};
use std::thread::sleep;

#[test]
fn test_layout() {
Expand Down Expand Up @@ -927,4 +955,39 @@ mod tests {
trace!("len: {} ref_len: {}", blob_q.len(), num_blobs_ref);
assert!(blob_q.len() > num_blobs_ref);
}

#[test]
fn test_entry_to_blobs_accountant_state() {
logger::setup();
let starting_balance = 100_000;
let alice = Mint::new(starting_balance);
let accountant = Accountant::new(&alice);
let accounting_stage = AccountingStage::new(accountant, &alice.last_id(), Some(30));
let tpu = Arc::new(Tpu::new(accounting_stage));

let mut cur_hash = Hash::default();
let bob_keypair = KeyPair::new();
trace!("hash: {:?}", cur_hash);
let bob_txs = 1024;
let mut events = Vec::new();
for _ in 0..bob_txs {
cur_hash = hash(&cur_hash);
let tr0 = Event::Transaction(Transaction::new(&alice.keypair(), bob_keypair.pubkey(), 1, cur_hash));
events.push(tr0);
tpu.accounting_stage.accountant.register_entry_id(&cur_hash);
}
cur_hash = hash(&cur_hash);
let e0 = entry::create_entry(&cur_hash, 0, events);
tpu.accounting_stage.accountant.register_entry_id(&cur_hash);

let entry_list = vec![e0.clone(); 1];
let blob_recycler = BlobRecycler::default();
let mut blob_q = VecDeque::new();
Tpu::process_entry_list_into_blobs(&entry_list, &blob_recycler, &mut blob_q);

Tpu::apply_blob_state(&tpu, &blob_q).expect("apply blob state fail");
//sleep(Duration::from_millis(300));
assert_eq!(tpu.accounting_stage.accountant.get_balance(&bob_keypair.pubkey()).unwrap(), bob_txs as i64);
}

}

0 comments on commit 84aaca6

Please sign in to comment.