Skip to content

Commit

Permalink
update
Browse files Browse the repository at this point in the history
heap

update

update

wip

use a vec and sort

builds

update

tests

update

fmt

update

progress

fmt

passes needs retransmit test

tests

cleanup

update

update

update

update

fmt
  • Loading branch information
aeyakovenko committed Apr 17, 2018
1 parent 83c5b3b commit 4944c96
Show file tree
Hide file tree
Showing 3 changed files with 309 additions and 20 deletions.
1 change: 1 addition & 0 deletions src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ pub mod recorder;
pub mod result;
pub mod signature;
pub mod streamer;
pub mod subscribers;
pub mod transaction;
extern crate bincode;
extern crate byteorder;
Expand Down
192 changes: 172 additions & 20 deletions src/streamer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,9 +4,10 @@ use std::collections::VecDeque;
use std::net::UdpSocket;
use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::mpsc;
use std::sync::Arc;
use std::sync::{Arc, RwLock};
use std::thread::{spawn, JoinHandle};
use std::time::Duration;
use subscribers;

pub type PacketReceiver = mpsc::Receiver<SharedPackets>;
pub type PacketSender = mpsc::Sender<SharedPackets>;
Expand Down Expand Up @@ -75,14 +76,77 @@ pub fn responder(

//TODO, we would need to stick block authentication before we create the
//window.
fn recv_blobs(recycler: &BlobRecycler, sock: &UdpSocket, s: &BlobSender) -> Result<()> {
let dq = Blob::recv_from(recycler, sock)?;
if !dq.is_empty() {
s.send(dq)?;
}
Ok(())
}

pub fn blob_receiver(
exit: Arc<AtomicBool>,
recycler: BlobRecycler,
sock: UdpSocket,
s: BlobSender,
) -> Result<JoinHandle<()>> {
//DOCUMENTED SIDE-EFFECT
//1 second timeout on socket read
let timer = Duration::new(1, 0);
sock.set_read_timeout(Some(timer))?;
let t = spawn(move || loop {
if exit.load(Ordering::Relaxed) {
break;
}
let _ = recv_blobs(&recycler, &sock, &s);
});
Ok(t)
}

fn recv_window(
window: &mut Vec<Option<SharedBlob>>,
subs: &Arc<RwLock<subscribers::Subscribers>>,
recycler: &BlobRecycler,
consumed: &mut usize,
socket: &UdpSocket,
r: &BlobReceiver,
s: &BlobSender,
cast: &BlobSender,
) -> Result<()> {
let mut dq = Blob::recv_from(recycler, socket)?;
let timer = Duration::new(1, 0);
let mut dq = r.recv_timeout(timer)?;
while let Ok(mut nq) = r.try_recv() {
dq.append(&mut nq)
}
{
//retransmit all leader blocks
let mut castq = VecDeque::new();
let rsubs = subs.read().unwrap();
for b in &dq {
let p = b.read().unwrap();
if p.meta.addr() == rsubs.leader.addr {
//TODO
//need to copy the retransmited blob
//otherwise we get into races with which thread
//should do the recycling
//
//a better absraction would be to recycle when the blob
//is dropped via a weakref to the recycler
let nv = recycler.allocate();
{
let mut mnv = nv.write().unwrap();
let sz = p.meta.size;
mnv.meta.size = sz;
mnv.data[..sz].copy_from_slice(&p.data[..sz]);
}
castq.push_back(nv);
}
}
if !castq.is_empty() {
cast.send(castq)?;
}
}
//send a contiguous set of blocks
let mut contq = VecDeque::new();
while let Some(b) = dq.pop_front() {
let b_ = b.clone();
let mut p = b.write().unwrap();
Expand All @@ -97,43 +161,79 @@ fn recv_window(
} else {
debug!("duplicate blob at index {:}", w);
}
//send a contiguous set of blocks
let mut dq = VecDeque::new();
loop {
let k = *consumed % NUM_BLOBS;
if window[k].is_none() {
break;
}
dq.push_back(window[k].clone().unwrap());
contq.push_back(window[k].clone().unwrap());
window[k] = None;
*consumed += 1;
}
if !dq.is_empty() {
s.send(dq)?;
}
}
}
if !contq.is_empty() {
s.send(contq)?;
}
Ok(())
}

