Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

more test coverage #305

Merged
merged 4 commits into from
Jun 4, 2018
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
113 changes: 109 additions & 4 deletions src/crdt.rs
Original file line number Diff line number Diff line change
Expand Up @@ -162,7 +162,7 @@ pub struct Crdt {
timeout: Duration,
}
// TODO These messages should be signed, and go through the gpu pipeline for spam filtering
#[derive(Serialize, Deserialize)]
#[derive(Serialize, Deserialize, Debug)]
enum Protocol {
/// forward your own latest data structure when requesting an update
/// this doesn't update the `remote` update index, but it allows the
Expand Down Expand Up @@ -581,6 +581,8 @@ impl Crdt {
None
}
Ok(Protocol::RequestWindowIndex(from, ix)) => {
//TODO this doesn't depend on CRDT module, can be moved
//but we are using the listen thread to service these request
//TODO verify from is signed
obj.write().unwrap().insert(&from);
let me = obj.read().unwrap().my_data().clone();
Expand Down Expand Up @@ -709,8 +711,14 @@ impl TestNode {
#[cfg(test)]
mod tests {
use crdt::{parse_port_or_addr, Crdt, ReplicatedData};
use packet::BlobRecycler;
use result::Error;
use signature::{KeyPair, KeyPairUtil};
use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::mpsc::channel;
use std::sync::{Arc, RwLock};
use std::time::Duration;
use streamer::default_window;

#[test]
fn test_parse_port_or_addr() {
Expand All @@ -721,8 +729,6 @@ mod tests {
let p3 = parse_port_or_addr(None);
assert_eq!(p3.port(), 8000);
}

/// Test that insert drops messages that are older
#[test]
fn insert_test() {
let mut d = ReplicatedData::new(
Expand Down Expand Up @@ -809,7 +815,6 @@ mod tests {
sorted(&crdt.table.values().map(|x| x.clone()).collect())
);
}
/// Test that insert drops messages that are older
#[test]
fn window_index_request() {
let me = ReplicatedData::new(
Expand Down Expand Up @@ -870,4 +875,104 @@ mod tests {
}
assert!(one && two);
}

/// test that gossip requests are eventually generated for all nodes
#[test]
fn gossip_request() {
let me = ReplicatedData::new(
KeyPair::new().pubkey(),
"127.0.0.1:1234".parse().unwrap(),
"127.0.0.1:1235".parse().unwrap(),
"127.0.0.1:1236".parse().unwrap(),
"127.0.0.1:1237".parse().unwrap(),
"127.0.0.1:1238".parse().unwrap(),
);
let mut crdt = Crdt::new(me.clone());
let rv = crdt.gossip_request();
assert_matches!(rv, Err(Error::CrdtTooSmall));
let nxt1 = ReplicatedData::new(
KeyPair::new().pubkey(),
"127.0.0.2:1234".parse().unwrap(),
"127.0.0.1:1235".parse().unwrap(),
"127.0.0.1:1236".parse().unwrap(),
"127.0.0.1:1237".parse().unwrap(),
"127.0.0.1:1238".parse().unwrap(),
);

crdt.insert(&nxt1);

let rv = crdt.gossip_request().unwrap();
assert_eq!(rv.0, nxt1.gossip_addr);

let nxt2 = ReplicatedData::new(
KeyPair::new().pubkey(),
"127.0.0.3:1234".parse().unwrap(),
"127.0.0.1:1235".parse().unwrap(),
"127.0.0.1:1236".parse().unwrap(),
"127.0.0.1:1237".parse().unwrap(),
"127.0.0.1:1238".parse().unwrap(),
);
crdt.insert(&nxt2);
// check that the service works
// and that it eventually produces a request for both nodes
let (sender, reader) = channel();
let recycler = BlobRecycler::default();
let exit = Arc::new(AtomicBool::new(false));
let obj = Arc::new(RwLock::new(crdt));
let thread = Crdt::gossip(obj, recycler, sender, exit.clone());
let mut one = false;
let mut two = false;
for _ in 0..5 {
let mut rv = reader.recv_timeout(Duration::new(1, 0)).unwrap();
while let Ok(mut more) = reader.try_recv() {
rv.append(&mut more);
}
assert!(rv.len() > 0);
for i in rv.iter() {
if i.read().unwrap().meta.addr() == nxt1.gossip_addr {
one = true;
} else if i.read().unwrap().meta.addr() == nxt2.gossip_addr {
two = true;
} else {
//unexpected request
assert!(false);
}
}
if one && two {
break;
}
}
exit.store(true, Ordering::Relaxed);
thread.join().unwrap();
//created requests to both
assert!(one && two);
}

/// test window requests respond with the right blob, and do not overrun
#[test]
fn run_window_request() {
let window = default_window();
let me = ReplicatedData::new(
KeyPair::new().pubkey(),
"127.0.0.1:1234".parse().unwrap(),
"127.0.0.1:1235".parse().unwrap(),
"127.0.0.1:1236".parse().unwrap(),
"127.0.0.1:1237".parse().unwrap(),
"127.0.0.1:1238".parse().unwrap(),
);
let recycler = BlobRecycler::default();
let rv = Crdt::run_window_request(&window, &me, 0, &recycler);
assert!(rv.is_none());
let out = recycler.allocate();
out.write().unwrap().meta.size = 200;
window.write().unwrap()[0] = Some(out);
let rv = Crdt::run_window_request(&window, &me, 0, &recycler);
assert!(rv.is_some());
let v = rv.unwrap();
//test we copied the blob
assert_eq!(v.read().unwrap().meta.size, 200);
let len = window.read().unwrap().len() as u64;
let rv = Crdt::run_window_request(&window, &me, len, &recycler);
assert!(rv.is_none());
}
}