From 0c8db16a67d02127cb6b4a1f399db054517f6aee Mon Sep 17 00:00:00 2001 From: Yoshua Wuyts Date: Fri, 13 Nov 2020 18:24:26 +0100 Subject: [PATCH 1/2] Add `core::stream::Stream` This patch adds the `core::stream` submodule and implements `core::stream::Stream` in accordance with RFC2996. Add feedback from @camelid --- library/alloc/src/boxed.rs | 14 +++ library/alloc/src/lib.rs | 1 + library/core/src/lib.rs | 2 + library/core/src/stream/mod.rs | 154 +++++++++++++++++++++++++ library/core/src/stream/stream/mod.rs | 129 +++++++++++++++++++++ library/core/src/stream/stream/next.rs | 30 +++++ library/std/src/lib.rs | 3 + library/std/src/panic.rs | 14 +++ 8 files changed, 347 insertions(+) create mode 100644 library/core/src/stream/mod.rs create mode 100644 library/core/src/stream/stream/mod.rs create mode 100644 library/core/src/stream/stream/next.rs diff --git a/library/alloc/src/boxed.rs b/library/alloc/src/boxed.rs index 0aa52b35ced45..e586ff8990215 100644 --- a/library/alloc/src/boxed.rs +++ b/library/alloc/src/boxed.rs @@ -149,6 +149,7 @@ use core::ops::{ }; use core::pin::Pin; use core::ptr::{self, Unique}; +use core::stream::Stream; use core::task::{Context, Poll}; use crate::alloc::{handle_alloc_error, AllocError, Allocator, Global, Layout, WriteCloneIntoRaw}; @@ -1618,3 +1619,16 @@ where F::poll(Pin::new(&mut *self), cx) } } + +#[unstable(feature = "async_stream", issue = "79024")] +impl Stream for Box { + type Item = S::Item; + + fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { + Pin::new(&mut **self).poll_next(cx) + } + + fn size_hint(&self) -> (usize, Option) { + (**self).size_hint() + } +} diff --git a/library/alloc/src/lib.rs b/library/alloc/src/lib.rs index 8d721ed7487ae..e524eb05fcdd7 100644 --- a/library/alloc/src/lib.rs +++ b/library/alloc/src/lib.rs @@ -82,6 +82,7 @@ #![feature(array_windows)] #![feature(allow_internal_unstable)] #![feature(arbitrary_self_types)] +#![feature(async_stream)] #![feature(box_patterns)] #![feature(box_syntax)] #![feature(cfg_sanitize)] diff --git a/library/core/src/lib.rs b/library/core/src/lib.rs index 263c6c9cf0f26..a4395ab57e8a1 100644 --- a/library/core/src/lib.rs +++ b/library/core/src/lib.rs @@ -254,6 +254,8 @@ pub mod panicking; pub mod pin; pub mod raw; pub mod result; +#[unstable(feature = "async_stream", issue = "79024")] +pub mod stream; pub mod sync; pub mod fmt; diff --git a/library/core/src/stream/mod.rs b/library/core/src/stream/mod.rs new file mode 100644 index 0000000000000..48cca4972929a --- /dev/null +++ b/library/core/src/stream/mod.rs @@ -0,0 +1,154 @@ +//! Composable asynchronous iteration. +//! +//! If futures are asynchronous values, then streams are asynchronous +//! iterators. If you've found yourself with an asynchronous collection of some kind, +//! and needed to perform an operation on the elements of said collection, +//! you'll quickly run into 'streams'. Streams are heavily used in idiomatic +//! asynchronous Rust code, so it's worth becoming familiar with them. +//! +//! Before explaining more, let's talk about how this module is structured: +//! +//! # Organization +//! +//! This module is largely organized by type: +//! +//! * [Traits] are the core portion: these traits define what kind of streams +//! exist and what you can do with them. The methods of these traits are worth +//! putting some extra study time into. +//! * Functions provide some helpful ways to create some basic streams. +//! * [Structs] are often the return types of the various methods on this +//! module's traits. You'll usually want to look at the method that creates +//! the `struct`, rather than the `struct` itself. For more detail about why, +//! see '[Implementing Stream](#implementing-stream)'. +//! +//! [Traits]: #traits +//! [Structs]: #structs +//! +//! That's it! Let's dig into streams. +//! +//! # Stream +//! +//! The heart and soul of this module is the [`Stream`] trait. The core of +//! [`Stream`] looks like this: +//! +//! ``` +//! # use core::task::{Context, Poll}; +//! # use core::pin::Pin; +//! trait Stream { +//! type Item; +//! fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll>; +//! } +//! ``` +//! +//! Unlike `Iterator`, `Stream` makes a distinction between the [`poll_next`] +//! method which is used when implementing a `Stream`, and the [`next`] method +//! which is used when consuming a stream. Consumers of `Stream` only need to +//! consider [`next`], which when called, returns a future which yields +//! yields [`Option`][``]. +//! +//! The future returned by [`next`] will yield `Some(Item)` as long as there are +//! elements, and once they've all been exhausted, will yield `None` to indicate +//! that iteration is finished. If we're waiting on something asynchronous to +//! resolve, the future will wait until the stream is ready to yield again. +//! +//! Individual streams may choose to resume iteration, and so calling [`next`] +//! again may or may not eventually yield `Some(Item)` again at some point. +//! +//! [`Stream`]'s full definition includes a number of other methods as well, +//! but they are default methods, built on top of [`poll_next`], and so you get +//! them for free. +//! +//! [`Poll`]: super::task::Poll +//! [`poll_next`]: Stream::poll_next +//! [`next`]: Stream::next +//! [``]: Stream::Item +//! +//! # Implementing Stream +//! +//! Creating a stream of your own involves two steps: creating a `struct` to +//! hold the stream's state, and then implementing [`Stream`] for that +//! `struct`. +//! +//! Let's make a stream named `Counter` which counts from `1` to `5`: +//! +//! ```no_run +//! #![feature(async_stream)] +//! # use core::stream::Stream; +//! # use core::task::{Context, Poll}; +//! # use core::pin::Pin; +//! +//! // First, the struct: +//! +//! /// A stream which counts from one to five +//! struct Counter { +//! count: usize, +//! } +//! +//! // we want our count to start at one, so let's add a new() method to help. +//! // This isn't strictly necessary, but is convenient. Note that we start +//! // `count` at zero, we'll see why in `poll_next()`'s implementation below. +//! impl Counter { +//! fn new() -> Counter { +//! Counter { count: 0 } +//! } +//! } +//! +//! // Then, we implement `Stream` for our `Counter`: +//! +//! impl Stream for Counter { +//! // we will be counting with usize +//! type Item = usize; +//! +//! // poll_next() is the only required method +//! fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { +//! // Increment our count. This is why we started at zero. +//! self.count += 1; +//! +//! // Check to see if we've finished counting or not. +//! if self.count < 6 { +//! Poll::Ready(Some(self.count)) +//! } else { +//! Poll::Ready(None) +//! } +//! } +//! } +//! +//! // And now we can use it! +//! # async fn run() { +//! # +//! let mut counter = Counter::new(); +//! +//! let x = counter.next().await.unwrap(); +//! println!("{}", x); +//! +//! let x = counter.next().await.unwrap(); +//! println!("{}", x); +//! +//! let x = counter.next().await.unwrap(); +//! println!("{}", x); +//! +//! let x = counter.next().await.unwrap(); +//! println!("{}", x); +//! +//! let x = counter.next().await.unwrap(); +//! println!("{}", x); +//! # +//! } +//! ``` +//! +//! This will print `1` through `5`, each on their own line. +//! +//! # Laziness +//! +//! Streams are *lazy*. This means that just creating a stream doesn't _do_ a +//! whole lot. Nothing really happens until you call [`next`]. This is sometimes a +//! source of confusion when creating a stream solely for its side effects. The +//! compiler will warn us about this kind of behavior: +//! +//! ```text +//! warning: unused result that must be used: streams do nothing unless polled +//! ``` + +mod stream; + +pub use stream::{Next, Stream}; diff --git a/library/core/src/stream/stream/mod.rs b/library/core/src/stream/stream/mod.rs new file mode 100644 index 0000000000000..3f92c2e8c1c02 --- /dev/null +++ b/library/core/src/stream/stream/mod.rs @@ -0,0 +1,129 @@ +mod next; + +pub use next::Next; + +use crate::ops::DerefMut; +use crate::pin::Pin; +use crate::task::{Context, Poll}; + +/// An interface for dealing with asynchronous iterators. +/// +/// This is the main stream trait. For more about the concept of streams +/// generally, please see the [module-level documentation]. In particular, you +/// may want to know how to [implement `Stream`][impl]. +/// +/// [module-level documentation]: index.html +/// [impl]: index.html#implementing-stream +#[unstable(feature = "async_stream", issue = "79024")] +#[must_use = "streams do nothing unless polled"] +pub trait Stream { + /// The type of items yielded by the stream. + type Item; + + /// Attempt to pull out the next value of this stream, registering the + /// current task for wakeup if the value is not yet available, and returning + /// `None` if the stream is exhausted. + /// + /// # Return value + /// + /// There are several possible return values, each indicating a distinct + /// stream state: + /// + /// - `Poll::Pending` means that this stream's next value is not ready + /// yet. Implementations will ensure that the current task will be notified + /// when the next value may be ready. + /// + /// - `Poll::Ready(Some(val))` means that the stream has successfully + /// produced a value, `val`, and may produce further values on subsequent + /// `poll_next` calls. + /// + /// - `Poll::Ready(None)` means that the stream has terminated, and + /// `poll_next` should not be invoked again. + /// + /// # Panics + /// + /// Once a stream has finished (returned `Ready(None)` from `poll_next`), calling its + /// `poll_next` method again may panic, block forever, or cause other kinds of + /// problems; the `Stream` trait places no requirements on the effects of + /// such a call. However, as the `poll_next` method is not marked `unsafe`, + /// Rust's usual rules apply: calls must never cause undefined behavior + /// (memory corruption, incorrect use of `unsafe` functions, or the like), + /// regardless of the stream's state. + fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll>; + + /// Returns the bounds on the remaining length of the stream. + /// + /// Specifically, `size_hint()` returns a tuple where the first element + /// is the lower bound, and the second element is the upper bound. + /// + /// The second half of the tuple that is returned is an [`Option`]`<`[`usize`]`>`. + /// A [`None`] here means that either there is no known upper bound, or the + /// upper bound is larger than [`usize`]. + /// + /// # Implementation notes + /// + /// It is not enforced that a stream implementation yields the declared + /// number of elements. A buggy stream may yield less than the lower bound + /// or more than the upper bound of elements. + /// + /// `size_hint()` is primarily intended to be used for optimizations such as + /// reserving space for the elements of the stream, but must not be + /// trusted to e.g., omit bounds checks in unsafe code. An incorrect + /// implementation of `size_hint()` should not lead to memory safety + /// violations. + /// + /// That said, the implementation should provide a correct estimation, + /// because otherwise it would be a violation of the trait's protocol. + /// + /// The default implementation returns `(0, `[`None`]`)` which is correct for any + /// stream. + #[inline] + fn size_hint(&self) -> (usize, Option) { + (0, None) + } + + /// Advances the stream and returns a future which yields the next value. + /// + /// The returned future yields [`None`] when iteration is finished. + /// Individual stream implementations may choose to resume iteration, and so + /// calling `next()` again may or may not eventually start yielding + /// [`Some(Item)`] again at some point. + /// + /// [`Some(Item)`]: Some + fn next(&mut self) -> Next<'_, Self> + where + Self: Unpin, + { + Next::new(self) + } +} + +#[unstable(feature = "async_stream", issue = "79024")] +impl Stream for &mut S { + type Item = S::Item; + + fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { + S::poll_next(Pin::new(&mut **self), cx) + } + + fn size_hint(&self) -> (usize, Option) { + (**self).size_hint() + } +} + +#[unstable(feature = "async_stream", issue = "79024")] +impl

