Skip to content

Commit

Permalink
compiles
Browse files Browse the repository at this point in the history
  • Loading branch information
sakridge committed Apr 30, 2018
1 parent e10d559 commit 1b2c62a
Show file tree
Hide file tree
Showing 6 changed files with 100 additions and 64 deletions.
12 changes: 6 additions & 6 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -16,13 +16,13 @@ license = "Apache-2.0"
name = "solana-historian-demo"
path = "src/bin/historian-demo.rs"

[[bin]]
name = "solana-client-demo"
path = "src/bin/client-demo.rs"
#[[bin]]
#name = "solana-client-demo"
#path = "src/bin/client-demo.rs"

[[bin]]
name = "solana-testnode"
path = "src/bin/testnode.rs"
#[[bin]]
#name = "solana-testnode"
#path = "src/bin/testnode.rs"

[[bin]]
name = "solana-genesis"
Expand Down
59 changes: 52 additions & 7 deletions src/crdt.rs
Original file line number Diff line number Diff line change
Expand Up @@ -151,11 +151,11 @@ impl Crdt {
s: &UdpSocket,
transmit_index: &mut u64
) -> Result<()> {
let (me, table): (&ReplicatedData, Vec<ReplicatedData>) = {
let (me, table): (ReplicatedData, Vec<ReplicatedData>) = {
// copy to avoid locking durring IO
let robj = obj.read().unwrap();
let cloned_table:Vec<ReplicatedData> = robj.table.values().cloned().collect();
(&cloned_table[&robj.me], cloned_table)
(robj.table[&robj.me].clone(), cloned_table)
};
let errs: Vec<_> = table.iter()
.enumerate()
Expand All @@ -167,7 +167,7 @@ impl Crdt {
}
// only leader should be broadcasting
assert!(me.current_leader_id != v.id);
let blob = b.write().unwrap();
let mut blob = b.write().unwrap();
blob.set_index(*transmit_index + i as u64);
s.send_to(&blob.data[..blob.meta.size], &v.replicate_addr)
})
Expand Down Expand Up @@ -354,6 +354,13 @@ mod test {
use std::thread::{sleep, JoinHandle};
use std::time::Duration;

use rayon::iter::*;
use streamer::{blob_receiver, retransmitter};
use std::sync::mpsc::channel;
use subscribers::{Node, Subscribers};
use packet::{Blob, BlobRecycler};
use std::collections::VecDeque;

/// Test that the network converges.
/// Run until every node in the network has a full ReplicatedData set.
/// Check that nodes stop sending updates after all the ReplicatedData has been shared.
Expand All @@ -366,12 +373,18 @@ mod test {
let exit = Arc::new(AtomicBool::new(false));
let listen: Vec<_> = (0..num)
.map(|_| {
let listener = UdpSocket::bind("0.0.0.0:0").unwrap();
let gossip = UdpSocket::bind("0.0.0.0:0").unwrap();
let replicate = UdpSocket::bind("0.0.0.0:0").unwrap();
let serve = UdpSocket::bind("0.0.0.0:0").unwrap();
let pubkey = KeyPair::new().pubkey();
let d = ReplicatedData::new(pubkey, listener.local_addr().unwrap());
let d = ReplicatedData::new(pubkey,
gossip.local_addr().unwrap(),
replicate.local_addr().unwrap(),
serve.local_addr().unwrap(),
);
let crdt = Crdt::new(d);
let c = Arc::new(RwLock::new(crdt));
let l = Crdt::listen(c.clone(), listener, exit.clone());
let l = Crdt::listen(c.clone(), gossip, exit.clone());
(c, l)
})
.collect();
Expand Down Expand Up @@ -452,7 +465,11 @@ mod test {
/// Test that insert drops messages that are older
#[test]
fn insert_test() {
let mut d = ReplicatedData::new(KeyPair::new().pubkey(), "127.0.0.1:1234".parse().unwrap());
let mut d = ReplicatedData::new(KeyPair::new().pubkey(),
"127.0.0.1:1234".parse().unwrap(),
"127.0.0.1:1235".parse().unwrap(),
"127.0.0.1:1236".parse().unwrap(),
);
assert_eq!(d.version, 0);
let mut crdt = Crdt::new(d.clone());
assert_eq!(crdt.table[&d.id].version, 0);
Expand All @@ -464,4 +481,32 @@ mod test {
assert_eq!(crdt.table[&d.id].version, 2);
}

#[test]
pub fn test_crdt_retransmit() {
let s1 = UdpSocket::bind("127.0.0.1:0").expect("bind");
let s2 = UdpSocket::bind("127.0.0.1:0").expect("bind");
let s3 = UdpSocket::bind("127.0.0.1:0").expect("bind");
let n1 = Node::new([0; 8], 0, s1.local_addr().unwrap());
let n2 = Node::new([0; 8], 0, s2.local_addr().unwrap());
let mut s = Subscribers::new(n1.clone(), n2.clone(), &[]);
let n3 = Node::new([0; 8], 0, s3.local_addr().unwrap());
s.insert(&[n3]);
let mut b = Blob::default();
b.meta.size = 10;
let s4 = UdpSocket::bind("127.0.0.1:0").expect("bind");
s.retransmit(&mut b, &s4).unwrap();
let res: Vec<_> = [s1, s2, s3]
.into_par_iter()
.map(|s| {
let mut b = Blob::default();
s.set_read_timeout(Some(Duration::new(1, 0))).unwrap();
s.recv_from(&mut b.data).is_err()
})
.collect();
assert_eq!(res, [true, true, false]);
let mut n4 = Node::default();
n4.addr = "255.255.255.255:1".parse().unwrap();
s.insert(&[n4]);
assert!(s.retransmit(&mut b, &s4).is_err());
}
}
3 changes: 2 additions & 1 deletion src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ pub mod accountant;
//pub mod accountant_skel;
//pub mod accountant_stub;
pub mod crdt;
pub mod ecdsa;
//pub mod ecdsa;
pub mod entry;
#[cfg(feature = "erasure")]
pub mod erasure;
Expand All @@ -19,6 +19,7 @@ pub mod result;
pub mod signature;
pub mod streamer;
pub mod transaction;
pub mod subscribers;
extern crate bincode;
extern crate byteorder;
extern crate chrono;
Expand Down
3 changes: 1 addition & 2 deletions src/packet.rs
Original file line number Diff line number Diff line change
Expand Up @@ -234,8 +234,7 @@ impl Blob {
Ok(e)
}
pub fn set_id(&mut self, id: PublicKey) -> Result<()> {
let mut wtr = vec![];
let wrt = serialize(&id)?;
let wtr = serialize(&id)?;
self.data[BLOB_INDEX_END..BLOB_ID_END].clone_from_slice(&wtr);
Ok(())
}
Expand Down
34 changes: 25 additions & 9 deletions src/streamer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -433,7 +433,11 @@ mod test {
use std::time::Duration;
use streamer::{blob_receiver, receiver, responder, retransmitter, window, BlobReceiver,
PacketReceiver};
use Crdt::Crdt;

use crdt::{Crdt, ReplicatedData};
use subscribers::Node;
use signature::KeyPair;
use signature::KeyPairUtil;

fn get_msgs(r: PacketReceiver, num: &mut usize) {
for _t in 0..5 {
Expand Down Expand Up @@ -504,15 +508,23 @@ mod test {

#[test]
pub fn window_send_test() {
let pubkey_me = KeyPair::new().pubkey();
let read = UdpSocket::bind("127.0.0.1:0").expect("bind");
let addr = read.local_addr().unwrap();
let send = UdpSocket::bind("127.0.0.1:0").expect("bind");
let serve = UdpSocket::bind("127.0.0.1:0").expect("bind");
let exit = Arc::new(AtomicBool::new(false));
let subs = Arc::new(RwLock::new(Crdt::new(

let rep_data = ReplicatedData::new(pubkey_me,
read.local_addr().unwrap(),
send.local_addr().unwrap(),
serve.local_addr().unwrap());
let subs = Arc::new(RwLock::new(Crdt::new(rep_data)));
/*
Node::default(),
Node::new([0; 8], 0, send.local_addr().unwrap()),
&[],
)));
&[],*/

let resp_recycler = BlobRecycler::default();
let (s_reader, r_reader) = channel();
let t_receiver =
Expand Down Expand Up @@ -558,14 +570,18 @@ mod test {

#[test]
pub fn retransmit() {
let pubkey_me = KeyPair::new().pubkey();
let read = UdpSocket::bind("127.0.0.1:0").expect("bind");
let send = UdpSocket::bind("127.0.0.1:0").expect("bind");
let serve = UdpSocket::bind("127.0.0.1:0").expect("bind");
let exit = Arc::new(AtomicBool::new(false));
let subs = Arc::new(RwLock::new(Crdt::new(
Node::default(),
Node::default(),
&[Node::new([0; 8], 1, read.local_addr().unwrap())],
)));

let rep_data = ReplicatedData::new(pubkey_me,
read.local_addr().unwrap(),
send.local_addr().unwrap(),
serve.local_addr().unwrap());
let subs = Arc::new(RwLock::new(Crdt::new(rep_data)));

let (s_retransmit, r_retransmit) = channel();
let blob_recycler = BlobRecycler::default();
let saddr = send.local_addr().unwrap();
Expand Down
53 changes: 14 additions & 39 deletions src/subscribers.rs
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ pub struct Subscribers {
data: Vec<Node>,
/// TODO derive this somehow from the historians counter
/// this is the window index
pub index: u64;
pub index: u64,
pub me: Node,
pub leader: Node,
}
Expand All @@ -59,6 +59,7 @@ impl Subscribers {
pub fn new(me: Node, leader: Node, network: &[Node]) -> Subscribers {
let mut h = Subscribers {
data: vec![],
index: 0,
me: me.clone(),
leader: leader.clone(),
};
Expand All @@ -68,24 +69,26 @@ impl Subscribers {
}

/// broadcast messages from the leader to layer 1 nodes
pub fn broadcast(&mut self, blobs: &Vec<Blob>, s: &UdpSocket) -> Result<()> {
let errs: Vec<_> = self.subs
pub fn broadcast(&mut self, blobs: &mut Vec<Blob>, s: &UdpSocket) -> Result<()> {
let errs: Vec<_> = self.data
.iter()
.enumerate()
.cycle()
.zip(blobs.iter())
.par_iter()
.zip(blobs.iter_mut())
//.par_iter() TODO: restore this
.map(|((i,v),b)| {
if self.me == *i {
if self.me.id == v.id {
return Ok(0);
}
if self.leader == *i {
if self.leader.id == v.id {
return Ok(0);
}
let blob = b.write().unwrap();
blob.set_index(self.index + i);
s.send_to(&blob.data[..blob.meta.size], &i.addr)
//let blob = b.write().unwrap(); TODO: should it be in write-lock here?
let blob = b;
blob.set_index(self.index + i as u64);
s.send_to(&blob.data[..blob.meta.size], &v.addr)
})
.collect()
.collect();
for e in errs {
trace!("retransmit result {:?}", e);
match e {
Expand Down Expand Up @@ -151,32 +154,4 @@ mod test {
assert_eq!(s.data.len(), 3);
assert_eq!(s.data[0].weight, 12);
}
#[test]
pub fn retransmit() {
let s1 = UdpSocket::bind("127.0.0.1:0").expect("bind");
let s2 = UdpSocket::bind("127.0.0.1:0").expect("bind");
let s3 = UdpSocket::bind("127.0.0.1:0").expect("bind");
let n1 = Node::new([0; 8], 0, s1.local_addr().unwrap());
let n2 = Node::new([0; 8], 0, s2.local_addr().unwrap());
let mut s = Subscribers::new(n1.clone(), n2.clone(), &[]);
let n3 = Node::new([0; 8], 0, s3.local_addr().unwrap());
s.insert(&[n3]);
let mut b = Blob::default();
b.meta.size = 10;
let s4 = UdpSocket::bind("127.0.0.1:0").expect("bind");
s.retransmit(&mut b, &s4).unwrap();
let res: Vec<_> = [s1, s2, s3]
.into_par_iter()
.map(|s| {
let mut b = Blob::default();
s.set_read_timeout(Some(Duration::new(1, 0))).unwrap();
s.recv_from(&mut b.data).is_err()
})
.collect();
assert_eq!(res, [true, true, false]);
let mut n4 = Node::default();
n4.addr = "255.255.255.255:1".parse().unwrap();
s.insert(&[n4]);
assert!(s.retransmit(&mut b, &s4).is_err());
}
}

0 comments on commit 1b2c62a

Please sign in to comment.