Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Operation take leaks errors #212

Merged
merged 5 commits into from
Mar 31, 2013
Merged

Operation take leaks errors #212

merged 5 commits into from
Mar 31, 2013

Conversation

johngmyers
Copy link
Contributor

The take() operator improperly propagates an error from its source after the count has been exhausted.

Test cases exhibiting the bug attached.

@cloudbees-pull-request-builder

RxJava-pull-requests #55 FAILURE
Looks like there's a problem with this pull request

@benjchristensen
Copy link
Member

        @Test
        public void testTakeZeroDoesntLeakError() {
            Observable<String> source = Observable.<String> error(new Exception("test failed"));
            Observable.create(take(source, 0)).lastOrDefault("ok");
        }

In that test Observable.error will immediately throw when subscribed to so the count logic never even fires since the Observer never even gets to perform it's logic.

When 0 is used it's basically saying "don't execute this sequence".

Now ... whether that means onError should be ignored is an interesting question. onError and onCompleted are terminal states, not part of the "take count" which applies to onNext.

So it could be argued that for this use case it's doing exactly what it should - it doesn't emit any onNext values but terminates with either onError or onCompleted.

So, should "take 0" be special cased to always just fire onComplete immediately and not even subscribe to the sequence?

I'm interested in what Rx.Net and RxJs do in this case.

@benjchristensen
Copy link
Member

On the other unit test:

        @Test
        public void testTakeDoesntLeakErrors() {
            Observable<String> source = Observable.concat(Observable.from("one"), Observable.<String> error(new Exception("test failed")));
            Observable.create(take(source, 1)).last();
        }

This is an interesting one because concat will throw the error, not take. This is because these are all synchronous (and thus eager) observables so unsubscribe will not take effect and concat will process the second Observable which will cause onError to be called.

  1. Can concat be made lazy even when it is synchronous so an unsubscribe can happen and prevent the second sequence from being subscribed to?

  2. The take operator is performing all of it's logic inside onNext which means even if concat was lazy or asynchronous it would still receive the onError because it doesn't know to ignore an onNext until it is called.

Thus, it seems the logic of ItemObserver needs to be changed to perform the counter logic after EACH onNext, not before. In other words, after each onNext it decides if it is done, not when it receives the next one which opens up the door for an onError.

I'll explore that change.

Good finds @johngmyers

@benjchristensen
Copy link
Member

My quick attempt fixed this test case but broke others. The challenge is that the take overload with item count converts into a predicate function that is used by the implementation.

The predicate actually uses the onNext argument to allow the predicate to determine if it should be emitted or not.

This works for that case, but not the count case.

The predicate can't be applied in the onError case either for the count since there isn't an arg to pass in.

Perhaps some special case logic needs to exist for the count use case?

I need to punt on this for now as I don't have time to work on it so if you can work on the fix ... or someone else such as @mairbek who worked on this code once already that would be great.

@johngmyers
Copy link
Contributor Author

I believe the concat implementation currently is asynchronous/lazy. In any case, I believe take (as with other operators) should produce the same output sequence regardless of whether its source sequence is synchronous or asynchronous.

That means the implementations of take, takeWhile, etc. cannot depend on unsubscribing their source to prevent pass-through of subsequent events. The onError callback needs to check some isCompleted state for filtering pass-through. As take is trusted, the onCompleted callback needs to do the same.

I believe that take cannot be implemented on top of the takeWhile implementation, as is currently done. This is what my test cases demonstrate.

take(0) should be special-cased to fire onCompleted immediately. Per the contract, it needs to emit the first zero items emitted by the source Observable, which happens immediately. The existing contract does not state that take is required to subscribe to its source, so I didn't write a test for that. I do think it should be required to subscribe to its source, as subscription can have external side-effects.

@cloudbees-pull-request-builder

RxJava-pull-requests #57 SUCCESS
This pull request looks good

@benjchristensen
Copy link
Member

What I meant about concat was not that the implementation of concat was synchronous but that the observable sequences being concatenated were synchronous and that does change behavior because unsubscribe generally doesn't work on synchronous sequences.

