Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

.retry with .onErrorResumeNext #1023

Closed
AlexeyRaga opened this issue Apr 8, 2014 · 2 comments
Closed

.retry with .onErrorResumeNext #1023

AlexeyRaga opened this issue Apr 8, 2014 · 2 comments

Comments

@AlexeyRaga
Copy link

Hi Guys,

found another bug that is critical from my point of view:

when using both .retry and .onErrorResumeNext combinators the "original" sequence is being consumed after error.

Example:

val brokenStream = Observable.interval(500 millis).map(randomBreak(_))
brokenStream
    .retry(retryCount = 3)
    .onErrorResumeNext(Observable.empty)
    .subscribe(println(_))

In this example, after 3 errors the subscriber switches to the alternative (empty) sequence correctly, but the original brokenStream sequence is continued being consumed non stop. It can easily be seen by writing a log inside the randomBreak function.

This bug is not reproducible when only retry or only onErrorResumeNext combinators are used. Using both causes the problem.

@benjchristensen
Copy link
Member

I believe this is solved as of 0.18.3 when the operators were migrated to use lift.

I tried confirming with this code:

import rx.Observable;
import rx.Subscriber;
import rx.schedulers.Schedulers;

public class TestRetryOnErrorResumeNext {

    public static void main(String[] args) {
        Observable<Long> o = Observable.create((Subscriber<? super Long> s) -> {
            long l = 0;
            while (!s.isUnsubscribed()) {
                System.out.println("Emitting l: " + l);
                s.onNext(l++);
                if (l > 10) {
                    s.onError(new RuntimeException("forced failure"));
                    break;
                }
            }
        }).subscribeOn(Schedulers.computation());

        o.retry(3).onErrorResumeNext(Observable.empty()).subscribe(System.out::println);

        try {
            Thread.sleep(1000);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }

    }
}

If the issue still exists in 0.18.3+ please re-open this with a unit test.

@benjchristensen
Copy link
Member

Ah, fix was in #1027

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

2 participants