Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Subscriber.isUnsubscribed always returns false for synchronous methods #813

Closed
nsk-mironov opened this issue Feb 5, 2014 · 10 comments
Closed

Comments

@nsk-mironov
Copy link
Contributor

The following program produces unexpected (at least for me) output:

public static void main(String[] args) throws Exception {
    final Subscription subscription = Observable
            .create(new Observable.OnSubscribe<Integer>() {
                @Override
                public void call(final Subscriber<? super Integer> subscriber) {
                    try {
                        for (int i = 0; !subscriber.isUnsubscribed(); i++) {
                            System.out.println("Observable : " + i);
                            subscriber.onNext(i);
                            Thread.sleep(2000);
                        }
                    } catch (Exception exception) {
                        subscriber.onError(exception);
                    }
                }
            })
            .subscribeOn(NewThreadScheduler.getInstance())
            .observeOn(NewThreadScheduler.getInstance())
            .subscribe(new Action1<Integer>() {
                @Override
                public void call(final Integer value) {
                    System.out.println("Observer : " + value);
                }
            });

    Thread.sleep(3000);
    subscription.unsubscribe();
}

The output is:

Observable : 0
Observer : 0
Observable : 1
Observer : 1
Observable : 2
Observable : 3
Observable : 4
Observable : 5
Observable : 6
Observable : 7
...

And I'm expecting to get the following output:

Observable : 0
Observer : 0
Observable : 1
Observer : 1

Am I wrong or it's a bug?

@akarnokd
Copy link
Member

akarnokd commented Feb 5, 2014

Hi. Generally, unsubscribe() is best-effort, so some values might slip through. The question is, does Observable: X stop at all? If not, that is a bug.

@nsk-mironov
Copy link
Contributor Author

@akarnokd, no, it never stops.

@zsxwing
Copy link
Member

zsxwing commented Feb 5, 2014

As OperationSubscribeOn has not been rewritten yet, subscriber.isUnsubscribed does not work on the subscribeOn method.

@benjchristensen
Copy link
Member

@mironov-nsk You are correct in your impression that it should work, that is the purpose of 0.17.

However, as @zsxwing said, we don't yet have subscribeOn and observeOn updated to the new model (which I want before we release 0.17 as they are important).

Right now if you avoid those and use a Scheduler with the synchronous code inside it demonstrates the theory of how it would work while still unsubscribing from the outside:

        final Subscription subscription = Observable
                .create(new Observable.OnSubscribe<Integer>() {
                    @Override
                    public void call(final Subscriber<? super Integer> subscriber) {
                        subscriber.add(NewThreadScheduler.getInstance().schedule(new Action1<Inner>() {

                            @Override
                            public void call(Inner inner) {
                                try {
                                    for (int i = 0; !inner.isUnsubscribed(); i++) {
                                        System.out.println("Observable : " + i);
                                        subscriber.onNext(i);
                                        Thread.sleep(2000);
                                    }
                                } catch (Exception exception) {
                                    subscriber.onError(exception);
                                }
                            }
                        }));

                    };
                })
                .subscribe(new Action1<Integer>() {
                    @Override
                    public void call(final Integer value) {
                        System.out.println("Observer : " + value);
                    }
                });

        Thread.sleep(3000);
        subscription.unsubscribe();

Note how the inner allows checking on the unsubscribe and I register the schedulers Subscription with Subscriber.

That said, we definitely want to have your code functional before releasing.

@nsk-mironov
Copy link
Contributor Author

@benjchristensen, thank you for clarifying that. Provided workaround works in this particular case, but doesn't work in a real case that I have. My Observable is a bit more complicated:

Observable.create(new Observable.OnSubscribe<String>() {
    @Override
    public void call(final Subscriber<? super String> subscriber) {
        try {
            final Socket socket = new Socket("127.0.0.1", 12345);
            final BufferedReader reader = new BufferedReader(new InputStreamReader(socket.getInputStream()));

            subscriber.add(Subscriptions.create(new Action0() {
                @Override
                public void call() {
                    IOUtils.closeQuietly(socket);
                }
            }));

            while (!subscriber.isUnsubscribed()) {
                subscriber.onNext(reader.readLine());
            }
        } catch (Exception exception) {
            subscriber.onError(exception);
        }
    }
 });

Server sends a new string very rarely and even if I unsubscribe from the Subscription, Observable won't know about this for a very long time because it's blocked on reader.readLine(). The only way to cancel the Observable "properly" is to provide an unsubscribe callback via subscriber.add and close a socket here (it will cause reader.readLine to fail with exception).

Unfortunately, I couldn't find a way to add an unsubscribe callback using Scheduler.Inner

@zsxwing
Copy link
Member

zsxwing commented Feb 6, 2014

You can add multiple Subscriptions to subscriber. For example,

        Observable.create(new Observable.OnSubscribe<String>() {
            @Override
            public void call(final Subscriber<? super String> subscriber) {
                try {
                    final Socket socket = new Socket("127.0.0.1", 12345);
                    subscriber.add(Subscriptions.create(new Action0() {
                        @Override
                        public void call() {
                            IOUtils.closeQuietly(socket);
                        }
                    }));
                    subscriber.add(NewThreadScheduler.getInstance().schedule(
                            new Action1<Inner>() {

                                @Override
                                public void call(Inner inner) {
                                    try {
                                        final BufferedReader reader = new BufferedReader(new InputStreamReader(socket.getInputStream()));
                                        while (!inner.isUnsubscribed()) {
                                            subscriber.onNext(reader.readLine());
                                        }
                                    } catch (Exception exception) {
                                        subscriber.onError(exception);
                                    }
                                }
                            }));
                } catch (Exception exception) {
                    subscriber.onError(exception);
                }
            }
        });

@nsk-mironov
Copy link
Contributor Author

@benjchristensen, @zsxwing thank you for your help. With your hints I managed to solve my problem, but the code became a bit more complicated than it has been before. Looking forward to the subscribeOn fix.

@benjchristensen
Copy link
Member

Agreed. I hope to get subscribeOn and observeOn in soon. @zsxwing has submitted a PR for subscribeOn and I'm reviewing it shortly.

@nsk-mironov
Copy link
Contributor Author

This issue seems to be fixed in the 0.17.0-RC2

@benjchristensen
Copy link
Member

Excellent, thank you for confirming.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

4 participants