Skip to content
This repository has been archived by the owner on Jan 22, 2025. It is now read-only.

Commit

Permalink
Merge pull request #129 from aeyakovenko/retransmit
Browse files Browse the repository at this point in the history
Retransmit
  • Loading branch information
garious authored Apr 18, 2018
2 parents 83c5b3b + 29f3230 commit d7670cd
Show file tree
Hide file tree
Showing 4 changed files with 338 additions and 20 deletions.
1 change: 1 addition & 0 deletions src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ pub mod recorder;
pub mod result;
pub mod signature;
pub mod streamer;
pub mod subscribers;
pub mod transaction;
extern crate bincode;
extern crate byteorder;
Expand Down
1 change: 1 addition & 0 deletions src/packet.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
//! The `packet` module defines data structures and methods to pull data from the network.
use byteorder::{LittleEndian, ReadBytesExt, WriteBytesExt};
use result::{Error, Result};
use std::collections::VecDeque;
Expand Down
216 changes: 196 additions & 20 deletions src/streamer.rs
Original file line number Diff line number Diff line change
@@ -1,12 +1,14 @@
//! The `streamer` module defines a set of services for effecently pulling data from udp sockets.
use packet::{Blob, BlobRecycler, PacketRecycler, SharedBlob, SharedPackets, NUM_BLOBS};
use result::Result;
use std::collections::VecDeque;
use std::net::UdpSocket;
use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::mpsc;
use std::sync::Arc;
use std::sync::{Arc, RwLock};
use std::thread::{spawn, JoinHandle};
use std::time::Duration;
use subscribers;

pub type PacketReceiver = mpsc::Receiver<SharedPackets>;
pub type PacketSender = mpsc::Sender<SharedPackets>;
Expand Down Expand Up @@ -75,14 +77,79 @@ pub fn responder(

//TODO, we would need to stick block authentication before we create the
//window.
fn recv_blobs(recycler: &BlobRecycler, sock: &UdpSocket, s: &BlobSender) -> Result<()> {
let dq = Blob::recv_from(recycler, sock)?;
if !dq.is_empty() {
s.send(dq)?;
}
Ok(())
}

pub fn blob_receiver(
exit: Arc<AtomicBool>,
recycler: BlobRecycler,
sock: UdpSocket,
s: BlobSender,
) -> Result<JoinHandle<()>> {
//DOCUMENTED SIDE-EFFECT
//1 second timeout on socket read
let timer = Duration::new(1, 0);
sock.set_read_timeout(Some(timer))?;
let t = spawn(move || loop {
if exit.load(Ordering::Relaxed) {
break;
}
let _ = recv_blobs(&recycler, &sock, &s);
});
Ok(t)
}

fn recv_window(
window: &mut Vec<Option<SharedBlob>>,
subs: &Arc<RwLock<subscribers::Subscribers>>,
recycler: &BlobRecycler,
consumed: &mut usize,
socket: &UdpSocket,
r: &BlobReceiver,
s: &BlobSender,
cast: &BlobSender,
) -> Result<()> {
let mut dq = Blob::recv_from(recycler, socket)?;
let timer = Duration::new(1, 0);
let mut dq = r.recv_timeout(timer)?;
while let Ok(mut nq) = r.try_recv() {
dq.append(&mut nq)
}
{
//retransmit all leader blocks
let mut castq = VecDeque::new();
let rsubs = subs.read().unwrap();
for b in &dq {
let p = b.read().unwrap();
//TODO this check isn't safe against adverserial packets
//we need to maintain a sequence window
if p.meta.addr() == rsubs.leader.addr {
//TODO
//need to copy the retransmited blob
//otherwise we get into races with which thread
//should do the recycling
//
//a better absraction would be to recycle when the blob
//is dropped via a weakref to the recycler
let nv = recycler.allocate();
{
let mut mnv = nv.write().unwrap();
let sz = p.meta.size;
mnv.meta.size = sz;
mnv.data[..sz].copy_from_slice(&p.data[..sz]);
}
castq.push_back(nv);
}
}
if !castq.is_empty() {
cast.send(castq)?;
}
}
//send a contiguous set of blocks
let mut contq = VecDeque::new();
while let Some(b) = dq.pop_front() {
let b_ = b.clone();
let mut p = b.write().unwrap();
Expand All @@ -97,46 +164,91 @@ fn recv_window(
} else {
debug!("duplicate blob at index {:}", w);
}
//send a contiguous set of blocks
let mut dq = VecDeque::new();
loop {
let k = *consumed % NUM_BLOBS;
if window[k].is_none() {
break;
}
dq.push_back(window[k].clone().unwrap());
contq.push_back(window[k].clone().unwrap());
window[k] = None;
*consumed += 1;
}
if !dq.is_empty() {
s.send(dq)?;
}
}
}
if !contq.is_empty() {
s.send(contq)?;
}
Ok(())
}