However, as you said the output result should be the same, but on a synchronous origin observable it may end up firehosing all of the onNext values even if an unsubscribe happened early in the sequence and all after the unsubscribe will still be sent but end up ignored which is different behavior than an async sequence.

Taking a look at your code now ...

@benjchristensen
Copy link
Member

There's a lot of code there and my brain is done for the night so I'll have to catch up on this later.

By the way, I agree with your points in your last comment.

@johngmyers
Copy link
Contributor Author

You can't unsubscribe from a synchronous Observable because you don't have a Subscription until after all the synchronous notifications are done. An unwrapped AtomicObservableSubscription is not a subscription to the Observable.

You agree that take(0) should subscribe to its source? I'll generate another commit.

Is the semantic of AtomicObserver<T>.onNext() of catching exceptions from its target's onNext() and relaying them to onError() in the contract? Is it a bug that none of the trusted observables* I've looked at implement that semantic?

* except empty(), error(), and never(), which implement it vacuously

@johngmyers
Copy link
Contributor Author

Github's diff doesn't show this well, but I renamed OperatorTake to OperatorTakeWhile (hopefully preserving commit history), changed the class javadoc, and then deleted the take() code from it.

@benjchristensen
Copy link
Member

You can't unsubscribe from a synchronous Observable because you don't have a Subscription until after all the synchronous notifications are done. An unwrapped AtomicObservableSubscription is not a subscription to the Observable.

Yes I know. That was my point.

You agree that take(0) should subscribe to its source? I'll generate another commit.

I think so ... though if the subscribe emits an error (such as on Observable.throw) then take(0) will result in onError and not onCompleted.

Is the semantic of AtomicObserver.onNext() of catching exceptions from its target's onNext() and relaying them to onError() in the contract?

Rx Design Guidelines 6.5 dictates that subscribe should not throw.

The try/catch around an onNext is ensuring that this contract is met even if the implementation of an onNext by a provided Observer does not do proper error handling (most user provided Observers).

Guideline 6.6 discusses onError abort semantics so the AtomicObserver also makes sure unsubscribe happens when the terminal state occurs (onError/onCompleted) even if the Observer doesn't handle it correctly in their error handling (which most don't have).

It counts on unsubscribe being idempotent (guideline 6.17) which AtomicObservableSubscription also ensures.

@cloudbees-pull-request-builder

RxJava-pull-requests #61 SUCCESS
This pull request looks good

@johngmyers
Copy link
Contributor Author

Guideline 6.4 explicitly states:

Note: do not protect calls to Subscribe, Dispose, OnNext, OnError and OnCompleted methods. These calls are on the edge of the monad. Calling the OnError method from these places will lead to unexpected behavior.

This seems to expressly contraindicate the AtomicObserver.onNext() protection code.

benjchristensen added a commit that referenced this pull request Mar 31, 2013
@benjchristensen benjchristensen merged commit 6c1a1ab into ReactiveX:master Mar 31, 2013
@benjchristensen
Copy link
Member

I merged these changes via #215

Thank you for finding, demonstrating via unit test and fixing these issues @johngmyers

@benjchristensen
Copy link
Member

I have created a separate issue to discuss the other possible issue you brought up: #216

rickbw pushed a commit to rickbw/RxJava that referenced this pull request Jan 9, 2014
jihoonson pushed a commit to jihoonson/RxJava that referenced this pull request Mar 6, 2020
…eption is… (ReactiveX#213)

* ReactiveX#212 reactor.core.Exceptions$BubblingException thrown if exception is thrown in subscribe

* ReactiveX#35 Fix Codacity issues
jihoonson pushed a commit to jihoonson/RxJava that referenced this pull request Mar 6, 2020
…hat a CircuitBreakerException is thrown when you subscribe to Flux.error.
jihoonson pushed a commit to jihoonson/RxJava that referenced this pull request Mar 6, 2020
…hat an error is propagated when you subscribe to Flux.error, but the error is not thrown onSubscribe.
jihoonson pushed a commit to jihoonson/RxJava that referenced this pull request Mar 6, 2020
…ctiveX#214)

Check that we are getting the resilience exception even when the source
will throw an exception (not on subscribe)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants