Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

A sink dual for stream::unfold #2274

Closed
piegamesde opened this issue Nov 28, 2020 · 2 comments
Closed

A sink dual for stream::unfold #2274

piegamesde opened this issue Nov 28, 2020 · 2 comments
Labels
A-sink Area: futures::sink C-feature-request

Comments

@piegamesde
Copy link

See my StackOverflow question for more details on my problem. I hacked together a prototype and I'm willing to contribute it upstream given there's interest in it (and some guidance; I'm novice):

fn fold<T, F, Fut, Item, E>(init: T, f: F) -> FoldSink<T, F, Fut>
where
    F: FnMut(T, Item) -> Fut,
    Fut: Future<Output = Result<T, E>>
{
    FoldSink {
        f,
        state: Some(init),
        fut: None,
    }
}


use pin_project::pin_project;

#[pin_project]
struct FoldSink<T, F, Fut> {
    f: F,
    state: Option<T>,
    #[pin]
    fut: Option<Fut>,
}

impl <T, Item, F, Fut, E> futures::sink::Sink<Item> for FoldSink<T, F, Fut>
where
    F: FnMut(T, Item) -> Fut,
    Fut: Future<Output = Result<T, E>>
{
    type Error = E;

    fn poll_ready(self: std::pin::Pin<&mut Self>, ctx: &mut std::task::Context<'_>) -> Poll<Result<(), E>> {
        let mut this = self.project();

        match this.fut.as_mut().as_pin_mut() {
            Some(fut) => {
                match fut.poll(ctx) {
                    Poll::Ready(Ok(new_state)) => {
                        this.fut.set(None);
                        *this.state = Some(new_state);
                        Poll::Ready(Ok(()))
                    },
                    Poll::Ready(Err(e)) => {
                        this.fut.set(None);
                        Poll::Ready(Err(e))
                    },
                    Poll::Pending => Poll::Pending,
                }
            },
            None => {
                Poll::Ready(Ok(()))
            }
        }
    }

    fn start_send(self: std::pin::Pin<&mut Self>, item: Item) -> Result<(), E> {
        let mut this = self.project();
        this.fut.set(Some((this.f)(this.state.take().expect("todo invalid state"), item)));
        Ok(())
    }

    fn poll_flush(self: std::pin::Pin<&mut Self>, ctx: &mut std::task::Context<'_>) -> Poll<Result<(), E>> {
        self.poll_ready(ctx)
    }

    fn poll_close(mut self: std::pin::Pin<&mut Self>, ctx: &mut std::task::Context<'_>) -> Poll<Result<(), E>> {
        futures::ready!(self.as_mut().poll_ready(ctx))?;
        let this = self.project();
        this.state.take().unwrap();
        Poll::Ready(Ok(()))
    }
}

Of course the name is completely subject to bike shedding. I picked fold as the dual to unfold for lack of a better idea.

I think the size of the FoldSink struct could be reduced with an enum, as state and fut are mutually exclusive.

Maybe we want to add some try_ variant to this as well?

@taiki-e
Copy link
Member

taiki-e commented Nov 28, 2020

Probably #2268 resolve this, right?

@taiki-e taiki-e added C-feature-request S-waiting-on-author Status: This is awaiting some action (such as code changes or more information) from the author labels Nov 28, 2020
@piegamesde
Copy link
Author

Waaaaa

I could have saved so many hours if I'd searched for PRs as well 😅

@taiki-e taiki-e removed the S-waiting-on-author Status: This is awaiting some action (such as code changes or more information) from the author label Nov 28, 2020
@taiki-e taiki-e added the A-sink Area: futures::sink label Jan 1, 2021
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
A-sink Area: futures::sink C-feature-request
Projects
None yet
Development

No branches or pull requests

2 participants