Skip to content
This repository has been archived by the owner on Nov 5, 2018. It is now read-only.

Futures support #22

Closed
tekjar opened this issue Jan 3, 2018 · 12 comments
Closed

Futures support #22

tekjar opened this issue Jan 3, 2018 · 12 comments

Comments

@tekjar
Copy link

tekjar commented Jan 3, 2018

Hi. Thanks a lot for this crate. Are there any plans of adding futures support?

@ghost
Copy link

ghost commented Jan 3, 2018

Yes, adding support for futures shouldn't be too difficult to do.

@tekjar
Copy link
Author

tekjar commented Jan 3, 2018

Futures channels are not very ergonomic IMO and they are not mpmc. crossbeam-channel might add good value in those cases (:

@ghost
Copy link

ghost commented Jan 6, 2018

The first step here would be to implement Sink for Sender and Stream for Receiver.

But what should we do about select_loop!, if anything at all? Perhaps we should have another macro variant geared at futures, which instead of calling std::thread::park() yields from the current future. Perhaps it should just do yield;, assuming that we're inside a generator (as in https://github.com/alexcrichton/futures-await). Any other ideas?

That said, futures are already pretty flexible on their own so it's not hard to write a loop yourself that simply polls Sinks and Streams, and then yield;s or returns Ok(Async::NotReady) after each iteration.

@tekjar What are the current pain points of futures channels? I don't have much experience with them, so I'm curious.

@tekjar
Copy link
Author

tekjar commented Jan 6, 2018

  1. They are not mpmc. If I want to run multiple reactors in multiple threads and share the load, mpmc makes my job very easy
  2. I need to clone the tx very frequently as combinators consume the future. And cloning will increase the size of the queue
let receiver = receiver.for_each(move |packet| {
    let tx = tx.clone();
    tx.send(reply).map(|_| ()).map_err(|_| io::Error::new(ErrorKind::Other, "Error sending reply"));
    future::ok(())
}

And using references are difficult

let fut1 = s.filter(move |v| *v == 3).for_each(move |v| { 
    let mut tx = &mut tx1; 
    tx.send(1000).map(move |tx| ()).map_err(|e| ()) 
});
let mut tx = &mut tx1;
   |                      ^^^^^^^^
note: first, the lifetime cannot outlive the lifetime  as defined on the body

Not sure if crossbeam-channel can solve this

  1. Reusing rx is difficult.
    If I want to reuse the rx after reactor ends, I need get it out of reactor result (which isn't simple) or use the reference.
let commands_rx = commands_rx.by_ref();
let network_send = commands_rx.map(move |msg| {
    // do something
}).forward(sender)

This works in a loop but creating this future in a method is not trivial as you need to pass rx reference

  1. Rust has too many channel interfaces. Standard library, Futures, Chan and Cross beam channels. Since crossbeam channels are the most convenient in the list, It would be nice to be able to use it everywhere

@ghost ghost added the help wanted label Jan 11, 2018
@jeehoonkang
Copy link
Contributor

FYI, the futures 0.2 RFC names futures-channel as one of its subcrates: https://github.com/cramertj/futures-rfcs/blob/futures-0.2/futures-02.md . Though I'm not sure if it's relevant here.

@ghost
Copy link

ghost commented Feb 20, 2018

@cramertj I'm planning to add futures support to crossbeam-channel soon.

Rather than simply implementing Sink and Stream on Sender and Receiver, I'm thinking of creating entirely separate types for working with futures. The reason is that that using the select macro when writing code inside a future would block the current thread rather than the current task, which is a potential footgun.

Therefore, the idea is to create entirely separate Sender and Receiver types that only work with futures, together with a special select macro for those types:

// Traditional channels without futures support.
struct Sender<T> { ... }
struct Receiver<T> { ... }

struct Iter<T> { ... }
struct IntoIter<T> { ... }
struct TryIter<T> { ... }

fn unbounded<T>() -> (Sender<T>, Receiver<T>) { ... }
fn bounded<T>(cap: usize) -> (Sender<T>, Receiver<T>) { ... }

// This macro works only with `Sender` and `Receiver` in the root
// module (the ones without futures support).
macro_rules! select { ... }

// Error types live here.
mod errors {
    ...
}

// Channels that work with futures.
mod async {
    // Implements `Sink`.
    struct Sender<T> { ... }
    // Implements `Stream`.
    struct Receiver<T> { ... }

    fn unbounded<T>() -> (Sender<T>, Receiver<T>) { ... }
    fn bounded<T>(cap: usize) -> (Sender<T>, Receiver<T>) { ... }
}

// Like `select!`, but it uses `yield` instead of `thread::park()`.
//
// This macro works only with `Sender` and `Receiver` in the `async` module.
macro_rules! async_select { ... }

Do you have any thoughts on this split between traditional and asynchronous channels?

Also, since you proposed creating a new futures-channel crate, should we somehow consolidate our efforts? Perhaps my async module should actually live inside futures-channel? I'm not sure what's the right thing to do here. :)

@cramertj
Copy link

@stjepang I'd love to see improvements to the futures channels! Currently, the latest implementations live in the futures-channel crate in the futures-rs repo, on the 0.2 branch. I'd love to chat w/ you about how to improve on those APIs-- can you create an issue on futures-rs?

@danburkert
Copy link
Contributor

I've put together a proof-of-concept futures-aware bounded mpsc channel which is living in this branch: https://github.com/danburkert/crossbeam-channel/tree/futures-channel. There's related discussion happening in rust-lang/futures-rs#800. I probably won't have a ton of time to dedicate to getting this over the finish-line, either in terms of merging into futures-rs or into crossbeam-channel (I don't yet know which is more appropriate), so if anyone is interested in pushing on it, please go ahead.

@seunlanlege
Copy link

@danburkert would love to extend your Poc to support mpmc, just not sure how to go about it. If you could point me to the relevant resources to read up on, I should be able to have a working Poc by tomorrow.

@danburkert
Copy link
Contributor

Hey @seunlanlege, that sounds great! I've pushed a commit which expands the implementation comments/documentation so hopefully it is easier to see how the channel mechanisms work. I'd suggest looking over that with a special focus on Channel::sender_tasks and SenderTask.

As far as extending to an mpmc channel, I think you'll have to replace the receiver_task: AtomicTask with a channel, just like the sender equivalent is right now.

@seunlanlege
Copy link

WIP pr here #38

jonhoo added a commit to jonhoo/trawler that referenced this issue Mar 29, 2018
Issuers are less efficient than we'd like since we now need to
alternate between turning the core and trying to receive new jobs. This
will be fixed once the crossbeam-channel crate is futures-aware:

crossbeam-rs/crossbeam-channel#22
@ghost
Copy link

ghost commented Sep 16, 2018

As mentioned in this comment, I think it'd probably make more sense to simply build a new crate with futures support from scratch. There are probably no real blockers, I just need to dedicate time and effort to it.

This issue was closed.
Sign up for free to subscribe to this conversation on GitHub. Already have an account? Sign in.
Development

No branches or pull requests

5 participants