Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

recover full network from a star #152

Merged
merged 3 commits into from
Apr 26, 2018
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
97 changes: 65 additions & 32 deletions src/crdt.rs
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,10 @@ pub struct Crdt {
// TODO These messages should be signed, and go through the gpu pipeline for spam filtering
#[derive(Serialize, Deserialize)]
enum Protocol {
RequestUpdates(u64, SocketAddr),
/// forward your own latest data structure when requesting an update
/// this doesn't update the `remote` update index, but it allows the
/// recepient of this request to add knowledge of this node to the network
RequestUpdates(u64, ReplicatedData),
//TODO might need a since?
/// from id, form's last update index, ReplicatedData
ReceiveUpdates(PublicKey, u64, Vec<ReplicatedData>),
Expand All @@ -106,6 +109,13 @@ impl Crdt {
g.table.insert(me.id, me);
g
}
pub fn import(&mut self, v: &ReplicatedData) {
// TODO check that last_verified types are always increasing
// TODO probably an error or attack
if self.me != v.id {
self.insert(v);
}
}
pub fn insert(&mut self, v: &ReplicatedData) {
if self.table.get(&v.id).is_none() || (v.version > self.table[&v.id].version) {
trace!("insert! {}", v.version);
Expand Down Expand Up @@ -141,13 +151,13 @@ impl Crdt {
/// * A - Remote gossip address
/// * B - My gossip address
/// * C - Remote update index to request updates since
fn gossip_request(&self) -> (SocketAddr, SocketAddr, u64) {
fn gossip_request(&self) -> (SocketAddr, Protocol) {
let n = (Self::random() as usize) % self.table.len();
trace!("random {:?} {}", &self.me[0..1], n);
let v = self.table.values().nth(n).unwrap().clone();
let remote_update_index = *self.remote.get(&v.id).unwrap_or(&0);
let my_addr = self.table[&self.me].gossip_addr;
(v.gossip_addr, my_addr, remote_update_index)
let req = Protocol::RequestUpdates(remote_update_index, self.table[&self.me].clone());
(v.gossip_addr, req)
}

/// At random pick a node and try to get updated changes from them
Expand All @@ -157,14 +167,11 @@ impl Crdt {

// Lock the object only to do this operation and not for any longer
// especially not when doing the `sock.send_to`
let (remote_gossip_addr, my_addr, remote_update_index) =
obj.read().unwrap().gossip_request();
let mut req_addr = my_addr;
req_addr.set_port(0);
let sock = UdpSocket::bind(req_addr)?;
let (remote_gossip_addr, req) = obj.read().unwrap().gossip_request();
let sock = UdpSocket::bind("0.0.0.0:0")?;
// TODO this will get chatty, so we need to first ask for number of updates since
// then only ask for specific data that we dont have
let r = serialize(&Protocol::RequestUpdates(remote_update_index, my_addr))?;
let r = serialize(&req)?;
sock.send_to(&r, remote_gossip_addr)?;
Ok(())
}
Expand All @@ -174,17 +181,12 @@ impl Crdt {
/// * `from` - identity of the sender of the updates
/// * `update_index` - the number of updates that `from` has completed and this set of `data` represents
/// * `data` - the update data
fn apply_updates(&mut self, from: PublicKey, update_index: u64, data: Vec<ReplicatedData>) {
fn apply_updates(&mut self, from: PublicKey, update_index: u64, data: &[ReplicatedData]) {
trace!("got updates {}", data.len());
// TODO we need to punish/spam resist here
// sig verify the whole update and slash anyone who sends a bad update
for v in data {
// TODO probably an error or attack
if v.id == self.me {
continue;
}
// TODO check that last_verified types are always increasing
self.insert(&v);
self.import(&v);
}
*self.remote.entry(from).or_insert(update_index) = update_index;
}
Expand All @@ -211,19 +213,22 @@ impl Crdt {
let r = deserialize(&buf)?;
match r {
// TODO sigverify these
Protocol::RequestUpdates(v, addr) => {
Protocol::RequestUpdates(v, reqdata) => {
trace!("RequestUpdates {}", v);
let addr = reqdata.gossip_addr;
// only lock for this call, dont lock durring IO `sock.send_to` or `sock.recv_from`
let (from, ups, data) = obj.read().unwrap().get_updates_since(v);
trace!("get updates since response {} {}", v, data.len());
let rsp = serialize(&Protocol::ReceiveUpdates(from, ups, data))?;
trace!("send_to {}", addr);
//TODO verify reqdata belongs to sender
obj.write().unwrap().import(&reqdata);
sock.send_to(&rsp, addr).unwrap();
trace!("send_to done!");
}
Protocol::ReceiveUpdates(from, ups, data) => {
trace!("ReceivedUpdates");
obj.write().unwrap().apply_updates(from, ups, data);
obj.write().unwrap().apply_updates(from, ups, &data);
}
}
Ok(())
Expand Down Expand Up @@ -251,15 +256,17 @@ mod test {
use std::net::UdpSocket;
use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::{Arc, RwLock};
use std::thread::sleep;
use std::thread::{sleep, JoinHandle};
use std::time::Duration;

/// Test that the network converges.
/// Create a ring a -> b -> c -> d -> e -> a of size num.
/// Run until every node in the network has a full ReplicatedData set.
/// Check that nodes stop sending updates after all the ReplicatedData has been shared.
#[test]
fn gossip_test() {
/// tests that actually use this function are below
fn run_gossip_topo<F>(topo: F)
where
F: Fn(&Vec<(Arc<RwLock<Crdt>>, JoinHandle<()>)>) -> (),
{
let num: usize = 5;
let exit = Arc::new(AtomicBool::new(false));
let listen: Vec<_> = (0..num)
Expand All @@ -273,15 +280,7 @@ mod test {
(c, l)
})
.collect();
for n in 0..num {
let y = n % listen.len();
let x = (n + 1) % listen.len();
let mut xv = listen[x].0.write().unwrap();
let yv = listen[y].0.read().unwrap();
let mut d = yv.table[&yv.me].clone();
d.version = 0;
xv.insert(&d);
}
topo(&listen);
let gossip: Vec<_> = listen
.iter()
.map(|&(ref c, _)| Crdt::gossip(c.clone(), exit.clone()))
Expand Down Expand Up @@ -321,6 +320,40 @@ mod test {
}
assert!(done);
}
/// ring a -> b -> c -> d -> e -> a
#[test]
fn gossip_ring_test() {
run_gossip_topo(|listen| {
let num = listen.len();
for n in 0..num {
let y = n % listen.len();
let x = (n + 1) % listen.len();
let mut xv = listen[x].0.write().unwrap();
let yv = listen[y].0.read().unwrap();
let mut d = yv.table[&yv.me].clone();
d.version = 0;
xv.insert(&d);
}
});
}

/// star (b,c,d,e) -> a
#[test]
fn gossip_star_test() {
run_gossip_topo(|listen| {
let num = listen.len();
for n in 0..(num - 1) {
let x = 0;
let y = (n + 1) % listen.len();
let mut xv = listen[x].0.write().unwrap();
let yv = listen[y].0.read().unwrap();
let mut d = yv.table[&yv.me].clone();
d.version = 0;
xv.insert(&d);
}
});
}

/// Test that insert drops messages that are older
#[test]
fn insert_test() {
Expand Down