diff --git a/src/accountant_skel.rs b/src/accountant_skel.rs index 3f06aafcb9e27e..ed987651f22373 100644 --- a/src/accountant_skel.rs +++ b/src/accountant_skel.rs @@ -129,6 +129,7 @@ impl AccountantSkel { recvr: &streamer::PacketReceiver, sendr: &Sender)>>, ) -> Result<()> { + println!("verifier!"); let batch = Self::recv_batch(recvr)?; let verified_batches = Self::verify_batch(batch); for xs in verified_batches { @@ -256,15 +257,18 @@ impl AccountantSkel { blob_sender: &streamer::BlobSender, blob_recycler: &packet::BlobRecycler, ) -> Result<()> { + println!("replicate_state start"); let timer = Duration::new(1, 0); let blobs = verified_receiver.recv_timeout(timer)?; + println!("received msg"); for msgs in &blobs { + println!("msg:"); let blob = msgs.read().unwrap(); let mut entries:Vec = Vec::new(); - for i in 0..blob.meta.size/size_of::() { - entries.push(deserialize(&blob.data[i..i+size_of::()]).unwrap()); - } + println!("blob size: {:?}", blob.meta.size); + let entries:Vec = deserialize(&blob.data()[..blob.meta.size]).unwrap(); for e in entries { + println!("processing entry: {:?}", e); obj.lock().unwrap().acc.process_verified_events(e.events)?; } //TODO respond back to leader with hash of the state @@ -346,6 +350,7 @@ impl AccountantSkel { rsubs: subscribers::Subscribers, exit: Arc, ) -> Result>> { + trace!("replicate stuff!"); let read = UdpSocket::bind(rsubs.me.addr)?; // make sure we are on the same interface let mut local = read.local_addr()?; @@ -384,6 +389,7 @@ impl AccountantSkel { let e = Self::replicate_state(&skel, &window_receiver, &blob_sender, &blob_recycler); if e.is_err() && exit.load(Ordering::Relaxed) { + println!("exiting replicate thread!"); break; } }); @@ -440,7 +446,11 @@ mod tests { use streamer; use std::sync::mpsc::channel; use std::collections::VecDeque; - use packet::{PACKET_DATA_SIZE}; + use std::mem::size_of; + use hash::{hash, Hash}; + use event::Event; + use entry; + use chrono::prelude::*; #[test] fn test_layout() { @@ -546,27 +556,44 @@ mod tests { exit.store(true, Ordering::Relaxed); } + use std::sync::{Once, ONCE_INIT}; + extern crate env_logger; + + static INIT: Once = ONCE_INIT; + + /// Setup function that is only run once, even if called multiple times. + fn setup() { + INIT.call_once(|| { + env_logger::init().unwrap(); + }); + } + #[test] fn test_replicate() { - let read = UdpSocket::bind("127.0.0.1:0").expect("bind"); - let addr = read.local_addr().unwrap(); - let send = UdpSocket::bind("127.0.0.1:0").expect("bind"); + setup(); + let leader_sock = UdpSocket::bind("127.0.0.1:0").expect("bind"); + let leader_addr = leader_sock.local_addr().unwrap(); + let me_addr = "127.0.0.1:9000".parse().unwrap(); + let target_peer_sock = UdpSocket::bind("127.0.0.1:0").expect("bind"); + let target_peer_addr = target_peer_sock.local_addr().unwrap(); + let source_peer_sock = UdpSocket::bind("127.0.0.1:0").expect("bind"); let exit = Arc::new(AtomicBool::new(false)); - let node_me = Node::default(); - let node_leader = Node::new([0; 8], 0, send.local_addr().unwrap()); - let subs = Subscribers::new(node_me, node_leader, &[]); + let node_me = Node::new([0,0,0,0,0,0,0,1], 10, me_addr); + let node_subs = vec![Node::new([0,0,0,0,0,0,0,2], 8, target_peer_addr); 1]; + let node_leader = Node::new([0,0,0,0,0,0,0,3], 20, leader_addr); + println!("leader: {:?} subs: {:?}", node_leader, node_subs); + let subs = Subscribers::new(node_me, node_leader, &node_subs); - let recv_recycler = PacketRecycler::default(); + let recv_recycler = BlobRecycler::default(); let resp_recycler = BlobRecycler::default(); let (s_reader, r_reader) = channel(); - let t_receiver = streamer::receiver(read, exit.clone(), recv_recycler.clone(), s_reader).unwrap(); + let t_receiver = streamer::blob_receiver(exit.clone(), recv_recycler.clone(), target_peer_sock, s_reader).unwrap(); let (s_responder, r_responder) = channel(); - let t_responder = streamer::responder(send, exit.clone(), resp_recycler.clone(), r_responder); + let t_responder = streamer::responder(source_peer_sock, exit.clone(), resp_recycler.clone(), r_responder); let alice = Mint::new(10_000); let acc = Accountant::new(&alice); - let bob_pubkey = KeyPair::new().pubkey(); let historian = Historian::new(&alice.last_id(), Some(30)); let acc = Arc::new(Mutex::new(AccountantSkel::new( acc, @@ -578,16 +605,43 @@ mod tests { let _threads = AccountantSkel::replicate(&acc, subs, exit.clone()).unwrap(); let mut msgs = VecDeque::new(); - for i in 0..10 { + let zero_hash = Hash::default(); + let num_blobs = 10; + let bob_keypair = KeyPair::new(); + for i in 0..num_blobs { let b = resp_recycler.allocate(); let b_ = b.clone(); let mut w = b.write().unwrap(); - w.data[0] = i as u8; - w.meta.size = PACKET_DATA_SIZE; - w.meta.set_addr(&addr); + w.set_index(i).unwrap(); + + let tr0 = Event::new_timestamp(&bob_keypair, Utc::now()); + let entry0 = entry::create_entry(&zero_hash, i, vec![tr0]); + + let tr1 = Transaction::new(&alice.keypair(), bob_keypair.pubkey(), 501, zero_hash); + let entry1 = entry::create_entry(&zero_hash, i + num_blobs, vec![Event::Transaction(tr1)]); + + let serialized_entry = serialize(&vec![entry0, entry1]).unwrap(); + + w.data_mut()[..serialized_entry.len()].copy_from_slice(&serialized_entry); + w.meta.size = serialized_entry.len(); + w.meta.set_addr(&me_addr); + drop(w); + println!("index: {}", b_.read().unwrap().get_index().unwrap()); msgs.push_back(b_); } s_responder.send(msgs).expect("send"); + println!("foo"); + let timer = Duration::new(1, 0); + let mut msgs: Vec<_> = Vec::new(); + while let Ok(msg) = r_reader.recv_timeout(timer) { + println!("msg: {:?}", msg); + msgs.push(msg); + } + + let client_socket = UdpSocket::bind("127.0.0.1:0").unwrap(); + let acc_client = AccountantStub::new(&target_peer_addr.to_string(), client_socket); + let last_id = acc_client.get_balance(&alice.keypair().pubkey()); + //println!("last id: {:?}", last_id); exit.store(true, Ordering::Relaxed); t_receiver.join().expect("join"); diff --git a/src/streamer.rs b/src/streamer.rs index 0fd8c1b758204d..c1e2336202259a 100644 --- a/src/streamer.rs +++ b/src/streamer.rs @@ -79,7 +79,9 @@ pub fn responder( //window. fn recv_blobs(recycler: &BlobRecycler, sock: &UdpSocket, s: &BlobSender) -> Result<()> { let dq = Blob::recv_from(recycler, sock)?; + println!("got some blobs: {}", dq.len()); if !dq.is_empty() { + println!("recv_blobs: sending some blobs"); s.send(dq)?; } Ok(()) @@ -99,7 +101,8 @@ pub fn blob_receiver( if exit.load(Ordering::Relaxed) { break; } - let _ = recv_blobs(&recycler, &sock, &s); + let ret = recv_blobs(&recycler, &sock, &s); + println!("recv_blobs ret: {:?}", ret); }); Ok(t) } @@ -113,8 +116,10 @@ fn recv_window( s: &BlobSender, retransmit: &BlobSender, ) -> Result<()> { + println!("recv_window start"); let timer = Duration::new(1, 0); let mut dq = r.recv_timeout(timer)?; + println!("got some window blobs: {}", dq.len()); while let Ok(mut nq) = r.try_recv() { dq.append(&mut nq) } @@ -126,6 +131,7 @@ fn recv_window( let p = b.read().unwrap(); //TODO this check isn't safe against adverserial packets //we need to maintain a sequence window + trace!("idx: {} addr: {:?} leader: {:?}", p.get_index().unwrap(), p.meta.addr(), rsubs.leader.addr); if p.meta.addr() == rsubs.leader.addr { //TODO //need to copy the retransmited blob @@ -158,6 +164,7 @@ fn recv_window( //TODO, after the block are authenticated //if we get different blocks at the same index //that is a network failure/attack + trace!("window w: {} size: {}", w, p.meta.size); { if window[w].is_none() { window[w] = Some(b_); @@ -166,6 +173,7 @@ fn recv_window( } loop { let k = *consumed % NUM_BLOBS; + trace!("k: {} consumed: {}", k, *consumed); if window[k].is_none() { break; } @@ -175,6 +183,7 @@ fn recv_window( } } } + trace!("sending contq.len: {}", contq.len()); if !contq.is_empty() { s.send(contq)?; } @@ -193,6 +202,7 @@ pub fn window( let mut window = vec![None; NUM_BLOBS]; let mut consumed = 0; loop { + println!("window loop"); if exit.load(Ordering::Relaxed) { break; } diff --git a/src/subscribers.rs b/src/subscribers.rs index b81a54941b599a..f0b271c43960b5 100644 --- a/src/subscribers.rs +++ b/src/subscribers.rs @@ -11,6 +11,8 @@ use rayon::prelude::*; use result::{Error, Result}; use std::net::{SocketAddr, UdpSocket}; +use std::fmt; + #[derive(Clone, PartialEq)] pub struct Node { pub id: [u64; 8], @@ -38,6 +40,12 @@ impl Node { } } +impl fmt::Debug for Node { + fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { + write!(f, "Node {{ weight: {} addr: {} }}", self.weight, self.addr) + } +} + pub struct Subscribers { data: Vec, pub me: Node,