Skip to content

Commit

Permalink
Add a unit test for the reactor
Browse files Browse the repository at this point in the history
Also, change a few APIs that were expecting Sockets, but only actually
depended on IoHandle, to take IoHandle. In general, only APIs that rely
on setsockopts or connection should require Sockets.
  • Loading branch information
wycats committed Sep 5, 2014
1 parent 971641a commit 497e73a
Show file tree
Hide file tree
Showing 2 changed files with 132 additions and 6 deletions.
32 changes: 30 additions & 2 deletions src/io.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,12 @@ pub trait IoHandle {
fn desc(&self) -> os::IoDesc;
}

impl IoHandle for os::IoDesc {
fn desc(&self) -> os::IoDesc {
*self
}
}

// TODO: Should read / write return bool to indicate whether or not there is more?
pub trait IoReader {
fn read(&mut self, buf: &mut MutBuf) -> MioResult<()>;
Expand Down Expand Up @@ -46,7 +52,7 @@ pub trait Socket : IoHandle {
}
}

impl<S: Socket> IoReader for S {
impl<H: IoHandle> IoReader for H {
fn read(&mut self, buf: &mut MutBuf) -> MioResult<()> {
while !buf.is_full() {
match os::read(self.desc(), buf.mut_bytes()) {
Expand All @@ -61,7 +67,7 @@ impl<S: Socket> IoReader for S {
}
}

impl<S: Socket> IoWriter for S {
impl<H: IoHandle> IoWriter for H {
fn write(&mut self, buf: &mut Buf) -> MioResult<()> {
while !buf.is_full() {
match os::write(self.desc(), buf.bytes()) {
Expand Down Expand Up @@ -145,6 +151,28 @@ impl IoHandle for UnixSocket {
impl Socket for UnixSocket {
}

#[deriving(Show)]
pub struct PipeSender {
desc: os::IoDesc
}

#[deriving(Show)]
pub struct PipeReceiver {
desc: os::IoDesc
}

impl IoHandle for PipeSender {
fn desc(&self) -> os::IoDesc {
self.desc
}
}

impl IoHandle for PipeReceiver {
fn desc(&self) -> os::IoDesc {
self.desc
}
}

// Types of sockets
pub enum AddressFamily {
Inet,
Expand Down
106 changes: 102 additions & 4 deletions src/reactor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -36,8 +36,8 @@ impl<T: Token> Reactor<T> {
unimplemented!()
}

/// Registers an IO descriptor with the reactor.
pub fn register<S: Socket>(&mut self, io: S, token: T) -> MioResult<()> {
/// Registers an IO handle with the reactor.
pub fn register<H: IoHandle>(&mut self, io: H, token: T) -> MioResult<()> {
debug!("registering IO with reactor");

// Register interets for this socket
Expand All @@ -48,6 +48,11 @@ impl<T: Token> Reactor<T> {

/// Connects the socket to the specified address. When the operation
/// completes, the handler will be notified with the supplied token.
///
/// The goal of this method is to ensure that the reactor will always
/// notify about the connection, even if the connection happens
/// immediately. Otherwise, every consumer of the reactor would have
/// to worry about possibly-immediate connection.
pub fn connect<S: Socket>(&mut self, io: S,
addr: &SockAddr, token: T) -> MioResult<()> {

Expand All @@ -67,7 +72,7 @@ impl<T: Token> Reactor<T> {
Ok(())
}

pub fn listen<S, A: Socket + IoAcceptor<S>>(&mut self, io: A, backlog: uint,
pub fn listen<S, A: IoHandle + IoAcceptor<S>>(&mut self, io: A, backlog: uint,
token: T) -> MioResult<()> {

debug!("socket listen");
Expand All @@ -81,6 +86,8 @@ impl<T: Token> Reactor<T> {
Ok(())
}

/// Keep spinning the reactor indefinitely, and notify the handler whenever
/// any of the registered handles are ready.
pub fn run<H: Handler<T>>(&mut self, mut handler: H) {
self.run = true;

Expand All @@ -90,16 +97,37 @@ impl<T: Token> Reactor<T> {
while self.run {
debug!("reactor tick");

// Check the registered IO handles for any new events
// Check the registered IO handles for any new events. Each poll
// is for one second, so a shutdown request can last as long as
// one second before it takes effect.
self.io_poll(&mut events, &mut handler);
}
}

/// Spin the reactor once, with a timeout of one second, and notify the
/// handler if any of the registered handles become ready during that
/// time.
pub fn run_once<H: Handler<T>>(&mut self, mut handler: H) {
// Created here for stack allocation
let mut events = os::Events::new();

// Check the registered IO handles for any new events. Each poll
// is for one second, so a shutdown request can last as long as
// one second before it takes effect.
self.io_poll(&mut events, &mut handler);
}

/// Poll the reactor for one second, calling the handler if any
/// of the registered handles are ready.
fn io_poll<H: Handler<T>>(&mut self, events: &mut os::Events, handler: &mut H) {
self.selector.select(events, 1000).unwrap();

let mut i = 0u;

// Iterate over the notifications. Each event provides the token
// it was registered with (which usually represents, at least, the
// handle that the event is about) as well as information about
// what kind of event occurred (readable, writable, signal, etc.)
while i < events.len() {
let evt = events.get(i);
let tok = Token::from_u64(evt.token);
Expand Down Expand Up @@ -138,23 +166,93 @@ pub struct IoEvent {
token: u64
}

/// IoEvent represents the raw event that the OS-specific selector
/// returned. An event can represent more than one kind (such as
/// readable or writable) at a time.
///
/// These IoEvent objects are created by the OS-specific concrete
/// Selector when they have events to report.
impl IoEvent {
/// Create a new IoEvent.
pub fn new(kind: IoEventKind, token: u64) -> IoEvent {
IoEvent {
kind: kind,
token: token
}
}

/// This event indicated that the IO handle is now readable
pub fn is_readable(&self) -> bool {
self.kind.contains(IoReadable)
}

/// This event indicated that the IO handle is now writable
pub fn is_writable(&self) -> bool {
self.kind.contains(IoWritable)
}

/// This event indicated that the IO handle had an error
pub fn is_error(&self) -> bool {
self.kind.contains(IoError)
}
}

#[cfg(test)]
mod tests {
use std;
use std::sync::Arc;
use std::sync::atomics::{AtomicInt, SeqCst};

use super::Reactor;
use buf::{SliceBuf, MutSliceBuf};
use io::{IoWriter, IoReader};
use Handler;
use os;

struct Funtimes {
readable: Arc<AtomicInt>,
writable: Arc<AtomicInt>
}

impl Funtimes {
fn new(readable: Arc<AtomicInt>, writable: Arc<AtomicInt>) -> Funtimes {
Funtimes { readable: readable, writable: writable }
}
}

impl Handler<u64> for Funtimes {
fn readable(&mut self, reactor: &mut Reactor<u64>, token: u64) {
(*self.readable).fetch_add(1, SeqCst);
assert_eq!(token, 10u64);
}
}

#[test]
fn test_readable() {
let mut reactor = Reactor::<u64>::new().ok().expect("Couldn't make reactor");
let pipe = unsafe { std::os::pipe() }.ok().expect("Couldn't create pipe");
let mut reader = os::IoDesc { fd: pipe.reader };
let mut writer = os::IoDesc { fd: pipe.writer };

let mut buf = SliceBuf::wrap("hello".as_bytes());

let read_count = Arc::new(AtomicInt::new(0));
let write_count = Arc::new(AtomicInt::new(0));

writer.write(&mut buf);

reactor.register(reader, 10u64);
reactor.run_once(Funtimes::new(read_count.clone(), write_count.clone()));

assert_eq!((*read_count).load(SeqCst), 1);

let mut write_vec = vec![0u8, 0u8, 0u8, 0u8, 0u8];

{
let mut write_into = MutSliceBuf::wrap(write_vec.as_mut_slice());
reader.read(&mut write_into).ok().expect("Couldn't read");
}

assert_eq!(String::from_utf8(write_vec).ok().expect("Invalid UTF-8").as_slice(), "hello");
}
}

0 comments on commit 497e73a

Please sign in to comment.