Skip to content

Commit

Permalink
Test for Server::process_receive_packet_channel (#20)
Browse files Browse the repository at this point in the history
  • Loading branch information
markmandel authored Feb 3, 2020
1 parent d598ea1 commit 7316371
Showing 1 changed file with 45 additions and 8 deletions.
53 changes: 45 additions & 8 deletions src/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -117,7 +117,6 @@ impl Server {
return Ok(());
}

// TODO: write tests for this
/// process_receive_packet_channel blocks on receive_packets.recv() channel
/// and sends each packet on to the Packet.dest
async fn process_receive_packet_channel(
Expand All @@ -128,16 +127,16 @@ impl Server {
while let Some(packet) = receive_packets.recv().await {
debug!(
log,
"Sending packet back to origin {}, {}",
packet.dest,
String::from_utf8(packet.contents.clone()).unwrap()
"Sending packet back to origin";
"origin" => packet.dest,
"contents" => String::from_utf8(packet.contents.clone()).unwrap(),
);

if let Err(err) = send_socket
.send_to(packet.contents.as_slice(), &packet.dest)
.await
{
error!(log, "Error sending packet to {}, {}", packet.dest, err);
error!(log, "Error sending packet"; "dest" => packet.dest.to_string(), "err" => err.to_string());
}
}
}
Expand Down Expand Up @@ -367,6 +366,39 @@ mod tests {
recv.close();
}

#[tokio::test]
async fn server_process_receive_packet_channel() {
let log = logger();
let socket = ephemeral_socket().await;
let local_addr = socket.local_addr().unwrap();

let (mut recv_socket, send_socket) = socket.split();
let (mut send_packet, recv_packet) = channel::<Packet>(5);
let (done, wait) = oneshot::channel::<()>();

tokio::spawn(async move {
let mut buf = vec![0; 1024];
let size = recv_socket.recv(&mut buf).await.unwrap();
assert_eq!("hello", from_utf8(&buf[..size]).unwrap());
done.send(()).unwrap();
});

if let Err(err) = send_packet
.send(Packet {
dest: local_addr,
contents: String::from("hello").into_bytes(),
})
.await
{
assert!(false, err)
}

tokio::spawn(async move {
Server::process_receive_packet_channel(&log, send_socket, recv_packet).await;
});
wait.await.unwrap();
}

#[tokio::test]
async fn session_send_to() {
let log = logger();
Expand All @@ -384,8 +416,7 @@ mod tests {
/// with the contents of "hello"
/// call wait.await.unwrap() to see if the message was received
async fn assert_recv_udp() -> (SocketAddr, oneshot::Receiver<()>) {
let addr = SocketAddrV4::new(Ipv4Addr::new(0, 0, 0, 0), 0);
let socket = UdpSocket::bind(addr).await.unwrap();
let socket = ephemeral_socket().await;
let local_addr = socket.local_addr().unwrap();
let (mut recv, _) = socket.split();
let (done, wait) = oneshot::channel::<()>();
Expand All @@ -395,6 +426,12 @@ mod tests {
assert_eq!("hello", from_utf8(&buf[..size]).unwrap());
done.send(()).unwrap();
});
return (local_addr, wait);
(local_addr, wait)
}

/// ephemeral_socket provides a socket bound to an ephemeral port
async fn ephemeral_socket() -> UdpSocket {
let addr = SocketAddrV4::new(Ipv4Addr::new(0, 0, 0, 0), 0);
UdpSocket::bind(addr).await.unwrap()
}
}

0 comments on commit 7316371

Please sign in to comment.