diff --git a/src/main/java/io/reactivex/internal/operators/mixed/FlowableConcatMapMaybe.java b/src/main/java/io/reactivex/internal/operators/mixed/FlowableConcatMapMaybe.java index 0b7eb3bd7d..bafc9d4aab 100644 --- a/src/main/java/io/reactivex/internal/operators/mixed/FlowableConcatMapMaybe.java +++ b/src/main/java/io/reactivex/internal/operators/mixed/FlowableConcatMapMaybe.java @@ -218,6 +218,7 @@ void drain() { if (cancelled) { queue.clear(); item = null; + break; } int s = state; diff --git a/src/main/java/io/reactivex/internal/operators/mixed/FlowableConcatMapSingle.java b/src/main/java/io/reactivex/internal/operators/mixed/FlowableConcatMapSingle.java index 3164d16a4b..654e7cabe5 100644 --- a/src/main/java/io/reactivex/internal/operators/mixed/FlowableConcatMapSingle.java +++ b/src/main/java/io/reactivex/internal/operators/mixed/FlowableConcatMapSingle.java @@ -213,6 +213,7 @@ void drain() { if (cancelled) { queue.clear(); item = null; + break; } int s = state; diff --git a/src/main/java/io/reactivex/internal/operators/mixed/ObservableConcatMapMaybe.java b/src/main/java/io/reactivex/internal/operators/mixed/ObservableConcatMapMaybe.java index 8331c38f23..f62669b705 100644 --- a/src/main/java/io/reactivex/internal/operators/mixed/ObservableConcatMapMaybe.java +++ b/src/main/java/io/reactivex/internal/operators/mixed/ObservableConcatMapMaybe.java @@ -199,6 +199,7 @@ void drain() { if (cancelled) { queue.clear(); item = null; + break; } int s = state; diff --git a/src/main/java/io/reactivex/internal/operators/mixed/ObservableConcatMapSingle.java b/src/main/java/io/reactivex/internal/operators/mixed/ObservableConcatMapSingle.java index 45799e8916..82afca6341 100644 --- a/src/main/java/io/reactivex/internal/operators/mixed/ObservableConcatMapSingle.java +++ b/src/main/java/io/reactivex/internal/operators/mixed/ObservableConcatMapSingle.java @@ -194,6 +194,7 @@ void drain() { if (cancelled) { queue.clear(); item = null; + break; } int s = state; diff --git a/src/test/java/io/reactivex/internal/operators/mixed/FlowableConcatMapMaybeTest.java b/src/test/java/io/reactivex/internal/operators/mixed/FlowableConcatMapMaybeTest.java index 334b2e4d69..5e6ef2c82b 100644 --- a/src/test/java/io/reactivex/internal/operators/mixed/FlowableConcatMapMaybeTest.java +++ b/src/test/java/io/reactivex/internal/operators/mixed/FlowableConcatMapMaybeTest.java @@ -394,4 +394,35 @@ public void cancelNoConcurrentClean() { assertTrue(operator.queue.isEmpty()); } + + @Test + public void innerSuccessDisposeRace() { + for (int i = 0; i < TestHelper.RACE_LONG_LOOPS; i++) { + + final MaybeSubject ms = MaybeSubject.create(); + + final TestSubscriber ts = Flowable.just(1) + .hide() + .concatMapMaybe(Functions.justFunction(ms)) + .test(); + + Runnable r1 = new Runnable() { + @Override + public void run() { + ms.onSuccess(1); + } + }; + Runnable r2 = new Runnable() { + @Override + public void run() { + ts.dispose(); + } + }; + + TestHelper.race(r1, r2); + + ts.assertNoErrors(); + } + } + } diff --git a/src/test/java/io/reactivex/internal/operators/mixed/FlowableConcatMapSingleTest.java b/src/test/java/io/reactivex/internal/operators/mixed/FlowableConcatMapSingleTest.java index c572e9ba30..6f07102918 100644 --- a/src/test/java/io/reactivex/internal/operators/mixed/FlowableConcatMapSingleTest.java +++ b/src/test/java/io/reactivex/internal/operators/mixed/FlowableConcatMapSingleTest.java @@ -309,4 +309,34 @@ public void cancelNoConcurrentClean() { assertTrue(operator.queue.isEmpty()); } + + @Test + public void innerSuccessDisposeRace() { + for (int i = 0; i < TestHelper.RACE_LONG_LOOPS; i++) { + + final SingleSubject ss = SingleSubject.create(); + + final TestSubscriber ts = Flowable.just(1) + .hide() + .concatMapSingle(Functions.justFunction(ss)) + .test(); + + Runnable r1 = new Runnable() { + @Override + public void run() { + ss.onSuccess(1); + } + }; + Runnable r2 = new Runnable() { + @Override + public void run() { + ts.dispose(); + } + }; + + TestHelper.race(r1, r2); + + ts.assertNoErrors(); + } + } } diff --git a/src/test/java/io/reactivex/internal/operators/mixed/ObservableConcatMapMaybeTest.java b/src/test/java/io/reactivex/internal/operators/mixed/ObservableConcatMapMaybeTest.java index 8fbd2937f1..fb732ff878 100644 --- a/src/test/java/io/reactivex/internal/operators/mixed/ObservableConcatMapMaybeTest.java +++ b/src/test/java/io/reactivex/internal/operators/mixed/ObservableConcatMapMaybeTest.java @@ -416,4 +416,34 @@ public void checkUnboundedInnerQueue() { to.assertResult(1, 2, 3, 4); } + + @Test + public void innerSuccessDisposeRace() { + for (int i = 0; i < TestHelper.RACE_LONG_LOOPS; i++) { + + final MaybeSubject ms = MaybeSubject.create(); + + final TestObserver to = Observable.just(1) + .hide() + .concatMapMaybe(Functions.justFunction(ms)) + .test(); + + Runnable r1 = new Runnable() { + @Override + public void run() { + ms.onSuccess(1); + } + }; + Runnable r2 = new Runnable() { + @Override + public void run() { + to.dispose(); + } + }; + + TestHelper.race(r1, r2); + + to.assertNoErrors(); + } + } } diff --git a/src/test/java/io/reactivex/internal/operators/mixed/ObservableConcatMapSingleTest.java b/src/test/java/io/reactivex/internal/operators/mixed/ObservableConcatMapSingleTest.java index bdd4f9e4cc..b5743dd5d7 100644 --- a/src/test/java/io/reactivex/internal/operators/mixed/ObservableConcatMapSingleTest.java +++ b/src/test/java/io/reactivex/internal/operators/mixed/ObservableConcatMapSingleTest.java @@ -353,4 +353,35 @@ public void checkUnboundedInnerQueue() { to.assertResult(1, 2, 3, 4); } + + @Test + public void innerSuccessDisposeRace() { + for (int i = 0; i < TestHelper.RACE_LONG_LOOPS; i++) { + + final SingleSubject ss = SingleSubject.create(); + + final TestObserver to = Observable.just(1) + .hide() + .concatMapSingle(Functions.justFunction(ss)) + .test(); + + Runnable r1 = new Runnable() { + @Override + public void run() { + ss.onSuccess(1); + } + }; + Runnable r2 = new Runnable() { + @Override + public void run() { + to.dispose(); + } + }; + + TestHelper.race(r1, r2); + + to.assertNoErrors(); + } + } + }