Skip to content

Commit

Permalink
stream: add StreamExt::timeout() (#2149)
Browse files Browse the repository at this point in the history
  • Loading branch information
alce authored and carllerche committed Jan 24, 2020
1 parent 0d49e11 commit 12be90e
Show file tree
Hide file tree
Showing 4 changed files with 252 additions and 1 deletion.
69 changes: 69 additions & 0 deletions tokio/src/stream/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,12 @@ use take::Take;
mod take_while;
use take_while::TakeWhile;

cfg_time! {
mod timeout;
use timeout::Timeout;
use std::time::Duration;
}

pub use futures_core::Stream;

/// An extension trait for `Stream`s that provides a variety of convenient
Expand Down Expand Up @@ -680,6 +686,69 @@ pub trait StreamExt: Stream {
{
Collect::new(self)
}

/// Applies a per-item timeout to the passed stream.
///
/// `timeout()` takes a `Duration` that represents the maximum amount of
/// time each element of the stream has to complete before timing out.
///
/// If the wrapped stream yields a value before the deadline is reached, the
/// value is returned. Otherwise, an error is returned. The caller may decide
/// to continue consuming the stream and will eventually get the next source
/// stream value once it becomes available.
///
/// # Notes
///
/// This function consumes the stream passed into it and returns a
/// wrapped version of it.
///
/// Polling the returned stream will continue to poll the inner stream even
/// if one or more items time out.
///
/// # Examples
///
/// Suppose we have a stream `int_stream` that yields 3 numbers (1, 2, 3):
///
/// ```
/// # #[tokio::main]
/// # async fn main() {
/// use tokio::stream::{self, StreamExt};
/// use std::time::Duration;
/// # let int_stream = stream::iter(1..=3);
///
/// let mut int_stream = int_stream.timeout(Duration::from_secs(1));
///
/// // When no items time out, we get the 3 elements in succession:
/// assert_eq!(int_stream.try_next().await, Ok(Some(1)));
/// assert_eq!(int_stream.try_next().await, Ok(Some(2)));
/// assert_eq!(int_stream.try_next().await, Ok(Some(3)));
/// assert_eq!(int_stream.try_next().await, Ok(None));
///
/// // If the second item times out, we get an error and continue polling the stream:
/// # let mut int_stream = stream::iter(vec![Ok(1), Err(()), Ok(2), Ok(3)]);
/// assert_eq!(int_stream.try_next().await, Ok(Some(1)));
/// assert!(int_stream.try_next().await.is_err());
/// assert_eq!(int_stream.try_next().await, Ok(Some(2)));
/// assert_eq!(int_stream.try_next().await, Ok(Some(3)));
/// assert_eq!(int_stream.try_next().await, Ok(None));
///
/// // If we want to stop consuming the source stream the first time an
/// // element times out, we can use the `take_while` operator:
/// # let int_stream = stream::iter(vec![Ok(1), Err(()), Ok(2), Ok(3)]);
/// let mut int_stream = int_stream.take_while(Result::is_ok);
///
/// assert_eq!(int_stream.try_next().await, Ok(Some(1)));
/// assert_eq!(int_stream.try_next().await, Ok(None));
/// # }
/// ```
#[cfg(all(feature = "time"))]
#[cfg_attr(docsrs, doc(cfg(feature = "time")))]
fn timeout(self, duration: Duration) -> Timeout<Self>
where
Self: Sized,
{
Timeout::new(self, duration)
}
}

impl<St: ?Sized> StreamExt for St where St: Stream {}
65 changes: 65 additions & 0 deletions tokio/src/stream/timeout.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,65 @@
use crate::stream::{Fuse, Stream};
use crate::time::{Delay, Elapsed, Instant};

use core::future::Future;
use core::pin::Pin;
use core::task::{Context, Poll};
use pin_project_lite::pin_project;
use std::time::Duration;

pin_project! {
/// Stream returned by the [`timeout`](super::StreamExt::timeout) method.
#[must_use = "streams do nothing unless polled"]
#[derive(Debug)]
pub struct Timeout<S> {
#[pin]
stream: Fuse<S>,
deadline: Delay,
duration: Duration,
poll_deadline: bool,
}
}

impl<S: Stream> Timeout<S> {
pub(super) fn new(stream: S, duration: Duration) -> Self {
let next = Instant::now() + duration;
let deadline = Delay::new_timeout(next, duration);

Timeout {
stream: Fuse::new(stream),
deadline,
duration,
poll_deadline: true,
}
}
}

impl<S: Stream> Stream for Timeout<S> {
type Item = Result<S::Item, Elapsed>;

fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
match self.as_mut().project().stream.poll_next(cx) {
Poll::Ready(v) => {
if v.is_some() {
let next = Instant::now() + self.duration;
self.as_mut().project().deadline.reset(next);
*self.as_mut().project().poll_deadline = true;
}
return Poll::Ready(v.map(Ok));
}
Poll::Pending => {}
};

if self.poll_deadline {
ready!(Pin::new(self.as_mut().project().deadline).poll(cx));
*self.as_mut().project().poll_deadline = false;
return Poll::Ready(Some(Err(Elapsed::new())));
}

Poll::Pending
}

fn size_hint(&self) -> (usize, Option<usize>) {
self.stream.size_hint()
}
}
10 changes: 9 additions & 1 deletion tokio/src/time/timeout.rs
Original file line number Diff line number Diff line change
Expand Up @@ -107,9 +107,17 @@ pub struct Timeout<T> {
}

