diff --git a/futures-util/src/sink/mod.rs b/futures-util/src/sink/mod.rs index b0e2c83f00..7fe5b055ed 100644 --- a/futures-util/src/sink/mod.rs +++ b/futures-util/src/sink/mod.rs @@ -6,11 +6,11 @@ //! This module is only available when the `sink` feature of this //! library is activated, and it is activated by default. +use crate::future::Either; use core::pin::Pin; use futures_core::future::Future; use futures_core::stream::{Stream, TryStream}; use futures_core::task::{Context, Poll}; -use crate::future::Either; #[cfg(feature = "compat")] use crate::compat::CompatSink; @@ -41,6 +41,9 @@ pub use self::send::Send; mod send_all; pub use self::send_all::SendAll; +mod unfold; +pub use self::unfold::{unfold, Unfold}; + mod with; pub use self::with::With; @@ -69,10 +72,11 @@ pub trait SinkExt: Sink { /// Note that this function consumes the given sink, returning a wrapped /// version, much like `Iterator::map`. fn with(self, f: F) -> With - where F: FnMut(U) -> Fut, - Fut: Future>, - E: From, - Self: Sized + where + F: FnMut(U) -> Fut, + Fut: Future>, + E: From, + Self: Sized, { With::new(self, f) } @@ -110,9 +114,10 @@ pub trait SinkExt: Sink { /// # }); /// ``` fn with_flat_map(self, f: F) -> WithFlatMap - where F: FnMut(U) -> St, - St: Stream>, - Self: Sized + where + F: FnMut(U) -> St, + St: Stream>, + Self: Sized, { WithFlatMap::new(self, f) } @@ -133,8 +138,9 @@ pub trait SinkExt: Sink { /// Transforms the error returned by the sink. fn sink_map_err(self, f: F) -> SinkMapErr - where F: FnOnce(Self::Error) -> E, - Self: Sized, + where + F: FnOnce(Self::Error) -> E, + Self: Sized, { SinkMapErr::new(self, f) } @@ -143,13 +149,13 @@ pub trait SinkExt: Sink { /// /// If wanting to map errors of a `Sink + Stream`, use `.sink_err_into().err_into()`. fn sink_err_into(self) -> err_into::SinkErrInto - where Self: Sized, - Self::Error: Into, + where + Self: Sized, + Self::Error: Into, { SinkErrInto::new(self) } - /// Adds a fixed-size buffer to the current sink. /// /// The resulting sink will buffer up to `capacity` items when the @@ -164,14 +170,16 @@ pub trait SinkExt: Sink { /// library is activated, and it is activated by default. #[cfg(feature = "alloc")] fn buffer(self, capacity: usize) -> Buffer - where Self: Sized, + where + Self: Sized, { Buffer::new(self, capacity) } /// Close the sink. fn close(&mut self) -> Close<'_, Self, Item> - where Self: Unpin, + where + Self: Unpin, { Close::new(self) } @@ -181,9 +189,10 @@ pub trait SinkExt: Sink { /// This adapter clones each incoming item and forwards it to both this as well as /// the other sink at the same time. fn fanout(self, other: Si) -> Fanout - where Self: Sized, - Item: Clone, - Si: Sink + where + Self: Sized, + Item: Clone, + Si: Sink, { Fanout::new(self, other) } @@ -193,7 +202,8 @@ pub trait SinkExt: Sink { /// This adapter is intended to be used when you want to stop sending to the sink /// until all current requests are processed. fn flush(&mut self) -> Flush<'_, Self, Item> - where Self: Unpin, + where + Self: Unpin, { Flush::new(self) } @@ -205,7 +215,8 @@ pub trait SinkExt: Sink { /// to batch together items to send via `send_all`, rather than flushing /// between each item.** fn send(&mut self, item: Item) -> Send<'_, Self, Item> - where Self: Unpin, + where + Self: Unpin, { Send::new(self, item) } @@ -221,12 +232,10 @@ pub trait SinkExt: Sink { /// Doing `sink.send_all(stream)` is roughly equivalent to /// `stream.forward(sink)`. The returned future will exhaust all items from /// `stream` and send them to `self`. - fn send_all<'a, St>( - &'a mut self, - stream: &'a mut St - ) -> SendAll<'a, Self, St> - where St: TryStream + Stream + Unpin + ?Sized, - Self: Unpin, + fn send_all<'a, St>(&'a mut self, stream: &'a mut St) -> SendAll<'a, Self, St> + where + St: TryStream + Stream + Unpin + ?Sized, + Self: Unpin, { SendAll::new(self, stream) } @@ -237,8 +246,9 @@ pub trait SinkExt: Sink { /// This can be used in combination with the `right_sink` method to write `if` /// statements that evaluate to different streams in different branches. fn left_sink(self) -> Either - where Si2: Sink, - Self: Sized + where + Si2: Sink, + Self: Sized, { Either::Left(self) } @@ -249,8 +259,9 @@ pub trait SinkExt: Sink { /// This can be used in combination with the `left_sink` method to write `if` /// statements that evaluate to different streams in different branches. fn right_sink(self) -> Either - where Si1: Sink, - Self: Sized + where + Si1: Sink, + Self: Sized, { Either::Right(self) } @@ -260,15 +271,17 @@ pub trait SinkExt: Sink { #[cfg(feature = "compat")] #[cfg_attr(docsrs, doc(cfg(feature = "compat")))] fn compat(self) -> CompatSink - where Self: Sized + Unpin, + where + Self: Sized + Unpin, { CompatSink::new(self) } - + /// A convenience method for calling [`Sink::poll_ready`] on [`Unpin`] /// sink types. fn poll_ready_unpin(&mut self, cx: &mut Context<'_>) -> Poll> - where Self: Unpin + where + Self: Unpin, { Pin::new(self).poll_ready(cx) } @@ -276,7 +289,8 @@ pub trait SinkExt: Sink { /// A convenience method for calling [`Sink::start_send`] on [`Unpin`] /// sink types. fn start_send_unpin(&mut self, item: Item) -> Result<(), Self::Error> - where Self: Unpin + where + Self: Unpin, { Pin::new(self).start_send(item) } @@ -284,7 +298,8 @@ pub trait SinkExt: Sink { /// A convenience method for calling [`Sink::poll_flush`] on [`Unpin`] /// sink types. fn poll_flush_unpin(&mut self, cx: &mut Context<'_>) -> Poll> - where Self: Unpin + where + Self: Unpin, { Pin::new(self).poll_flush(cx) } @@ -292,7 +307,8 @@ pub trait SinkExt: Sink { /// A convenience method for calling [`Sink::poll_close`] on [`Unpin`] /// sink types. fn poll_close_unpin(&mut self, cx: &mut Context<'_>) -> Poll> - where Self: Unpin + where + Self: Unpin, { Pin::new(self).poll_close(cx) } diff --git a/futures-util/src/sink/unfold.rs b/futures-util/src/sink/unfold.rs new file mode 100644 index 0000000000..e8eda14a45 --- /dev/null +++ b/futures-util/src/sink/unfold.rs @@ -0,0 +1,83 @@ +use core::{future::Future, pin::Pin}; +use futures_core::task::{Context, Poll}; +use futures_sink::Sink; +use pin_project::pin_project; + +/// Sink for the [`unfold`] function. +#[pin_project] +#[derive(Debug)] +#[must_use = "sinks do nothing unless polled"] +pub struct Unfold { + state: Option, + function: F, + #[pin] + future: Option, +} + +/// Create a sink from a function which processes one item at a time. +/// +/// # Examples +/// +/// ``` +/// # futures::executor::block_on(async { +/// use futures::sink::{self, SinkExt}; +/// +/// let unfold = sink::unfold(0, |mut sum, i: i32| { +/// async move { +/// sum += i; +/// eprintln!("{}", i); +/// Ok::<_, futures::never::Never>(sum) +/// } +/// }); +/// futures::pin_mut!(unfold); +/// unfold.send(5).await?; +/// # Ok::<(), futures::never::Never>(()) }).unwrap(); +/// ``` +pub fn unfold(init: T, function: F) -> Unfold { + Unfold { + state: Some(init), + function, + future: None, + } +} + +impl Sink for Unfold +where + F: FnMut(T, Item) -> R, + R: Future>, +{ + type Error = E; + + fn poll_ready(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { + self.poll_flush(cx) + } + + fn start_send(self: Pin<&mut Self>, item: Item) -> Result<(), Self::Error> { + let mut this = self.project(); + debug_assert!(this.future.is_none()); + let future = (this.function)(this.state.take().unwrap(), item); + this.future.set(Some(future)); + Ok(()) + } + + fn poll_flush(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { + let mut this = self.project(); + Poll::Ready(if let Some(future) = this.future.as_mut().as_pin_mut() { + let result = match ready!(future.poll(cx)) { + Ok(state) => { + *this.state = Some(state); + Ok(()) + } + Err(err) => Err(err), + }; + this.future.set(None); + result + } else { + Ok(()) + }) + } + + fn poll_close(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { + self.poll_flush(cx) + } +} diff --git a/futures/src/lib.rs b/futures/src/lib.rs index 34f93998df..4a91f7ae35 100644 --- a/futures/src/lib.rs +++ b/futures/src/lib.rs @@ -420,7 +420,7 @@ pub mod sink { pub use futures_util::sink::{ Close, Flush, Send, SendAll, SinkErrInto, SinkMapErr, With, - SinkExt, Fanout, Drain, drain, + SinkExt, Fanout, Drain, drain, Unfold, unfold, WithFlatMap, }; diff --git a/futures/tests/sink.rs b/futures/tests/sink.rs index 8ed201e394..83a6a78532 100644 --- a/futures/tests/sink.rs +++ b/futures/tests/sink.rs @@ -610,6 +610,44 @@ fn sink_map_err() { ); } +#[test] +fn sink_unfold() { + use futures::channel::mpsc; + use futures::executor::block_on; + use futures::future::poll_fn; + use futures::sink::{self, Sink, SinkExt}; + use futures::task::Poll; + + block_on(poll_fn(|cx| { + let (tx, mut rx) = mpsc::channel(1); + let unfold = sink::unfold((), |(), i: i32| { + let mut tx = tx.clone(); + async move { + tx.send(i).await.unwrap(); + Ok::<_, String>(()) + } + }); + futures::pin_mut!(unfold); + assert_eq!(unfold.as_mut().start_send(1), Ok(())); + assert_eq!(unfold.as_mut().poll_flush(cx), Poll::Ready(Ok(()))); + assert_eq!(rx.try_next().unwrap(), Some(1)); + + assert_eq!(unfold.as_mut().poll_ready(cx), Poll::Ready(Ok(()))); + assert_eq!(unfold.as_mut().start_send(2), Ok(())); + assert_eq!(unfold.as_mut().poll_ready(cx), Poll::Ready(Ok(()))); + assert_eq!(unfold.as_mut().start_send(3), Ok(())); + assert_eq!(rx.try_next().unwrap(), Some(2)); + assert!(rx.try_next().is_err()); + assert_eq!(unfold.as_mut().poll_ready(cx), Poll::Ready(Ok(()))); + assert_eq!(unfold.as_mut().start_send(4), Ok(())); + assert_eq!(unfold.as_mut().poll_flush(cx), Poll::Pending); // Channel full + assert_eq!(rx.try_next().unwrap(), Some(3)); + assert_eq!(rx.try_next().unwrap(), Some(4)); + + Poll::Ready(()) + })) +} + #[test] fn err_into() { use futures::channel::mpsc;