Stream for Pin

+where + P: DerefMut + Unpin, + P::Target: Stream, +{ + type Item = ::Item; + + fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { + self.get_mut().as_mut().poll_next(cx) + } + + fn size_hint(&self) -> (usize, Option) { + (**self).size_hint() + } +} diff --git a/library/core/src/stream/stream/next.rs b/library/core/src/stream/stream/next.rs new file mode 100644 index 0000000000000..e25d44228e781 --- /dev/null +++ b/library/core/src/stream/stream/next.rs @@ -0,0 +1,30 @@ +use crate::future::Future; +use crate::pin::Pin; +use crate::stream::Stream; +use crate::task::{Context, Poll}; + +/// A future which advances the stream and returns the next value. +/// +/// This `struct` is created by [`Stream::next`]. See its documentation for more. +#[unstable(feature = "async_stream", issue = "79024")] +#[derive(Debug)] +#[must_use = "futures do nothing unless you `.await` or poll them"] +pub struct Next<'a, S: ?Sized> { + stream: &'a mut S, +} + +impl<'a, S: ?Sized> Next<'a, S> { + /// Create a new instance of `Next`. + pub(crate) fn new(stream: &'a mut S) -> Self { + Self { stream } + } +} + +#[unstable(feature = "async_stream", issue = "79024")] +impl Future for Next<'_, S> { + type Output = Option; + + fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { + Pin::new(&mut *self.stream).poll_next(cx) + } +} diff --git a/library/std/src/lib.rs b/library/std/src/lib.rs index 5ba13c2f91334..f739fffd1c04c 100644 --- a/library/std/src/lib.rs +++ b/library/std/src/lib.rs @@ -224,6 +224,7 @@ #![feature(allocator_internals)] #![feature(allow_internal_unsafe)] #![feature(allow_internal_unstable)] +#![feature(async_stream)] #![feature(arbitrary_self_types)] #![feature(array_error_internals)] #![feature(asm)] @@ -448,6 +449,8 @@ pub use core::ptr; pub use core::raw; #[stable(feature = "rust1", since = "1.0.0")] pub use core::result; +#[unstable(feature = "async_stream", issue = "79024")] +pub use core::stream; #[stable(feature = "i128", since = "1.26.0")] #[allow(deprecated, deprecated_in_future)] pub use core::u128; diff --git a/library/std/src/panic.rs b/library/std/src/panic.rs index d18b94b6c1aef..66e363bf67b8b 100644 --- a/library/std/src/panic.rs +++ b/library/std/src/panic.rs @@ -12,6 +12,7 @@ use crate::panicking; use crate::pin::Pin; use crate::ptr::{NonNull, Unique}; use crate::rc::Rc; +use crate::stream::Stream; use crate::sync::atomic; use crate::sync::{Arc, Mutex, RwLock}; use crate::task::{Context, Poll}; @@ -340,6 +341,19 @@ impl Future for AssertUnwindSafe { } } +#[unstable(feature = "async_stream", issue = "79024")] +impl Stream for AssertUnwindSafe { + type Item = S::Item; + + fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { + unsafe { self.map_unchecked_mut(|x| &mut x.0) }.poll_next(cx) + } + + fn size_hint(&self) -> (usize, Option) { + self.0.size_hint() + } +} + /// Invokes a closure, capturing the cause of an unwinding panic if one occurs. /// /// This function will return `Ok` with the closure's result if the closure From a1b11321fb2d6ce00af9c8957c98df76432b1b78 Mon Sep 17 00:00:00 2001 From: Yoshua Wuyts Date: Fri, 15 Jan 2021 15:54:09 +0100 Subject: [PATCH 2/2] Remove `Stream::next` This is a temporary change only, as we wait to resolve dynamic dispatch issues. The `Stream::next` method and corresponding documentation are expected to be fully restored once we have a path to proceed. Ref: https://github.com/rust-lang/rfcs/pull/2996#issuecomment-757386206 update docs --- library/core/src/stream/mod.rs | 45 ++++++-------------------- library/core/src/stream/stream/mod.rs | 19 ----------- library/core/src/stream/stream/next.rs | 30 ----------------- 3 files changed, 9 insertions(+), 85 deletions(-) delete mode 100644 library/core/src/stream/stream/next.rs diff --git a/library/core/src/stream/mod.rs b/library/core/src/stream/mod.rs index 48cca4972929a..0df18af65ebf0 100644 --- a/library/core/src/stream/mod.rs +++ b/library/core/src/stream/mod.rs @@ -16,13 +16,12 @@ //! exist and what you can do with them. The methods of these traits are worth //! putting some extra study time into. //! * Functions provide some helpful ways to create some basic streams. -//! * [Structs] are often the return types of the various methods on this +//! * Structs are often the return types of the various methods on this //! module's traits. You'll usually want to look at the method that creates //! the `struct`, rather than the `struct` itself. For more detail about why, //! see '[Implementing Stream](#implementing-stream)'. //! //! [Traits]: #traits -//! [Structs]: #structs //! //! That's it! Let's dig into streams. //! @@ -41,17 +40,17 @@ //! ``` //! //! Unlike `Iterator`, `Stream` makes a distinction between the [`poll_next`] -//! method which is used when implementing a `Stream`, and the [`next`] method -//! which is used when consuming a stream. Consumers of `Stream` only need to -//! consider [`next`], which when called, returns a future which yields -//! yields [`Option`][``]. +//! method which is used when implementing a `Stream`, and a (to-be-implemented) +//! `next` method which is used when consuming a stream. Consumers of `Stream` +//! only need to consider `next`, which when called, returns a future which +//! yields `Option`. //! -//! The future returned by [`next`] will yield `Some(Item)` as long as there are +//! The future returned by `next` will yield `Some(Item)` as long as there are //! elements, and once they've all been exhausted, will yield `None` to indicate //! that iteration is finished. If we're waiting on something asynchronous to //! resolve, the future will wait until the stream is ready to yield again. //! -//! Individual streams may choose to resume iteration, and so calling [`next`] +//! Individual streams may choose to resume iteration, and so calling `next` //! again may or may not eventually yield `Some(Item)` again at some point. //! //! [`Stream`]'s full definition includes a number of other methods as well, @@ -60,8 +59,6 @@ //! //! [`Poll`]: super::task::Poll //! [`poll_next`]: Stream::poll_next -//! [`next`]: Stream::next -//! [``]: Stream::Item //! //! # Implementing Stream //! @@ -112,36 +109,12 @@ //! } //! } //! } -//! -//! // And now we can use it! -//! # async fn run() { -//! # -//! let mut counter = Counter::new(); -//! -//! let x = counter.next().await.unwrap(); -//! println!("{}", x); -//! -//! let x = counter.next().await.unwrap(); -//! println!("{}", x); -//! -//! let x = counter.next().await.unwrap(); -//! println!("{}", x); -//! -//! let x = counter.next().await.unwrap(); -//! println!("{}", x); -//! -//! let x = counter.next().await.unwrap(); -//! println!("{}", x); -//! # -//! } //! ``` //! -//! This will print `1` through `5`, each on their own line. -//! //! # Laziness //! //! Streams are *lazy*. This means that just creating a stream doesn't _do_ a -//! whole lot. Nothing really happens until you call [`next`]. This is sometimes a +//! whole lot. Nothing really happens until you call `next`. This is sometimes a //! source of confusion when creating a stream solely for its side effects. The //! compiler will warn us about this kind of behavior: //! @@ -151,4 +124,4 @@ mod stream; -pub use stream::{Next, Stream}; +pub use stream::Stream; diff --git a/library/core/src/stream/stream/mod.rs b/library/core/src/stream/stream/mod.rs index 3f92c2e8c1c02..e37902dae1f2d 100644 --- a/library/core/src/stream/stream/mod.rs +++ b/library/core/src/stream/stream/mod.rs @@ -1,7 +1,3 @@ -mod next; - -pub use next::Next; - use crate::ops::DerefMut; use crate::pin::Pin; use crate::task::{Context, Poll}; @@ -81,21 +77,6 @@ pub trait Stream { fn size_hint(&self) -> (usize, Option) { (0, None) } - - /// Advances the stream and returns a future which yields the next value. - /// - /// The returned future yields [`None`] when iteration is finished. - /// Individual stream implementations may choose to resume iteration, and so - /// calling `next()` again may or may not eventually start yielding - /// [`Some(Item)`] again at some point. - /// - /// [`Some(Item)`]: Some - fn next(&mut self) -> Next<'_, Self> - where - Self: Unpin, - { - Next::new(self) - } } #[unstable(feature = "async_stream", issue = "79024")] diff --git a/library/core/src/stream/stream/next.rs b/library/core/src/stream/stream/next.rs deleted file mode 100644 index e25d44228e781..0000000000000 --- a/library/core/src/stream/stream/next.rs +++ /dev/null @@ -1,30 +0,0 @@ -use crate::future::Future; -use crate::pin::Pin; -use crate::stream::Stream; -use crate::task::{Context, Poll}; - -/// A future which advances the stream and returns the next value. -/// -/// This `struct` is created by [`Stream::next`]. See its documentation for more. -#[unstable(feature = "async_stream", issue = "79024")] -#[derive(Debug)] -#[must_use = "futures do nothing unless you `.await` or poll them"] -pub struct Next<'a, S: ?Sized> { - stream: &'a mut S, -} - -impl<'a, S: ?Sized> Next<'a, S> { - /// Create a new instance of `Next`. - pub(crate) fn new(stream: &'a mut S) -> Self { - Self { stream } - } -} - -#[unstable(feature = "async_stream", issue = "79024")] -impl Future for Next<'_, S> { - type Output = Option; - - fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { - Pin::new(&mut *self.stream).poll_next(cx) - } -}