From 53a46bd5d84114e67832b9dfe30c41e62fbd8ded Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?D=C3=A1vid=20Karnok?= Date: Mon, 7 Nov 2016 19:22:21 +0100 Subject: [PATCH] 2.x: Observable.repeatWhen fix for onError --- src/main/java/io/reactivex/Observable.java | 4 +-- .../operators/observable/ObservableRedo.java | 25 +++++++++++++++---- .../flowable/FlowableRepeatTest.java | 14 +++++++++++ .../observable/ObservableRepeatTest.java | 14 +++++++++++ 4 files changed, 50 insertions(+), 7 deletions(-) diff --git a/src/main/java/io/reactivex/Observable.java b/src/main/java/io/reactivex/Observable.java index a9f71a64a7..5757b0a999 100644 --- a/src/main/java/io/reactivex/Observable.java +++ b/src/main/java/io/reactivex/Observable.java @@ -8566,7 +8566,7 @@ public final Observable repeatUntil(BooleanSupplier stop) { @SchedulerSupport(SchedulerSupport.NONE) public final Observable repeatWhen(final Function, ? extends ObservableSource> handler) { ObjectHelper.requireNonNull(handler, "handler is null"); - return RxJavaPlugins.onAssembly(new ObservableRedo(this, ObservableInternalHelper.repeatWhenHandler(handler))); + return RxJavaPlugins.onAssembly(new ObservableRedo(this, ObservableInternalHelper.repeatWhenHandler(handler), false)); } /** @@ -9219,7 +9219,7 @@ public final Observable retryUntil(final BooleanSupplier stop) { public final Observable retryWhen( final Function, ? extends ObservableSource> handler) { ObjectHelper.requireNonNull(handler, "handler is null"); - return RxJavaPlugins.onAssembly(new ObservableRedo(this, ObservableInternalHelper.retryWhenHandler(handler))); + return RxJavaPlugins.onAssembly(new ObservableRedo(this, ObservableInternalHelper.retryWhenHandler(handler), true)); } /** diff --git a/src/main/java/io/reactivex/internal/operators/observable/ObservableRedo.java b/src/main/java/io/reactivex/internal/operators/observable/ObservableRedo.java index 59c8ebf05f..14b1cb8413 100644 --- a/src/main/java/io/reactivex/internal/operators/observable/ObservableRedo.java +++ b/src/main/java/io/reactivex/internal/operators/observable/ObservableRedo.java @@ -27,10 +27,14 @@ public final class ObservableRedo extends AbstractObservableWithUpstream { final Function>, ? extends ObservableSource> manager; + final boolean retryMode; + public ObservableRedo(ObservableSource source, - Function>, ? extends ObservableSource> manager) { + Function>, ? extends ObservableSource> manager, + boolean retryMode) { super(source); this.manager = manager; + this.retryMode = retryMode; } @Override @@ -38,7 +42,7 @@ public void subscribeActual(Observer s) { Subject> subject = BehaviorSubject.>create().toSerialized(); - final RedoObserver parent = new RedoObserver(s, subject, source); + final RedoObserver parent = new RedoObserver(s, subject, source, retryMode); ToNotificationObserver actionObserver = new ToNotificationObserver(new Consumer>() { @Override @@ -73,13 +77,16 @@ static final class RedoObserver extends AtomicBoolean implements Observer final ObservableSource source; final SequentialDisposable arbiter; + final boolean retryMode; + final AtomicInteger wip = new AtomicInteger(); - RedoObserver(Observer actual, Subject> subject, ObservableSource source) { + RedoObserver(Observer actual, Subject> subject, ObservableSource source, boolean retryMode) { this.actual = actual; this.subject = subject; this.source = source; this.arbiter = new SequentialDisposable(); + this.retryMode = retryMode; this.lazySet(true); } @@ -96,14 +103,22 @@ public void onNext(T t) { @Override public void onError(Throwable t) { if (compareAndSet(false, true)) { - subject.onNext(Notification.createOnError(t)); + if (retryMode) { + subject.onNext(Notification.createOnError(t)); + } else { + subject.onError(t); + } } } @Override public void onComplete() { if (compareAndSet(false, true)) { - subject.onNext(Notification.createOnComplete()); + if (retryMode) { + subject.onComplete(); + } else { + subject.onNext(Notification.createOnComplete()); + } } } diff --git a/src/test/java/io/reactivex/internal/operators/flowable/FlowableRepeatTest.java b/src/test/java/io/reactivex/internal/operators/flowable/FlowableRepeatTest.java index 96d577eb0a..40989b5363 100644 --- a/src/test/java/io/reactivex/internal/operators/flowable/FlowableRepeatTest.java +++ b/src/test/java/io/reactivex/internal/operators/flowable/FlowableRepeatTest.java @@ -329,4 +329,18 @@ public Flowable apply(Object ignore) throws Exception { disposable.dispose(); assertFalse(subject.hasSubscribers()); } + + @Test + public void testRepeatWhen() { + Flowable.error(new TestException()) + .repeatWhen(new Function, Flowable>() { + @Override + public Flowable apply(Flowable v) throws Exception { + return v.delay(10, TimeUnit.SECONDS); + } + }) + .test() + .awaitDone(5, TimeUnit.SECONDS) + .assertFailure(TestException.class); + } } \ No newline at end of file diff --git a/src/test/java/io/reactivex/internal/operators/observable/ObservableRepeatTest.java b/src/test/java/io/reactivex/internal/operators/observable/ObservableRepeatTest.java index acf87c8992..64a5e68adf 100644 --- a/src/test/java/io/reactivex/internal/operators/observable/ObservableRepeatTest.java +++ b/src/test/java/io/reactivex/internal/operators/observable/ObservableRepeatTest.java @@ -280,4 +280,18 @@ public ObservableSource apply(Object ignore) throws Exception { disposable.dispose(); assertFalse(subject.hasObservers()); } + + @Test + public void testRepeatWhen() { + Observable.error(new TestException()) + .repeatWhen(new Function, ObservableSource>() { + @Override + public ObservableSource apply(Observable v) throws Exception { + return v.delay(10, TimeUnit.SECONDS); + } + }) + .test() + .awaitDone(5, TimeUnit.SECONDS) + .assertFailure(TestException.class); + } }