diff --git a/src/accountant_skel.rs b/src/accountant_skel.rs index 4901b979b10483..b4b27de03930c3 100644 --- a/src/accountant_skel.rs +++ b/src/accountant_skel.rs @@ -29,7 +29,6 @@ use streamer; use transaction::Transaction; use subscribers; -use std::mem::size_of; pub struct AccountantSkel { acc: Accountant, @@ -253,19 +252,20 @@ impl AccountantSkel { fn replicate_state( obj: &Arc>>, verified_receiver: &streamer::BlobReceiver, - blob_sender: &streamer::BlobSender, blob_recycler: &packet::BlobRecycler, ) -> Result<()> { let timer = Duration::new(1, 0); let blobs = verified_receiver.recv_timeout(timer)?; for msgs in &blobs { - let blob = msgs.read().unwrap(); - let mut entries:Vec = Vec::new(); - for i in 0..blob.meta.size/size_of::() { - entries.push(deserialize(&blob.data[i..i+size_of::()]).unwrap()); - } - for e in entries { - obj.lock().unwrap().acc.process_verified_events(e.events)?; + let blob = msgs.read().unwrap(); + let entries: Vec = deserialize(&blob.data()[..blob.meta.size]).unwrap(); + for entry in entries { + obj.lock().unwrap().acc.register_entry_id(&entry.id); + + obj.lock() + .unwrap() + .acc + .process_verified_events(entry.events)?; } //TODO respond back to leader with hash of the state } @@ -275,7 +275,6 @@ impl AccountantSkel { Ok(()) } - /// Create a UDP microservice that forwards messages the given AccountantSkel. /// This service is the network leader /// Set `exit` to shutdown its threads. @@ -354,8 +353,12 @@ impl AccountantSkel { let blob_recycler = packet::BlobRecycler::default(); let (blob_sender, blob_receiver) = channel(); - let t_blob_receiver = - streamer::blob_receiver(exit.clone(), blob_recycler.clone(), read, blob_sender.clone())?; + let t_blob_receiver = streamer::blob_receiver( + exit.clone(), + blob_recycler.clone(), + read, + blob_sender.clone(), + )?; let (window_sender, window_receiver) = channel(); let (retransmit_sender, retransmit_receiver) = channel(); @@ -368,7 +371,7 @@ impl AccountantSkel { retransmit_receiver, ); //TODO - //the packets comming out of blob_receiver need to be sent to the GPU and verified + //the packets coming out of blob_receiver need to be sent to the GPU and verified //then sent to the window, which does the erasure coding reconstruction let t_window = streamer::window( exit.clone(), @@ -381,8 +384,7 @@ impl AccountantSkel { let skel = obj.clone(); let t_server = spawn(move || loop { - let e = Self::replicate_state(&skel, &window_receiver, - &blob_sender, &blob_recycler); + let e = Self::replicate_state(&skel, &window_receiver, &blob_recycler); if e.is_err() && exit.load(Ordering::Relaxed) { break; } @@ -441,7 +443,10 @@ mod tests { use streamer; use std::sync::mpsc::channel; use std::collections::VecDeque; - use packet::{PACKET_DATA_SIZE}; + use hash::{hash, Hash}; + use event::Event; + use entry; + use chrono::prelude::*; #[test] fn test_layout() { @@ -547,27 +552,57 @@ mod tests { exit.store(true, Ordering::Relaxed); } + use std::sync::{Once, ONCE_INIT}; + extern crate env_logger; + + static INIT: Once = ONCE_INIT; + + /// Setup function that is only run once, even if called multiple times. + fn setup() { + INIT.call_once(|| { + env_logger::init().unwrap(); + }); + } + #[test] fn test_replicate() { - let read = UdpSocket::bind("127.0.0.1:0").expect("bind"); - let addr = read.local_addr().unwrap(); - let send = UdpSocket::bind("127.0.0.1:0").expect("bind"); + setup(); + let leader_sock = UdpSocket::bind("127.0.0.1:0").expect("bind"); + let leader_addr = leader_sock.local_addr().unwrap(); + let me_addr = "127.0.0.1:9010".parse().unwrap(); + let target_peer_sock = UdpSocket::bind("127.0.0.1:0").expect("bind"); + let target_peer_addr = target_peer_sock.local_addr().unwrap(); + let source_peer_sock = UdpSocket::bind("127.0.0.1:0").expect("bind"); let exit = Arc::new(AtomicBool::new(false)); - let node_me = Node::default(); - let node_leader = Node::new([0; 8], 0, send.local_addr().unwrap()); - let subs = Subscribers::new(node_me, node_leader, &[]); + let node_me = Node::new([0, 0, 0, 0, 0, 0, 0, 1], 10, me_addr); + let node_subs = vec![Node::new([0, 0, 0, 0, 0, 0, 0, 2], 8, target_peer_addr); 1]; + let node_leader = Node::new([0, 0, 0, 0, 0, 0, 0, 3], 20, leader_addr); + let subs = Subscribers::new(node_me, node_leader, &node_subs); - let recv_recycler = PacketRecycler::default(); + // setup some blob services to send blobs into the socket + // to simulate the source peer and get blobs out of the socket to + // simulate target peer + let recv_recycler = BlobRecycler::default(); let resp_recycler = BlobRecycler::default(); let (s_reader, r_reader) = channel(); - let t_receiver = streamer::receiver(read, exit.clone(), recv_recycler.clone(), s_reader).unwrap(); + let t_receiver = streamer::blob_receiver( + exit.clone(), + recv_recycler.clone(), + target_peer_sock, + s_reader, + ).unwrap(); let (s_responder, r_responder) = channel(); - let t_responder = streamer::responder(send, exit.clone(), resp_recycler.clone(), r_responder); + let t_responder = streamer::responder( + source_peer_sock, + exit.clone(), + resp_recycler.clone(), + r_responder, + ); - let alice = Mint::new(10_000); + let starting_balance = 10_000; + let alice = Mint::new(starting_balance); let acc = Accountant::new(&alice); - let bob_pubkey = KeyPair::new().pubkey(); let historian = Historian::new(&alice.last_id(), Some(30)); let acc = Arc::new(Mutex::new(AccountantSkel::new( acc, @@ -575,21 +610,75 @@ mod tests { sink(), historian, ))); - + let _threads = AccountantSkel::replicate(&acc, subs, exit.clone()).unwrap(); + let mut alice_ref_balance = starting_balance; let mut msgs = VecDeque::new(); - for i in 0..10 { + let mut cur_hash = Hash::default(); + let num_blobs = 10; + let transfer_amount = 501; + let bob_keypair = KeyPair::new(); + for i in 0..num_blobs { let b = resp_recycler.allocate(); let b_ = b.clone(); let mut w = b.write().unwrap(); - w.data[0] = i as u8; - w.meta.size = PACKET_DATA_SIZE; - w.meta.set_addr(&addr); + w.set_index(i).unwrap(); + + let tr0 = Event::new_timestamp(&bob_keypair, Utc::now()); + let entry0 = entry::create_entry(&cur_hash, i, vec![tr0]); + acc.lock().unwrap().acc.register_entry_id(&cur_hash); + cur_hash = hash(&cur_hash); + + let tr1 = Transaction::new( + &alice.keypair(), + bob_keypair.pubkey(), + transfer_amount, + cur_hash, + ); + acc.lock().unwrap().acc.register_entry_id(&cur_hash); + cur_hash = hash(&cur_hash); + let entry1 = + entry::create_entry(&cur_hash, i + num_blobs, vec![Event::Transaction(tr1)]); + acc.lock().unwrap().acc.register_entry_id(&cur_hash); + cur_hash = hash(&cur_hash); + + alice_ref_balance -= transfer_amount; + + let serialized_entry = serialize(&vec![entry0, entry1]).unwrap(); + + w.data_mut()[..serialized_entry.len()].copy_from_slice(&serialized_entry); + w.set_size(serialized_entry.len()); + w.meta.set_addr(&me_addr); + drop(w); msgs.push_back(b_); } + + // send the blobs into the socket s_responder.send(msgs).expect("send"); + // receive retransmitted messages + let timer = Duration::new(1, 0); + let mut msgs: Vec<_> = Vec::new(); + while let Ok(msg) = r_reader.recv_timeout(timer) { + trace!("msg: {:?}", msg); + msgs.push(msg); + } + + let alice_balance = acc.lock() + .unwrap() + .acc + .get_balance(&alice.keypair().pubkey()) + .unwrap(); + assert_eq!(alice_balance, alice_ref_balance); + + let bob_balance = acc.lock() + .unwrap() + .acc + .get_balance(&bob_keypair.pubkey()) + .unwrap(); + assert_eq!(bob_balance, starting_balance - alice_ref_balance); + exit.store(true, Ordering::Relaxed); t_receiver.join().expect("join"); t_responder.join().expect("join"); diff --git a/src/packet.rs b/src/packet.rs index 2106bf5dab546b..d97b261e9f0fc8 100644 --- a/src/packet.rs +++ b/src/packet.rs @@ -6,6 +6,7 @@ use std::fmt; use std::io; use std::net::{IpAddr, Ipv4Addr, Ipv6Addr, SocketAddr, UdpSocket}; use std::sync::{Arc, Mutex, RwLock}; +use std::mem::size_of; pub type SharedPackets = Arc>; pub type SharedBlob = Arc>; @@ -210,23 +211,28 @@ impl Packets { } } +const BLOB_INDEX_SIZE: usize = size_of::(); + impl Blob { pub fn get_index(&self) -> Result { - let mut rdr = io::Cursor::new(&self.data[0..8]); + let mut rdr = io::Cursor::new(&self.data[0..BLOB_INDEX_SIZE]); let r = rdr.read_u64::()?; Ok(r) } pub fn set_index(&mut self, ix: u64) -> Result<()> { let mut wtr = vec![]; wtr.write_u64::(ix)?; - self.data[..8].clone_from_slice(&wtr); + self.data[..BLOB_INDEX_SIZE].clone_from_slice(&wtr); Ok(()) } pub fn data(&self) -> &[u8] { - &self.data[8..] + &self.data[BLOB_INDEX_SIZE..] } pub fn data_mut(&mut self) -> &mut [u8] { - &mut self.data[8..] + &mut self.data[BLOB_INDEX_SIZE..] + } + pub fn set_size(&mut self, size: usize) { + self.meta.size = size + BLOB_INDEX_SIZE; } pub fn recv_from(re: &BlobRecycler, socket: &UdpSocket) -> Result> { let mut v = VecDeque::new(); diff --git a/src/streamer.rs b/src/streamer.rs index 0fd8c1b758204d..43e6f2ac353216 100644 --- a/src/streamer.rs +++ b/src/streamer.rs @@ -99,7 +99,10 @@ pub fn blob_receiver( if exit.load(Ordering::Relaxed) { break; } - let _ = recv_blobs(&recycler, &sock, &s); + let ret = recv_blobs(&recycler, &sock, &s); + if ret.is_err() { + break; + } }); Ok(t) } @@ -126,6 +129,12 @@ fn recv_window( let p = b.read().unwrap(); //TODO this check isn't safe against adverserial packets //we need to maintain a sequence window + trace!( + "idx: {} addr: {:?} leader: {:?}", + p.get_index().unwrap(), + p.meta.addr(), + rsubs.leader.addr + ); if p.meta.addr() == rsubs.leader.addr { //TODO //need to copy the retransmited blob @@ -158,6 +167,7 @@ fn recv_window( //TODO, after the block are authenticated //if we get different blocks at the same index //that is a network failure/attack + trace!("window w: {} size: {}", w, p.meta.size); { if window[w].is_none() { window[w] = Some(b_); @@ -166,6 +176,7 @@ fn recv_window( } loop { let k = *consumed % NUM_BLOBS; + trace!("k: {} consumed: {}", k, *consumed); if window[k].is_none() { break; } @@ -175,6 +186,7 @@ fn recv_window( } } } + trace!("sending contq.len: {}", contq.len()); if !contq.is_empty() { s.send(contq)?; } @@ -196,7 +208,15 @@ pub fn window( if exit.load(Ordering::Relaxed) { break; } - let _ = recv_window(&mut window, &subs, &recycler, &mut consumed, &r, &s, &retransmit); + let _ = recv_window( + &mut window, + &subs, + &recycler, + &mut consumed, + &r, + &s, + &retransmit, + ); } }) } @@ -495,7 +515,7 @@ mod test { let subs = Arc::new(RwLock::new(Subscribers::new( Node::default(), Node::default(), - &[Node::new([0; 8], 1, read.local_addr().unwrap())] + &[Node::new([0; 8], 1, read.local_addr().unwrap())], ))); let (s_retransmit, r_retransmit) = channel(); let blob_recycler = BlobRecycler::default(); diff --git a/src/subscribers.rs b/src/subscribers.rs index b81a54941b599a..f0b271c43960b5 100644 --- a/src/subscribers.rs +++ b/src/subscribers.rs @@ -11,6 +11,8 @@ use rayon::prelude::*; use result::{Error, Result}; use std::net::{SocketAddr, UdpSocket}; +use std::fmt; + #[derive(Clone, PartialEq)] pub struct Node { pub id: [u64; 8], @@ -38,6 +40,12 @@ impl Node { } } +impl fmt::Debug for Node { + fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { + write!(f, "Node {{ weight: {} addr: {} }}", self.weight, self.addr) + } +} + pub struct Subscribers { data: Vec, pub me: Node,