From 84aaca610d795a5b825c9285702fb2e8b4713cdc Mon Sep 17 00:00:00 2001 From: Stephen Akridge Date: Thu, 10 May 2018 13:01:42 -0700 Subject: [PATCH] Fixes for serializing entries over blobs --- src/tpu.rs | 93 +++++++++++++++++++++++++++++++++++++++++++++--------- 1 file changed, 78 insertions(+), 15 deletions(-) diff --git a/src/tpu.rs b/src/tpu.rs index 0c04af830158ec..4901e7221756ea 100644 --- a/src/tpu.rs +++ b/src/tpu.rs @@ -8,12 +8,14 @@ use crdt::{Crdt, ReplicatedData}; use ecdsa; use entry::Entry; use event::Event; +use hash::Hash; use packet; use packet::{SharedBlob, SharedPackets, BLOB_DATA_SIZE, BLOB_SIZE}; use rand::{thread_rng, Rng}; use rayon::prelude::*; use result::Result; use serde_json; +use std::cmp::min; use std::collections::VecDeque; use std::io::sink; use std::io::{Cursor, Write}; @@ -95,17 +97,19 @@ impl Tpu { } // See if we need to split the events if end <= start { - trace!("splitting events"); let mut event_start = 0; let num_events_per_blob = BLOB_DATA_SIZE / size_of::(); - let total_entry_chunks = list[end].events.len() / num_events_per_blob; + let total_entry_chunks = (list[end].events.len() + num_events_per_blob - 1) / num_events_per_blob; + trace!("splitting events end: {} total_chunks: {}", end, total_entry_chunks); for _ in 0..total_entry_chunks { - let event_end = event_start + num_events_per_blob; + let event_end = min(event_start + num_events_per_blob, list[end].events.len()); + trace!("event start: {} end: {}", event_start, event_end); let mut entry = Entry { num_hashes: list[end].num_hashes, id: list[end].id, events: list[end].events[event_start..event_end].to_vec(), }; + trace!("event[0]: {:?}", entry.events[0]); entries.push(vec![entry]); event_start = event_end; } @@ -115,6 +119,8 @@ impl Tpu { } for entry in entries { + trace!("entry.id: {:?} num_hashes: {} num events: {}", + entry[0].id, entry[0].num_hashes, entry[0].events.len()); let b = blob_recycler.allocate(); let pos = { let mut bd = b.write().unwrap(); @@ -381,6 +387,38 @@ impl Tpu { ); Ok(()) } + fn apply_blob_state(obj: &Tpu, blobs: &VecDeque) -> Result<()> { + let mut entries_to_apply:Vec = Vec::new(); + let mut last_id = Hash::default(); + for msgs in blobs { + let blob = msgs.read().unwrap(); + let entries: Vec = deserialize(&blob.data()[..blob.meta.size]).unwrap(); + for entry in entries { + if entry.id == last_id { + trace!("same id!"); + if let Some(last_entry) = entries_to_apply.last_mut() { + last_entry.events.extend(entry.events); + } + } else { + last_id = entry.id; + entries_to_apply.push(entry); + } + } + //TODO respond back to leader with hash of the state + } + + trace!("{} entries to apply", entries_to_apply.len()); + let accountant = &obj.accounting_stage.accountant; + for entry in entries_to_apply { + trace!("{} events to apply id: {:?}", entry.events.len(), entry.id); + accountant.register_entry_id(&entry.id); + for result in accountant.process_verified_events(entry.events) { + result?; + } + } + + Ok(()) + } /// Process verified blobs, already in order /// Respond with a signed hash of the state fn replicate_state( @@ -391,18 +429,7 @@ impl Tpu { let timer = Duration::new(1, 0); let blobs = verified_receiver.recv_timeout(timer)?; trace!("replicating blobs {}", blobs.len()); - for msgs in &blobs { - let blob = msgs.read().unwrap(); - let entries: Vec = deserialize(&blob.data()[..blob.meta.size]).unwrap(); - let accountant = &obj.accounting_stage.accountant; - for entry in entries { - accountant.register_entry_id(&entry.id); - for result in accountant.process_verified_events(entry.events) { - result?; - } - } - //TODO respond back to leader with hash of the state - } + Self::apply_blob_state(obj, &blobs)?; for blob in blobs { blob_recycler.recycle(blob); } @@ -728,6 +755,7 @@ mod tests { use streamer; use tpu::{test_node, to_packets, Request, Tpu}; use transaction::{memfind, test_tx, Transaction}; + use std::thread::sleep; #[test] fn test_layout() { @@ -927,4 +955,39 @@ mod tests { trace!("len: {} ref_len: {}", blob_q.len(), num_blobs_ref); assert!(blob_q.len() > num_blobs_ref); } + + #[test] + fn test_entry_to_blobs_accountant_state() { + logger::setup(); + let starting_balance = 100_000; + let alice = Mint::new(starting_balance); + let accountant = Accountant::new(&alice); + let accounting_stage = AccountingStage::new(accountant, &alice.last_id(), Some(30)); + let tpu = Arc::new(Tpu::new(accounting_stage)); + + let mut cur_hash = Hash::default(); + let bob_keypair = KeyPair::new(); + trace!("hash: {:?}", cur_hash); + let bob_txs = 1024; + let mut events = Vec::new(); + for _ in 0..bob_txs { + cur_hash = hash(&cur_hash); + let tr0 = Event::Transaction(Transaction::new(&alice.keypair(), bob_keypair.pubkey(), 1, cur_hash)); + events.push(tr0); + tpu.accounting_stage.accountant.register_entry_id(&cur_hash); + } + cur_hash = hash(&cur_hash); + let e0 = entry::create_entry(&cur_hash, 0, events); + tpu.accounting_stage.accountant.register_entry_id(&cur_hash); + + let entry_list = vec![e0.clone(); 1]; + let blob_recycler = BlobRecycler::default(); + let mut blob_q = VecDeque::new(); + Tpu::process_entry_list_into_blobs(&entry_list, &blob_recycler, &mut blob_q); + + Tpu::apply_blob_state(&tpu, &blob_q).expect("apply blob state fail"); + //sleep(Duration::from_millis(300)); + assert_eq!(tpu.accounting_stage.accountant.get_balance(&bob_keypair.pubkey()).unwrap(), bob_txs as i64); + } + }