Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Limit futures dependency to Stream via feature flag #1774

Merged
merged 15 commits into from
Nov 16, 2019
5 changes: 2 additions & 3 deletions tests-integration/tests/process_stdio.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@ use tokio::process::{Child, Command};
use tokio_test::assert_ok;

use futures::future::{self, FutureExt};
use futures::stream::StreamExt;
use std::env;
use std::io;
use std::process::{ExitStatus, Stdio};
Expand Down Expand Up @@ -47,9 +46,9 @@ async fn feed_cat(mut cat: Child, n: usize) -> io::Result<ExitStatus> {
// (i.e. EOF is reached after `n` lines.
loop {
let data = reader
.next()
.next_line()
.await
.unwrap_or_else(|| Ok(String::new()))
.unwrap_or_else(|_| Some(String::new()))
.expect("failed to read line");

let num_read = data.len();
Expand Down
4 changes: 2 additions & 2 deletions tokio-test/src/io.rs
Original file line number Diff line number Diff line change
Expand Up @@ -298,7 +298,7 @@ impl AsyncRead for Mock {
Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => {
if let Some(rem) = self.inner.remaining_wait() {
let until = Instant::now() + rem;
self.inner.sleep = Some(time::delay(until));
self.inner.sleep = Some(time::delay_until(until));
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

delay was renamed delay_until to further disambiguate from delay_for.

} else {
self.inner.read_wait = Some(cx.waker().clone());
return Poll::Pending;
Expand Down Expand Up @@ -340,7 +340,7 @@ impl AsyncWrite for Mock {
Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => {
if let Some(rem) = self.inner.remaining_wait() {
let until = Instant::now() + rem;
self.inner.sleep = Some(time::delay(until));
self.inner.sleep = Some(time::delay_until(until));
} else {
panic!("unexpected WouldBlock");
}
Expand Down
4 changes: 2 additions & 2 deletions tokio-test/tests/block_on.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
#![warn(rust_2018_idioms)]

use tokio::time::{delay, Duration, Instant};
use tokio::time::{delay_until, Duration, Instant};
use tokio_test::block_on;

#[test]
Expand All @@ -20,5 +20,5 @@ fn async_fn() {
#[test]
fn test_delay() {
let deadline = Instant::now() + Duration::from_millis(100);
assert_eq!((), block_on(delay(deadline)));
assert_eq!((), block_on(delay_until(deadline)));
}
2 changes: 1 addition & 1 deletion tokio-util/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ log = "0.4"
tokio = { version = "=0.2.0-alpha.6", path = "../tokio" }
tokio-test = { version = "=0.2.0-alpha.6", path = "../tokio-test" }

futures-util = "0.3.0"
futures = "0.3.0"

[package.metadata.docs.rs]
all-features = true
1 change: 1 addition & 0 deletions tokio-util/src/fs/mod.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
mod read_dir;
19 changes: 19 additions & 0 deletions tokio-util/src/fs/read_dir.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
use crate::stream::Stream;
use tokio::fs::{DirEntry, ReadDir};

use futures_core::ready;
use std::io;
use std::pin::Pin;
use std::task::{Context, Poll};

impl Stream for ReadDir {
type Item = io::Result<DirEntry>;

fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
Poll::Ready(match ready!(self.poll_next_entry(cx)) {
Ok(Some(entry)) => Some(Ok(entry)),
Ok(None) => None,
Err(err) => Some(Err(err)),
})
}
}
22 changes: 22 additions & 0 deletions tokio-util/src/io/lines.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
use crate::stream::Stream;
use tokio::io::{AsyncBufRead, Lines};

use futures_core::ready;
use std::io;
use std::pin::Pin;
use std::task::{Context, Poll};

impl<R> Stream for Lines<R>
where
R: AsyncBufRead + Unpin,
{
type Item = io::Result<String>;

fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
match ready!(self.poll_next_line(cx))? {
Some(line) => Poll::Ready(Some(Ok(line))),
None => Poll::Ready(None),
}
}
}

1 change: 1 addition & 0 deletions tokio-util/src/io/mod.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
mod split;
21 changes: 21 additions & 0 deletions tokio-util/src/io/split.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
use crate::stream::Stream;
use tokio::io::{AsyncBufRead, Split};

use futures_core::ready;
use std::io;
use std::pin::Pin;
use std::task::{Context, Poll};

impl<R> Stream for Split<R>
where
R: AsyncBufRead + Unpin,
{
type Item = io::Result<Vec<u8>>;

fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
match ready!(self.poll_next_segment(cx))? {
Some(segment) => Poll::Ready(Some(Ok(segment))),
None => Poll::Ready(None),
}
}
}
5 changes: 5 additions & 0 deletions tokio-util/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,4 +14,9 @@
//! Utilities for working with Tokio.

pub mod codec;
mod fs;
mod io;
pub mod stream;
mod sync;
mod time;
pub mod udp;
Empty file added tokio-util/src/stream/as_std.rs
Empty file.
19 changes: 19 additions & 0 deletions tokio-util/src/stream/into_std.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
use crate::stream::Stream;

use std::pin::Pin;
use std::task::{Context, Poll};

/// Stream for the [`into_std`](super::Stream::into_std) method.
#[derive(Debug)]
pub struct IntoStd<T> {
pub(super) stream: T,
}

impl<T: Stream> futures_core::Stream for IntoStd<T> {
type Item = T::Item;

fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
let stream = unsafe { self.map_unchecked_mut(|me| &mut me.stream) };
stream.poll_next(cx)
}
}
carllerche marked this conversation as resolved.
Show resolved Hide resolved
117 changes: 117 additions & 0 deletions tokio-util/src/stream/mod.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,117 @@
//! Asynchronous value iteration.

mod into_std;
pub use into_std::IntoStd;

use std::ops::DerefMut;
use std::pin::Pin;
use std::task::{Context, Poll};

/// A stream of values produced asynchronously.
///
/// This trait is used to convert Tokio's "stream-like" types into
/// `futures::Stream`. This trait is not intended to be implemented by third
/// parties (use `futures::Stream` instead) and only exists to satisfy Rust's
carllerche marked this conversation as resolved.
Show resolved Hide resolved
/// coherence requirements. When `Stream` is stabilized in `std`, this trait
/// will be removed and `tokio` will directly implement the `std` trait.
#[must_use = "streams do nothing unless polled"]
pub trait Stream {
/// Values yielded by the stream.
type Item;

/// Attempt to pull out the next value of this stream, registering the
/// current task for wakeup if the value is not yet available, and returning
/// `None` if the stream is exhausted.
///
/// # Return value
///
/// There are several possible return values, each indicating a distinct
/// stream state:
///
/// - `Poll::Pending` means that this stream's next value is not ready
/// yet. Implementations will ensure that the current task will be notified
/// when the next value may be ready.
///
/// - `Poll::Ready(Some(val))` means that the stream has successfully
/// produced a value, `val`, and may produce further values on subsequent
/// `poll_next` calls.
///
/// - `Poll::Ready(None)` means that the stream has terminated, and
/// `poll_next` should not be invoked again.
///
/// # Panics
///
/// Once a stream is finished, i.e. `Ready(None)` has been returned, further
/// calls to `poll_next` may result in a panic or other "bad behavior". If
/// this is difficult to guard against then the `fuse` adapter can be used
/// to ensure that `poll_next` always returns `Ready(None)` in subsequent
/// calls.
fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>>;

/// Returns the bounds on the remaining length of the stream.
///
/// Specifically, `size_hint()` returns a tuple where the first element
/// is the lower bound, and the second element is the upper bound.
///
/// The second half of the tuple that is returned is an [`Option`]`<`[`usize`]`>`.
/// A [`None`] here means that either there is no known upper bound, or the
/// upper bound is larger than [`usize`].
///
/// # Implementation notes
///
/// It is not enforced that a stream implementation yields the declared
/// number of elements. A buggy stream may yield less than the lower bound
/// or more than the upper bound of elements.
///
/// `size_hint()` is primarily intended to be used for optimizations such as
/// reserving space for the elements of the stream, but must not be
/// trusted to e.g., omit bounds checks in unsafe code. An incorrect
/// implementation of `size_hint()` should not lead to memory safety
/// violations.
///
/// That said, the implementation should provide a correct estimation,
/// because otherwise it would be a violation of the trait's protocol.
///
/// The default implementation returns `(0, `[`None`]`)` which is correct for any
/// stream.
#[inline]
fn size_hint(&self) -> (usize, Option<usize>) {
(0, None)
}

/// Convert the stream into a `futures::Stream` type.
fn into_std(self) -> IntoStd<Self>
where
Self: Sized,
{
IntoStd { stream: self }
}
}

impl<S: ?Sized + Stream + Unpin> Stream for &mut S {
type Item = S::Item;

fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
S::poll_next(Pin::new(&mut **self), cx)
}

fn size_hint(&self) -> (usize, Option<usize>) {
(**self).size_hint()
}
}

impl<P> Stream for Pin<P>
where
P: DerefMut + Unpin,
P::Target: Stream,
{
type Item = <P::Target as Stream>::Item;

fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
self.get_mut().as_mut().poll_next(cx)
}

fn size_hint(&self) -> (usize, Option<usize>) {
(**self).size_hint()
}
}
2 changes: 2 additions & 0 deletions tokio-util/src/sync/mod.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
mod mpsc;
mod watch;
21 changes: 21 additions & 0 deletions tokio-util/src/sync/mpsc.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
use crate::stream::Stream;
use tokio::sync::mpsc::{Receiver, UnboundedReceiver};

use std::pin::Pin;
use std::task::{Context, Poll};

impl<T> Stream for Receiver<T> {
type Item = T;

fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<T>> {
self.get_mut().poll_recv(cx)
}
}

impl<T> Stream for UnboundedReceiver<T> {
type Item = T;

fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<T>> {
self.get_mut().poll_recv(cx)
}
}
17 changes: 17 additions & 0 deletions tokio-util/src/sync/watch.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
use crate::stream::Stream;
use tokio::sync::watch::Receiver;

use futures_core::ready;
use std::pin::Pin;
use std::task::{Context, Poll};

impl<T: Clone> Stream for Receiver<T> {
type Item = T;

fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<T>> {
let ret = ready!(self.poll_recv_ref(cx));

#[allow(clippy::map_clone)]
Poll::Ready(ret.map(|v_ref| v_ref.clone()))
}
}
15 changes: 15 additions & 0 deletions tokio-util/src/time/interval.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
use crate::stream::Stream;
use tokio::time::{Instant, Interval};

use futures_core::ready;
use std::pin::Pin;
use std::task::{Context, Poll};

impl Stream for Interval {
type Item = Instant;

fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Instant>> {
let instant = ready!(self.get_mut().poll_tick(cx));
Poll::Ready(Some(instant))
}
}
1 change: 1 addition & 0 deletions tokio-util/src/time/mod.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
mod interval;
44 changes: 44 additions & 0 deletions tokio-util/tests/sync_watch.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
#![warn(rust_2018_idioms)]

use tokio::sync::watch;
use tokio_test::task::spawn;
use tokio_test::{assert_pending, assert_ready};
use tokio_util::stream::Stream;

use futures::prelude::*;

#[test]
fn stream_impl() {
let (tx, rx) = watch::channel("one");
let mut rx = rx.into_std();

{
let mut t = spawn(rx.next());
let v = assert_ready!(t.poll()).unwrap();
assert_eq!(v, "one");
}

{
let mut t = spawn(rx.next());

assert_pending!(t.poll());

tx.broadcast("two").unwrap();

assert!(t.is_woken());

let v = assert_ready!(t.poll()).unwrap();
assert_eq!(v, "two");
}

{
let mut t = spawn(rx.next());

assert_pending!(t.poll());

drop(tx);

let res = assert_ready!(t.poll());
assert!(res.is_none());
}
}
3 changes: 0 additions & 3 deletions tokio/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -84,9 +84,6 @@ uds = ["io-driver", "mio-uds", "libc"]
tokio-macros = { version = "=0.2.0-alpha.6", optional = true, path = "../tokio-macros" }

bytes = "0.4"
futures-core = "0.3.0"
futures-sink = "0.3.0"
futures-util = { version = "0.3.0", features = ["sink", "channel"] }
iovec = "0.1"

# Everything else is optional...
Expand Down
Loading