diff --git a/src/accountant.rs b/src/accountant.rs index d97e2067350b30..39b190f5301f37 100644 --- a/src/accountant.rs +++ b/src/accountant.rs @@ -6,6 +6,7 @@ extern crate libc; use chrono::prelude::*; +use entry::Entry; use event::Event; use hash::Hash; use mint::Mint; @@ -15,8 +16,8 @@ use signature::{KeyPair, PublicKey, Signature}; use std::collections::hash_map::Entry::Occupied; use std::collections::{HashMap, HashSet, VecDeque}; use std::result; -use std::sync::atomic::{AtomicIsize, Ordering}; use std::sync::RwLock; +use std::sync::atomic::{AtomicIsize, Ordering}; use transaction::Transaction; pub const MAX_ENTRY_IDS: usize = 1024 * 4; @@ -232,6 +233,16 @@ impl Accountant { results } + pub fn process_verified_entries(&self, entries: Vec) -> Result<()> { + for entry in entries { + self.register_entry_id(&entry.id); + for result in self.process_verified_events(entry.events) { + result?; + } + } + Ok(()) + } + /// Process a Witness Signature that has already been verified. fn process_verified_sig(&self, from: PublicKey, tx_sig: Signature) -> Result<()> { if let Occupied(mut e) = self.pending.write().unwrap().entry(tx_sig) { diff --git a/src/bin/testnode.rs b/src/bin/testnode.rs index bb0a81743a6e73..a4de9e45c00d99 100644 --- a/src/bin/testnode.rs +++ b/src/bin/testnode.rs @@ -17,8 +17,8 @@ use std::env; use std::io::{stdin, stdout, Read}; use std::net::UdpSocket; use std::process::exit; -use std::sync::atomic::AtomicBool; use std::sync::Arc; +use std::sync::atomic::AtomicBool; fn print_usage(program: &str, opts: Options) { let mut brief = format!("Usage: cat | {} [options]\n\n", program); diff --git a/src/ecdsa.rs b/src/ecdsa.rs index 9ac7959cf1e766..4110387015e3aa 100644 --- a/src/ecdsa.rs +++ b/src/ecdsa.rs @@ -135,8 +135,8 @@ mod tests { use packet::{Packet, Packets, SharedPackets}; use std::sync::RwLock; use thin_client_service::Request; - use transaction::test_tx; use transaction::Transaction; + use transaction::test_tx; fn make_packet_from_transaction(tr: Transaction) -> Packet { let tx = serialize(&Request::Transaction(tr)).unwrap(); diff --git a/src/hash.rs b/src/hash.rs index 61dd01468c9fcb..ee7598a0dc29e2 100644 --- a/src/hash.rs +++ b/src/hash.rs @@ -1,7 +1,7 @@ //! The `hash` module provides functions for creating SHA-256 hashes. -use generic_array::typenum::U32; use generic_array::GenericArray; +use generic_array::typenum::U32; use sha2::{Digest, Sha256}; pub type Hash = GenericArray; diff --git a/src/historian.rs b/src/historian.rs index 7796adfcaff8d3..019ec57d3666be 100644 --- a/src/historian.rs +++ b/src/historian.rs @@ -4,8 +4,8 @@ use entry::Entry; use hash::Hash; use recorder::{ExitReason, Recorder, Signal}; -use std::sync::mpsc::{channel, Receiver, Sender, TryRecvError}; use std::sync::Mutex; +use std::sync::mpsc::{channel, Receiver, Sender, TryRecvError}; use std::thread::{spawn, JoinHandle}; use std::time::Instant; diff --git a/src/ledger.rs b/src/ledger.rs index 0056bd54e20894..18f924fed140e0 100644 --- a/src/ledger.rs +++ b/src/ledger.rs @@ -1,9 +1,17 @@ //! The `ledger` module provides functions for parallel verification of the //! Proof of History ledger. +use bincode::{deserialize, serialize_into}; use entry::{next_tick, Entry}; +use event::Event; use hash::Hash; +use packet; +use packet::{SharedBlob, BLOB_DATA_SIZE, BLOB_SIZE}; use rayon::prelude::*; +use std::cmp::min; +use std::collections::VecDeque; +use std::io::Cursor; +use std::mem::size_of; pub trait Block { /// Verifies the hashes and counts of a slice of events are all consistent. @@ -30,10 +38,95 @@ pub fn next_ticks(start_hash: &Hash, num_hashes: u64, len: usize) -> Vec ticks } +pub fn process_entry_list_into_blobs( + list: &Vec, + blob_recycler: &packet::BlobRecycler, + q: &mut VecDeque, +) { + let mut start = 0; + let mut end = 0; + while start < list.len() { + let mut entries: Vec> = Vec::new(); + let mut total = 0; + for i in &list[start..] { + total += size_of::() * i.events.len(); + total += size_of::(); + if total >= BLOB_DATA_SIZE { + break; + } + end += 1; + } + // See if we need to split the events + if end <= start { + let mut event_start = 0; + let num_events_per_blob = BLOB_DATA_SIZE / size_of::(); + let total_entry_chunks = + (list[end].events.len() + num_events_per_blob - 1) / num_events_per_blob; + trace!( + "splitting events end: {} total_chunks: {}", + end, + total_entry_chunks + ); + for _ in 0..total_entry_chunks { + let event_end = min(event_start + num_events_per_blob, list[end].events.len()); + let mut entry = Entry { + num_hashes: list[end].num_hashes, + id: list[end].id, + events: list[end].events[event_start..event_end].to_vec(), + }; + entries.push(vec![entry]); + event_start = event_end; + } + end += 1; + } else { + entries.push(list[start..end].to_vec()); + } + + for entry in entries { + let b = blob_recycler.allocate(); + let pos = { + let mut bd = b.write().unwrap(); + let mut out = Cursor::new(bd.data_mut()); + serialize_into(&mut out, &entry).expect("failed to serialize output"); + out.position() as usize + }; + assert!(pos < BLOB_SIZE); + b.write().unwrap().set_size(pos); + q.push_back(b); + } + start = end; + } +} + +pub fn reconstruct_entries_from_blobs(blobs: &VecDeque) -> Vec { + let mut entries_to_apply: Vec = Vec::new(); + let mut last_id = Hash::default(); + for msgs in blobs { + let blob = msgs.read().unwrap(); + let entries: Vec = deserialize(&blob.data()[..blob.meta.size]).unwrap(); + for entry in entries { + if entry.id == last_id { + if let Some(last_entry) = entries_to_apply.last_mut() { + last_entry.events.extend(entry.events); + } + } else { + last_id = entry.id; + entries_to_apply.push(entry); + } + } + //TODO respond back to leader with hash of the state + } + entries_to_apply +} + #[cfg(test)] mod tests { use super::*; + use entry; use hash::hash; + use packet::BlobRecycler; + use signature::{KeyPair, KeyPairUtil}; + use transaction::Transaction; #[test] fn test_verify_slice() { @@ -48,6 +141,24 @@ mod tests { bad_ticks[1].id = one; assert!(!bad_ticks.verify(&zero)); // inductive step, bad } + + #[test] + fn test_entry_to_blobs() { + let zero = Hash::default(); + let one = hash(&zero); + let keypair = KeyPair::new(); + let tr0 = Event::Transaction(Transaction::new(&keypair, keypair.pubkey(), 1, one)); + let events = vec![tr0.clone(); 10000]; + let e0 = entry::create_entry(&zero, 0, events); + + let entry_list = vec![e0.clone(); 1]; + let blob_recycler = BlobRecycler::default(); + let mut blob_q = VecDeque::new(); + process_entry_list_into_blobs(&entry_list, &blob_recycler, &mut blob_q); + let entries = reconstruct_entries_from_blobs(&blob_q); + + assert_eq!(entry_list, entries); + } } #[cfg(all(feature = "unstable", test))] diff --git a/src/mint.rs b/src/mint.rs index 754cacaa411c40..67829669e4ef18 100644 --- a/src/mint.rs +++ b/src/mint.rs @@ -1,7 +1,7 @@ //! The `mint` module is a library for generating the chain's genesis block. -use entry::create_entry; use entry::Entry; +use entry::create_entry; use event::Event; use hash::{hash, Hash}; use ring::rand::SystemRandom; diff --git a/src/result.rs b/src/result.rs index d2cb485add3aa1..fca876ebec8fdd 100644 --- a/src/result.rs +++ b/src/result.rs @@ -78,9 +78,9 @@ mod tests { use std::io; use std::io::Write; use std::net::SocketAddr; - use std::sync::mpsc::channel; use std::sync::mpsc::RecvError; use std::sync::mpsc::RecvTimeoutError; + use std::sync::mpsc::channel; use std::thread; fn addr_parse_error() -> Result { diff --git a/src/signature.rs b/src/signature.rs index 1b01e14ef579e8..5f3aee61ea2267 100644 --- a/src/signature.rs +++ b/src/signature.rs @@ -1,7 +1,7 @@ //! The `signature` module provides functionality for public, and private keys. -use generic_array::typenum::{U32, U64}; use generic_array::GenericArray; +use generic_array::typenum::{U32, U64}; use ring::signature::Ed25519KeyPair; use ring::{rand, signature}; use untrusted; diff --git a/src/streamer.rs b/src/streamer.rs index 2d43f28847fb04..1a607a12f0cb6a 100644 --- a/src/streamer.rs +++ b/src/streamer.rs @@ -438,8 +438,8 @@ mod test { use std::sync::{Arc, RwLock}; use std::thread::sleep; use std::time::Duration; - use streamer::{blob_receiver, receiver, responder, retransmitter, window}; use streamer::{BlobReceiver, PacketReceiver}; + use streamer::{blob_receiver, receiver, responder, retransmitter, window}; fn get_msgs(r: PacketReceiver, num: &mut usize) { for _t in 0..5 { diff --git a/src/tpu.rs b/src/tpu.rs index 0c04af830158ec..cc01f957e2c98a 100644 --- a/src/tpu.rs +++ b/src/tpu.rs @@ -3,21 +3,21 @@ use accountant::Accountant; use accounting_stage::AccountingStage; -use bincode::{deserialize, serialize, serialize_into}; +use bincode::{deserialize, serialize}; use crdt::{Crdt, ReplicatedData}; use ecdsa; use entry::Entry; use event::Event; +use ledger; use packet; -use packet::{SharedBlob, SharedPackets, BLOB_DATA_SIZE, BLOB_SIZE}; +use packet::SharedPackets; use rand::{thread_rng, Rng}; use rayon::prelude::*; use result::Result; use serde_json; use std::collections::VecDeque; +use std::io::Write; use std::io::sink; -use std::io::{Cursor, Write}; -use std::mem::size_of; use std::net::{SocketAddr, UdpSocket}; use std::sync::atomic::{AtomicBool, Ordering}; use std::sync::mpsc::{channel, Receiver, Sender}; @@ -75,61 +75,6 @@ impl Tpu { Ok(l) } - fn process_entry_list_into_blobs( - list: &Vec, - blob_recycler: &packet::BlobRecycler, - q: &mut VecDeque, - ) { - let mut start = 0; - let mut end = 0; - while start < list.len() { - let mut entries: Vec> = Vec::new(); - let mut total = 0; - for i in &list[start..] { - total += size_of::() * i.events.len(); - total += size_of::(); - if total >= BLOB_DATA_SIZE { - break; - } - end += 1; - } - // See if we need to split the events - if end <= start { - trace!("splitting events"); - let mut event_start = 0; - let num_events_per_blob = BLOB_DATA_SIZE / size_of::(); - let total_entry_chunks = list[end].events.len() / num_events_per_blob; - for _ in 0..total_entry_chunks { - let event_end = event_start + num_events_per_blob; - let mut entry = Entry { - num_hashes: list[end].num_hashes, - id: list[end].id, - events: list[end].events[event_start..event_end].to_vec(), - }; - entries.push(vec![entry]); - event_start = event_end; - } - end += 1; - } else { - entries.push(list[start..end].to_vec()); - } - - for entry in entries { - let b = blob_recycler.allocate(); - let pos = { - let mut bd = b.write().unwrap(); - let mut out = Cursor::new(bd.data_mut()); - serialize_into(&mut out, &entry).expect("failed to serialize output"); - out.position() as usize - }; - assert!(pos < BLOB_SIZE); - b.write().unwrap().set_size(pos); - q.push_back(b); - } - start = end; - } - } - /// Process any Entry items that have been published by the Historian. /// continuosly broadcast blobs of entries out fn run_sync( @@ -141,7 +86,7 @@ impl Tpu { let mut q = VecDeque::new(); let list = Self::receive_all(&obj, writer)?; trace!("New blobs? {}", list.len()); - Self::process_entry_list_into_blobs(&list, blob_recycler, &mut q); + ledger::process_entry_list_into_blobs(&list, blob_recycler, &mut q); if !q.is_empty() { broadcast.send(q)?; } @@ -381,6 +326,7 @@ impl Tpu { ); Ok(()) } + /// Process verified blobs, already in order /// Respond with a signed hash of the state fn replicate_state( @@ -391,18 +337,10 @@ impl Tpu { let timer = Duration::new(1, 0); let blobs = verified_receiver.recv_timeout(timer)?; trace!("replicating blobs {}", blobs.len()); - for msgs in &blobs { - let blob = msgs.read().unwrap(); - let entries: Vec = deserialize(&blob.data()[..blob.meta.size]).unwrap(); - let accountant = &obj.accounting_stage.accountant; - for entry in entries { - accountant.register_entry_id(&entry.id); - for result in accountant.process_verified_events(entry.events) { - result?; - } - } - //TODO respond back to leader with hash of the state - } + let entries = ledger::reconstruct_entries_from_blobs(&blobs); + obj.accounting_stage + .accountant + .process_verified_entries(entries)?; for blob in blobs { blob_recycler.recycle(blob); } @@ -709,7 +647,7 @@ pub fn test_node() -> (ReplicatedData, UdpSocket, UdpSocket, UdpSocket, UdpSocke mod tests { use accountant::Accountant; use accounting_stage::AccountingStage; - use bincode::{deserialize, serialize}; + use bincode::serialize; use chrono::prelude::*; use crdt::Crdt; use ecdsa; @@ -718,7 +656,7 @@ mod tests { use hash::{hash, Hash}; use logger; use mint::Mint; - use packet::{BlobRecycler, PacketRecycler, BLOB_SIZE, NUM_PACKETS}; + use packet::{BlobRecycler, PacketRecycler, NUM_PACKETS}; use signature::{KeyPair, KeyPairUtil}; use std::collections::VecDeque; use std::sync::atomic::{AtomicBool, Ordering}; @@ -894,37 +832,4 @@ mod tests { t_l_listen.join().expect("join"); } - #[test] - fn test_entry_to_blobs() { - let zero = Hash::default(); - let keypair = KeyPair::new(); - let tr0 = Event::Transaction(Transaction::new(&keypair, keypair.pubkey(), 0, zero)); - let events = vec![tr0.clone(); 10000]; - //let tr1 = Event::Transaction(Transaction::new(&keypair, keypair.pubkey(), 1, zero)); - let e0 = entry::create_entry(&zero, 0, events); - - let entry_list = vec![e0.clone(); 1]; - let blob_recycler = BlobRecycler::default(); - let mut blob_q = VecDeque::new(); - Tpu::process_entry_list_into_blobs(&entry_list, &blob_recycler, &mut blob_q); - let serialized_entry_list = serialize(&entry_list).unwrap(); - let mut num_blobs_ref = serialized_entry_list.len() / BLOB_SIZE; - if serialized_entry_list.len() % BLOB_SIZE != 0 { - num_blobs_ref += 1 - } - let mut new_events = Vec::new(); - for b in &blob_q { - let blob = b.read().unwrap(); - let entries: Vec = deserialize(&blob.data()[..blob.meta.size]).unwrap(); - assert_eq!(entries.len(), 1); - assert_eq!(entries[0].num_hashes, e0.num_hashes); - assert_eq!(entries[0].id, e0.id); - new_events.extend(entries[0].events.clone()); - } - for (i, e) in new_events.iter().enumerate() { - assert_eq!(*e, e0.events[i]); - } - trace!("len: {} ref_len: {}", blob_q.len(), num_blobs_ref); - assert!(blob_q.len() > num_blobs_ref); - } }