Skip to content

Commit

Permalink
Merge pull request #2 from sakridge/broadcast
Browse files Browse the repository at this point in the history
test_crdt_retransmit passing
  • Loading branch information
aeyakovenko authored May 1, 2018
2 parents 4f94bfa + f4d9822 commit 5a95827
Showing 1 changed file with 48 additions and 11 deletions.
59 changes: 48 additions & 11 deletions src/crdt.rs
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,6 @@ impl ReplicatedData {
gossip_addr: SocketAddr,
replicate_addr: SocketAddr,
serve_addr: SocketAddr) -> ReplicatedData {
let daddr:SocketAddr = "0.0.0.0:0".parse().unwrap();
ReplicatedData {
id,
sig: Signature::default(),
Expand Down Expand Up @@ -141,13 +140,15 @@ impl Crdt {
if self.table.get(&v.id).is_none() || (v.version > self.table[&v.id].version) {
//somehow we signed a message for our own identity with a higher version that
// we have stored ourselves
trace!("me: {:?} v.id: {:?}", self.me, v.id);
trace!("me: {:?}", self.me[0]);
trace!("v.id: {:?}", v.id[0]);
trace!("insert! {}", v.version);
self.update_index += 1;
let _ = self.table.insert(v.id.clone(), v.clone());
let _ = self.local.insert(v.id, self.update_index);
} else {
trace!("INSERT FAILED {}", v.version);
trace!("INSERT FAILED new.version: {} me.version: {}",
v.version, self.table[&v.id].version);
}
}

Expand Down Expand Up @@ -177,7 +178,7 @@ impl Crdt {
// only leader should be broadcasting
assert!(me.current_leader_id != v.id);
let mut blob = b.write().unwrap();
blob.set_index(*transmit_index + i as u64);
blob.set_index(*transmit_index + i as u64).expect("set_index");
s.send_to(&blob.data[..blob.meta.size], &v.replicate_addr)
})
.collect();
Expand Down Expand Up @@ -238,7 +239,7 @@ impl Crdt {
rdr.read_u64::<LittleEndian>().unwrap()
}
fn get_updates_since(&self, v: u64) -> (PublicKey, u64, Vec<ReplicatedData>) {
trace!("get updates since {}", v);
//trace!("get updates since {}", v);
let data = self.table
.values()
.filter(|x| self.local[&x.id] > v)
Expand Down Expand Up @@ -374,7 +375,14 @@ mod test {
replicate.local_addr().unwrap(),
serve.local_addr().unwrap(),
);
(Crdt::new(d), gossip, replicate, serve)
let crdt = Crdt::new(d);
println!("id: {} gossip: {} replicate: {} serve: {}",
crdt.my_data().id[0],
gossip.local_addr().unwrap(),
replicate.local_addr().unwrap(),
serve.local_addr().unwrap(),
);
(crdt, gossip, replicate, serve)
}


Expand Down Expand Up @@ -489,28 +497,56 @@ mod test {
assert_eq!(crdt.table[&d.id].version, 2);
}

use std::sync::{Once, ONCE_INIT};
extern crate env_logger;

static INIT: Once = ONCE_INIT;

/// Setup function that is only run once, even if called multiple times.
fn setup() {
INIT.call_once(|| {
env_logger::init().unwrap();
});
}


#[test]
pub fn test_crdt_retransmit() {
setup();
println!("c1:");
let (mut c1,s1,r1,e1) = test_node();
println!("c2:");
let (mut c2,s2,r2,_) = test_node();
println!("c3:");
let (mut c3,s3,r3,_) = test_node();
let c1_id = c1.my_data().id;
c1.set_leader(c1_id);

c2.insert(c1.my_data().clone());
c3.insert(c1.my_data().clone());

c2.set_leader(c1.my_data().id);
c3.set_leader(c1.my_data().id);

let exit = Arc::new(AtomicBool::new(false));

// Create listen threads
let a1 = Arc::new(RwLock::new(c1));
let t1 = Crdt::listen(a1.clone(), s1, exit.clone());
let _t1 = Crdt::listen(a1.clone(), s1, exit.clone());

let a2 = Arc::new(RwLock::new(c2));
let t2 = Crdt::listen(a2.clone(), s2, exit.clone());
let _t2 = Crdt::listen(a2.clone(), s2, exit.clone());

let a3 = Arc::new(RwLock::new(c3));
let t3 = Crdt::listen(a3.clone(), s3, exit.clone());
let _t3 = Crdt::listen(a3.clone(), s3, exit.clone());

// Create gossip threads
let _t1_gossip = Crdt::gossip(a1.clone(), exit.clone());
let _t2_gossip = Crdt::gossip(a2.clone(), exit.clone());
let _t3_gossip = Crdt::gossip(a3.clone(), exit.clone());

//wait to converge
println!("waitng to converge:");
let mut done = false;
for _ in 0 .. 10 {
done = a1.read().unwrap().table.len() == 3 &&
Expand All @@ -530,9 +566,10 @@ mod test {
.map(|s| {
let mut b = Blob::default();
s.set_read_timeout(Some(Duration::new(1, 0))).unwrap();
s.recv_from(&mut b.data).is_err()
let res = s.recv_from(&mut b.data);
res.is_err()
})
.collect();
assert_eq!(res, [false, true, true]);
assert_eq!(res, [true, false, false]);
}
}

0 comments on commit 5a95827

Please sign in to comment.