Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Bug: Sync channel returns RecvError #31

Open
zonyitoo opened this issue Jan 31, 2016 · 5 comments
Open

Bug: Sync channel returns RecvError #31

zonyitoo opened this issue Jan 31, 2016 · 5 comments

Comments

@zonyitoo
Copy link
Owner

Minimal test case:

extern crate coio;

use coio::Scheduler;
use coio::sync::mpsc;

fn main() {
    Scheduler::new().run(|| {
        let (tx, rx) = mpsc::sync_channel(1);
        let h = Scheduler::spawn(move|| {
            tx.send(1).unwrap();
        });

        assert_eq!(rx.recv().unwrap(), 1);

        h.join().unwrap();
    }).unwrap();
}

The program would panic with message:

thread 'Processor #0' panicked at 'called `Result::unwrap()` on an `Err` value: RecvError', ../src/libcore/result.rs:746
note: Run with `RUST_BACKTRACE=1` for a backtrace.
thread '<main>' panicked at 'called `Result::unwrap()` on an `Err` value: Any', ../src/libcore/result.rs:746
@zonyitoo zonyitoo added the bug label Jan 31, 2016
@zonyitoo
Copy link
Owner Author

This could be fixed by

extern crate coio;
extern crate env_logger;

use coio::Scheduler;
use coio::sync::mpsc;

fn main() {
    env_logger::init().unwrap();

    Scheduler::new().run(|| {
        let (tx, rx) = mpsc::sync_channel(1);
        let h = Scheduler::spawn(move|| {
            // 2. Push 1 into the queue and push <main> into the work queue
            tx.send(1).unwrap();

            // 3. Force yield the current coroutine,
            //    which will let the receiver have a chance to be waken up and read data
            Scheduler::sched();
        });

        // 1. The <main> coroutine block itself into the wait list
        assert_eq!(rx.recv().unwrap(), 1);

        h.join().unwrap();
    }).unwrap();
}

The official implementation of sync channel, the try_recv method will return Disconnected if it is the last one.

So when the last SyncSender is dropped, the receiver will always return Disconnected even if there are items remain in the queue.

@lhecker
Copy link
Collaborator

lhecker commented Jan 31, 2016

Ah good idea! I didn't think of this...
Maybe we should add a method to the Scheduler that works just like Scheduler::ready(coro) but yields to the coro if it's on the same Processor? Or would it still break if they run on 2 different Processors?

I mean: I think it should still break if 2 different Processors are used. So... Should we really adopt the standard Rust channel semantics? Why are they even using this weird Disconnected state? Shouldn't a channel be empty before a Disconnected is returned? IMHO that would make a lot more sense...

@zonyitoo
Copy link
Owner Author

I spent some time on reading the source code of the official implementation of mpsc channel, and I think it too complicated to reinvent this wheel. The behavior should have been discussed in RFC, so I think it is Ok to use it. Also, you cannot reproduce it with threads.

use std::thread;
use std::sync::mpsc;

fn main() {
    let (tx, rx) = mpsc::sync_channel(1);
    let t = thread::spawn(move|| {
        tx.send(1).unwrap();
    });

    t.join().unwrap();

    rx.recv().unwrap();
}

It always works because the official implementation will actually park the thread.

@lhecker
Copy link
Collaborator

lhecker commented Jan 31, 2016

I've read it too now...
So we either have to use the same thing as std (ACKs), or we have to wait until the queue is empty before we trigger a Disconnected result.
The former case will be a lot slower (btw: the std implementation uses Mutex?! Wow. 😞 ) and the latter case is dangerous because we don't want to wait until the overwhelmed receiver is finished even though all senders are already gone.
I think we should invent something ouselves here, because it can't be that complicated if even the std. implementation is using a Mutex for this - we can't really make it any worse right? 😐

@zonyitoo
Copy link
Owner Author

Yep. I agree.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

No branches or pull requests

2 participants