diff --git a/src/crdt.rs b/src/crdt.rs index cf60657aabca1d..08c14faed7e35f 100644 --- a/src/crdt.rs +++ b/src/crdt.rs @@ -55,7 +55,6 @@ impl ReplicatedData { gossip_addr: SocketAddr, replicate_addr: SocketAddr, serve_addr: SocketAddr) -> ReplicatedData { - let daddr:SocketAddr = "0.0.0.0:0".parse().unwrap(); ReplicatedData { id, sig: Signature::default(), @@ -141,13 +140,15 @@ impl Crdt { if self.table.get(&v.id).is_none() || (v.version > self.table[&v.id].version) { //somehow we signed a message for our own identity with a higher version that // we have stored ourselves - trace!("me: {:?} v.id: {:?}", self.me, v.id); + trace!("me: {:?}", self.me[0]); + trace!("v.id: {:?}", v.id[0]); trace!("insert! {}", v.version); self.update_index += 1; let _ = self.table.insert(v.id.clone(), v.clone()); let _ = self.local.insert(v.id, self.update_index); } else { - trace!("INSERT FAILED {}", v.version); + trace!("INSERT FAILED new.version: {} me.version: {}", + v.version, self.table[&v.id].version); } } @@ -177,7 +178,7 @@ impl Crdt { // only leader should be broadcasting assert!(me.current_leader_id != v.id); let mut blob = b.write().unwrap(); - blob.set_index(*transmit_index + i as u64); + blob.set_index(*transmit_index + i as u64).expect("set_index"); s.send_to(&blob.data[..blob.meta.size], &v.replicate_addr) }) .collect(); @@ -238,7 +239,7 @@ impl Crdt { rdr.read_u64::().unwrap() } fn get_updates_since(&self, v: u64) -> (PublicKey, u64, Vec) { - trace!("get updates since {}", v); + //trace!("get updates since {}", v); let data = self.table .values() .filter(|x| self.local[&x.id] > v) @@ -374,7 +375,14 @@ mod test { replicate.local_addr().unwrap(), serve.local_addr().unwrap(), ); - (Crdt::new(d), gossip, replicate, serve) + let crdt = Crdt::new(d); + println!("id: {} gossip: {} replicate: {} serve: {}", + crdt.my_data().id[0], + gossip.local_addr().unwrap(), + replicate.local_addr().unwrap(), + serve.local_addr().unwrap(), + ); + (crdt, gossip, replicate, serve) } @@ -489,28 +497,56 @@ mod test { assert_eq!(crdt.table[&d.id].version, 2); } + use std::sync::{Once, ONCE_INIT}; + extern crate env_logger; + + static INIT: Once = ONCE_INIT; + + /// Setup function that is only run once, even if called multiple times. + fn setup() { + INIT.call_once(|| { + env_logger::init().unwrap(); + }); + } + + #[test] pub fn test_crdt_retransmit() { + setup(); + println!("c1:"); let (mut c1,s1,r1,e1) = test_node(); + println!("c2:"); let (mut c2,s2,r2,_) = test_node(); + println!("c3:"); let (mut c3,s3,r3,_) = test_node(); let c1_id = c1.my_data().id; c1.set_leader(c1_id); + c2.insert(c1.my_data().clone()); c3.insert(c1.my_data().clone()); + c2.set_leader(c1.my_data().id); c3.set_leader(c1.my_data().id); let exit = Arc::new(AtomicBool::new(false)); + + // Create listen threads let a1 = Arc::new(RwLock::new(c1)); - let t1 = Crdt::listen(a1.clone(), s1, exit.clone()); + let _t1 = Crdt::listen(a1.clone(), s1, exit.clone()); let a2 = Arc::new(RwLock::new(c2)); - let t2 = Crdt::listen(a2.clone(), s2, exit.clone()); + let _t2 = Crdt::listen(a2.clone(), s2, exit.clone()); let a3 = Arc::new(RwLock::new(c3)); - let t3 = Crdt::listen(a3.clone(), s3, exit.clone()); + let _t3 = Crdt::listen(a3.clone(), s3, exit.clone()); + + // Create gossip threads + let _t1_gossip = Crdt::gossip(a1.clone(), exit.clone()); + let _t2_gossip = Crdt::gossip(a2.clone(), exit.clone()); + let _t3_gossip = Crdt::gossip(a3.clone(), exit.clone()); + //wait to converge + println!("waitng to converge:"); let mut done = false; for _ in 0 .. 10 { done = a1.read().unwrap().table.len() == 3 && @@ -530,9 +566,10 @@ mod test { .map(|s| { let mut b = Blob::default(); s.set_read_timeout(Some(Duration::new(1, 0))).unwrap(); - s.recv_from(&mut b.data).is_err() + let res = s.recv_from(&mut b.data); + res.is_err() }) .collect(); - assert_eq!(res, [false, true, true]); + assert_eq!(res, [true, false, false]); } }