diff --git a/src/main/java/io/reactivex/internal/operators/flowable/FlowableFlatMap.java b/src/main/java/io/reactivex/internal/operators/flowable/FlowableFlatMap.java index fbfed551fb..9ffd7df2f2 100644 --- a/src/main/java/io/reactivex/internal/operators/flowable/FlowableFlatMap.java +++ b/src/main/java/io/reactivex/internal/operators/flowable/FlowableFlatMap.java @@ -421,10 +421,12 @@ void drainLoop() { if (d && (svq == null || svq.isEmpty()) && n == 0) { Throwable ex = errs.terminate(); - if (ex == null) { - child.onComplete(); - } else { - child.onError(ex); + if (ex != ExceptionHelper.TERMINATED) { + if (ex == null) { + child.onComplete(); + } else { + child.onError(ex); + } } return; } @@ -556,7 +558,10 @@ boolean checkTerminate() { } if (!delayErrors && errs.get() != null) { clearScalarQueue(); - actual.onError(errs.terminate()); + Throwable ex = errs.terminate(); + if (ex != ExceptionHelper.TERMINATED) { + actual.onError(ex); + } return true; } return false; diff --git a/src/main/java/io/reactivex/internal/operators/observable/ObservableFlatMap.java b/src/main/java/io/reactivex/internal/operators/observable/ObservableFlatMap.java index 36f5a93d4e..1de4506dac 100644 --- a/src/main/java/io/reactivex/internal/operators/observable/ObservableFlatMap.java +++ b/src/main/java/io/reactivex/internal/operators/observable/ObservableFlatMap.java @@ -361,11 +361,13 @@ void drainLoop() { int n = inner.length; if (d && (svq == null || svq.isEmpty()) && n == 0) { - Throwable ex = errors.get(); - if (ex == null) { - child.onComplete(); - } else { - child.onError(errors.terminate()); + Throwable ex = errors.terminate(); + if (ex != ExceptionHelper.TERMINATED) { + if (ex == null) { + child.onComplete(); + } else { + child.onError(ex); + } } return; } @@ -488,7 +490,10 @@ boolean checkTerminate() { Throwable e = errors.get(); if (!delayErrors && (e != null)) { disposeAll(); - actual.onError(errors.terminate()); + e = errors.terminate(); + if (e != ExceptionHelper.TERMINATED) { + actual.onError(e); + } return true; } return false; diff --git a/src/test/java/io/reactivex/internal/operators/flowable/FlowableFlatMapTest.java b/src/test/java/io/reactivex/internal/operators/flowable/FlowableFlatMapTest.java index 8a373dbe11..9a2d1ea0b1 100644 --- a/src/test/java/io/reactivex/internal/operators/flowable/FlowableFlatMapTest.java +++ b/src/test/java/io/reactivex/internal/operators/flowable/FlowableFlatMapTest.java @@ -28,6 +28,7 @@ import io.reactivex.exceptions.*; import io.reactivex.functions.*; import io.reactivex.internal.functions.Functions; +import io.reactivex.plugins.RxJavaPlugins; import io.reactivex.processors.PublishProcessor; import io.reactivex.schedulers.Schedulers; import io.reactivex.subscribers.TestSubscriber; @@ -926,4 +927,74 @@ public Object apply(Integer v) throws Exception { assertTrue(list.toString(), list.contains("RxCo")); } } + + @Test + public void cancelScalarDrainRace() { + for (int i = 0; i < 1000; i++) { + List errors = TestHelper.trackPluginErrors(); + try { + + final PublishProcessor> pp = PublishProcessor.create(); + + final TestSubscriber ts = pp.flatMap(Functions.>identity()).test(0); + + Runnable r1 = new Runnable() { + @Override + public void run() { + ts.cancel(); + } + }; + Runnable r2 = new Runnable() { + @Override + public void run() { + pp.onComplete(); + } + }; + + TestHelper.race(r1, r2); + + assertTrue(errors.toString(), errors.isEmpty()); + } finally { + RxJavaPlugins.reset(); + } + } + } + + @Test + public void cancelDrainRace() { + for (int i = 0; i < 1000; i++) { + for (int j = 1; j < 50; j += 5) { + List errors = TestHelper.trackPluginErrors(); + try { + + final PublishProcessor> pp = PublishProcessor.create(); + + final TestSubscriber ts = pp.flatMap(Functions.>identity()).test(0); + + final PublishProcessor just = PublishProcessor.create(); + pp.onNext(just); + + Runnable r1 = new Runnable() { + @Override + public void run() { + ts.request(1); + ts.cancel(); + } + }; + Runnable r2 = new Runnable() { + @Override + public void run() { + just.onNext(1); + } + }; + + TestHelper.race(r1, r2); + + assertTrue(errors.toString(), errors.isEmpty()); + } finally { + RxJavaPlugins.reset(); + } + } + } + } } diff --git a/src/test/java/io/reactivex/internal/operators/observable/ObservableFlatMapTest.java b/src/test/java/io/reactivex/internal/operators/observable/ObservableFlatMapTest.java index 4cc8425efb..e733c5e3d4 100644 --- a/src/test/java/io/reactivex/internal/operators/observable/ObservableFlatMapTest.java +++ b/src/test/java/io/reactivex/internal/operators/observable/ObservableFlatMapTest.java @@ -29,7 +29,9 @@ import io.reactivex.disposables.Disposable; import io.reactivex.exceptions.*; import io.reactivex.functions.*; +import io.reactivex.internal.functions.Functions; import io.reactivex.observers.TestObserver; +import io.reactivex.plugins.RxJavaPlugins; import io.reactivex.schedulers.Schedulers; import io.reactivex.subjects.PublishSubject; @@ -784,4 +786,76 @@ public Object apply(Integer v) throws Exception { assertTrue(list.toString(), list.contains("RxCo")); } } + + @Test + public void cancelScalarDrainRace() { + for (int i = 0; i < 1000; i++) { + List errors = TestHelper.trackPluginErrors(); + try { + + final PublishSubject> pp = PublishSubject.create(); + + final TestObserver ts = pp.flatMap(Functions.>identity()).test(); + + Runnable r1 = new Runnable() { + @Override + public void run() { + ts.cancel(); + } + }; + Runnable r2 = new Runnable() { + @Override + public void run() { + pp.onComplete(); + } + }; + + TestHelper.race(r1, r2); + + assertTrue(errors.toString(), errors.isEmpty()); + } finally { + RxJavaPlugins.reset(); + } + } + } + + @Test + public void cancelDrainRace() { + for (int i = 0; i < 1000; i++) { + for (int j = 1; j < 50; j += 5) { + List errors = TestHelper.trackPluginErrors(); + try { + + final PublishSubject> pp = PublishSubject.create(); + + final TestObserver ts = pp.flatMap(Functions.>identity()).test(); + + final PublishSubject just = PublishSubject.create(); + final PublishSubject just2 = PublishSubject.create(); + pp.onNext(just); + pp.onNext(just2); + + Runnable r1 = new Runnable() { + @Override + public void run() { + just2.onNext(1); + ts.cancel(); + } + }; + Runnable r2 = new Runnable() { + @Override + public void run() { + just.onNext(1); + } + }; + + TestHelper.race(r1, r2); + + assertTrue(errors.toString(), errors.isEmpty()); + } finally { + RxJavaPlugins.reset(); + } + } + } + } }