Skip to content

Commit

Permalink
Deserialize the Entry structs
Browse files Browse the repository at this point in the history
  • Loading branch information
sakridge committed Apr 26, 2018
1 parent 58d1ddd commit 6680710
Show file tree
Hide file tree
Showing 3 changed files with 91 additions and 19 deletions.
90 changes: 72 additions & 18 deletions src/accountant_skel.rs
Original file line number Diff line number Diff line change
Expand Up @@ -129,6 +129,7 @@ impl<W: Write + Send + 'static> AccountantSkel<W> {
recvr: &streamer::PacketReceiver,
sendr: &Sender<Vec<(SharedPackets, Vec<u8>)>>,
) -> Result<()> {
println!("verifier!");
let batch = Self::recv_batch(recvr)?;
let verified_batches = Self::verify_batch(batch);
for xs in verified_batches {
Expand Down Expand Up @@ -256,15 +257,18 @@ impl<W: Write + Send + 'static> AccountantSkel<W> {
blob_sender: &streamer::BlobSender,
blob_recycler: &packet::BlobRecycler,
) -> Result<()> {
println!("replicate_state start");
let timer = Duration::new(1, 0);
let blobs = verified_receiver.recv_timeout(timer)?;
println!("received msg");
for msgs in &blobs {
println!("msg:");
let blob = msgs.read().unwrap();
let mut entries:Vec<Entry> = Vec::new();
for i in 0..blob.meta.size/size_of::<Entry>() {
entries.push(deserialize(&blob.data[i..i+size_of::<Entry>()]).unwrap());
}
println!("blob size: {:?}", blob.meta.size);
let entries:Vec<Entry> = deserialize(&blob.data()[..blob.meta.size]).unwrap();
for e in entries {
println!("processing entry: {:?}", e);
obj.lock().unwrap().acc.process_verified_events(e.events)?;
}
//TODO respond back to leader with hash of the state
Expand Down Expand Up @@ -346,6 +350,7 @@ impl<W: Write + Send + 'static> AccountantSkel<W> {
rsubs: subscribers::Subscribers,
exit: Arc<AtomicBool>,
) -> Result<Vec<JoinHandle<()>>> {
trace!("replicate stuff!");
let read = UdpSocket::bind(rsubs.me.addr)?;
// make sure we are on the same interface
let mut local = read.local_addr()?;
Expand Down Expand Up @@ -384,6 +389,7 @@ impl<W: Write + Send + 'static> AccountantSkel<W> {
let e = Self::replicate_state(&skel, &window_receiver,
&blob_sender, &blob_recycler);
if e.is_err() && exit.load(Ordering::Relaxed) {
println!("exiting replicate thread!");
break;
}
});
Expand Down Expand Up @@ -440,7 +446,11 @@ mod tests {
use streamer;
use std::sync::mpsc::channel;
use std::collections::VecDeque;
use packet::{PACKET_DATA_SIZE};
use std::mem::size_of;
use hash::{hash, Hash};
use event::Event;
use entry;
use chrono::prelude::*;

#[test]
fn test_layout() {
Expand Down Expand Up @@ -546,27 +556,44 @@ mod tests {
exit.store(true, Ordering::Relaxed);
}

use std::sync::{Once, ONCE_INIT};
extern crate env_logger;

static INIT: Once = ONCE_INIT;

/// Setup function that is only run once, even if called multiple times.
fn setup() {
INIT.call_once(|| {
env_logger::init().unwrap();
});
}

#[test]
fn test_replicate() {
let read = UdpSocket::bind("127.0.0.1:0").expect("bind");
let addr = read.local_addr().unwrap();
let send = UdpSocket::bind("127.0.0.1:0").expect("bind");
setup();
let leader_sock = UdpSocket::bind("127.0.0.1:0").expect("bind");
let leader_addr = leader_sock.local_addr().unwrap();
let me_addr = "127.0.0.1:9000".parse().unwrap();
let target_peer_sock = UdpSocket::bind("127.0.0.1:0").expect("bind");
let target_peer_addr = target_peer_sock.local_addr().unwrap();
let source_peer_sock = UdpSocket::bind("127.0.0.1:0").expect("bind");
let exit = Arc::new(AtomicBool::new(false));

let node_me = Node::default();
let node_leader = Node::new([0; 8], 0, send.local_addr().unwrap());
let subs = Subscribers::new(node_me, node_leader, &[]);
let node_me = Node::new([0,0,0,0,0,0,0,1], 10, me_addr);
let node_subs = vec![Node::new([0,0,0,0,0,0,0,2], 8, target_peer_addr); 1];
let node_leader = Node::new([0,0,0,0,0,0,0,3], 20, leader_addr);
println!("leader: {:?} subs: {:?}", node_leader, node_subs);
let subs = Subscribers::new(node_me, node_leader, &node_subs);

let recv_recycler = PacketRecycler::default();
let recv_recycler = BlobRecycler::default();
let resp_recycler = BlobRecycler::default();
let (s_reader, r_reader) = channel();
let t_receiver = streamer::receiver(read, exit.clone(), recv_recycler.clone(), s_reader).unwrap();
let t_receiver = streamer::blob_receiver(exit.clone(), recv_recycler.clone(), target_peer_sock, s_reader).unwrap();
let (s_responder, r_responder) = channel();
let t_responder = streamer::responder(send, exit.clone(), resp_recycler.clone(), r_responder);
let t_responder = streamer::responder(source_peer_sock, exit.clone(), resp_recycler.clone(), r_responder);

let alice = Mint::new(10_000);
let acc = Accountant::new(&alice);
let bob_pubkey = KeyPair::new().pubkey();
let historian = Historian::new(&alice.last_id(), Some(30));
let acc = Arc::new(Mutex::new(AccountantSkel::new(
acc,
Expand All @@ -578,16 +605,43 @@ mod tests {
let _threads = AccountantSkel::replicate(&acc, subs, exit.clone()).unwrap();

let mut msgs = VecDeque::new();
for i in 0..10 {
let zero_hash = Hash::default();
let num_blobs = 10;
let bob_keypair = KeyPair::new();
for i in 0..num_blobs {
let b = resp_recycler.allocate();
let b_ = b.clone();
let mut w = b.write().unwrap();
w.data[0] = i as u8;
w.meta.size = PACKET_DATA_SIZE;
w.meta.set_addr(&addr);
w.set_index(i).unwrap();

let tr0 = Event::new_timestamp(&bob_keypair, Utc::now());
let entry0 = entry::create_entry(&zero_hash, i, vec![tr0]);

let tr1 = Transaction::new(&alice.keypair(), bob_keypair.pubkey(), 501, zero_hash);
let entry1 = entry::create_entry(&zero_hash, i + num_blobs, vec![Event::Transaction(tr1)]);

let serialized_entry = serialize(&vec![entry0, entry1]).unwrap();

w.data_mut()[..serialized_entry.len()].copy_from_slice(&serialized_entry);
w.meta.size = serialized_entry.len();
w.meta.set_addr(&me_addr);
drop(w);
println!("index: {}", b_.read().unwrap().get_index().unwrap());
msgs.push_back(b_);
}
s_responder.send(msgs).expect("send");
println!("foo");
let timer = Duration::new(1, 0);
let mut msgs: Vec<_> = Vec::new();
while let Ok(msg) = r_reader.recv_timeout(timer) {
println!("msg: {:?}", msg);
msgs.push(msg);
}

let client_socket = UdpSocket::bind("127.0.0.1:0").unwrap();
let acc_client = AccountantStub::new(&target_peer_addr.to_string(), client_socket);
let last_id = acc_client.get_balance(&alice.keypair().pubkey());
//println!("last id: {:?}", last_id);

exit.store(true, Ordering::Relaxed);
t_receiver.join().expect("join");
Expand Down
12 changes: 11 additions & 1 deletion src/streamer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,9 @@ pub fn responder(
//window.
fn recv_blobs(recycler: &BlobRecycler, sock: &UdpSocket, s: &BlobSender) -> Result<()> {
let dq = Blob::recv_from(recycler, sock)?;
println!("got some blobs: {}", dq.len());
if !dq.is_empty() {
println!("recv_blobs: sending some blobs");
s.send(dq)?;
}
Ok(())
Expand All @@ -99,7 +101,8 @@ pub fn blob_receiver(
if exit.load(Ordering::Relaxed) {
break;
}
let _ = recv_blobs(&recycler, &sock, &s);
let ret = recv_blobs(&recycler, &sock, &s);
println!("recv_blobs ret: {:?}", ret);
});
Ok(t)
}
Expand All @@ -113,8 +116,10 @@ fn recv_window(
s: &BlobSender,
retransmit: &BlobSender,
) -> Result<()> {
println!("recv_window start");
let timer = Duration::new(1, 0);
let mut dq = r.recv_timeout(timer)?;
println!("got some window blobs: {}", dq.len());
while let Ok(mut nq) = r.try_recv() {
dq.append(&mut nq)
}
Expand All @@ -126,6 +131,7 @@ fn recv_window(
let p = b.read().unwrap();
//TODO this check isn't safe against adverserial packets
//we need to maintain a sequence window
trace!("idx: {} addr: {:?} leader: {:?}", p.get_index().unwrap(), p.meta.addr(), rsubs.leader.addr);
if p.meta.addr() == rsubs.leader.addr {
//TODO
//need to copy the retransmited blob
Expand Down Expand Up @@ -158,6 +164,7 @@ fn recv_window(
//TODO, after the block are authenticated
//if we get different blocks at the same index
//that is a network failure/attack
trace!("window w: {} size: {}", w, p.meta.size);
{
if window[w].is_none() {
window[w] = Some(b_);
Expand All @@ -166,6 +173,7 @@ fn recv_window(
}
loop {
let k = *consumed % NUM_BLOBS;
trace!("k: {} consumed: {}", k, *consumed);
if window[k].is_none() {
break;
}
Expand All @@ -175,6 +183,7 @@ fn recv_window(
}
}
}
trace!("sending contq.len: {}", contq.len());
if !contq.is_empty() {
s.send(contq)?;
}
Expand All @@ -193,6 +202,7 @@ pub fn window(
let mut window = vec![None; NUM_BLOBS];
let mut consumed = 0;
loop {
println!("window loop");
if exit.load(Ordering::Relaxed) {
break;
}
Expand Down
8 changes: 8 additions & 0 deletions src/subscribers.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,8 @@ use rayon::prelude::*;
use result::{Error, Result};
use std::net::{SocketAddr, UdpSocket};

use std::fmt;

#[derive(Clone, PartialEq)]
pub struct Node {
pub id: [u64; 8],
Expand Down Expand Up @@ -38,6 +40,12 @@ impl Node {
}
}

impl fmt::Debug for Node {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
write!(f, "Node {{ weight: {} addr: {} }}", self.weight, self.addr)
}
}

pub struct Subscribers {
data: Vec<Node>,
pub me: Node,
Expand Down

0 comments on commit 6680710

Please sign in to comment.