Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add core::stream::Stream #79023

Merged
merged 2 commits into from
Jan 30, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 14 additions & 0 deletions library/alloc/src/boxed.rs
Original file line number Diff line number Diff line change
Expand Up @@ -149,6 +149,7 @@ use core::ops::{
};
use core::pin::Pin;
use core::ptr::{self, Unique};
use core::stream::Stream;
use core::task::{Context, Poll};

use crate::alloc::{handle_alloc_error, AllocError, Allocator, Global, Layout, WriteCloneIntoRaw};
Expand Down Expand Up @@ -1618,3 +1619,16 @@ where
F::poll(Pin::new(&mut *self), cx)
}
}

#[unstable(feature = "async_stream", issue = "79024")]
impl<S: ?Sized + Stream + Unpin> Stream for Box<S> {
type Item = S::Item;

fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
Pin::new(&mut **self).poll_next(cx)
yoshuawuyts marked this conversation as resolved.
Show resolved Hide resolved
}

fn size_hint(&self) -> (usize, Option<usize>) {
(**self).size_hint()
}
}
1 change: 1 addition & 0 deletions library/alloc/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,7 @@
#![feature(array_windows)]
#![feature(allow_internal_unstable)]
#![feature(arbitrary_self_types)]
#![feature(async_stream)]
#![feature(box_patterns)]
#![feature(box_syntax)]
#![feature(cfg_sanitize)]
Expand Down
2 changes: 2 additions & 0 deletions library/core/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -254,6 +254,8 @@ pub mod panicking;
pub mod pin;
pub mod raw;
pub mod result;
#[unstable(feature = "async_stream", issue = "79024")]
pub mod stream;
pub mod sync;

pub mod fmt;
Expand Down
127 changes: 127 additions & 0 deletions library/core/src/stream/mod.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,127 @@
//! Composable asynchronous iteration.
//!
//! If futures are asynchronous values, then streams are asynchronous
//! iterators. If you've found yourself with an asynchronous collection of some kind,
//! and needed to perform an operation on the elements of said collection,
//! you'll quickly run into 'streams'. Streams are heavily used in idiomatic
//! asynchronous Rust code, so it's worth becoming familiar with them.
//!
//! Before explaining more, let's talk about how this module is structured:
//!
//! # Organization
//!
//! This module is largely organized by type:
//!
//! * [Traits] are the core portion: these traits define what kind of streams
//! exist and what you can do with them. The methods of these traits are worth
//! putting some extra study time into.
//! * Functions provide some helpful ways to create some basic streams.
//! * Structs are often the return types of the various methods on this
//! module's traits. You'll usually want to look at the method that creates
//! the `struct`, rather than the `struct` itself. For more detail about why,
//! see '[Implementing Stream](#implementing-stream)'.
//!
//! [Traits]: #traits
//!
//! That's it! Let's dig into streams.
//!
//! # Stream
//!
//! The heart and soul of this module is the [`Stream`] trait. The core of
//! [`Stream`] looks like this:
//!
//! ```
//! # use core::task::{Context, Poll};
//! # use core::pin::Pin;
//! trait Stream {
//! type Item;
//! fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>>;
//! }
//! ```
//!
//! Unlike `Iterator`, `Stream` makes a distinction between the [`poll_next`]
//! method which is used when implementing a `Stream`, and a (to-be-implemented)
//! `next` method which is used when consuming a stream. Consumers of `Stream`
//! only need to consider `next`, which when called, returns a future which
//! yields `Option<Stream::Item>`.
//!
//! The future returned by `next` will yield `Some(Item)` as long as there are
//! elements, and once they've all been exhausted, will yield `None` to indicate
//! that iteration is finished. If we're waiting on something asynchronous to
//! resolve, the future will wait until the stream is ready to yield again.
//!
//! Individual streams may choose to resume iteration, and so calling `next`
//! again may or may not eventually yield `Some(Item)` again at some point.
//!
//! [`Stream`]'s full definition includes a number of other methods as well,
//! but they are default methods, built on top of [`poll_next`], and so you get
//! them for free.
//!
//! [`Poll`]: super::task::Poll
//! [`poll_next`]: Stream::poll_next
//!
//! # Implementing Stream
//!
//! Creating a stream of your own involves two steps: creating a `struct` to
//! hold the stream's state, and then implementing [`Stream`] for that
//! `struct`.
//!
//! Let's make a stream named `Counter` which counts from `1` to `5`:
//!
//! ```no_run
//! #![feature(async_stream)]
//! # use core::stream::Stream;
//! # use core::task::{Context, Poll};
//! # use core::pin::Pin;
//!
//! // First, the struct:
//!
//! /// A stream which counts from one to five
//! struct Counter {
//! count: usize,
//! }
//!
//! // we want our count to start at one, so let's add a new() method to help.
//! // This isn't strictly necessary, but is convenient. Note that we start
//! // `count` at zero, we'll see why in `poll_next()`'s implementation below.
//! impl Counter {
//! fn new() -> Counter {
//! Counter { count: 0 }
//! }
//! }
//!
//! // Then, we implement `Stream` for our `Counter`:
//!
//! impl Stream for Counter {
//! // we will be counting with usize
//! type Item = usize;
//!
//! // poll_next() is the only required method
//! fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
//! // Increment our count. This is why we started at zero.
//! self.count += 1;
//!
//! // Check to see if we've finished counting or not.
//! if self.count < 6 {
//! Poll::Ready(Some(self.count))
//! } else {
//! Poll::Ready(None)
//! }
//! }
//! }
//! ```
//!
//! # Laziness
//!
//! Streams are *lazy*. This means that just creating a stream doesn't _do_ a
//! whole lot. Nothing really happens until you call `next`. This is sometimes a
//! source of confusion when creating a stream solely for its side effects. The
//! compiler will warn us about this kind of behavior:
//!
//! ```text
//! warning: unused result that must be used: streams do nothing unless polled
//! ```

