Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: Add sink::unfold #2268

Merged
merged 2 commits into from
Dec 8, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
88 changes: 52 additions & 36 deletions futures-util/src/sink/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,11 +6,11 @@
//! This module is only available when the `sink` feature of this
//! library is activated, and it is activated by default.

use crate::future::Either;
use core::pin::Pin;
use futures_core::future::Future;
use futures_core::stream::{Stream, TryStream};
use futures_core::task::{Context, Poll};
use crate::future::Either;

#[cfg(feature = "compat")]
use crate::compat::CompatSink;
Expand Down Expand Up @@ -41,6 +41,9 @@ pub use self::send::Send;
mod send_all;
pub use self::send_all::SendAll;

mod unfold;
pub use self::unfold::{unfold, Unfold};

mod with;
pub use self::with::With;

Expand Down Expand Up @@ -69,10 +72,11 @@ pub trait SinkExt<Item>: Sink<Item> {
/// Note that this function consumes the given sink, returning a wrapped
/// version, much like `Iterator::map`.
fn with<U, Fut, F, E>(self, f: F) -> With<Self, Item, U, Fut, F>
where F: FnMut(U) -> Fut,
Fut: Future<Output = Result<Item, E>>,
E: From<Self::Error>,
Self: Sized
where
F: FnMut(U) -> Fut,
Fut: Future<Output = Result<Item, E>>,
E: From<Self::Error>,
Self: Sized,
{
With::new(self, f)
}
Expand Down Expand Up @@ -110,9 +114,10 @@ pub trait SinkExt<Item>: Sink<Item> {
/// # });
/// ```
fn with_flat_map<U, St, F>(self, f: F) -> WithFlatMap<Self, Item, U, St, F>
where F: FnMut(U) -> St,
St: Stream<Item = Result<Item, Self::Error>>,
Self: Sized
where
F: FnMut(U) -> St,
St: Stream<Item = Result<Item, Self::Error>>,
Self: Sized,
{
WithFlatMap::new(self, f)
}
Expand All @@ -133,8 +138,9 @@ pub trait SinkExt<Item>: Sink<Item> {

/// Transforms the error returned by the sink.
fn sink_map_err<E, F>(self, f: F) -> SinkMapErr<Self, F>
where F: FnOnce(Self::Error) -> E,
Self: Sized,
where
F: FnOnce(Self::Error) -> E,
Self: Sized,
{
SinkMapErr::new(self, f)
}
Expand All @@ -143,13 +149,13 @@ pub trait SinkExt<Item>: Sink<Item> {
///
/// If wanting to map errors of a `Sink + Stream`, use `.sink_err_into().err_into()`.
fn sink_err_into<E>(self) -> err_into::SinkErrInto<Self, Item, E>
where Self: Sized,
Self::Error: Into<E>,
where
Self: Sized,
Self::Error: Into<E>,
{
SinkErrInto::new(self)
}


/// Adds a fixed-size buffer to the current sink.
///
/// The resulting sink will buffer up to `capacity` items when the
Expand All @@ -164,14 +170,16 @@ pub trait SinkExt<Item>: Sink<Item> {
/// library is activated, and it is activated by default.
#[cfg(feature = "alloc")]
fn buffer(self, capacity: usize) -> Buffer<Self, Item>
where Self: Sized,
where
Self: Sized,
{
Buffer::new(self, capacity)
}

/// Close the sink.
fn close(&mut self) -> Close<'_, Self, Item>
where Self: Unpin,
where
Self: Unpin,
{
Close::new(self)
}
Expand All @@ -181,9 +189,10 @@ pub trait SinkExt<Item>: Sink<Item> {
/// This adapter clones each incoming item and forwards it to both this as well as
/// the other sink at the same time.
fn fanout<Si>(self, other: Si) -> Fanout<Self, Si>
where Self: Sized,
Item: Clone,
Si: Sink<Item, Error=Self::Error>
where
Self: Sized,
Item: Clone,
Si: Sink<Item, Error = Self::Error>,
{
Fanout::new(self, other)
}
Expand All @@ -193,7 +202,8 @@ pub trait SinkExt<Item>: Sink<Item> {
/// This adapter is intended to be used when you want to stop sending to the sink
/// until all current requests are processed.
fn flush(&mut self) -> Flush<'_, Self, Item>
where Self: Unpin,
where
Self: Unpin,
{
Flush::new(self)
}
Expand All @@ -205,7 +215,8 @@ pub trait SinkExt<Item>: Sink<Item> {
/// to batch together items to send via `send_all`, rather than flushing
/// between each item.**
fn send(&mut self, item: Item) -> Send<'_, Self, Item>
where Self: Unpin,
where
Self: Unpin,
{
Send::new(self, item)
}
Expand All @@ -221,12 +232,10 @@ pub trait SinkExt<Item>: Sink<Item> {
/// Doing `sink.send_all(stream)` is roughly equivalent to
/// `stream.forward(sink)`. The returned future will exhaust all items from
/// `stream` and send them to `self`.
fn send_all<'a, St>(
&'a mut self,
stream: &'a mut St
) -> SendAll<'a, Self, St>
where St: TryStream<Ok = Item, Error = Self::Error> + Stream + Unpin + ?Sized,
Self: Unpin,
fn send_all<'a, St>(&'a mut self, stream: &'a mut St) -> SendAll<'a, Self, St>
where
St: TryStream<Ok = Item, Error = Self::Error> + Stream + Unpin + ?Sized,
Self: Unpin,
{
SendAll::new(self, stream)
}
Expand All @@ -237,8 +246,9 @@ pub trait SinkExt<Item>: Sink<Item> {
/// This can be used in combination with the `right_sink` method to write `if`
/// statements that evaluate to different streams in different branches.
fn left_sink<Si2>(self) -> Either<Self, Si2>
where Si2: Sink<Item, Error = Self::Error>,
Self: Sized
where
Si2: Sink<Item, Error = Self::Error>,
Self: Sized,
{
Either::Left(self)
}
Expand All @@ -249,8 +259,9 @@ pub trait SinkExt<Item>: Sink<Item> {
/// This can be used in combination with the `left_sink` method to write `if`
/// statements that evaluate to different streams in different branches.
fn right_sink<Si1>(self) -> Either<Si1, Self>
where Si1: Sink<Item, Error = Self::Error>,
Self: Sized
where
Si1: Sink<Item, Error = Self::Error>,
Self: Sized,
{
Either::Right(self)
}
Expand All @@ -260,39 +271,44 @@ pub trait SinkExt<Item>: Sink<Item> {
#[cfg(feature = "compat")]
#[cfg_attr(docsrs, doc(cfg(feature = "compat")))]
fn compat(self) -> CompatSink<Self, Item>
where Self: Sized + Unpin,
where
Self: Sized + Unpin,
{
CompatSink::new(self)
}

/// A convenience method for calling [`Sink::poll_ready`] on [`Unpin`]
/// sink types.
fn poll_ready_unpin(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>>
where Self: Unpin
where
Self: Unpin,
{
Pin::new(self).poll_ready(cx)
}

/// A convenience method for calling [`Sink::start_send`] on [`Unpin`]
/// sink types.
fn start_send_unpin(&mut self, item: Item) -> Result<(), Self::Error>
where Self: Unpin
where
Self: Unpin,
{
Pin::new(self).start_send(item)
}

/// A convenience method for calling [`Sink::poll_flush`] on [`Unpin`]
/// sink types.
fn poll_flush_unpin(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>>
where Self: Unpin
where
Self: Unpin,
{
Pin::new(self).poll_flush(cx)
}

/// A convenience method for calling [`Sink::poll_close`] on [`Unpin`]
/// sink types.
fn poll_close_unpin(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>>
where Self: Unpin
where
Self: Unpin,
{
Pin::new(self).poll_close(cx)
}
Expand Down
83 changes: 83 additions & 0 deletions futures-util/src/sink/unfold.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,83 @@
use core::{future::Future, pin::Pin};
use futures_core::task::{Context, Poll};
use futures_sink::Sink;
use pin_project::pin_project;

/// Sink for the [`unfold`] function.
#[pin_project]
#[derive(Debug)]
#[must_use = "sinks do nothing unless polled"]
pub struct Unfold<T, F, R> {
state: Option<T>,
function: F,
#[pin]
future: Option<R>,

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

state and future are never present at the same time, so maybe the use of an enum could reduce the size of the struct?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I would accept a patch to reduce the size of unfold.

}

/// Create a sink from a function which processes one item at a time.
///
/// # Examples
///
/// ```
/// # futures::executor::block_on(async {
/// use futures::sink::{self, SinkExt};
///
/// let unfold = sink::unfold(0, |mut sum, i: i32| {
/// async move {
/// sum += i;
/// eprintln!("{}", i);
/// Ok::<_, futures::never::Never>(sum)
/// }
/// });
/// futures::pin_mut!(unfold);
/// unfold.send(5).await?;
/// # Ok::<(), futures::never::Never>(()) }).unwrap();
/// ```
pub fn unfold<T, F, R>(init: T, function: F) -> Unfold<T, F, R> {
Unfold {
state: Some(init),
function,
future: None,
}
}

impl<T, F, R, Item, E> Sink<Item> for Unfold<T, F, R>
where
F: FnMut(T, Item) -> R,
R: Future<Output = Result<T, E>>,
{
type Error = E;

fn poll_ready(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
self.poll_flush(cx)
}

fn start_send(self: Pin<&mut Self>, item: Item) -> Result<(), Self::Error> {
let mut this = self.project();
debug_assert!(this.future.is_none());
let future = (this.function)(this.state.take().unwrap(), item);
this.future.set(Some(future));
Ok(())
}

fn poll_flush(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
let mut this = self.project();
Poll::Ready(if let Some(future) = this.future.as_mut().as_pin_mut() {
let result = match ready!(future.poll(cx)) {
Ok(state) => {
*this.state = Some(state);
Ok(())
}
Err(err) => Err(err),
};
this.future.set(None);
result
} else {
Ok(())
})
}

fn poll_close(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
self.poll_flush(cx)
}
}
2 changes: 1 addition & 1 deletion futures/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -420,7 +420,7 @@ pub mod sink {

pub use futures_util::sink::{
Close, Flush, Send, SendAll, SinkErrInto, SinkMapErr, With,
SinkExt, Fanout, Drain, drain,
SinkExt, Fanout, Drain, drain, Unfold, unfold,
WithFlatMap,
};

Expand Down
38 changes: 38 additions & 0 deletions futures/tests/sink.rs
Original file line number Diff line number Diff line change
Expand Up @@ -610,6 +610,44 @@ fn sink_map_err() {
);
}

#[test]
fn sink_unfold() {
use futures::channel::mpsc;
use futures::executor::block_on;
use futures::future::poll_fn;
use futures::sink::{self, Sink, SinkExt};
use futures::task::Poll;

block_on(poll_fn(|cx| {
let (tx, mut rx) = mpsc::channel(1);
let unfold = sink::unfold((), |(), i: i32| {
let mut tx = tx.clone();
async move {
tx.send(i).await.unwrap();
Ok::<_, String>(())
}
});
futures::pin_mut!(unfold);
assert_eq!(unfold.as_mut().start_send(1), Ok(()));
assert_eq!(unfold.as_mut().poll_flush(cx), Poll::Ready(Ok(())));
assert_eq!(rx.try_next().unwrap(), Some(1));

assert_eq!(unfold.as_mut().poll_ready(cx), Poll::Ready(Ok(())));
assert_eq!(unfold.as_mut().start_send(2), Ok(()));
assert_eq!(unfold.as_mut().poll_ready(cx), Poll::Ready(Ok(())));
assert_eq!(unfold.as_mut().start_send(3), Ok(()));
assert_eq!(rx.try_next().unwrap(), Some(2));
assert!(rx.try_next().is_err());
assert_eq!(unfold.as_mut().poll_ready(cx), Poll::Ready(Ok(())));
assert_eq!(unfold.as_mut().start_send(4), Ok(()));
assert_eq!(unfold.as_mut().poll_flush(cx), Poll::Pending); // Channel full
assert_eq!(rx.try_next().unwrap(), Some(3));
assert_eq!(rx.try_next().unwrap(), Some(4));

Poll::Ready(())
}))
}

#[test]
fn err_into() {
use futures::channel::mpsc;
Expand Down