/// Error returned by `Timeout`.
#[derive(Debug)]
#[derive(Debug, PartialEq)]
pub struct Elapsed(());

impl Elapsed {
// Used on StreamExt::timeout
#[allow(unused)]
pub(crate) fn new() -> Self {
Elapsed(())
}
}

impl<T> Timeout<T> {
pub(crate) fn new_with_delay(value: T, delay: Delay) -> Timeout<T> {
Timeout { value, delay }
Expand Down
109 changes: 109 additions & 0 deletions tokio/tests/stream_timeout.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,109 @@
#![cfg(feature = "full")]

use tokio::stream::{self, StreamExt};
use tokio::time::{self, delay_for, Duration};
use tokio_test::*;

use futures::StreamExt as _;

async fn maybe_delay(idx: i32) -> i32 {
if idx % 2 == 0 {
delay_for(ms(200)).await;
}
idx
}

fn ms(n: u64) -> Duration {
Duration::from_millis(n)
}

#[tokio::test]
async fn basic_usage() {
time::pause();

// Items 2 and 4 time out. If we run the stream until it completes,
// we end up with the following items:
//
// [Ok(1), Err(Elapsed), Ok(2), Ok(3), Err(Elapsed), Ok(4)]

let stream = stream::iter(1..=4).then(maybe_delay).timeout(ms(100));
let mut stream = task::spawn(stream);

// First item completes immediately
assert_ready_eq!(stream.poll_next(), Some(Ok(1)));

// Second item is delayed 200ms, times out after 100ms
assert_pending!(stream.poll_next());

time::advance(ms(150)).await;
let v = assert_ready!(stream.poll_next());
assert!(v.unwrap().is_err());

assert_pending!(stream.poll_next());

time::advance(ms(100)).await;
assert_ready_eq!(stream.poll_next(), Some(Ok(2)));

// Third item is ready immediately
assert_ready_eq!(stream.poll_next(), Some(Ok(3)));

// Fourth item is delayed 200ms, times out after 100ms
assert_pending!(stream.poll_next());

time::advance(ms(60)).await;
assert_pending!(stream.poll_next()); // nothing ready yet

time::advance(ms(60)).await;
let v = assert_ready!(stream.poll_next());
assert!(v.unwrap().is_err()); // timeout!

time::advance(ms(120)).await;
assert_ready_eq!(stream.poll_next(), Some(Ok(4)));

// Done.
assert_ready_eq!(stream.poll_next(), None);
}

#[tokio::test]
async fn return_elapsed_errors_only_once() {
time::pause();

let stream = stream::iter(1..=3).then(maybe_delay).timeout(ms(50));
let mut stream = task::spawn(stream);

// First item completes immediately
assert_ready_eq!(stream.poll_next(), Some(Ok(1)));

// Second item is delayed 200ms, times out after 50ms. Only one `Elapsed`
// error is returned.
assert_pending!(stream.poll_next());
//
time::advance(ms(50)).await;
let v = assert_ready!(stream.poll_next());
assert!(v.unwrap().is_err()); // timeout!

// deadline elapses again, but no error is returned
time::advance(ms(50)).await;
assert_pending!(stream.poll_next());

time::advance(ms(100)).await;
assert_ready_eq!(stream.poll_next(), Some(Ok(2)));
assert_ready_eq!(stream.poll_next(), Some(Ok(3)));

// Done
assert_ready_eq!(stream.poll_next(), None);
}

#[tokio::test]
async fn no_timeouts() {
let stream = stream::iter(vec![1, 3, 5])
.then(maybe_delay)
.timeout(ms(100));

let mut stream = task::spawn(stream);

assert_ready_eq!(stream.poll_next(), Some(Ok(1)));
assert_ready_eq!(stream.poll_next(), Some(Ok(3)));
assert_ready_eq!(stream.poll_next(), Some(Ok(5)));
assert_ready_eq!(stream.poll_next(), None);
}

0 comments on commit 12be90e

Please sign in to comment.