Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

OperatorDoOnEach breakes latter subscribtions after error wrapping lift #1689

Closed
MikolajKakol opened this issue Sep 16, 2014 · 8 comments
Closed
Labels

Comments

@MikolajKakol
Copy link

I have failing test:
final AtomicInteger count = new AtomicInteger(3);

    Observable.range(0, count.get())
            .map(THROW_ON_ODD)
            .map(i -> i) // #1
            .flatMap(Observable::just) // #2
            //.doOnNext(System.out::println) // #3
            .doOnEach(System.out::println) // #4
            .lift(OPTION_WRAP())
            .subscribe(
                    op -> {
                        System.out.println(op.toString());
                        count.decrementAndGet();
                    },
                    e -> System.out.println("It never will be printed" + e.getClass().getSimpleName()),
                    () -> System.out.println("end")
            );

    assertEquals(0, count.get());

All the magic is in lift in which I wanted to have error wrapping using something similar to com.google.common.base.Optional. So when onError is emitted I wrap it to error result and when onNext is emmited I wrap it valid result. Whole test source is here: https://gist.github.com/novemberox/e2b1b4e289ac45162847

Problem is with line marked as #3 and #4, if any of those is not commented test fails. Order of lines #1-4 doesn't matter, result is always the same. Since doOnXXX is side effect that shouldn't affect core observable I think is a bug.


You might ask me while I'm trying to treat error as valid data. Common thing is to have some action triggered from UI that would trigger some IO action and display result back to the UI.

I wanted have my UI to be expresed more as FRP so when UI is created I want to have few Subjects that would emit data events like text change or button clicked this way I can do all my business logic it that very moment.

However that is not working at all with Rx paradigm that after onError nothing should be emitted. This paradigm doesn't work well with UI, because I want to be able do IO at every button click event if it fails, my UI can handle both error and result nicely.

@MikolajKakol
Copy link
Author

The same issue occurs if we replace doOnEach(System.out::println) with startWith(Observable.empty()).

@zsxwing
Copy link
Member

zsxwing commented Sep 19, 2014

Since THROW_ON_ODD will throw IllegalArgumentException when i is 1, onNext should be called only twice: Option(1) and Option(IllegalArgumentException). So assertEquals(0, count.get()) will fail.

Actually you don't need to implement a new operator. You can use onErrorReturn, like:

o.map(i -> new Option<Integer>(i)).onErrorReturn(e -> new Option<Integer>(e))

@MikolajKakol
Copy link
Author

I wouldn't use custom operator if .map(i -> new Option<Integer>(i)).onErrorReturn(e -> new Option<Integer>(e)) work.

I redesigned my application architecture so now I don't have this issue. However I don't know if this situation is or is not bug.

@benjchristensen
Copy link
Member

There is no guarantee how this will behave as you are attempting something which breaks the Rx contract by trying to send onNext after a terminal event onError. Any operator along the way has the right to unsubscribe up and filter out further elements once a terminal event is received. See here: https://github.com/ReactiveX/RxJava/blob/1.x/src/main/java/rx/internal/operators/OperatorDoOnEach.java#L76

In other words, onNext -> onError -> onNext is invalid, so there is no guarantee you'll ever see the onNext after an onError. Even materialize() which makes the onError into a Notification will terminate once it receives the onError since that is a terminal event: https://github.com/ReactiveX/RxJava/blob/1.x/src/main/java/rx/internal/operators/OperatorMaterialize.java#L47

@MikolajKakol
Copy link
Author

Thanks for clarification. As I wrote before I found other design solution for building UI with Rx, I should share it. Our community lacks few chapters of best practices IRL development.

Correct me if I'm wrong, but Rx contract doesn't say anything about signal propagation if subscriber has unsubscribe. Will all the flatMaps/maps/``youNameItwill be executed until subscriber? CurrentlyOperatorMap` on each `onNext` call doesn't check `Subscriber.isUnsubscribed()`, can it change or should I assume that it might change?

@benjchristensen
Copy link
Member

OperatorMap is synchronous and just passes the subscription through, so it is not involved in producing new values, it's just part of the chain. This is why it doesn't need to check isUnsubscribed. The only places where that is needed are the places where values are produced. This is either the original source from Observable.OnSubscribe or inside an operator that acts asynchronously such as observeOn.

Note here how the child o is passed through to the parent via the constructor: https://github.com/ReactiveX/RxJava/blob/1.x/src/main/java/rx/internal/operators/OperatorMap.java#L39

This means when the child unsubscribes it is affecting the parent directly, there is no decoupling.

Any operator can check isUnsubscribed if it wants, but it is not necessary on most because of how the Subscriber IS the Subscription and is chained through.

The Rx contract for unsubscribe states the following:

4.4. Assume a best effort to stop all outstanding work on Unsubscribe

When unsubscribe is called on an observable subscription, the observable sequence will make a best effort attempt to stop all outstanding work. This means that any queued work that has not been started will not start.

Any work that is already in progress might still complete as it is not always safe to abort work that is in progress. Results from this work will not be signaled to any previously subscribed observer instances.

There are some operators however that do make guarantees. For example take guarantees to not deliver any further values after n are taken. This means after it unsubscribes, even if it receives more events it will discard them. That is the contract of the take operator, NOT of unsubscribe.

@benjchristensen
Copy link
Member

Anything further on this?

@MikolajKakol
Copy link
Author

Thanks for that great explanation. I think we're done with it.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
Projects
None yet
Development

No branches or pull requests

3 participants