pub fn window(
sock: UdpSocket,
exit: Arc<AtomicBool>,
r: BlobRecycler,
subs: Arc<RwLock<subscribers::Subscribers>>,
recycler: BlobRecycler,
r: BlobReceiver,
s: BlobSender,
cast: BlobSender,
) -> JoinHandle<()> {
spawn(move || {
let mut window = vec![None; NUM_BLOBS];
let mut consumed = 0;
let timer = Duration::new(1, 0);
sock.set_read_timeout(Some(timer)).unwrap();
loop {
if recv_window(&mut window, &r, &mut consumed, &sock, &s).is_err()
|| exit.load(Ordering::Relaxed)
{
if exit.load(Ordering::Relaxed) {
break;
}
let _ = recv_window(&mut window, &subs, &recycler, &mut consumed, &r, &s, &cast);
}
})
}

fn retransmit(
subs: &Arc<RwLock<subscribers::Subscribers>>,
recycler: &BlobRecycler,
r: &BlobReceiver,
sock: &UdpSocket,
) -> Result<()> {
let timer = Duration::new(1, 0);
let mut dq = r.recv_timeout(timer)?;
while let Ok(mut nq) = r.try_recv() {
dq.append(&mut nq);
}
{
let wsubs = subs.read().unwrap();
for b in &dq {
let mut mb = b.write().unwrap();
wsubs.retransmit(&mut mb, sock)?;
}
}
while let Some(b) = dq.pop_front() {
recycler.recycle(b);
}
Ok(())
}

pub fn retransmitter(
sock: UdpSocket,
exit: Arc<AtomicBool>,
subs: Arc<RwLock<subscribers::Subscribers>>,
recycler: BlobRecycler,
r: BlobReceiver,
) -> JoinHandle<()> {
spawn(move || loop {
if exit.load(Ordering::Relaxed) {
break;
}
let _ = retransmit(&subs, &recycler, &r, &sock);
})
}

Expand Down Expand Up @@ -248,9 +348,11 @@ mod test {
use std::net::UdpSocket;
use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::mpsc::channel;
use std::sync::Arc;
use std::sync::{Arc, RwLock};
use std::time::Duration;
use streamer::{receiver, responder, window, BlobReceiver, PacketReceiver};
use streamer::{blob_receiver, receiver, responder, retransmitter, window, BlobReceiver,
PacketReceiver};
use subscribers::{Node, Subscribers};

fn get_msgs(r: PacketReceiver, num: &mut usize) {
for _t in 0..5 {
Expand Down Expand Up @@ -325,9 +427,24 @@ mod test {
let addr = read.local_addr().unwrap();
let send = UdpSocket::bind("127.0.0.1:0").expect("bind");
let exit = Arc::new(AtomicBool::new(false));
let subs = Arc::new(RwLock::new(Subscribers::new(
Node::default(),
Node::default(),
)));
let resp_recycler = BlobRecycler::default();
let (s_reader, r_reader) = channel();
let t_receiver = window(read, exit.clone(), resp_recycler.clone(), s_reader);
let t_receiver =
blob_receiver(exit.clone(), resp_recycler.clone(), read, s_reader).unwrap();
let (s_window, r_window) = channel();
let (s_cast, _r_cast) = channel();
let t_window = window(
exit.clone(),
subs,
resp_recycler.clone(),
r_reader,
s_window,
s_cast,
);
let (s_responder, r_responder) = channel();
let t_responder = responder(send, exit.clone(), resp_recycler.clone(), r_responder);
let mut msgs = VecDeque::new();
Expand All @@ -344,10 +461,45 @@ mod test {
}
s_responder.send(msgs).expect("send");
let mut num = 0;
get_blobs(r_reader, &mut num);
get_blobs(r_window, &mut num);
assert_eq!(num, 10);
exit.store(true, Ordering::Relaxed);
t_receiver.join().expect("join");
t_responder.join().expect("join");
t_window.join().expect("join");
}

#[test]
pub fn retransmit() {
let read = UdpSocket::bind("127.0.0.1:0").expect("bind");
let send = UdpSocket::bind("127.0.0.1:0").expect("bind");
let exit = Arc::new(AtomicBool::new(false));
let subs = Arc::new(RwLock::new(Subscribers::new(
Node::default(),
Node::default(),
)));
let n3 = Node::new([0; 8], 1, read.local_addr().unwrap());
subs.write().unwrap().insert(&[n3]);
let (s_cast, r_cast) = channel();
let re = BlobRecycler::default();
let saddr = send.local_addr().unwrap();
let t_retransmit = retransmitter(send, exit.clone(), subs, re.clone(), r_cast);
let mut bq = VecDeque::new();
let b = re.allocate();
b.write().unwrap().meta.size = 10;
bq.push_back(b);
s_cast.send(bq).unwrap();
let (s_recv, r_recv) = channel();
let t_receiver = blob_receiver(exit.clone(), re.clone(), read, s_recv).unwrap();
let mut oq = r_recv.recv().unwrap();
assert_eq!(oq.len(), 1);
let o = oq.pop_front().unwrap();
let ro = o.read().unwrap();
assert_eq!(ro.meta.size, 10);
assert_eq!(ro.meta.addr(), saddr);
exit.store(true, Ordering::Relaxed);
t_receiver.join().expect("join");
t_retransmit.join().expect("join");
}

}
Loading

0 comments on commit 4944c96

Please sign in to comment.