Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Retransmit #129

Merged
merged 10 commits into from
Apr 18, 2018
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ pub mod recorder;
pub mod result;
pub mod signature;
pub mod streamer;
pub mod subscribers;
pub mod transaction;
extern crate bincode;
extern crate byteorder;
Expand Down
197 changes: 177 additions & 20 deletions src/streamer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,9 +4,10 @@ use std::collections::VecDeque;
use std::net::UdpSocket;
use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::mpsc;
use std::sync::Arc;
use std::sync::{Arc, RwLock};
use std::thread::{spawn, JoinHandle};
use std::time::Duration;
use subscribers;

pub type PacketReceiver = mpsc::Receiver<SharedPackets>;
pub type PacketSender = mpsc::Sender<SharedPackets>;
Expand Down Expand Up @@ -75,14 +76,77 @@ pub fn responder(

//TODO, we would need to stick block authentication before we create the
//window.
fn recv_blobs(recycler: &BlobRecycler, sock: &UdpSocket, s: &BlobSender) -> Result<()> {
let dq = Blob::recv_from(recycler, sock)?;
if !dq.is_empty() {
s.send(dq)?;
}
Ok(())
}

pub fn blob_receiver(
exit: Arc<AtomicBool>,
recycler: BlobRecycler,
sock: UdpSocket,
s: BlobSender,
) -> Result<JoinHandle<()>> {
//DOCUMENTED SIDE-EFFECT
//1 second timeout on socket read
let timer = Duration::new(1, 0);
sock.set_read_timeout(Some(timer))?;
let t = spawn(move || loop {
if exit.load(Ordering::Relaxed) {
break;
}
let _ = recv_blobs(&recycler, &sock, &s);
});
Ok(t)
}

fn recv_window(
window: &mut Vec<Option<SharedBlob>>,
subs: &Arc<RwLock<subscribers::Subscribers>>,
recycler: &BlobRecycler,
consumed: &mut usize,
socket: &UdpSocket,
r: &BlobReceiver,
s: &BlobSender,
cast: &BlobSender,
) -> Result<()> {
let mut dq = Blob::recv_from(recycler, socket)?;
let timer = Duration::new(1, 0);
let mut dq = r.recv_timeout(timer)?;
while let Ok(mut nq) = r.try_recv() {
dq.append(&mut nq)
}
{
//retransmit all leader blocks
let mut castq = VecDeque::new();
let rsubs = subs.read().unwrap();
for b in &dq {
let p = b.read().unwrap();
if p.meta.addr() == rsubs.leader.addr {
//TODO
//need to copy the retransmited blob
//otherwise we get into races with which thread
//should do the recycling
//
//a better absraction would be to recycle when the blob
//is dropped via a weakref to the recycler
let nv = recycler.allocate();
{
let mut mnv = nv.write().unwrap();
let sz = p.meta.size;
mnv.meta.size = sz;
mnv.data[..sz].copy_from_slice(&p.data[..sz]);
}
castq.push_back(nv);
}
}
if !castq.is_empty() {
cast.send(castq)?;
}
}
//send a contiguous set of blocks
let mut contq = VecDeque::new();
while let Some(b) = dq.pop_front() {
let b_ = b.clone();
let mut p = b.write().unwrap();
Expand All @@ -97,46 +161,82 @@ fn recv_window(
} else {
debug!("duplicate blob at index {:}", w);
}
//send a contiguous set of blocks
let mut dq = VecDeque::new();
loop {
let k = *consumed % NUM_BLOBS;
if window[k].is_none() {
break;
}
dq.push_back(window[k].clone().unwrap());
contq.push_back(window[k].clone().unwrap());
window[k] = None;
*consumed += 1;
}
if !dq.is_empty() {
s.send(dq)?;
}
}
}
if !contq.is_empty() {
s.send(contq)?;
}
Ok(())
}

