From 5a20c5a360013496ad6783e90a59631bcdd75a7e Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?D=C3=A1vid=20Karnok?= Date: Wed, 22 Aug 2018 13:23:34 +0200 Subject: [PATCH 1/2] 2.x: Make observeOn not let worker.dispose() called prematurely --- .../operators/flowable/FlowableObserveOn.java | 13 ++ .../observable/ObservableObserveOn.java | 18 +- .../flowable/FlowableObserveOnTest.java | 162 +++++++++++++++++- .../observable/ObservableObserveOnTest.java | 76 ++++++++ 4 files changed, 262 insertions(+), 7 deletions(-) diff --git a/src/main/java/io/reactivex/internal/operators/flowable/FlowableObserveOn.java b/src/main/java/io/reactivex/internal/operators/flowable/FlowableObserveOn.java index d1c97b6801..2a7d499d1a 100644 --- a/src/main/java/io/reactivex/internal/operators/flowable/FlowableObserveOn.java +++ b/src/main/java/io/reactivex/internal/operators/flowable/FlowableObserveOn.java @@ -191,6 +191,7 @@ final boolean checkTerminated(boolean d, boolean empty, Subscriber a) { if (d) { if (delayError) { if (empty) { + cancelled = true; Throwable e = error; if (e != null) { a.onError(e); @@ -203,12 +204,14 @@ final boolean checkTerminated(boolean d, boolean empty, Subscriber a) { } else { Throwable e = error; if (e != null) { + cancelled = true; clear(); a.onError(e); worker.dispose(); return true; } else if (empty) { + cancelled = true; a.onComplete(); worker.dispose(); return true; @@ -314,6 +317,7 @@ void runSync() { v = q.poll(); } catch (Throwable ex) { Exceptions.throwIfFatal(ex); + cancelled = true; upstream.cancel(); a.onError(ex); worker.dispose(); @@ -324,6 +328,7 @@ void runSync() { return; } if (v == null) { + cancelled = true; a.onComplete(); worker.dispose(); return; @@ -339,6 +344,7 @@ void runSync() { } if (q.isEmpty()) { + cancelled = true; a.onComplete(); worker.dispose(); return; @@ -379,6 +385,7 @@ void runAsync() { } catch (Throwable ex) { Exceptions.throwIfFatal(ex); + cancelled = true; upstream.cancel(); q.clear(); @@ -441,6 +448,7 @@ void runBackfused() { downstream.onNext(null); if (d) { + cancelled = true; Throwable e = error; if (e != null) { downstream.onError(e); @@ -552,6 +560,7 @@ void runSync() { v = q.poll(); } catch (Throwable ex) { Exceptions.throwIfFatal(ex); + cancelled = true; upstream.cancel(); a.onError(ex); worker.dispose(); @@ -562,6 +571,7 @@ void runSync() { return; } if (v == null) { + cancelled = true; a.onComplete(); worker.dispose(); return; @@ -577,6 +587,7 @@ void runSync() { } if (q.isEmpty()) { + cancelled = true; a.onComplete(); worker.dispose(); return; @@ -617,6 +628,7 @@ void runAsync() { } catch (Throwable ex) { Exceptions.throwIfFatal(ex); + cancelled = true; upstream.cancel(); q.clear(); @@ -680,6 +692,7 @@ void runBackfused() { downstream.onNext(null); if (d) { + cancelled = true; Throwable e = error; if (e != null) { downstream.onError(e); diff --git a/src/main/java/io/reactivex/internal/operators/observable/ObservableObserveOn.java b/src/main/java/io/reactivex/internal/operators/observable/ObservableObserveOn.java index f415e7016f..abf1f0bb85 100644 --- a/src/main/java/io/reactivex/internal/operators/observable/ObservableObserveOn.java +++ b/src/main/java/io/reactivex/internal/operators/observable/ObservableObserveOn.java @@ -62,7 +62,7 @@ static final class ObserveOnObserver extends BasicIntQueueDisposable Throwable error; volatile boolean done; - volatile boolean cancelled; + volatile boolean disposed; int sourceMode; @@ -141,8 +141,8 @@ public void onComplete() { @Override public void dispose() { - if (!cancelled) { - cancelled = true; + if (!disposed) { + disposed = true; upstream.dispose(); worker.dispose(); if (getAndIncrement() == 0) { @@ -153,7 +153,7 @@ public void dispose() { @Override public boolean isDisposed() { - return cancelled; + return disposed; } void schedule() { @@ -181,6 +181,7 @@ void drainNormal() { v = q.poll(); } catch (Throwable ex) { Exceptions.throwIfFatal(ex); + disposed = true; upstream.dispose(); q.clear(); a.onError(ex); @@ -211,7 +212,7 @@ void drainFused() { int missed = 1; for (;;) { - if (cancelled) { + if (disposed) { return; } @@ -219,6 +220,7 @@ void drainFused() { Throwable ex = error; if (!delayError && d && ex != null) { + disposed = true; downstream.onError(error); worker.dispose(); return; @@ -227,6 +229,7 @@ void drainFused() { downstream.onNext(null); if (d) { + disposed = true; ex = error; if (ex != null) { downstream.onError(ex); @@ -254,7 +257,7 @@ public void run() { } boolean checkTerminated(boolean d, boolean empty, Observer a) { - if (cancelled) { + if (disposed) { queue.clear(); return true; } @@ -262,6 +265,7 @@ boolean checkTerminated(boolean d, boolean empty, Observer a) { Throwable e = error; if (delayError) { if (empty) { + disposed = true; if (e != null) { a.onError(e); } else { @@ -272,12 +276,14 @@ boolean checkTerminated(boolean d, boolean empty, Observer a) { } } else { if (e != null) { + disposed = true; queue.clear(); a.onError(e); worker.dispose(); return true; } else if (empty) { + disposed = true; a.onComplete(); worker.dispose(); return true; diff --git a/src/test/java/io/reactivex/internal/operators/flowable/FlowableObserveOnTest.java b/src/test/java/io/reactivex/internal/operators/flowable/FlowableObserveOnTest.java index 2beacebd84..d4fdbe15db 100644 --- a/src/test/java/io/reactivex/internal/operators/flowable/FlowableObserveOnTest.java +++ b/src/test/java/io/reactivex/internal/operators/flowable/FlowableObserveOnTest.java @@ -21,12 +21,13 @@ import java.util.concurrent.*; import java.util.concurrent.atomic.*; -import io.reactivex.annotations.Nullable; import org.junit.Test; import org.mockito.InOrder; import org.reactivestreams.*; import io.reactivex.*; +import io.reactivex.annotations.Nullable; +import io.reactivex.disposables.*; import io.reactivex.exceptions.*; import io.reactivex.functions.*; import io.reactivex.internal.functions.Functions; @@ -1781,4 +1782,163 @@ public void syncFusedRequestOneByOneConditional() { .test() .assertResult(1, 2, 3, 4, 5); } + + public static final class DisposeTrackingScheduler extends Scheduler { + + public final AtomicInteger disposedCount = new AtomicInteger(); + + @Override + public Worker createWorker() { + return new TrackingWorker(); + } + + final class TrackingWorker extends Scheduler.Worker { + + @Override + public void dispose() { + disposedCount.getAndIncrement(); + } + + @Override + public boolean isDisposed() { + return false; + } + + @Override + public Disposable schedule(Runnable run, long delay, + TimeUnit unit) { + run.run(); + return Disposables.empty(); + } + } + } + + @Test + public void workerNotDisposedPrematurelyNormalInNormalOut() { + DisposeTrackingScheduler s = new DisposeTrackingScheduler(); + + Flowable.concat( + Flowable.just(1).hide().observeOn(s), + Flowable.just(2) + ) + .test() + .assertResult(1, 2); + + assertEquals(1, s.disposedCount.get()); + } + + @Test + public void workerNotDisposedPrematurelySyncInNormalOut() { + DisposeTrackingScheduler s = new DisposeTrackingScheduler(); + + Flowable.concat( + Flowable.just(1).observeOn(s), + Flowable.just(2) + ) + .test() + .assertResult(1, 2); + + assertEquals(1, s.disposedCount.get()); + } + + @Test + public void workerNotDisposedPrematurelyAsyncInNormalOut() { + DisposeTrackingScheduler s = new DisposeTrackingScheduler(); + + UnicastProcessor up = UnicastProcessor.create(); + up.onNext(1); + up.onComplete(); + + Flowable.concat( + up.observeOn(s), + Flowable.just(2) + ) + .test() + .assertResult(1, 2); + + assertEquals(1, s.disposedCount.get()); + } + + static final class TestSubscriberFusedCanceling + extends TestSubscriber { + + public TestSubscriberFusedCanceling() { + super(); + initialFusionMode = QueueFuseable.ANY; + } + + @Override + public void onComplete() { + cancel(); + super.onComplete(); + } + } + + @Test + public void workerNotDisposedPrematurelyNormalInAsyncOut() { + DisposeTrackingScheduler s = new DisposeTrackingScheduler(); + + TestSubscriber ts = new TestSubscriberFusedCanceling(); + + Flowable.just(1).hide().observeOn(s).subscribe(ts); + + assertEquals(1, s.disposedCount.get()); + } + + @Test + public void workerNotDisposedPrematurelyNormalInNormalOutConditional() { + DisposeTrackingScheduler s = new DisposeTrackingScheduler(); + + Flowable.concat( + Flowable.just(1).hide().observeOn(s).filter(Functions.alwaysTrue()), + Flowable.just(2) + ) + .test() + .assertResult(1, 2); + + assertEquals(1, s.disposedCount.get()); + } + + @Test + public void workerNotDisposedPrematurelySyncInNormalOutConditional() { + DisposeTrackingScheduler s = new DisposeTrackingScheduler(); + + Flowable.concat( + Flowable.just(1).observeOn(s).filter(Functions.alwaysTrue()), + Flowable.just(2) + ) + .test() + .assertResult(1, 2); + + assertEquals(1, s.disposedCount.get()); + } + + @Test + public void workerNotDisposedPrematurelyAsyncInNormalOutConditional() { + DisposeTrackingScheduler s = new DisposeTrackingScheduler(); + + UnicastProcessor up = UnicastProcessor.create(); + up.onNext(1); + up.onComplete(); + + Flowable.concat( + up.observeOn(s).filter(Functions.alwaysTrue()), + Flowable.just(2) + ) + .test() + .assertResult(1, 2); + + assertEquals(1, s.disposedCount.get()); + } + + @Test + public void workerNotDisposedPrematurelyNormalInAsyncOutConditional() { + DisposeTrackingScheduler s = new DisposeTrackingScheduler(); + + TestSubscriber ts = new TestSubscriberFusedCanceling(); + + Flowable.just(1).hide().observeOn(s).filter(Functions.alwaysTrue()).subscribe(ts); + + assertEquals(1, s.disposedCount.get()); + } } diff --git a/src/test/java/io/reactivex/internal/operators/observable/ObservableObserveOnTest.java b/src/test/java/io/reactivex/internal/operators/observable/ObservableObserveOnTest.java index 1ac99fcb5e..227b734b3a 100644 --- a/src/test/java/io/reactivex/internal/operators/observable/ObservableObserveOnTest.java +++ b/src/test/java/io/reactivex/internal/operators/observable/ObservableObserveOnTest.java @@ -32,12 +32,15 @@ import io.reactivex.exceptions.TestException; import io.reactivex.functions.*; import io.reactivex.internal.fuseable.*; +import io.reactivex.internal.operators.flowable.FlowableObserveOnTest.DisposeTrackingScheduler; import io.reactivex.internal.operators.observable.ObservableObserveOn.ObserveOnObserver; import io.reactivex.internal.schedulers.ImmediateThinScheduler; import io.reactivex.observers.*; import io.reactivex.plugins.RxJavaPlugins; +import io.reactivex.processors.UnicastProcessor; import io.reactivex.schedulers.*; import io.reactivex.subjects.*; +import io.reactivex.subscribers.TestSubscriber; public class ObservableObserveOnTest { @@ -740,4 +743,77 @@ public void onNext(Integer t) { }) .assertValuesOnly(2, 3); } + + @Test + public void workerNotDisposedPrematurelyNormalInNormalOut() { + DisposeTrackingScheduler s = new DisposeTrackingScheduler(); + + Observable.concat( + Observable.just(1).hide().observeOn(s), + Observable.just(2) + ) + .test() + .assertResult(1, 2); + + assertEquals(1, s.disposedCount.get()); + } + + @Test + public void workerNotDisposedPrematurelySyncInNormalOut() { + DisposeTrackingScheduler s = new DisposeTrackingScheduler(); + + Observable.concat( + Observable.just(1).observeOn(s), + Observable.just(2) + ) + .test() + .assertResult(1, 2); + + assertEquals(1, s.disposedCount.get()); + } + + @Test + public void workerNotDisposedPrematurelyAsyncInNormalOut() { + DisposeTrackingScheduler s = new DisposeTrackingScheduler(); + + UnicastSubject up = UnicastSubject.create(); + up.onNext(1); + up.onComplete(); + + Observable.concat( + up.observeOn(s), + Observable.just(2) + ) + .test() + .assertResult(1, 2); + + assertEquals(1, s.disposedCount.get()); + } + + static final class TestObserverFusedCanceling + extends TestObserver { + + public TestObserverFusedCanceling() { + super(); + initialFusionMode = QueueFuseable.ANY; + } + + @Override + public void onComplete() { + cancel(); + super.onComplete(); + } + } + + @Test + public void workerNotDisposedPrematurelyNormalInAsyncOut() { + DisposeTrackingScheduler s = new DisposeTrackingScheduler(); + + TestObserver to = new TestObserverFusedCanceling(); + + Observable.just(1).hide().observeOn(s).subscribe(to); + + assertEquals(1, s.disposedCount.get()); + } + } From be80d4d3a80d9fd1f2dcf74b244ac15bfdee75ed Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?D=C3=A1vid=20Karnok?= Date: Wed, 22 Aug 2018 13:54:00 +0200 Subject: [PATCH 2/2] Merge in master. --- .../operators/completable/CompletableFromCallable.java | 3 +++ .../operators/flowable/FlowableUnsubscribeOnTest.java | 10 ++++++++-- .../observable/ObservableUnsubscribeOnTest.java | 10 ++++++++-- 3 files changed, 19 insertions(+), 4 deletions(-) diff --git a/src/main/java/io/reactivex/internal/operators/completable/CompletableFromCallable.java b/src/main/java/io/reactivex/internal/operators/completable/CompletableFromCallable.java index 3dbd3701b5..6b7e68defb 100644 --- a/src/main/java/io/reactivex/internal/operators/completable/CompletableFromCallable.java +++ b/src/main/java/io/reactivex/internal/operators/completable/CompletableFromCallable.java @@ -18,6 +18,7 @@ import io.reactivex.*; import io.reactivex.disposables.*; import io.reactivex.exceptions.Exceptions; +import io.reactivex.plugins.RxJavaPlugins; public final class CompletableFromCallable extends Completable { @@ -37,6 +38,8 @@ protected void subscribeActual(CompletableObserver observer) { Exceptions.throwIfFatal(e); if (!d.isDisposed()) { observer.onError(e); + } else { + RxJavaPlugins.onError(e); } return; } diff --git a/src/test/java/io/reactivex/internal/operators/flowable/FlowableUnsubscribeOnTest.java b/src/test/java/io/reactivex/internal/operators/flowable/FlowableUnsubscribeOnTest.java index 3ed444424f..cd6f22ea56 100644 --- a/src/test/java/io/reactivex/internal/operators/flowable/FlowableUnsubscribeOnTest.java +++ b/src/test/java/io/reactivex/internal/operators/flowable/FlowableUnsubscribeOnTest.java @@ -47,7 +47,10 @@ public void subscribe(Subscriber t1) { t1.onSubscribe(subscription); t1.onNext(1); t1.onNext(2); - t1.onComplete(); + // observeOn will prevent canceling the upstream upon its termination now + // this call is racing for that state in this test + // not doing it will make sure the unsubscribeOn always gets through + // t1.onComplete(); } }); @@ -93,7 +96,10 @@ public void subscribe(Subscriber t1) { t1.onSubscribe(subscription); t1.onNext(1); t1.onNext(2); - t1.onComplete(); + // observeOn will prevent canceling the upstream upon its termination now + // this call is racing for that state in this test + // not doing it will make sure the unsubscribeOn always gets through + // t1.onComplete(); } }); diff --git a/src/test/java/io/reactivex/internal/operators/observable/ObservableUnsubscribeOnTest.java b/src/test/java/io/reactivex/internal/operators/observable/ObservableUnsubscribeOnTest.java index cdb21dafa5..2b7d7f4141 100644 --- a/src/test/java/io/reactivex/internal/operators/observable/ObservableUnsubscribeOnTest.java +++ b/src/test/java/io/reactivex/internal/operators/observable/ObservableUnsubscribeOnTest.java @@ -46,7 +46,10 @@ public void subscribe(Observer t1) { t1.onSubscribe(subscription); t1.onNext(1); t1.onNext(2); - t1.onComplete(); + // observeOn will prevent canceling the upstream upon its termination now + // this call is racing for that state in this test + // not doing it will make sure the unsubscribeOn always gets through + // t1.onComplete(); } }); @@ -92,7 +95,10 @@ public void subscribe(Observer t1) { t1.onSubscribe(subscription); t1.onNext(1); t1.onNext(2); - t1.onComplete(); + // observeOn will prevent canceling the upstream upon its termination now + // this call is racing for that state in this test + // not doing it will make sure the unsubscribeOn always gets through + // t1.onComplete(); } });