diff --git a/src/accountant_skel.rs b/src/accountant_skel.rs index b4bf805c63b0a7..b4b27de03930c3 100644 --- a/src/accountant_skel.rs +++ b/src/accountant_skel.rs @@ -22,12 +22,14 @@ use std::io::Write; use std::net::{SocketAddr, UdpSocket}; use std::sync::atomic::{AtomicBool, Ordering}; use std::sync::mpsc::{channel, Receiver, Sender}; -use std::sync::{Arc, Mutex}; +use std::sync::{Arc, Mutex, RwLock}; use std::thread::{spawn, JoinHandle}; use std::time::Duration; use streamer; use transaction::Transaction; +use subscribers; + pub struct AccountantSkel { acc: Accountant, last_id: Hash, @@ -245,8 +247,36 @@ impl AccountantSkel { } Ok(()) } + /// Process verified blobs, already in order + /// Respond with a signed hash of the state + fn replicate_state( + obj: &Arc>>, + verified_receiver: &streamer::BlobReceiver, + blob_recycler: &packet::BlobRecycler, + ) -> Result<()> { + let timer = Duration::new(1, 0); + let blobs = verified_receiver.recv_timeout(timer)?; + for msgs in &blobs { + let blob = msgs.read().unwrap(); + let entries: Vec = deserialize(&blob.data()[..blob.meta.size]).unwrap(); + for entry in entries { + obj.lock().unwrap().acc.register_entry_id(&entry.id); + + obj.lock() + .unwrap() + .acc + .process_verified_events(entry.events)?; + } + //TODO respond back to leader with hash of the state + } + for blob in blobs { + blob_recycler.recycle(blob); + } + Ok(()) + } /// Create a UDP microservice that forwards messages the given AccountantSkel. + /// This service is the network leader /// Set `exit` to shutdown its threads. pub fn serve( obj: &Arc>>, @@ -279,7 +309,7 @@ impl AccountantSkel { let skel = obj.clone(); let t_server = spawn(move || loop { - let e = AccountantSkel::process( + let e = Self::process( &skel, &verified_receiver, &blob_sender, @@ -292,6 +322,75 @@ impl AccountantSkel { }); Ok(vec![t_receiver, t_responder, t_server, t_verifier]) } + + /// This service receives messages from a leader in the network and processes the transactions + /// on the accountant state. + /// # Arguments + /// * `obj` - The accountant state. + /// * `rsubs` - The subscribers. + /// * `exit` - The exit signal. + /// # Remarks + /// The pipeline is constructed as follows: + /// 1. receive blobs from the network, these are out of order + /// 2. verify blobs, PoH, signatures (TODO) + /// 3. reconstruct contiguous window + /// a. order the blobs + /// b. use erasure coding to reconstruct missing blobs + /// c. ask the network for missing blobs, if erasure coding is insufficient + /// d. make sure that the blobs PoH sequences connect (TODO) + /// 4. process the transaction state machine + /// 5. respond with the hash of the state back to the leader + pub fn replicate( + obj: &Arc>>, + rsubs: subscribers::Subscribers, + exit: Arc, + ) -> Result>> { + let read = UdpSocket::bind(rsubs.me.addr)?; + // make sure we are on the same interface + let mut local = read.local_addr()?; + local.set_port(0); + let write = UdpSocket::bind(local)?; + + let blob_recycler = packet::BlobRecycler::default(); + let (blob_sender, blob_receiver) = channel(); + let t_blob_receiver = streamer::blob_receiver( + exit.clone(), + blob_recycler.clone(), + read, + blob_sender.clone(), + )?; + let (window_sender, window_receiver) = channel(); + let (retransmit_sender, retransmit_receiver) = channel(); + + let subs = Arc::new(RwLock::new(rsubs)); + let t_retransmit = streamer::retransmitter( + write, + exit.clone(), + subs.clone(), + blob_recycler.clone(), + retransmit_receiver, + ); + //TODO + //the packets coming out of blob_receiver need to be sent to the GPU and verified + //then sent to the window, which does the erasure coding reconstruction + let t_window = streamer::window( + exit.clone(), + subs, + blob_recycler.clone(), + blob_receiver, + window_sender, + retransmit_sender, + ); + + let skel = obj.clone(); + let t_server = spawn(move || loop { + let e = Self::replicate_state(&skel, &window_receiver, &blob_recycler); + if e.is_err() && exit.load(Ordering::Relaxed) { + break; + } + }); + Ok(vec![t_blob_receiver, t_retransmit, t_window, t_server]) + } } #[cfg(test)] @@ -319,7 +418,7 @@ mod tests { use accountant_skel::{to_packets, Request}; use bincode::serialize; use ecdsa; - use packet::{PacketRecycler, NUM_PACKETS}; + use packet::{BlobRecycler, PacketRecycler, NUM_PACKETS}; use transaction::{memfind, test_tx}; use accountant::Accountant; @@ -340,6 +439,15 @@ mod tests { use std::time::Duration; use transaction::Transaction; + use subscribers::{Node, Subscribers}; + use streamer; + use std::sync::mpsc::channel; + use std::collections::VecDeque; + use hash::{hash, Hash}; + use event::Event; + use entry; + use chrono::prelude::*; + #[test] fn test_layout() { let tr = test_tx(); @@ -444,6 +552,138 @@ mod tests { exit.store(true, Ordering::Relaxed); } + use std::sync::{Once, ONCE_INIT}; + extern crate env_logger; + + static INIT: Once = ONCE_INIT; + + /// Setup function that is only run once, even if called multiple times. + fn setup() { + INIT.call_once(|| { + env_logger::init().unwrap(); + }); + } + + #[test] + fn test_replicate() { + setup(); + let leader_sock = UdpSocket::bind("127.0.0.1:0").expect("bind"); + let leader_addr = leader_sock.local_addr().unwrap(); + let me_addr = "127.0.0.1:9010".parse().unwrap(); + let target_peer_sock = UdpSocket::bind("127.0.0.1:0").expect("bind"); + let target_peer_addr = target_peer_sock.local_addr().unwrap(); + let source_peer_sock = UdpSocket::bind("127.0.0.1:0").expect("bind"); + let exit = Arc::new(AtomicBool::new(false)); + + let node_me = Node::new([0, 0, 0, 0, 0, 0, 0, 1], 10, me_addr); + let node_subs = vec![Node::new([0, 0, 0, 0, 0, 0, 0, 2], 8, target_peer_addr); 1]; + let node_leader = Node::new([0, 0, 0, 0, 0, 0, 0, 3], 20, leader_addr); + let subs = Subscribers::new(node_me, node_leader, &node_subs); + + // setup some blob services to send blobs into the socket + // to simulate the source peer and get blobs out of the socket to + // simulate target peer + let recv_recycler = BlobRecycler::default(); + let resp_recycler = BlobRecycler::default(); + let (s_reader, r_reader) = channel(); + let t_receiver = streamer::blob_receiver( + exit.clone(), + recv_recycler.clone(), + target_peer_sock, + s_reader, + ).unwrap(); + let (s_responder, r_responder) = channel(); + let t_responder = streamer::responder( + source_peer_sock, + exit.clone(), + resp_recycler.clone(), + r_responder, + ); + + let starting_balance = 10_000; + let alice = Mint::new(starting_balance); + let acc = Accountant::new(&alice); + let historian = Historian::new(&alice.last_id(), Some(30)); + let acc = Arc::new(Mutex::new(AccountantSkel::new( + acc, + alice.last_id(), + sink(), + historian, + ))); + + let _threads = AccountantSkel::replicate(&acc, subs, exit.clone()).unwrap(); + + let mut alice_ref_balance = starting_balance; + let mut msgs = VecDeque::new(); + let mut cur_hash = Hash::default(); + let num_blobs = 10; + let transfer_amount = 501; + let bob_keypair = KeyPair::new(); + for i in 0..num_blobs { + let b = resp_recycler.allocate(); + let b_ = b.clone(); + let mut w = b.write().unwrap(); + w.set_index(i).unwrap(); + + let tr0 = Event::new_timestamp(&bob_keypair, Utc::now()); + let entry0 = entry::create_entry(&cur_hash, i, vec![tr0]); + acc.lock().unwrap().acc.register_entry_id(&cur_hash); + cur_hash = hash(&cur_hash); + + let tr1 = Transaction::new( + &alice.keypair(), + bob_keypair.pubkey(), + transfer_amount, + cur_hash, + ); + acc.lock().unwrap().acc.register_entry_id(&cur_hash); + cur_hash = hash(&cur_hash); + let entry1 = + entry::create_entry(&cur_hash, i + num_blobs, vec![Event::Transaction(tr1)]); + acc.lock().unwrap().acc.register_entry_id(&cur_hash); + cur_hash = hash(&cur_hash); + + alice_ref_balance -= transfer_amount; + + let serialized_entry = serialize(&vec![entry0, entry1]).unwrap(); + + w.data_mut()[..serialized_entry.len()].copy_from_slice(&serialized_entry); + w.set_size(serialized_entry.len()); + w.meta.set_addr(&me_addr); + drop(w); + msgs.push_back(b_); + } + + // send the blobs into the socket + s_responder.send(msgs).expect("send"); + + // receive retransmitted messages + let timer = Duration::new(1, 0); + let mut msgs: Vec<_> = Vec::new(); + while let Ok(msg) = r_reader.recv_timeout(timer) { + trace!("msg: {:?}", msg); + msgs.push(msg); + } + + let alice_balance = acc.lock() + .unwrap() + .acc + .get_balance(&alice.keypair().pubkey()) + .unwrap(); + assert_eq!(alice_balance, alice_ref_balance); + + let bob_balance = acc.lock() + .unwrap() + .acc + .get_balance(&bob_keypair.pubkey()) + .unwrap(); + assert_eq!(bob_balance, starting_balance - alice_ref_balance); + + exit.store(true, Ordering::Relaxed); + t_receiver.join().expect("join"); + t_responder.join().expect("join"); + } + } #[cfg(all(feature = "unstable", test))] diff --git a/src/packet.rs b/src/packet.rs index 2106bf5dab546b..d97b261e9f0fc8 100644 --- a/src/packet.rs +++ b/src/packet.rs @@ -6,6 +6,7 @@ use std::fmt; use std::io; use std::net::{IpAddr, Ipv4Addr, Ipv6Addr, SocketAddr, UdpSocket}; use std::sync::{Arc, Mutex, RwLock}; +use std::mem::size_of; pub type SharedPackets = Arc>; pub type SharedBlob = Arc>; @@ -210,23 +211,28 @@ impl Packets { } } +const BLOB_INDEX_SIZE: usize = size_of::(); + impl Blob { pub fn get_index(&self) -> Result { - let mut rdr = io::Cursor::new(&self.data[0..8]); + let mut rdr = io::Cursor::new(&self.data[0..BLOB_INDEX_SIZE]); let r = rdr.read_u64::()?; Ok(r) } pub fn set_index(&mut self, ix: u64) -> Result<()> { let mut wtr = vec![]; wtr.write_u64::(ix)?; - self.data[..8].clone_from_slice(&wtr); + self.data[..BLOB_INDEX_SIZE].clone_from_slice(&wtr); Ok(()) } pub fn data(&self) -> &[u8] { - &self.data[8..] + &self.data[BLOB_INDEX_SIZE..] } pub fn data_mut(&mut self) -> &mut [u8] { - &mut self.data[8..] + &mut self.data[BLOB_INDEX_SIZE..] + } + pub fn set_size(&mut self, size: usize) { + self.meta.size = size + BLOB_INDEX_SIZE; } pub fn recv_from(re: &BlobRecycler, socket: &UdpSocket) -> Result> { let mut v = VecDeque::new(); diff --git a/src/result.rs b/src/result.rs index 9b3c17a3695fde..01872dfbe1138c 100644 --- a/src/result.rs +++ b/src/result.rs @@ -4,6 +4,7 @@ use bincode; use serde_json; use std; use std::any::Any; +use accountant; #[derive(Debug)] pub enum Error { @@ -14,6 +15,7 @@ pub enum Error { RecvError(std::sync::mpsc::RecvError), RecvTimeoutError(std::sync::mpsc::RecvTimeoutError), Serialize(std::boxed::Box), + AccountingError(accountant::AccountingError), SendError, Services, } @@ -30,6 +32,11 @@ impl std::convert::From for Error { Error::RecvTimeoutError(e) } } +impl std::convert::From for Error { + fn from(e: accountant::AccountingError) -> Error { + Error::AccountingError(e) + } +} impl std::convert::From> for Error { fn from(_e: std::sync::mpsc::SendError) -> Error { Error::SendError diff --git a/src/streamer.rs b/src/streamer.rs index 33882c31dcffda..43e6f2ac353216 100644 --- a/src/streamer.rs +++ b/src/streamer.rs @@ -8,7 +8,7 @@ use std::sync::mpsc; use std::sync::{Arc, RwLock}; use std::thread::{spawn, JoinHandle}; use std::time::Duration; -use subscribers; +use subscribers::Subscribers; pub type PacketReceiver = mpsc::Receiver; pub type PacketSender = mpsc::Sender; @@ -99,19 +99,22 @@ pub fn blob_receiver( if exit.load(Ordering::Relaxed) { break; } - let _ = recv_blobs(&recycler, &sock, &s); + let ret = recv_blobs(&recycler, &sock, &s); + if ret.is_err() { + break; + } }); Ok(t) } fn recv_window( window: &mut Vec>, - subs: &Arc>, + subs: &Arc>, recycler: &BlobRecycler, consumed: &mut usize, r: &BlobReceiver, s: &BlobSender, - cast: &BlobSender, + retransmit: &BlobSender, ) -> Result<()> { let timer = Duration::new(1, 0); let mut dq = r.recv_timeout(timer)?; @@ -120,12 +123,18 @@ fn recv_window( } { //retransmit all leader blocks - let mut castq = VecDeque::new(); + let mut retransmitq = VecDeque::new(); let rsubs = subs.read().unwrap(); for b in &dq { let p = b.read().unwrap(); //TODO this check isn't safe against adverserial packets //we need to maintain a sequence window + trace!( + "idx: {} addr: {:?} leader: {:?}", + p.get_index().unwrap(), + p.meta.addr(), + rsubs.leader.addr + ); if p.meta.addr() == rsubs.leader.addr { //TODO //need to copy the retransmited blob @@ -141,11 +150,11 @@ fn recv_window( mnv.meta.size = sz; mnv.data[..sz].copy_from_slice(&p.data[..sz]); } - castq.push_back(nv); + retransmitq.push_back(nv); } } - if !castq.is_empty() { - cast.send(castq)?; + if !retransmitq.is_empty() { + retransmit.send(retransmitq)?; } } //send a contiguous set of blocks @@ -158,6 +167,7 @@ fn recv_window( //TODO, after the block are authenticated //if we get different blocks at the same index //that is a network failure/attack + trace!("window w: {} size: {}", w, p.meta.size); { if window[w].is_none() { window[w] = Some(b_); @@ -166,6 +176,7 @@ fn recv_window( } loop { let k = *consumed % NUM_BLOBS; + trace!("k: {} consumed: {}", k, *consumed); if window[k].is_none() { break; } @@ -175,6 +186,7 @@ fn recv_window( } } } + trace!("sending contq.len: {}", contq.len()); if !contq.is_empty() { s.send(contq)?; } @@ -183,11 +195,11 @@ fn recv_window( pub fn window( exit: Arc, - subs: Arc>, + subs: Arc>, recycler: BlobRecycler, r: BlobReceiver, s: BlobSender, - cast: BlobSender, + retransmit: BlobSender, ) -> JoinHandle<()> { spawn(move || { let mut window = vec![None; NUM_BLOBS]; @@ -196,13 +208,21 @@ pub fn window( if exit.load(Ordering::Relaxed) { break; } - let _ = recv_window(&mut window, &subs, &recycler, &mut consumed, &r, &s, &cast); + let _ = recv_window( + &mut window, + &subs, + &recycler, + &mut consumed, + &r, + &s, + &retransmit, + ); } }) } fn retransmit( - subs: &Arc>, + subs: &Arc>, recycler: &BlobRecycler, r: &BlobReceiver, sock: &UdpSocket, @@ -237,7 +257,7 @@ fn retransmit( pub fn retransmitter( sock: UdpSocket, exit: Arc, - subs: Arc>, + subs: Arc>, recycler: BlobRecycler, r: BlobReceiver, ) -> JoinHandle<()> { @@ -442,20 +462,21 @@ mod test { let subs = Arc::new(RwLock::new(Subscribers::new( Node::default(), Node::new([0; 8], 0, send.local_addr().unwrap()), + &[], ))); let resp_recycler = BlobRecycler::default(); let (s_reader, r_reader) = channel(); let t_receiver = blob_receiver(exit.clone(), resp_recycler.clone(), read, s_reader).unwrap(); let (s_window, r_window) = channel(); - let (s_cast, r_cast) = channel(); + let (s_retransmit, r_retransmit) = channel(); let t_window = window( exit.clone(), subs, resp_recycler.clone(), r_reader, s_window, - s_cast, + s_retransmit, ); let (s_responder, r_responder) = channel(); let t_responder = responder(send, exit.clone(), resp_recycler.clone(), r_responder); @@ -475,8 +496,8 @@ mod test { let mut num = 0; get_blobs(r_window, &mut num); assert_eq!(num, 10); - let mut q = r_cast.recv().unwrap(); - while let Ok(mut nq) = r_cast.try_recv() { + let mut q = r_retransmit.recv().unwrap(); + while let Ok(mut nq) = r_retransmit.try_recv() { q.append(&mut nq); } assert_eq!(q.len(), 10); @@ -494,9 +515,8 @@ mod test { let subs = Arc::new(RwLock::new(Subscribers::new( Node::default(), Node::default(), + &[Node::new([0; 8], 1, read.local_addr().unwrap())], ))); - let n3 = Node::new([0; 8], 1, read.local_addr().unwrap()); - subs.write().unwrap().insert(&[n3]); let (s_retransmit, r_retransmit) = channel(); let blob_recycler = BlobRecycler::default(); let saddr = send.local_addr().unwrap(); diff --git a/src/subscribers.rs b/src/subscribers.rs index 153246d12adb31..f0b271c43960b5 100644 --- a/src/subscribers.rs +++ b/src/subscribers.rs @@ -11,6 +11,8 @@ use rayon::prelude::*; use result::{Error, Result}; use std::net::{SocketAddr, UdpSocket}; +use std::fmt; + #[derive(Clone, PartialEq)] pub struct Node { pub id: [u64; 8], @@ -38,20 +40,27 @@ impl Node { } } +impl fmt::Debug for Node { + fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { + write!(f, "Node {{ weight: {} addr: {} }}", self.weight, self.addr) + } +} + pub struct Subscribers { data: Vec, - me: Node, + pub me: Node, pub leader: Node, } impl Subscribers { - pub fn new(me: Node, leader: Node) -> Subscribers { + pub fn new(me: Node, leader: Node, network: &[Node]) -> Subscribers { let mut h = Subscribers { data: vec![], me: me.clone(), leader: leader.clone(), }; h.insert(&[me, leader]); + h.insert(network); h } @@ -99,7 +108,7 @@ mod test { me.weight = 10; let mut leader = Node::default(); leader.weight = 11; - let mut s = Subscribers::new(me, leader); + let mut s = Subscribers::new(me, leader, &[]); assert_eq!(s.data.len(), 2); assert_eq!(s.data[0].weight, 11); assert_eq!(s.data[1].weight, 10); @@ -116,7 +125,7 @@ mod test { let s3 = UdpSocket::bind("127.0.0.1:0").expect("bind"); let n1 = Node::new([0; 8], 0, s1.local_addr().unwrap()); let n2 = Node::new([0; 8], 0, s2.local_addr().unwrap()); - let mut s = Subscribers::new(n1.clone(), n2.clone()); + let mut s = Subscribers::new(n1.clone(), n2.clone(), &[]); let n3 = Node::new([0; 8], 0, s3.local_addr().unwrap()); s.insert(&[n3]); let mut b = Blob::default();