Skip to content
This repository has been archived by the owner on Apr 2, 2018. It is now read-only.

Commit

Permalink
Respect max_timeout setting
Browse files Browse the repository at this point in the history
  • Loading branch information
carllerche committed Sep 3, 2016
1 parent 1fd1a91 commit c944a24
Show file tree
Hide file tree
Showing 3 changed files with 61 additions and 10 deletions.
40 changes: 34 additions & 6 deletions src/timer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ use worker::Worker;
use wheel::{Token, Wheel};
use futures::{Future, Async, Poll};
use futures::task::{self, Task};
use std::fmt;
use std::time::Instant;

/// A facility for scheduling timeouts
Expand All @@ -11,6 +12,15 @@ pub struct Timer {
worker: Worker,
}

/// The error type for timeout operations.
#[derive(Debug, Clone)]
pub enum Error {
/// The requested timeout exceeds the timer's `max_timeout` setting.
TooLong,
/// The timer has reached capacity and cannot support new timeouts.
NoCapacity,
}

/// A `Future` that completes at the requested instance
pub struct Timeout {
worker: Worker,
Expand All @@ -19,10 +29,8 @@ pub struct Timeout {
}

pub fn build(builder: Builder) -> Timer {
let tick = builder.get_tick_duration();

let wheel = Wheel::new(&builder);
let worker = Worker::spawn(wheel, tick, builder.get_channel_capacity());
let worker = Worker::spawn(wheel, &builder);

Timer { worker: worker }
}
Expand Down Expand Up @@ -66,9 +74,9 @@ impl Timeout {

impl Future for Timeout {
type Item = ();
type Error = ();
type Error = Error;

fn poll(&mut self) -> Poll<(), ()> {
fn poll(&mut self) -> Poll<(), Error> {
trace!("Timeout::poll; when={:?}", self.when);

if self.is_expired() {
Expand All @@ -82,7 +90,12 @@ impl Future for Timeout {

let handle = match self.handle {
None => {
// An wakeup request has not yet been sent to the timer.
// An wakeup request has not yet been sent to the timer. Before
// doing so, check to ensure that the requested timeout does
// not exceed the `max_timeout` duration
if (self.when - Instant::now()) > *self.worker.max_timeout() {
return Err(Error::TooLong);
}

trace!(" --> no handle; parking");

Expand Down Expand Up @@ -143,3 +156,18 @@ impl Drop for Timeout {
}
}
}

impl fmt::Display for Error {
fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result {
write!(fmt, "{}", ::std::error::Error::description(self))
}
}

impl ::std::error::Error for Error {
fn description(&self) -> &str {
match *self {
Error::TooLong => "requested timeout too long",
Error::NoCapacity => "timer out of capacity",
}
}
}
13 changes: 12 additions & 1 deletion src/worker.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
//! This code is needed in order to support a channel that can receive with a
//! timeout.
use Builder;
use mpmc::Queue;
use wheel::{Token, Wheel};
use futures::task::Task;
Expand All @@ -20,6 +21,7 @@ struct Tx {
chan: Arc<Chan>,
worker: Thread,
tolerance: Duration,
max_timeout: Duration,
}

struct Chan {
Expand All @@ -42,7 +44,11 @@ type ModQueue = Queue<ModTimeout, ()>;

impl Worker {
/// Spawn a worker, returning a handle to allow communication
pub fn spawn(mut wheel: Wheel, tolerance: Duration, capacity: usize) -> Worker {
pub fn spawn(mut wheel: Wheel, builder: &Builder) -> Worker {
let tolerance = builder.get_tick_duration();
let max_timeout = builder.get_max_timeout();
let capacity = builder.get_channel_capacity();

// Assert that the wheel has at least capacity available timeouts
assert!(wheel.available() >= capacity);

Expand All @@ -62,6 +68,7 @@ impl Worker {
chan: chan,
worker: t.thread().clone(),
tolerance: tolerance,
max_timeout: max_timeout,
}),
}
}
Expand All @@ -71,6 +78,10 @@ impl Worker {
&self.tx.tolerance
}

pub fn max_timeout(&self) -> &Duration {
&self.tx.max_timeout
}

/// Set a timeout
pub fn set_timeout(&self, when: Instant, task: Task) -> Result<Token, Task> {
self.tx.chan.set_timeouts.push(SetTimeout(when, task))
Expand Down
18 changes: 15 additions & 3 deletions tests/test_timer_wheel.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ fn test_immediate_timeout() {
let timer = Timer::default();

let mut t = timer.set_timeout(Instant::now());
assert_eq!(Ok(Async::Ready(())), t.poll());
assert_eq!(Async::Ready(()), t.poll().unwrap());
}

#[test]
Expand Down Expand Up @@ -57,10 +57,9 @@ fn test_setting_later_timeout_then_earlier_one() {

#[test]
fn test_timer_with_looping_wheel() {
let _ = ::env_logger::init();

let timer = timer::wheel()
.num_slots(8)
.max_timeout(Duration::from_millis(10_000))
.build();

let dur1 = Duration::from_millis(200);
Expand All @@ -75,3 +74,16 @@ fn test_timer_with_looping_wheel() {
e1.assert_is_about(dur1);
e2.assert_is_about(Duration::from_millis(800));
}

#[test]
fn test_request_timeout_greater_than_max() {
let timer = timer::wheel()
.max_timeout(Duration::from_millis(500))
.build();

let to = timer.set_timeout(Instant::now() + Duration::from_millis(600));
assert!(to.wait().is_err());

let to = timer.set_timeout(Instant::now() + Duration::from_millis(500));
assert!(to.wait().is_ok());
}

0 comments on commit c944a24

Please sign in to comment.