diff --git a/src/main/java/io/reactivex/processors/BehaviorProcessor.java b/src/main/java/io/reactivex/processors/BehaviorProcessor.java index 4a4a69b007..d033d636c0 100644 --- a/src/main/java/io/reactivex/processors/BehaviorProcessor.java +++ b/src/main/java/io/reactivex/processors/BehaviorProcessor.java @@ -87,7 +87,7 @@ public final class BehaviorProcessor extends FlowableProcessor { final AtomicReference value; - boolean done; + final AtomicReference terminalEvent; long index; @@ -131,6 +131,7 @@ public static BehaviorProcessor createDefault(T defaultValue) { this.readLock = lock.readLock(); this.writeLock = lock.writeLock(); this.subscribers = new AtomicReference[]>(EMPTY); + this.terminalEvent = new AtomicReference(); } /** @@ -155,18 +156,18 @@ protected void subscribeActual(Subscriber s) { bs.emitFirst(); } } else { - Object o = value.get(); - if (NotificationLite.isComplete(o)) { + Throwable ex = terminalEvent.get(); + if (ex == ExceptionHelper.TERMINATED) { s.onComplete(); } else { - s.onError(NotificationLite.getError(o)); + s.onError(ex); } } } @Override public void onSubscribe(Subscription s) { - if (done) { + if (terminalEvent.get() != null) { s.cancel(); return; } @@ -179,7 +180,7 @@ public void onNext(T t) { onError(new NullPointerException("onNext called with null. Null values are generally not allowed in 2.x operators and sources.")); return; } - if (done) { + if (terminalEvent.get() != null) { return; } Object o = NotificationLite.next(t); @@ -194,11 +195,10 @@ public void onError(Throwable t) { if (t == null) { t = new NullPointerException("onError called with null. Null values are generally not allowed in 2.x operators and sources."); } - if (done) { + if (!terminalEvent.compareAndSet(null, t)) { RxJavaPlugins.onError(t); return; } - done = true; Object o = NotificationLite.error(t); for (BehaviorSubscription bs : terminate(o)) { bs.emitNext(o, index); @@ -207,10 +207,9 @@ public void onError(Throwable t) { @Override public void onComplete() { - if (done) { + if (!terminalEvent.compareAndSet(null, ExceptionHelper.TERMINATED)) { return; } - done = true; Object o = NotificationLite.complete(); for (BehaviorSubscription bs : terminate(o)) { bs.emitNext(o, index); // relaxed read okay since this is the only mutator thread diff --git a/src/main/java/io/reactivex/subjects/BehaviorSubject.java b/src/main/java/io/reactivex/subjects/BehaviorSubject.java index b0376ecec1..196ad8301f 100644 --- a/src/main/java/io/reactivex/subjects/BehaviorSubject.java +++ b/src/main/java/io/reactivex/subjects/BehaviorSubject.java @@ -86,7 +86,7 @@ public final class BehaviorSubject extends Subject { final Lock readLock; final Lock writeLock; - boolean done; + final AtomicReference terminalEvent; long index; @@ -129,6 +129,7 @@ public static BehaviorSubject createDefault(T defaultValue) { this.writeLock = lock.writeLock(); this.subscribers = new AtomicReference[]>(EMPTY); this.value = new AtomicReference(); + this.terminalEvent = new AtomicReference(); } /** @@ -153,18 +154,18 @@ protected void subscribeActual(Observer observer) { bs.emitFirst(); } } else { - Object o = value.get(); - if (NotificationLite.isComplete(o)) { + Throwable ex = terminalEvent.get(); + if (ex == ExceptionHelper.TERMINATED) { observer.onComplete(); } else { - observer.onError(NotificationLite.getError(o)); + observer.onError(ex); } } } @Override public void onSubscribe(Disposable s) { - if (done) { + if (terminalEvent.get() != null) { s.dispose(); } } @@ -175,7 +176,7 @@ public void onNext(T t) { onError(new NullPointerException("onNext called with null. Null values are generally not allowed in 2.x operators and sources.")); return; } - if (done) { + if (terminalEvent.get() != null) { return; } Object o = NotificationLite.next(t); @@ -190,11 +191,10 @@ public void onError(Throwable t) { if (t == null) { t = new NullPointerException("onError called with null. Null values are generally not allowed in 2.x operators and sources."); } - if (done) { + if (!terminalEvent.compareAndSet(null, t)) { RxJavaPlugins.onError(t); return; } - done = true; Object o = NotificationLite.error(t); for (BehaviorDisposable bs : terminate(o)) { bs.emitNext(o, index); @@ -203,10 +203,9 @@ public void onError(Throwable t) { @Override public void onComplete() { - if (done) { + if (!terminalEvent.compareAndSet(null, ExceptionHelper.TERMINATED)) { return; } - done = true; Object o = NotificationLite.complete(); for (BehaviorDisposable bs : terminate(o)) { bs.emitNext(o, index); // relaxed read okay since this is the only mutator thread diff --git a/src/test/java/io/reactivex/processors/BehaviorProcessorTest.java b/src/test/java/io/reactivex/processors/BehaviorProcessorTest.java index 48becde4ee..8a1766f724 100644 --- a/src/test/java/io/reactivex/processors/BehaviorProcessorTest.java +++ b/src/test/java/io/reactivex/processors/BehaviorProcessorTest.java @@ -756,4 +756,60 @@ public void run() { .awaitDone(5, TimeUnit.SECONDS) .assertResult(1, 2, 3, 4, 5, 6, 7, 8, 9, 10); } + + @Test + public void completeSubscribeRace() throws Exception { + for (int i = 0; i < 1000; i++) { + final BehaviorProcessor p = BehaviorProcessor.create(); + + final TestSubscriber ts = new TestSubscriber(); + + Runnable r1 = new Runnable() { + @Override + public void run() { + p.subscribe(ts); + } + }; + + Runnable r2 = new Runnable() { + @Override + public void run() { + p.onComplete(); + } + }; + + TestHelper.race(r1, r2); + + ts.assertResult(); + } + } + + @Test + public void errorSubscribeRace() throws Exception { + for (int i = 0; i < 1000; i++) { + final BehaviorProcessor p = BehaviorProcessor.create(); + + final TestSubscriber ts = new TestSubscriber(); + + final TestException ex = new TestException(); + + Runnable r1 = new Runnable() { + @Override + public void run() { + p.subscribe(ts); + } + }; + + Runnable r2 = new Runnable() { + @Override + public void run() { + p.onError(ex); + } + }; + + TestHelper.race(r1, r2); + + ts.assertFailure(TestException.class); + } + } } diff --git a/src/test/java/io/reactivex/subjects/BehaviorSubjectTest.java b/src/test/java/io/reactivex/subjects/BehaviorSubjectTest.java index c3965c6e3f..62941ef5e0 100644 --- a/src/test/java/io/reactivex/subjects/BehaviorSubjectTest.java +++ b/src/test/java/io/reactivex/subjects/BehaviorSubjectTest.java @@ -769,4 +769,61 @@ public void onComplete() { } }); } + + + @Test + public void completeSubscribeRace() throws Exception { + for (int i = 0; i < 1000; i++) { + final BehaviorSubject p = BehaviorSubject.create(); + + final TestObserver ts = new TestObserver(); + + Runnable r1 = new Runnable() { + @Override + public void run() { + p.subscribe(ts); + } + }; + + Runnable r2 = new Runnable() { + @Override + public void run() { + p.onComplete(); + } + }; + + TestHelper.race(r1, r2); + + ts.assertResult(); + } + } + + @Test + public void errorSubscribeRace() throws Exception { + for (int i = 0; i < 1000; i++) { + final BehaviorSubject p = BehaviorSubject.create(); + + final TestObserver ts = new TestObserver(); + + final TestException ex = new TestException(); + + Runnable r1 = new Runnable() { + @Override + public void run() { + p.subscribe(ts); + } + }; + + Runnable r2 = new Runnable() { + @Override + public void run() { + p.onError(ex); + } + }; + + TestHelper.race(r1, r2); + + ts.assertFailure(TestException.class); + } + } }