diff --git a/Cargo.toml b/Cargo.toml index 877fbde79eb027..3387af244f7309 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -16,13 +16,13 @@ license = "Apache-2.0" name = "solana-historian-demo" path = "src/bin/historian-demo.rs" -[[bin]] -name = "solana-client-demo" -path = "src/bin/client-demo.rs" +#[[bin]] +#name = "solana-client-demo" +#path = "src/bin/client-demo.rs" -[[bin]] -name = "solana-testnode" -path = "src/bin/testnode.rs" +#[[bin]] +#name = "solana-testnode" +#path = "src/bin/testnode.rs" [[bin]] name = "solana-genesis" diff --git a/src/crdt.rs b/src/crdt.rs index 8235ef7593d720..cacd99344e4414 100644 --- a/src/crdt.rs +++ b/src/crdt.rs @@ -151,11 +151,11 @@ impl Crdt { s: &UdpSocket, transmit_index: &mut u64 ) -> Result<()> { - let (me, table): (&ReplicatedData, Vec) = { + let (me, table): (ReplicatedData, Vec) = { // copy to avoid locking durring IO let robj = obj.read().unwrap(); let cloned_table:Vec = robj.table.values().cloned().collect(); - (&cloned_table[&robj.me], cloned_table) + (robj.table[&robj.me].clone(), cloned_table) }; let errs: Vec<_> = table.iter() .enumerate() @@ -167,7 +167,7 @@ impl Crdt { } // only leader should be broadcasting assert!(me.current_leader_id != v.id); - let blob = b.write().unwrap(); + let mut blob = b.write().unwrap(); blob.set_index(*transmit_index + i as u64); s.send_to(&blob.data[..blob.meta.size], &v.replicate_addr) }) @@ -354,6 +354,13 @@ mod test { use std::thread::{sleep, JoinHandle}; use std::time::Duration; + use rayon::iter::*; + use streamer::{blob_receiver, retransmitter}; + use std::sync::mpsc::channel; + use subscribers::{Node, Subscribers}; + use packet::{Blob, BlobRecycler}; + use std::collections::VecDeque; + /// Test that the network converges. /// Run until every node in the network has a full ReplicatedData set. /// Check that nodes stop sending updates after all the ReplicatedData has been shared. @@ -366,12 +373,18 @@ mod test { let exit = Arc::new(AtomicBool::new(false)); let listen: Vec<_> = (0..num) .map(|_| { - let listener = UdpSocket::bind("0.0.0.0:0").unwrap(); + let gossip = UdpSocket::bind("0.0.0.0:0").unwrap(); + let replicate = UdpSocket::bind("0.0.0.0:0").unwrap(); + let serve = UdpSocket::bind("0.0.0.0:0").unwrap(); let pubkey = KeyPair::new().pubkey(); - let d = ReplicatedData::new(pubkey, listener.local_addr().unwrap()); + let d = ReplicatedData::new(pubkey, + gossip.local_addr().unwrap(), + replicate.local_addr().unwrap(), + serve.local_addr().unwrap(), + ); let crdt = Crdt::new(d); let c = Arc::new(RwLock::new(crdt)); - let l = Crdt::listen(c.clone(), listener, exit.clone()); + let l = Crdt::listen(c.clone(), gossip, exit.clone()); (c, l) }) .collect(); @@ -452,7 +465,11 @@ mod test { /// Test that insert drops messages that are older #[test] fn insert_test() { - let mut d = ReplicatedData::new(KeyPair::new().pubkey(), "127.0.0.1:1234".parse().unwrap()); + let mut d = ReplicatedData::new(KeyPair::new().pubkey(), + "127.0.0.1:1234".parse().unwrap(), + "127.0.0.1:1235".parse().unwrap(), + "127.0.0.1:1236".parse().unwrap(), + ); assert_eq!(d.version, 0); let mut crdt = Crdt::new(d.clone()); assert_eq!(crdt.table[&d.id].version, 0); @@ -464,4 +481,32 @@ mod test { assert_eq!(crdt.table[&d.id].version, 2); } + #[test] + pub fn test_crdt_retransmit() { + let s1 = UdpSocket::bind("127.0.0.1:0").expect("bind"); + let s2 = UdpSocket::bind("127.0.0.1:0").expect("bind"); + let s3 = UdpSocket::bind("127.0.0.1:0").expect("bind"); + let n1 = Node::new([0; 8], 0, s1.local_addr().unwrap()); + let n2 = Node::new([0; 8], 0, s2.local_addr().unwrap()); + let mut s = Subscribers::new(n1.clone(), n2.clone(), &[]); + let n3 = Node::new([0; 8], 0, s3.local_addr().unwrap()); + s.insert(&[n3]); + let mut b = Blob::default(); + b.meta.size = 10; + let s4 = UdpSocket::bind("127.0.0.1:0").expect("bind"); + s.retransmit(&mut b, &s4).unwrap(); + let res: Vec<_> = [s1, s2, s3] + .into_par_iter() + .map(|s| { + let mut b = Blob::default(); + s.set_read_timeout(Some(Duration::new(1, 0))).unwrap(); + s.recv_from(&mut b.data).is_err() + }) + .collect(); + assert_eq!(res, [true, true, false]); + let mut n4 = Node::default(); + n4.addr = "255.255.255.255:1".parse().unwrap(); + s.insert(&[n4]); + assert!(s.retransmit(&mut b, &s4).is_err()); + } } diff --git a/src/lib.rs b/src/lib.rs index 1f285ef8801764..a73f6906afe896 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -3,7 +3,7 @@ pub mod accountant; //pub mod accountant_skel; //pub mod accountant_stub; pub mod crdt; -pub mod ecdsa; +//pub mod ecdsa; pub mod entry; #[cfg(feature = "erasure")] pub mod erasure; @@ -19,6 +19,7 @@ pub mod result; pub mod signature; pub mod streamer; pub mod transaction; +pub mod subscribers; extern crate bincode; extern crate byteorder; extern crate chrono; diff --git a/src/packet.rs b/src/packet.rs index 7204dc78beb165..cefe44cd0bf741 100644 --- a/src/packet.rs +++ b/src/packet.rs @@ -234,8 +234,7 @@ impl Blob { Ok(e) } pub fn set_id(&mut self, id: PublicKey) -> Result<()> { - let mut wtr = vec![]; - let wrt = serialize(&id)?; + let wtr = serialize(&id)?; self.data[BLOB_INDEX_END..BLOB_ID_END].clone_from_slice(&wtr); Ok(()) } diff --git a/src/streamer.rs b/src/streamer.rs index 423ee45c091398..35d9bc38c6c9d7 100644 --- a/src/streamer.rs +++ b/src/streamer.rs @@ -433,7 +433,11 @@ mod test { use std::time::Duration; use streamer::{blob_receiver, receiver, responder, retransmitter, window, BlobReceiver, PacketReceiver}; - use Crdt::Crdt; + + use crdt::{Crdt, ReplicatedData}; + use subscribers::Node; + use signature::KeyPair; + use signature::KeyPairUtil; fn get_msgs(r: PacketReceiver, num: &mut usize) { for _t in 0..5 { @@ -504,15 +508,23 @@ mod test { #[test] pub fn window_send_test() { + let pubkey_me = KeyPair::new().pubkey(); let read = UdpSocket::bind("127.0.0.1:0").expect("bind"); let addr = read.local_addr().unwrap(); let send = UdpSocket::bind("127.0.0.1:0").expect("bind"); + let serve = UdpSocket::bind("127.0.0.1:0").expect("bind"); let exit = Arc::new(AtomicBool::new(false)); - let subs = Arc::new(RwLock::new(Crdt::new( + + let rep_data = ReplicatedData::new(pubkey_me, + read.local_addr().unwrap(), + send.local_addr().unwrap(), + serve.local_addr().unwrap()); + let subs = Arc::new(RwLock::new(Crdt::new(rep_data))); + /* Node::default(), Node::new([0; 8], 0, send.local_addr().unwrap()), - &[], - ))); + &[],*/ + let resp_recycler = BlobRecycler::default(); let (s_reader, r_reader) = channel(); let t_receiver = @@ -558,14 +570,18 @@ mod test { #[test] pub fn retransmit() { + let pubkey_me = KeyPair::new().pubkey(); let read = UdpSocket::bind("127.0.0.1:0").expect("bind"); let send = UdpSocket::bind("127.0.0.1:0").expect("bind"); + let serve = UdpSocket::bind("127.0.0.1:0").expect("bind"); let exit = Arc::new(AtomicBool::new(false)); - let subs = Arc::new(RwLock::new(Crdt::new( - Node::default(), - Node::default(), - &[Node::new([0; 8], 1, read.local_addr().unwrap())], - ))); + + let rep_data = ReplicatedData::new(pubkey_me, + read.local_addr().unwrap(), + send.local_addr().unwrap(), + serve.local_addr().unwrap()); + let subs = Arc::new(RwLock::new(Crdt::new(rep_data))); + let (s_retransmit, r_retransmit) = channel(); let blob_recycler = BlobRecycler::default(); let saddr = send.local_addr().unwrap(); diff --git a/src/subscribers.rs b/src/subscribers.rs index 67105778cf191f..ce7bd5ebd27c49 100644 --- a/src/subscribers.rs +++ b/src/subscribers.rs @@ -50,7 +50,7 @@ pub struct Subscribers { data: Vec, /// TODO derive this somehow from the historians counter /// this is the window index - pub index: u64; + pub index: u64, pub me: Node, pub leader: Node, } @@ -59,6 +59,7 @@ impl Subscribers { pub fn new(me: Node, leader: Node, network: &[Node]) -> Subscribers { let mut h = Subscribers { data: vec![], + index: 0, me: me.clone(), leader: leader.clone(), }; @@ -68,24 +69,26 @@ impl Subscribers { } /// broadcast messages from the leader to layer 1 nodes - pub fn broadcast(&mut self, blobs: &Vec, s: &UdpSocket) -> Result<()> { - let errs: Vec<_> = self.subs + pub fn broadcast(&mut self, blobs: &mut Vec, s: &UdpSocket) -> Result<()> { + let errs: Vec<_> = self.data + .iter() .enumerate() .cycle() - .zip(blobs.iter()) - .par_iter() + .zip(blobs.iter_mut()) + //.par_iter() TODO: restore this .map(|((i,v),b)| { - if self.me == *i { + if self.me.id == v.id { return Ok(0); } - if self.leader == *i { + if self.leader.id == v.id { return Ok(0); } - let blob = b.write().unwrap(); - blob.set_index(self.index + i); - s.send_to(&blob.data[..blob.meta.size], &i.addr) + //let blob = b.write().unwrap(); TODO: should it be in write-lock here? + let blob = b; + blob.set_index(self.index + i as u64); + s.send_to(&blob.data[..blob.meta.size], &v.addr) }) - .collect() + .collect(); for e in errs { trace!("retransmit result {:?}", e); match e { @@ -151,32 +154,4 @@ mod test { assert_eq!(s.data.len(), 3); assert_eq!(s.data[0].weight, 12); } - #[test] - pub fn retransmit() { - let s1 = UdpSocket::bind("127.0.0.1:0").expect("bind"); - let s2 = UdpSocket::bind("127.0.0.1:0").expect("bind"); - let s3 = UdpSocket::bind("127.0.0.1:0").expect("bind"); - let n1 = Node::new([0; 8], 0, s1.local_addr().unwrap()); - let n2 = Node::new([0; 8], 0, s2.local_addr().unwrap()); - let mut s = Subscribers::new(n1.clone(), n2.clone(), &[]); - let n3 = Node::new([0; 8], 0, s3.local_addr().unwrap()); - s.insert(&[n3]); - let mut b = Blob::default(); - b.meta.size = 10; - let s4 = UdpSocket::bind("127.0.0.1:0").expect("bind"); - s.retransmit(&mut b, &s4).unwrap(); - let res: Vec<_> = [s1, s2, s3] - .into_par_iter() - .map(|s| { - let mut b = Blob::default(); - s.set_read_timeout(Some(Duration::new(1, 0))).unwrap(); - s.recv_from(&mut b.data).is_err() - }) - .collect(); - assert_eq!(res, [true, true, false]); - let mut n4 = Node::default(); - n4.addr = "255.255.255.255:1".parse().unwrap(); - s.insert(&[n4]); - assert!(s.retransmit(&mut b, &s4).is_err()); - } }