mod stream;

pub use stream::Stream;
110 changes: 110 additions & 0 deletions library/core/src/stream/stream/mod.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,110 @@
use crate::ops::DerefMut;
use crate::pin::Pin;
use crate::task::{Context, Poll};

/// An interface for dealing with asynchronous iterators.
///
/// This is the main stream trait. For more about the concept of streams
/// generally, please see the [module-level documentation]. In particular, you
/// may want to know how to [implement `Stream`][impl].
///
/// [module-level documentation]: index.html
/// [impl]: index.html#implementing-stream
#[unstable(feature = "async_stream", issue = "79024")]
#[must_use = "streams do nothing unless polled"]
pub trait Stream {
yoshuawuyts marked this conversation as resolved.
Show resolved Hide resolved
/// The type of items yielded by the stream.
type Item;

/// Attempt to pull out the next value of this stream, registering the
/// current task for wakeup if the value is not yet available, and returning
/// `None` if the stream is exhausted.
///
/// # Return value
///
/// There are several possible return values, each indicating a distinct
/// stream state:
///
/// - `Poll::Pending` means that this stream's next value is not ready
/// yet. Implementations will ensure that the current task will be notified
/// when the next value may be ready.
///
/// - `Poll::Ready(Some(val))` means that the stream has successfully
/// produced a value, `val`, and may produce further values on subsequent
/// `poll_next` calls.
///
/// - `Poll::Ready(None)` means that the stream has terminated, and
/// `poll_next` should not be invoked again.
yoshuawuyts marked this conversation as resolved.
Show resolved Hide resolved
///
/// # Panics
///
/// Once a stream has finished (returned `Ready(None)` from `poll_next`), calling its
/// `poll_next` method again may panic, block forever, or cause other kinds of
/// problems; the `Stream` trait places no requirements on the effects of
/// such a call. However, as the `poll_next` method is not marked `unsafe`,
/// Rust's usual rules apply: calls must never cause undefined behavior
/// (memory corruption, incorrect use of `unsafe` functions, or the like),
/// regardless of the stream's state.
fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>>;

/// Returns the bounds on the remaining length of the stream.
///
/// Specifically, `size_hint()` returns a tuple where the first element
/// is the lower bound, and the second element is the upper bound.
///
/// The second half of the tuple that is returned is an [`Option`]`<`[`usize`]`>`.
/// A [`None`] here means that either there is no known upper bound, or the
/// upper bound is larger than [`usize`].
///
/// # Implementation notes
///
/// It is not enforced that a stream implementation yields the declared
/// number of elements. A buggy stream may yield less than the lower bound
/// or more than the upper bound of elements.
///
/// `size_hint()` is primarily intended to be used for optimizations such as
/// reserving space for the elements of the stream, but must not be
/// trusted to e.g., omit bounds checks in unsafe code. An incorrect
/// implementation of `size_hint()` should not lead to memory safety
/// violations.
///
/// That said, the implementation should provide a correct estimation,
/// because otherwise it would be a violation of the trait's protocol.
///
/// The default implementation returns `(0, `[`None`]`)` which is correct for any
/// stream.
#[inline]
fn size_hint(&self) -> (usize, Option<usize>) {
(0, None)
}
}
yoshuawuyts marked this conversation as resolved.
Show resolved Hide resolved