pub fn window(
sock: UdpSocket,
exit: Arc<AtomicBool>,
r: BlobRecycler,
subs: Arc<RwLock<subscribers::Subscribers>>,
recycler: BlobRecycler,
r: BlobReceiver,
s: BlobSender,
cast: BlobSender,
) -> JoinHandle<()> {
spawn(move || {
let mut window = vec![None; NUM_BLOBS];
let mut consumed = 0;
let timer = Duration::new(1, 0);
sock.set_read_timeout(Some(timer)).unwrap();
loop {
if recv_window(&mut window, &r, &mut consumed, &sock, &s).is_err()
|| exit.load(Ordering::Relaxed)
{
if exit.load(Ordering::Relaxed) {
break;
}
let _ = recv_window(&mut window, &subs, &recycler, &mut consumed, &r, &s, &cast);
}
})
}

fn retransmit(
subs: &Arc<RwLock<subscribers::Subscribers>>,
recycler: &BlobRecycler,
r: &BlobReceiver,
sock: &UdpSocket,
) -> Result<()> {
let timer = Duration::new(1, 0);
let mut dq = r.recv_timeout(timer)?;
while let Ok(mut nq) = r.try_recv() {
dq.append(&mut nq);
}
{
let wsubs = subs.read().unwrap();
for b in &dq {
let mut mb = b.write().unwrap();
wsubs.retransmit(&mut mb, sock)?;
}
}
while let Some(b) = dq.pop_front() {
recycler.recycle(b);
}
Ok(())
}

/// Service to retransmit messages from the leader to layer 1 nodes.
/// See `subscribers` for network layer definitions.
/// # Arguments
/// * `sock` - Socket to read from. Read timeout is set to 1.
/// * `exit` - Boolean to signal system exit.
/// * `subs` - Shared Subscriber structure. This structure needs to be updated and popualted by
/// the accountant.
/// * `recycler` - Blob recycler.
/// * `r` - Receive channel for blobs to be retransmitted to all the layer 1 nodes.
pub fn retransmitter(
sock: UdpSocket,
exit: Arc<AtomicBool>,
subs: Arc<RwLock<subscribers::Subscribers>>,
recycler: BlobRecycler,
r: BlobReceiver,
) -> JoinHandle<()> {
spawn(move || loop {
if exit.load(Ordering::Relaxed) {
break;
}
let _ = retransmit(&subs, &recycler, &r, &sock);
})
}

#[cfg(all(feature = "unstable", test))]
mod bench {
extern crate test;
Expand Down Expand Up @@ -248,9 +360,11 @@ mod test {
use std::net::UdpSocket;
use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::mpsc::channel;
use std::sync::Arc;
use std::sync::{Arc, RwLock};
use std::time::Duration;
use streamer::{receiver, responder, window, BlobReceiver, PacketReceiver};
use streamer::{blob_receiver, receiver, responder, retransmitter, window, BlobReceiver,
PacketReceiver};
use subscribers::{Node, Subscribers};

fn get_msgs(r: PacketReceiver, num: &mut usize) {
for _t in 0..5 {
Expand Down Expand Up @@ -325,9 +439,24 @@ mod test {
let addr = read.local_addr().unwrap();
let send = UdpSocket::bind("127.0.0.1:0").expect("bind");
let exit = Arc::new(AtomicBool::new(false));
let subs = Arc::new(RwLock::new(Subscribers::new(
Node::default(),
Node::new([0; 8], 0, send.local_addr().unwrap()),
)));
let resp_recycler = BlobRecycler::default();
let (s_reader, r_reader) = channel();
let t_receiver = window(read, exit.clone(), resp_recycler.clone(), s_reader);
let t_receiver =
blob_receiver(exit.clone(), resp_recycler.clone(), read, s_reader).unwrap();
let (s_window, r_window) = channel();
let (s_cast, r_cast) = channel();
let t_window = window(
exit.clone(),
subs,
resp_recycler.clone(),
r_reader,
s_window,
s_cast,
);
let (s_responder, r_responder) = channel();
let t_responder = responder(send, exit.clone(), resp_recycler.clone(), r_responder);
let mut msgs = VecDeque::new();
Expand All @@ -344,10 +473,57 @@ mod test {
}
s_responder.send(msgs).expect("send");
let mut num = 0;
get_blobs(r_reader, &mut num);
get_blobs(r_window, &mut num);
assert_eq!(num, 10);
let mut q = r_cast.recv().unwrap();
while let Ok(mut nq) = r_cast.try_recv() {
q.append(&mut nq);
}
assert_eq!(q.len(), 10);
exit.store(true, Ordering::Relaxed);
t_receiver.join().expect("join");
t_responder.join().expect("join");
t_window.join().expect("join");
}

#[test]
pub fn retransmit() {
let read = UdpSocket::bind("127.0.0.1:0").expect("bind");
let send = UdpSocket::bind("127.0.0.1:0").expect("bind");
let exit = Arc::new(AtomicBool::new(false));
let subs = Arc::new(RwLock::new(Subscribers::new(
Node::default(),
Node::default(),
)));
let n3 = Node::new([0; 8], 1, read.local_addr().unwrap());
subs.write().unwrap().insert(&[n3]);
let (s_retransmit, r_retransmit) = channel();
let blob_recycler = BlobRecycler::default();
let saddr = send.local_addr().unwrap();
let t_retransmit = retransmitter(
send,
exit.clone(),
subs,
blob_recycler.clone(),
r_retransmit,
);
let mut bq = VecDeque::new();
let b = blob_recycler.allocate();
b.write().unwrap().meta.size = 10;
bq.push_back(b);
s_retransmit.send(bq).unwrap();
let (s_blob_receiver, r_blob_receiver) = channel();
let t_receiver =
blob_receiver(exit.clone(), blob_recycler.clone(), read, s_blob_receiver).unwrap();
let mut oq = r_blob_receiver.recv().unwrap();
assert_eq!(oq.len(), 1);
let o = oq.pop_front().unwrap();
let ro = o.read().unwrap();
assert_eq!(ro.meta.size, 10);
assert_eq!(ro.meta.addr(), saddr);
exit.store(true, Ordering::Relaxed);
t_receiver.join().expect("join");
t_retransmit.join().expect("join");
}

}
Loading

0 comments on commit d7670cd

Please sign in to comment.