Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

2.x: BehaviorProcessor & Subject terminate-subscribe race #5281

Merged
merged 2 commits into from
Apr 12, 2017
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
19 changes: 9 additions & 10 deletions src/main/java/io/reactivex/processors/BehaviorProcessor.java
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,7 @@ public final class BehaviorProcessor<T> extends FlowableProcessor<T> {

final AtomicReference<Object> value;

boolean done;
final AtomicReference<Throwable> terminalEvent;

long index;

Expand Down Expand Up @@ -131,6 +131,7 @@ public static <T> BehaviorProcessor<T> createDefault(T defaultValue) {
this.readLock = lock.readLock();
this.writeLock = lock.writeLock();
this.subscribers = new AtomicReference<BehaviorSubscription<T>[]>(EMPTY);
this.terminalEvent = new AtomicReference<Throwable>();
}

/**
Expand All @@ -155,18 +156,18 @@ protected void subscribeActual(Subscriber<? super T> s) {
bs.emitFirst();
}
} else {
Object o = value.get();
if (NotificationLite.isComplete(o)) {
Throwable ex = terminalEvent.get();
if (ex == ExceptionHelper.TERMINATED) {
s.onComplete();
} else {
s.onError(NotificationLite.getError(o));
s.onError(ex);
}
}
}

@Override
public void onSubscribe(Subscription s) {
if (done) {
if (terminalEvent.get() != null) {
s.cancel();
return;
}
Expand All @@ -179,7 +180,7 @@ public void onNext(T t) {
onError(new NullPointerException("onNext called with null. Null values are generally not allowed in 2.x operators and sources."));
return;
}
if (done) {
if (terminalEvent.get() != null) {
return;
}
Object o = NotificationLite.next(t);
Expand All @@ -194,11 +195,10 @@ public void onError(Throwable t) {
if (t == null) {
t = new NullPointerException("onError called with null. Null values are generally not allowed in 2.x operators and sources.");
}
if (done) {
if (!terminalEvent.compareAndSet(null, t)) {
RxJavaPlugins.onError(t);
return;
}
done = true;
Object o = NotificationLite.error(t);
for (BehaviorSubscription<T> bs : terminate(o)) {
bs.emitNext(o, index);
Expand All @@ -207,10 +207,9 @@ public void onError(Throwable t) {

@Override
public void onComplete() {
if (done) {
if (!terminalEvent.compareAndSet(null, ExceptionHelper.TERMINATED)) {
return;
}
done = true;
Object o = NotificationLite.complete();
for (BehaviorSubscription<T> bs : terminate(o)) {
bs.emitNext(o, index); // relaxed read okay since this is the only mutator thread
Expand Down
19 changes: 9 additions & 10 deletions src/main/java/io/reactivex/subjects/BehaviorSubject.java
Original file line number Diff line number Diff line change
Expand Up @@ -86,7 +86,7 @@ public final class BehaviorSubject<T> extends Subject<T> {
final Lock readLock;
final Lock writeLock;

boolean done;
final AtomicReference<Throwable> terminalEvent;

long index;

Expand Down Expand Up @@ -129,6 +129,7 @@ public static <T> BehaviorSubject<T> createDefault(T defaultValue) {
this.writeLock = lock.writeLock();
this.subscribers = new AtomicReference<BehaviorDisposable<T>[]>(EMPTY);
this.value = new AtomicReference<Object>();
this.terminalEvent = new AtomicReference<Throwable>();
}

/**
Expand All @@ -153,18 +154,18 @@ protected void subscribeActual(Observer<? super T> observer) {
bs.emitFirst();
}
} else {
Object o = value.get();
if (NotificationLite.isComplete(o)) {
Throwable ex = terminalEvent.get();
if (ex == ExceptionHelper.TERMINATED) {
observer.onComplete();
} else {
observer.onError(NotificationLite.getError(o));
observer.onError(ex);
}
}
}

@Override
public void onSubscribe(Disposable s) {
if (done) {
if (terminalEvent.get() != null) {
s.dispose();
}
}
Expand All @@ -175,7 +176,7 @@ public void onNext(T t) {
onError(new NullPointerException("onNext called with null. Null values are generally not allowed in 2.x operators and sources."));
return;
}
if (done) {
if (terminalEvent.get() != null) {
return;
}
Object o = NotificationLite.next(t);
Expand All @@ -190,11 +191,10 @@ public void onError(Throwable t) {
if (t == null) {
t = new NullPointerException("onError called with null. Null values are generally not allowed in 2.x operators and sources.");
}
if (done) {
if (!terminalEvent.compareAndSet(null, t)) {
RxJavaPlugins.onError(t);
return;
}
done = true;
Object o = NotificationLite.error(t);
for (BehaviorDisposable<T> bs : terminate(o)) {
bs.emitNext(o, index);
Expand All @@ -203,10 +203,9 @@ public void onError(Throwable t) {

@Override
public void onComplete() {
if (done) {
if (!terminalEvent.compareAndSet(null, ExceptionHelper.TERMINATED)) {
return;
}
done = true;
Object o = NotificationLite.complete();
for (BehaviorDisposable<T> bs : terminate(o)) {
bs.emitNext(o, index); // relaxed read okay since this is the only mutator thread
Expand Down
56 changes: 56 additions & 0 deletions src/test/java/io/reactivex/processors/BehaviorProcessorTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -756,4 +756,60 @@ public void run() {
.awaitDone(5, TimeUnit.SECONDS)
.assertResult(1, 2, 3, 4, 5, 6, 7, 8, 9, 10);
}

@Test
public void completeSubscribeRace() throws Exception {
for (int i = 0; i < 1000; i++) {
final BehaviorProcessor<Object> p = BehaviorProcessor.create();

final TestSubscriber<Object> ts = new TestSubscriber<Object>();

Runnable r1 = new Runnable() {
@Override
public void run() {
p.subscribe(ts);
}
};

Runnable r2 = new Runnable() {
@Override
public void run() {
p.onComplete();
}
};

TestHelper.race(r1, r2);

ts.assertResult();
}
}

@Test
public void errorSubscribeRace() throws Exception {
for (int i = 0; i < 1000; i++) {
final BehaviorProcessor<Object> p = BehaviorProcessor.create();

final TestSubscriber<Object> ts = new TestSubscriber<Object>();

final TestException ex = new TestException();

Runnable r1 = new Runnable() {
@Override
public void run() {
p.subscribe(ts);
}
};

Runnable r2 = new Runnable() {
@Override
public void run() {
p.onError(ex);
}
};

TestHelper.race(r1, r2);

ts.assertFailure(TestException.class);
}
}
}
57 changes: 57 additions & 0 deletions src/test/java/io/reactivex/subjects/BehaviorSubjectTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -769,4 +769,61 @@ public void onComplete() {
}
});
}


@Test
public void completeSubscribeRace() throws Exception {
for (int i = 0; i < 1000; i++) {
final BehaviorSubject<Object> p = BehaviorSubject.create();

final TestObserver<Object> ts = new TestObserver<Object>();

Runnable r1 = new Runnable() {
@Override
public void run() {
p.subscribe(ts);
}
};

Runnable r2 = new Runnable() {
@Override
public void run() {
p.onComplete();
}
};

TestHelper.race(r1, r2);

ts.assertResult();
}
}

@Test
public void errorSubscribeRace() throws Exception {
for (int i = 0; i < 1000; i++) {
final BehaviorSubject<Object> p = BehaviorSubject.create();

final TestObserver<Object> ts = new TestObserver<Object>();

final TestException ex = new TestException();

Runnable r1 = new Runnable() {
@Override
public void run() {
p.subscribe(ts);
}
};

Runnable r2 = new Runnable() {
@Override
public void run() {
p.onError(ex);
}
};

TestHelper.race(r1, r2);

ts.assertFailure(TestException.class);
}
}
}