From 4294af1f87811bab1dcec149e31e61fbebf1ef9f Mon Sep 17 00:00:00 2001 From: akarnokd Date: Thu, 17 Jan 2019 13:06:08 +0100 Subject: [PATCH 1/2] 2.x: Don't dispose the winner of {Single|Maybe|Completable}.amb() --- .../operators/completable/CompletableAmb.java | 19 ++-- .../internal/operators/maybe/MaybeAmb.java | 56 ++++++------ .../internal/operators/single/SingleAmb.java | 26 +++--- .../completable/CompletableAmbTest.java | 55 ++++++++++++ .../operators/flowable/FlowableAmbTest.java | 84 +++++++++++++++++- .../operators/maybe/MaybeAmbTest.java | 84 ++++++++++++++++++ .../observable/ObservableAmbTest.java | 87 ++++++++++++++++++- .../operators/single/SingleAmbTest.java | 61 +++++++++++++ 8 files changed, 422 insertions(+), 50 deletions(-) diff --git a/src/main/java/io/reactivex/internal/operators/completable/CompletableAmb.java b/src/main/java/io/reactivex/internal/operators/completable/CompletableAmb.java index cc603acbdc..7de1c648e4 100644 --- a/src/main/java/io/reactivex/internal/operators/completable/CompletableAmb.java +++ b/src/main/java/io/reactivex/internal/operators/completable/CompletableAmb.java @@ -63,8 +63,6 @@ public void subscribeActual(final CompletableObserver observer) { final AtomicBoolean once = new AtomicBoolean(); - CompletableObserver inner = new Amb(once, set, observer); - for (int i = 0; i < count; i++) { CompletableSource c = sources[i]; if (set.isDisposed()) { @@ -82,7 +80,7 @@ public void subscribeActual(final CompletableObserver observer) { } // no need to have separate subscribers because inner is stateless - c.subscribe(inner); + c.subscribe(new Amb(once, set, observer)); } if (count == 0) { @@ -91,9 +89,14 @@ public void subscribeActual(final CompletableObserver observer) { } static final class Amb implements CompletableObserver { - private final AtomicBoolean once; - private final CompositeDisposable set; - private final CompletableObserver downstream; + + final AtomicBoolean once; + + final CompositeDisposable set; + + final CompletableObserver downstream; + + Disposable upstream; Amb(AtomicBoolean once, CompositeDisposable set, CompletableObserver observer) { this.once = once; @@ -104,6 +107,7 @@ static final class Amb implements CompletableObserver { @Override public void onComplete() { if (once.compareAndSet(false, true)) { + set.delete(upstream); set.dispose(); downstream.onComplete(); } @@ -112,6 +116,7 @@ public void onComplete() { @Override public void onError(Throwable e) { if (once.compareAndSet(false, true)) { + set.delete(upstream); set.dispose(); downstream.onError(e); } else { @@ -121,8 +126,8 @@ public void onError(Throwable e) { @Override public void onSubscribe(Disposable d) { + upstream = d; set.add(d); } - } } diff --git a/src/main/java/io/reactivex/internal/operators/maybe/MaybeAmb.java b/src/main/java/io/reactivex/internal/operators/maybe/MaybeAmb.java index d9c1c6963c..8efc69b24b 100644 --- a/src/main/java/io/reactivex/internal/operators/maybe/MaybeAmb.java +++ b/src/main/java/io/reactivex/internal/operators/maybe/MaybeAmb.java @@ -64,64 +64,63 @@ protected void subscribeActual(MaybeObserver observer) { count = sources.length; } - AmbMaybeObserver parent = new AmbMaybeObserver(observer); - observer.onSubscribe(parent); + CompositeDisposable set = new CompositeDisposable(); + observer.onSubscribe(set); + + AtomicBoolean winner = new AtomicBoolean(); for (int i = 0; i < count; i++) { MaybeSource s = sources[i]; - if (parent.isDisposed()) { + if (set.isDisposed()) { return; } if (s == null) { - parent.onError(new NullPointerException("One of the MaybeSources is null")); + set.dispose(); + NullPointerException ex = new NullPointerException("One of the MaybeSources is null"); + if (winner.compareAndSet(false, true)) { + observer.onError(ex); + } else { + RxJavaPlugins.onError(ex); + } return; } - s.subscribe(parent); + s.subscribe(new AmbMaybeObserver(observer, set, winner)); } if (count == 0) { observer.onComplete(); } - } static final class AmbMaybeObserver - extends AtomicBoolean - implements MaybeObserver, Disposable { - - private static final long serialVersionUID = -7044685185359438206L; + implements MaybeObserver { final MaybeObserver downstream; - final CompositeDisposable set; + final AtomicBoolean winner; - AmbMaybeObserver(MaybeObserver downstream) { - this.downstream = downstream; - this.set = new CompositeDisposable(); - } + final CompositeDisposable set; - @Override - public void dispose() { - if (compareAndSet(false, true)) { - set.dispose(); - } - } + Disposable upstream; - @Override - public boolean isDisposed() { - return get(); + AmbMaybeObserver(MaybeObserver downstream, CompositeDisposable set, AtomicBoolean winner) { + this.downstream = downstream; + this.set = set; + this.winner = winner; } @Override public void onSubscribe(Disposable d) { + upstream = d; set.add(d); } @Override public void onSuccess(T value) { - if (compareAndSet(false, true)) { + if (winner.compareAndSet(false, true)) { + set.delete(upstream); set.dispose(); downstream.onSuccess(value); @@ -130,7 +129,8 @@ public void onSuccess(T value) { @Override public void onError(Throwable e) { - if (compareAndSet(false, true)) { + if (winner.compareAndSet(false, true)) { + set.delete(upstream); set.dispose(); downstream.onError(e); @@ -141,12 +141,12 @@ public void onError(Throwable e) { @Override public void onComplete() { - if (compareAndSet(false, true)) { + if (winner.compareAndSet(false, true)) { + set.delete(upstream); set.dispose(); downstream.onComplete(); } } - } } diff --git a/src/main/java/io/reactivex/internal/operators/single/SingleAmb.java b/src/main/java/io/reactivex/internal/operators/single/SingleAmb.java index d7508c3a72..2584506b59 100644 --- a/src/main/java/io/reactivex/internal/operators/single/SingleAmb.java +++ b/src/main/java/io/reactivex/internal/operators/single/SingleAmb.java @@ -59,21 +59,21 @@ protected void subscribeActual(final SingleObserver observer) { count = sources.length; } + final AtomicBoolean winner = new AtomicBoolean(); final CompositeDisposable set = new CompositeDisposable(); - AmbSingleObserver shared = new AmbSingleObserver(observer, set); observer.onSubscribe(set); for (int i = 0; i < count; i++) { SingleSource s1 = sources[i]; - if (shared.get()) { + if (set.isDisposed()) { return; } if (s1 == null) { set.dispose(); Throwable e = new NullPointerException("One of the sources is null"); - if (shared.compareAndSet(false, true)) { + if (winner.compareAndSet(false, true)) { observer.onError(e); } else { RxJavaPlugins.onError(e); @@ -81,31 +81,36 @@ protected void subscribeActual(final SingleObserver observer) { return; } - s1.subscribe(shared); + s1.subscribe(new AmbSingleObserver(observer, set, winner)); } } - static final class AmbSingleObserver extends AtomicBoolean implements SingleObserver { - - private static final long serialVersionUID = -1944085461036028108L; + static final class AmbSingleObserver implements SingleObserver { final CompositeDisposable set; final SingleObserver downstream; - AmbSingleObserver(SingleObserver observer, CompositeDisposable set) { + final AtomicBoolean winner; + + Disposable upstream; + + AmbSingleObserver(SingleObserver observer, CompositeDisposable set, AtomicBoolean winner) { this.downstream = observer; this.set = set; + this.winner = winner; } @Override public void onSubscribe(Disposable d) { + this.upstream = d; set.add(d); } @Override public void onSuccess(T value) { - if (compareAndSet(false, true)) { + if (winner.compareAndSet(false, true)) { + set.delete(upstream); set.dispose(); downstream.onSuccess(value); } @@ -113,7 +118,8 @@ public void onSuccess(T value) { @Override public void onError(Throwable e) { - if (compareAndSet(false, true)) { + if (winner.compareAndSet(false, true)) { + set.delete(upstream); set.dispose(); downstream.onError(e); } else { diff --git a/src/test/java/io/reactivex/internal/operators/completable/CompletableAmbTest.java b/src/test/java/io/reactivex/internal/operators/completable/CompletableAmbTest.java index 0e45cd1c1b..f4a8a084b8 100644 --- a/src/test/java/io/reactivex/internal/operators/completable/CompletableAmbTest.java +++ b/src/test/java/io/reactivex/internal/operators/completable/CompletableAmbTest.java @@ -16,6 +16,7 @@ import static org.junit.Assert.*; import java.util.*; +import java.util.concurrent.*; import java.util.concurrent.atomic.AtomicBoolean; import org.junit.Test; @@ -23,10 +24,13 @@ import io.reactivex.*; import io.reactivex.disposables.*; import io.reactivex.exceptions.TestException; +import io.reactivex.functions.*; +import io.reactivex.internal.functions.Functions; import io.reactivex.internal.operators.completable.CompletableAmb.Amb; import io.reactivex.observers.TestObserver; import io.reactivex.plugins.RxJavaPlugins; import io.reactivex.processors.PublishProcessor; +import io.reactivex.schedulers.Schedulers; import io.reactivex.subjects.*; public class CompletableAmbTest { @@ -173,6 +177,7 @@ public void ambRace() { CompositeDisposable cd = new CompositeDisposable(); AtomicBoolean once = new AtomicBoolean(); Amb a = new Amb(once, cd, to); + a.onSubscribe(Disposables.empty()); a.onComplete(); a.onComplete(); @@ -259,4 +264,54 @@ public void untilCompletableOtherError() { to.assertFailure(TestException.class); } + @Test + public void noWinnerErrorDispose() throws Exception { + final TestException ex = new TestException(); + for (int i = 0; i < TestHelper.RACE_LONG_LOOPS; i++) { + final AtomicBoolean interrupted = new AtomicBoolean(); + final CountDownLatch cdl = new CountDownLatch(1); + + Completable.ambArray( + Completable.error(ex) + .subscribeOn(Schedulers.single()) + .observeOn(Schedulers.computation()), + Completable.never() + ) + .subscribe(Functions.EMPTY_ACTION, new Consumer() { + @Override + public void accept(Throwable e) throws Exception { + interrupted.set(Thread.currentThread().isInterrupted()); + cdl.countDown(); + } + }); + + assertTrue(cdl.await(500, TimeUnit.SECONDS)); + assertFalse("Interrupted!", interrupted.get()); + } + } + + @Test + public void noWinnerCompleteDispose() throws Exception { + for (int i = 0; i < TestHelper.RACE_LONG_LOOPS; i++) { + final AtomicBoolean interrupted = new AtomicBoolean(); + final CountDownLatch cdl = new CountDownLatch(1); + + Completable.ambArray( + Completable.complete() + .subscribeOn(Schedulers.single()) + .observeOn(Schedulers.computation()), + Completable.never() + ) + .subscribe(new Action() { + @Override + public void run() throws Exception { + interrupted.set(Thread.currentThread().isInterrupted()); + cdl.countDown(); + } + }); + + assertTrue(cdl.await(500, TimeUnit.SECONDS)); + assertFalse("Interrupted!", interrupted.get()); + } + } } diff --git a/src/test/java/io/reactivex/internal/operators/flowable/FlowableAmbTest.java b/src/test/java/io/reactivex/internal/operators/flowable/FlowableAmbTest.java index a4b03c633c..5b5941fbf4 100644 --- a/src/test/java/io/reactivex/internal/operators/flowable/FlowableAmbTest.java +++ b/src/test/java/io/reactivex/internal/operators/flowable/FlowableAmbTest.java @@ -19,8 +19,8 @@ import java.io.IOException; import java.lang.reflect.Method; import java.util.*; -import java.util.concurrent.TimeUnit; -import java.util.concurrent.atomic.AtomicLong; +import java.util.concurrent.*; +import java.util.concurrent.atomic.*; import org.junit.*; import org.mockito.InOrder; @@ -30,6 +30,7 @@ import io.reactivex.disposables.CompositeDisposable; import io.reactivex.exceptions.TestException; import io.reactivex.functions.*; +import io.reactivex.internal.functions.Functions; import io.reactivex.internal.util.CrashingMappedIterable; import io.reactivex.plugins.RxJavaPlugins; import io.reactivex.processors.PublishProcessor; @@ -713,4 +714,83 @@ public void ambArrayOrder() { Flowable error = Flowable.error(new RuntimeException()); Flowable.ambArray(Flowable.just(1), error).test().assertValue(1).assertComplete(); } + + @SuppressWarnings("unchecked") + @Test + public void noWinnerSuccessDispose() throws Exception { + for (int i = 0; i < TestHelper.RACE_LONG_LOOPS; i++) { + final AtomicBoolean interrupted = new AtomicBoolean(); + final CountDownLatch cdl = new CountDownLatch(1); + + Flowable.ambArray( + Flowable.just(1) + .subscribeOn(Schedulers.single()) + .observeOn(Schedulers.computation()), + Flowable.never() + ) + .subscribe(new Consumer() { + @Override + public void accept(Object v) throws Exception { + interrupted.set(Thread.currentThread().isInterrupted()); + cdl.countDown(); + } + }); + + assertTrue(cdl.await(500, TimeUnit.SECONDS)); + assertFalse("Interrupted!", interrupted.get()); + } + } + + @SuppressWarnings("unchecked") + @Test + public void noWinnerErrorDispose() throws Exception { + final TestException ex = new TestException(); + for (int i = 0; i < TestHelper.RACE_LONG_LOOPS; i++) { + final AtomicBoolean interrupted = new AtomicBoolean(); + final CountDownLatch cdl = new CountDownLatch(1); + + Flowable.ambArray( + Flowable.error(ex) + .subscribeOn(Schedulers.single()) + .observeOn(Schedulers.computation()), + Flowable.never() + ) + .subscribe(Functions.emptyConsumer(), new Consumer() { + @Override + public void accept(Throwable e) throws Exception { + interrupted.set(Thread.currentThread().isInterrupted()); + cdl.countDown(); + } + }); + + assertTrue(cdl.await(500, TimeUnit.SECONDS)); + assertFalse("Interrupted!", interrupted.get()); + } + } + + @SuppressWarnings("unchecked") + @Test + public void noWinnerCompleteDispose() throws Exception { + for (int i = 0; i < TestHelper.RACE_LONG_LOOPS; i++) { + final AtomicBoolean interrupted = new AtomicBoolean(); + final CountDownLatch cdl = new CountDownLatch(1); + + Flowable.ambArray( + Flowable.empty() + .subscribeOn(Schedulers.single()) + .observeOn(Schedulers.computation()), + Flowable.never() + ) + .subscribe(Functions.emptyConsumer(), Functions.emptyConsumer(), new Action() { + @Override + public void run() throws Exception { + interrupted.set(Thread.currentThread().isInterrupted()); + cdl.countDown(); + } + }); + + assertTrue(cdl.await(500, TimeUnit.SECONDS)); + assertFalse("Interrupted!", interrupted.get()); + } + } } diff --git a/src/test/java/io/reactivex/internal/operators/maybe/MaybeAmbTest.java b/src/test/java/io/reactivex/internal/operators/maybe/MaybeAmbTest.java index a50c685233..90f047765c 100644 --- a/src/test/java/io/reactivex/internal/operators/maybe/MaybeAmbTest.java +++ b/src/test/java/io/reactivex/internal/operators/maybe/MaybeAmbTest.java @@ -16,15 +16,20 @@ import static org.junit.Assert.*; import java.util.*; +import java.util.concurrent.*; +import java.util.concurrent.atomic.AtomicBoolean; import org.junit.Test; import io.reactivex.*; import io.reactivex.disposables.Disposables; import io.reactivex.exceptions.TestException; +import io.reactivex.functions.*; +import io.reactivex.internal.functions.Functions; import io.reactivex.observers.TestObserver; import io.reactivex.plugins.RxJavaPlugins; import io.reactivex.processors.PublishProcessor; +import io.reactivex.schedulers.Schedulers; public class MaybeAmbTest { @@ -129,4 +134,83 @@ protected void subscribeActual( to.assertResult(1); } + + @SuppressWarnings("unchecked") + @Test + public void noWinnerSuccessDispose() throws Exception { + for (int i = 0; i < TestHelper.RACE_LONG_LOOPS; i++) { + final AtomicBoolean interrupted = new AtomicBoolean(); + final CountDownLatch cdl = new CountDownLatch(1); + + Maybe.ambArray( + Maybe.just(1) + .subscribeOn(Schedulers.single()) + .observeOn(Schedulers.computation()), + Maybe.never() + ) + .subscribe(new Consumer() { + @Override + public void accept(Object v) throws Exception { + interrupted.set(Thread.currentThread().isInterrupted()); + cdl.countDown(); + } + }); + + assertTrue(cdl.await(500, TimeUnit.SECONDS)); + assertFalse("Interrupted!", interrupted.get()); + } + } + + @SuppressWarnings("unchecked") + @Test + public void noWinnerErrorDispose() throws Exception { + final TestException ex = new TestException(); + for (int i = 0; i < TestHelper.RACE_LONG_LOOPS; i++) { + final AtomicBoolean interrupted = new AtomicBoolean(); + final CountDownLatch cdl = new CountDownLatch(1); + + Maybe.ambArray( + Maybe.error(ex) + .subscribeOn(Schedulers.single()) + .observeOn(Schedulers.computation()), + Maybe.never() + ) + .subscribe(Functions.emptyConsumer(), new Consumer() { + @Override + public void accept(Throwable e) throws Exception { + interrupted.set(Thread.currentThread().isInterrupted()); + cdl.countDown(); + } + }); + + assertTrue(cdl.await(500, TimeUnit.SECONDS)); + assertFalse("Interrupted!", interrupted.get()); + } + } + + @SuppressWarnings("unchecked") + @Test + public void noWinnerCompleteDispose() throws Exception { + for (int i = 0; i < TestHelper.RACE_LONG_LOOPS; i++) { + final AtomicBoolean interrupted = new AtomicBoolean(); + final CountDownLatch cdl = new CountDownLatch(1); + + Maybe.ambArray( + Maybe.empty() + .subscribeOn(Schedulers.single()) + .observeOn(Schedulers.computation()), + Maybe.never() + ) + .subscribe(Functions.emptyConsumer(), Functions.emptyConsumer(), new Action() { + @Override + public void run() throws Exception { + interrupted.set(Thread.currentThread().isInterrupted()); + cdl.countDown(); + } + }); + + assertTrue(cdl.await(500, TimeUnit.SECONDS)); + assertFalse("Interrupted!", interrupted.get()); + } + } } diff --git a/src/test/java/io/reactivex/internal/operators/observable/ObservableAmbTest.java b/src/test/java/io/reactivex/internal/operators/observable/ObservableAmbTest.java index ee4d58adf5..6e2c737f75 100644 --- a/src/test/java/io/reactivex/internal/operators/observable/ObservableAmbTest.java +++ b/src/test/java/io/reactivex/internal/operators/observable/ObservableAmbTest.java @@ -18,8 +18,8 @@ import java.io.IOException; import java.util.*; -import java.util.concurrent.TimeUnit; -import java.util.concurrent.atomic.AtomicLong; +import java.util.concurrent.*; +import java.util.concurrent.atomic.*; import org.junit.*; import org.mockito.InOrder; @@ -29,7 +29,8 @@ import io.reactivex.Observer; import io.reactivex.disposables.*; import io.reactivex.exceptions.TestException; -import io.reactivex.functions.Consumer; +import io.reactivex.functions.*; +import io.reactivex.internal.functions.Functions; import io.reactivex.observers.TestObserver; import io.reactivex.plugins.RxJavaPlugins; import io.reactivex.schedulers.*; @@ -383,4 +384,84 @@ public void ambArrayOrder() { Observable error = Observable.error(new RuntimeException()); Observable.ambArray(Observable.just(1), error).test().assertValue(1).assertComplete(); } + + @SuppressWarnings("unchecked") + @Test + public void noWinnerSuccessDispose() throws Exception { + for (int i = 0; i < TestHelper.RACE_LONG_LOOPS; i++) { + final AtomicBoolean interrupted = new AtomicBoolean(); + final CountDownLatch cdl = new CountDownLatch(1); + + Observable.ambArray( + Observable.just(1) + .subscribeOn(Schedulers.single()) + .observeOn(Schedulers.computation()), + Observable.never() + ) + .subscribe(new Consumer() { + @Override + public void accept(Object v) throws Exception { + interrupted.set(Thread.currentThread().isInterrupted()); + cdl.countDown(); + } + }); + + assertTrue(cdl.await(500, TimeUnit.SECONDS)); + assertFalse("Interrupted!", interrupted.get()); + } + } + + @SuppressWarnings("unchecked") + @Test + public void noWinnerErrorDispose() throws Exception { + final TestException ex = new TestException(); + for (int i = 0; i < TestHelper.RACE_LONG_LOOPS; i++) { + final AtomicBoolean interrupted = new AtomicBoolean(); + final CountDownLatch cdl = new CountDownLatch(1); + + Observable.ambArray( + Observable.error(ex) + .subscribeOn(Schedulers.single()) + .observeOn(Schedulers.computation()), + Observable.never() + ) + .subscribe(Functions.emptyConsumer(), new Consumer() { + @Override + public void accept(Throwable e) throws Exception { + interrupted.set(Thread.currentThread().isInterrupted()); + cdl.countDown(); + } + }); + + assertTrue(cdl.await(500, TimeUnit.SECONDS)); + assertFalse("Interrupted!", interrupted.get()); + } + } + + @SuppressWarnings("unchecked") + @Test + public void noWinnerCompleteDispose() throws Exception { + for (int i = 0; i < TestHelper.RACE_LONG_LOOPS; i++) { + final AtomicBoolean interrupted = new AtomicBoolean(); + final CountDownLatch cdl = new CountDownLatch(1); + + Observable.ambArray( + Observable.empty() + .subscribeOn(Schedulers.single()) + .observeOn(Schedulers.computation()), + Observable.never() + ) + .subscribe(Functions.emptyConsumer(), Functions.emptyConsumer(), new Action() { + @Override + public void run() throws Exception { + interrupted.set(Thread.currentThread().isInterrupted()); + cdl.countDown(); + } + }); + + assertTrue(cdl.await(500, TimeUnit.SECONDS)); + assertFalse("Interrupted!", interrupted.get()); + } + } + } diff --git a/src/test/java/io/reactivex/internal/operators/single/SingleAmbTest.java b/src/test/java/io/reactivex/internal/operators/single/SingleAmbTest.java index 1bc00dedd6..18f4f3be65 100644 --- a/src/test/java/io/reactivex/internal/operators/single/SingleAmbTest.java +++ b/src/test/java/io/reactivex/internal/operators/single/SingleAmbTest.java @@ -16,14 +16,18 @@ import static org.junit.Assert.*; import java.util.*; +import java.util.concurrent.*; +import java.util.concurrent.atomic.AtomicBoolean; import org.junit.Test; import io.reactivex.*; import io.reactivex.exceptions.TestException; +import io.reactivex.functions.BiConsumer; import io.reactivex.observers.TestObserver; import io.reactivex.plugins.RxJavaPlugins; import io.reactivex.processors.PublishProcessor; +import io.reactivex.schedulers.Schedulers; import io.reactivex.subjects.*; public class SingleAmbTest { @@ -280,4 +284,61 @@ public void ambArrayOrder() { Single error = Single.error(new RuntimeException()); Single.ambArray(Single.just(1), error).test().assertValue(1); } + + @SuppressWarnings("unchecked") + @Test + public void noWinnerSuccessDispose() throws Exception { + for (int i = 0; i < TestHelper.RACE_LONG_LOOPS; i++) { + final AtomicBoolean interrupted = new AtomicBoolean(); + final CountDownLatch cdl = new CountDownLatch(1); + + Single.ambArray( + Single.just(1) + .subscribeOn(Schedulers.single()) + .observeOn(Schedulers.computation()), + Single.never() + ) + .subscribe(new BiConsumer() { + @Override + public void accept(Object v, Throwable e) throws Exception { + assertNotNull(v); + assertNull(e); + interrupted.set(Thread.currentThread().isInterrupted()); + cdl.countDown(); + } + }); + + assertTrue(cdl.await(500, TimeUnit.SECONDS)); + assertFalse("Interrupted!", interrupted.get()); + } + } + + @SuppressWarnings("unchecked") + @Test + public void noWinnerErrorDispose() throws Exception { + final TestException ex = new TestException(); + for (int i = 0; i < TestHelper.RACE_LONG_LOOPS; i++) { + final AtomicBoolean interrupted = new AtomicBoolean(); + final CountDownLatch cdl = new CountDownLatch(1); + + Single.ambArray( + Single.error(ex) + .subscribeOn(Schedulers.single()) + .observeOn(Schedulers.computation()), + Single.never() + ) + .subscribe(new BiConsumer() { + @Override + public void accept(Object v, Throwable e) throws Exception { + assertNull(v); + assertNotNull(e); + interrupted.set(Thread.currentThread().isInterrupted()); + cdl.countDown(); + } + }); + + assertTrue(cdl.await(500, TimeUnit.SECONDS)); + assertFalse("Interrupted!", interrupted.get()); + } + } } From 15f5dfe32d27fde28e0ce1834655593ca8b34c8d Mon Sep 17 00:00:00 2001 From: akarnokd Date: Thu, 17 Jan 2019 13:09:14 +0100 Subject: [PATCH 2/2] Add null-source test to MaybeAmbTest --- .../operators/maybe/MaybeAmbTest.java | 40 +++++++++++++++++++ 1 file changed, 40 insertions(+) diff --git a/src/test/java/io/reactivex/internal/operators/maybe/MaybeAmbTest.java b/src/test/java/io/reactivex/internal/operators/maybe/MaybeAmbTest.java index 90f047765c..a701a279ab 100644 --- a/src/test/java/io/reactivex/internal/operators/maybe/MaybeAmbTest.java +++ b/src/test/java/io/reactivex/internal/operators/maybe/MaybeAmbTest.java @@ -30,6 +30,7 @@ import io.reactivex.plugins.RxJavaPlugins; import io.reactivex.processors.PublishProcessor; import io.reactivex.schedulers.Schedulers; +import io.reactivex.subjects.*; public class MaybeAmbTest { @@ -213,4 +214,43 @@ public void run() throws Exception { assertFalse("Interrupted!", interrupted.get()); } } + + @Test + public void nullSourceSuccessRace() { + for (int i = 0; i < TestHelper.RACE_DEFAULT_LOOPS; i++) { + List errors = TestHelper.trackPluginErrors(); + + try { + + final Subject ps = ReplaySubject.create(); + ps.onNext(1); + + @SuppressWarnings("unchecked") + final Maybe source = Maybe.ambArray(ps.singleElement(), + Maybe.never(), Maybe.never(), null); + + Runnable r1 = new Runnable() { + @Override + public void run() { + source.test(); + } + }; + + Runnable r2 = new Runnable() { + @Override + public void run() { + ps.onComplete(); + } + }; + + TestHelper.race(r1, r2); + + if (!errors.isEmpty()) { + TestHelper.assertError(errors, 0, NullPointerException.class); + } + } finally { + RxJavaPlugins.reset(); + } + } + } }