#[unstable(feature = "async_stream", issue = "79024")]
impl<S: ?Sized + Stream + Unpin> Stream for &mut S {
type Item = S::Item;

fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
S::poll_next(Pin::new(&mut **self), cx)
}

fn size_hint(&self) -> (usize, Option<usize>) {
(**self).size_hint()
}
}

#[unstable(feature = "async_stream", issue = "79024")]
impl<P> Stream for Pin<P>
where
P: DerefMut + Unpin,
P::Target: Stream,
yoshuawuyts marked this conversation as resolved.
Show resolved Hide resolved
{
type Item = <P::Target as Stream>::Item;

fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
self.get_mut().as_mut().poll_next(cx)
}

fn size_hint(&self) -> (usize, Option<usize>) {
(**self).size_hint()
}
}
3 changes: 3 additions & 0 deletions library/std/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -224,6 +224,7 @@
#![feature(allocator_internals)]
#![feature(allow_internal_unsafe)]
#![feature(allow_internal_unstable)]
#![feature(async_stream)]
#![feature(arbitrary_self_types)]
#![feature(array_error_internals)]
#![feature(asm)]
Expand Down Expand Up @@ -448,6 +449,8 @@ pub use core::ptr;
pub use core::raw;
#[stable(feature = "rust1", since = "1.0.0")]
pub use core::result;
#[unstable(feature = "async_stream", issue = "79024")]
pub use core::stream;
#[stable(feature = "i128", since = "1.26.0")]
#[allow(deprecated, deprecated_in_future)]
pub use core::u128;
Expand Down
14 changes: 14 additions & 0 deletions library/std/src/panic.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ use crate::panicking;
use crate::pin::Pin;
use crate::ptr::{NonNull, Unique};
use crate::rc::Rc;
use crate::stream::Stream;
use crate::sync::atomic;
use crate::sync::{Arc, Mutex, RwLock};
use crate::task::{Context, Poll};
Expand Down Expand Up @@ -340,6 +341,19 @@ impl<F: Future> Future for AssertUnwindSafe<F> {
}
}

#[unstable(feature = "async_stream", issue = "79024")]
impl<S: Stream> Stream for AssertUnwindSafe<S> {
type Item = S::Item;

fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<S::Item>> {
unsafe { self.map_unchecked_mut(|x| &mut x.0) }.poll_next(cx)
}

fn size_hint(&self) -> (usize, Option<usize>) {
self.0.size_hint()
}
}

/// Invokes a closure, capturing the cause of an unwinding panic if one occurs.
///
/// This function will return `Ok` with the closure's result if the closure
Expand Down