pub fn window(
sock: UdpSocket,
exit: Arc<AtomicBool>,
r: BlobRecycler,
subs: Arc<RwLock<subscribers::Subscribers>>,
recycler: BlobRecycler,
r: BlobReceiver,
s: BlobSender,
cast: BlobSender,
) -> JoinHandle<()> {
spawn(move || {
let mut window = vec![None; NUM_BLOBS];
let mut consumed = 0;
let timer = Duration::new(1, 0);
sock.set_read_timeout(Some(timer)).unwrap();
loop {
if recv_window(&mut window, &r, &mut consumed, &sock, &s).is_err()
|| exit.load(Ordering::Relaxed)
{
if exit.load(Ordering::Relaxed) {
break;
}
let _ = recv_window(&mut window, &subs, &recycler, &mut consumed, &r, &s, &cast);
}
})
}

fn retransmit(
subs: &Arc<RwLock<subscribers::Subscribers>>,
recycler: &BlobRecycler,
r: &BlobReceiver,
sock: &UdpSocket,
) -> Result<()> {
let timer = Duration::new(1, 0);
let mut dq = r.recv_timeout(timer)?;
while let Ok(mut nq) = r.try_recv() {
dq.append(&mut nq);
}
{
let wsubs = subs.read().unwrap();
for b in &dq {
let mut mb = b.write().unwrap();
wsubs.retransmit(&mut mb, sock)?;
}
}
while let Some(b) = dq.pop_front() {
recycler.recycle(b);
}
Ok(())
}

pub fn retransmitter(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you add docs to this function?

sock: UdpSocket,
exit: Arc<AtomicBool>,
subs: Arc<RwLock<subscribers::Subscribers>>,
recycler: BlobRecycler,
r: BlobReceiver,
) -> JoinHandle<()> {
spawn(move || loop {
if exit.load(Ordering::Relaxed) {
break;
}
let _ = retransmit(&subs, &recycler, &r, &sock);
})
}

#[cfg(all(feature = "unstable", test))]
mod bench {
extern crate test;
Expand Down Expand Up @@ -248,9 +348,11 @@ mod test {
use std::net::UdpSocket;
use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::mpsc::channel;
use std::sync::Arc;
use std::sync::{Arc, RwLock};
use std::time::Duration;
use streamer::{receiver, responder, window, BlobReceiver, PacketReceiver};
use streamer::{blob_receiver, receiver, responder, retransmitter, window, BlobReceiver,
PacketReceiver};
use subscribers::{Node, Subscribers};

fn get_msgs(r: PacketReceiver, num: &mut usize) {
for _t in 0..5 {
Expand Down Expand Up @@ -325,9 +427,24 @@ mod test {
let addr = read.local_addr().unwrap();
let send = UdpSocket::bind("127.0.0.1:0").expect("bind");
let exit = Arc::new(AtomicBool::new(false));
let subs = Arc::new(RwLock::new(Subscribers::new(
Node::default(),
Node::new([0; 8], 0, send.local_addr().unwrap()),
)));
let resp_recycler = BlobRecycler::default();
let (s_reader, r_reader) = channel();
let t_receiver = window(read, exit.clone(), resp_recycler.clone(), s_reader);
let t_receiver =
blob_receiver(exit.clone(), resp_recycler.clone(), read, s_reader).unwrap();
let (s_window, r_window) = channel();
let (s_cast, r_cast) = channel();
let t_window = window(
exit.clone(),
subs,
resp_recycler.clone(),
r_reader,
s_window,
s_cast,
);
let (s_responder, r_responder) = channel();
let t_responder = responder(send, exit.clone(), resp_recycler.clone(), r_responder);
let mut msgs = VecDeque::new();
Expand All @@ -344,10 +461,50 @@ mod test {
}
s_responder.send(msgs).expect("send");
let mut num = 0;
get_blobs(r_reader, &mut num);
get_blobs(r_window, &mut num);
assert_eq!(num, 10);
let mut q = r_cast.recv().unwrap();
while let Ok(mut nq) = r_cast.try_recv() {
q.append(&mut nq);
}
assert_eq!(q.len(), 10);
exit.store(true, Ordering::Relaxed);
t_receiver.join().expect("join");
t_responder.join().expect("join");
t_window.join().expect("join");
}

#[test]
pub fn retransmit() {
let read = UdpSocket::bind("127.0.0.1:0").expect("bind");
let send = UdpSocket::bind("127.0.0.1:0").expect("bind");
let exit = Arc::new(AtomicBool::new(false));
let subs = Arc::new(RwLock::new(Subscribers::new(
Node::default(),
Node::default(),
)));
let n3 = Node::new([0; 8], 1, read.local_addr().unwrap());
subs.write().unwrap().insert(&[n3]);
let (s_cast, r_cast) = channel();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What is a cast?

let re = BlobRecycler::default();
let saddr = send.local_addr().unwrap();
let t_retransmit = retransmitter(send, exit.clone(), subs, re.clone(), r_cast);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What's doing retransmitting exactly? Seems more like a proxy.

let mut bq = VecDeque::new();
let b = re.allocate();
b.write().unwrap().meta.size = 10;
bq.push_back(b);
s_cast.send(bq).unwrap();
let (s_recv, r_recv) = channel();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What are these variable names supposed to mean?

let t_receiver = blob_receiver(exit.clone(), re.clone(), read, s_recv).unwrap();
let mut oq = r_recv.recv().unwrap();
assert_eq!(oq.len(), 1);
let o = oq.pop_front().unwrap();
let ro = o.read().unwrap();
assert_eq!(ro.meta.size, 10);
assert_eq!(ro.meta.addr(), saddr);
exit.store(true, Ordering::Relaxed);
t_receiver.join().expect("join");
t_retransmit.join().expect("join");
}

}
Loading