Skip to content

Commit

Permalink
Remove PublishSubject Terminal State Behavior
Browse files Browse the repository at this point in the history
The PublishSubject implementation was performing onError/onCompleted unsubscribe logic that was put in place long ago and I am now pretty sure it was wrong.

This was revealed while playing with `refCount` which intends on allowing a re-subscription to the source once new Observers arrive. PublishSubject was preventing that.

The one use case that I'm still wondering about though is if someone subscribes to a PublishSubject after it has emitted onCompleted and isn't "restarted". That Observer would wait forever if it is a "single-shot" PublishSubject use case. I'm not sure if that's just a bad use and fits into the "don't do that" scenario, or if it's a legit issue that has a solution.

Rightn now this code is "thread-safe" in the visibility sense, but it's not atomic and could have race conditions between adding/removing Observers and event notifications. I don't think that's an issue as if someone is concurrently adding/removing it's always a race, but am not 100% sure if there's a use case I'm missing. This also assumes (as it always did) that someone is not invoking onNext concurrently as that would break the Rx contract.
  • Loading branch information
benjchristensen committed Oct 9, 2013
1 parent 34ee83b commit 4d5d873
Show file tree
Hide file tree
Showing 2 changed files with 109 additions and 158 deletions.
232 changes: 93 additions & 139 deletions rxjava-core/src/main/java/rx/subjects/PublishSubject.java
Original file line number Diff line number Diff line change
@@ -1,12 +1,12 @@
/**
* Copyright 2013 Netflix, Inc.
*
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
*
* http://www.apache.org/licenses/LICENSE-2.0
*
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
Expand Down Expand Up @@ -51,7 +51,7 @@
* <p>
* <pre> {@code
PublishSubject<Object> subject = PublishSubject.create();
* ublishSubject<Object> subject = PublishSubject.create();
// observer1 will receive all onNext and onCompleted events
subject.subscribe(observer1);
subject.onNext("one");
Expand All @@ -62,21 +62,16 @@
subject.onCompleted();
} </pre>
*
*
* @param <T>
*/
public class PublishSubject<T> extends Subject<T, T> {
public static <T> PublishSubject<T> create() {
final ConcurrentHashMap<Subscription, Observer<? super T>> observers = new ConcurrentHashMap<Subscription, Observer<? super T>>();
final AtomicReference<Notification<? extends T>> terminalState = new AtomicReference<Notification<? extends T>>();

OnSubscribeFunc<T> onSubscribe = new OnSubscribeFunc<T>() {
@Override
public Subscription onSubscribe(Observer<? super T> observer) {
// shortcut check if terminal state exists already
Subscription s = checkTerminalState(observer);
if(s != null) return s;

final SafeObservableSubscription subscription = new SafeObservableSubscription();

subscription.wrap(new Subscription() {
Expand All @@ -87,67 +82,26 @@ public void unsubscribe() {
}
});

/**
* NOTE: We are synchronizing to avoid a race condition between terminalState being set and
* a new observer being added to observers.
*
* The synchronization only occurs on subscription and terminal states, it does not affect onNext calls
* so a high-volume hot-observable will not pay this cost for emitting data.
*
* Due to the restricted impact of blocking synchronization here I have not pursued more complicated
* approaches to try and stay completely non-blocking.
*/
synchronized (terminalState) {
// check terminal state again
s = checkTerminalState(observer);
if (s != null)
return s;

// on subscribe add it to the map of outbound observers to notify
observers.put(subscription, observer);

return subscription;
}
}
// on subscribe add it to the map of outbound observers to notify
observers.put(subscription, observer);

private Subscription checkTerminalState(Observer<? super T> observer) {
Notification<? extends T> n = terminalState.get();
if (n != null) {
// we are terminated to immediately emit and don't continue with subscription
if (n.isOnCompleted()) {
observer.onCompleted();
} else {
observer.onError(n.getThrowable());
}
return Subscriptions.empty();
} else {
return null;
}
return subscription;
}

};

return new PublishSubject<T>(onSubscribe, observers, terminalState);
return new PublishSubject<T>(onSubscribe, observers);
}

private final ConcurrentHashMap<Subscription, Observer<? super T>> observers;
private final AtomicReference<Notification<? extends T>> terminalState;

protected PublishSubject(OnSubscribeFunc<T> onSubscribe, ConcurrentHashMap<Subscription, Observer<? super T>> observers, AtomicReference<Notification<? extends T>> terminalState) {
protected PublishSubject(OnSubscribeFunc<T> onSubscribe, ConcurrentHashMap<Subscription, Observer<? super T>> observers) {
super(onSubscribe);
this.observers = observers;
this.terminalState = terminalState;
}

@Override
public void onCompleted() {
/**
* Synchronizing despite terminalState being an AtomicReference because of multi-step logic in subscription.
* Why use AtomicReference then? Convenient for passing around a mutable reference holder between the
* onSubscribe function and PublishSubject instance... and it's a "better volatile" for the shortcut codepath.
*/
synchronized (terminalState) {
terminalState.set(new Notification<T>());
}
for (Observer<? super T> observer : snapshotOfValues()) {
observer.onCompleted();
}
Expand All @@ -156,14 +110,6 @@ public void onCompleted() {

@Override
public void onError(Throwable e) {
/**
* Synchronizing despite terminalState being an AtomicReference because of multi-step logic in subscription.
* Why use AtomicReference then? Convenient for passing around a mutable reference holder between the
* onSubscribe function and PublishSubject instance... and it's a "better volatile" for the shortcut codepath.
*/
synchronized (terminalState) {
terminalState.set(new Notification<T>(e));
}
for (Observer<? super T> observer : snapshotOfValues()) {
observer.onError(e);
}
Expand All @@ -179,12 +125,12 @@ public void onNext(T args) {

/**
* Current snapshot of 'values()' so that concurrent modifications aren't included.
*
*
* This makes it behave deterministically in a single-threaded execution when nesting subscribes.
*
*
* In multi-threaded execution it will cause new subscriptions to wait until the following onNext instead
* of possibly being included in the current onNext iteration.
*
*
* @return List<Observer<T>>
*/
private Collection<Observer<? super T>> snapshotOfValues() {
Expand Down Expand Up @@ -378,75 +324,6 @@ private void assertObservedUntilTwo(Observer<String> aObserver)
verify(aObserver, Mockito.never()).onCompleted();
}

/**
* Test that subscribing after onError/onCompleted immediately terminates instead of causing it to hang.
*
* Nothing is mentioned in Rx Guidelines for what to do in this case so I'm doing what seems to make sense
* which is:
*
* - cache terminal state (onError/onCompleted)
* - any subsequent subscriptions will immediately receive the terminal state rather than start a new subscription
*
*/
@Test
public void testUnsubscribeAfterOnCompleted() {
PublishSubject<String> subject = PublishSubject.create();

@SuppressWarnings("unchecked")
Observer<String> anObserver = mock(Observer.class);
subject.subscribe(anObserver);

subject.onNext("one");
subject.onNext("two");
subject.onCompleted();

InOrder inOrder = inOrder(anObserver);
inOrder.verify(anObserver, times(1)).onNext("one");
inOrder.verify(anObserver, times(1)).onNext("two");
inOrder.verify(anObserver, times(1)).onCompleted();
inOrder.verify(anObserver, Mockito.never()).onError(any(Throwable.class));

@SuppressWarnings("unchecked")
Observer<String> anotherObserver = mock(Observer.class);
subject.subscribe(anotherObserver);

inOrder = inOrder(anotherObserver);
inOrder.verify(anotherObserver, Mockito.never()).onNext("one");
inOrder.verify(anotherObserver, Mockito.never()).onNext("two");
inOrder.verify(anotherObserver, times(1)).onCompleted();
inOrder.verify(anotherObserver, Mockito.never()).onError(any(Throwable.class));
}

@Test
public void testUnsubscribeAfterOnError() {
PublishSubject<String> subject = PublishSubject.create();
RuntimeException exception = new RuntimeException("failure");

@SuppressWarnings("unchecked")
Observer<String> anObserver = mock(Observer.class);
subject.subscribe(anObserver);

subject.onNext("one");
subject.onNext("two");
subject.onError(exception);

InOrder inOrder = inOrder(anObserver);
inOrder.verify(anObserver, times(1)).onNext("one");
inOrder.verify(anObserver, times(1)).onNext("two");
inOrder.verify(anObserver, times(1)).onError(exception);
inOrder.verify(anObserver, Mockito.never()).onCompleted();

@SuppressWarnings("unchecked")
Observer<String> anotherObserver = mock(Observer.class);
subject.subscribe(anotherObserver);

inOrder = inOrder(anotherObserver);
inOrder.verify(anotherObserver, Mockito.never()).onNext("one");
inOrder.verify(anotherObserver, Mockito.never()).onNext("two");
inOrder.verify(anotherObserver, times(1)).onError(exception);
inOrder.verify(anotherObserver, Mockito.never()).onCompleted();
}

@Test
public void testUnsubscribe()
{
Expand Down Expand Up @@ -519,8 +396,7 @@ public void call(String v) {

});


for(int i=0; i<10; i++) {
for (int i = 0; i < 10; i++) {
s.onNext(i);
}
s.onCompleted();
Expand All @@ -533,5 +409,83 @@ public void call(String v) {
assertEquals(45, list.size());
}

/**
* Should be able to unsubscribe all Observers, have it stop emitting, then subscribe new ones and it start emitting again.
*/
@Test
public void testReSubscribe() {
final PublishSubject<Integer> ps = PublishSubject.create();

Observer<Integer> o1 = mock(Observer.class);
Subscription s1 = ps.subscribe(o1);

// emit
ps.onNext(1);

// validate we got it
InOrder inOrder1 = inOrder(o1);
inOrder1.verify(o1, times(1)).onNext(1);
inOrder1.verifyNoMoreInteractions();

// unsubscribe
s1.unsubscribe();

// emit again but nothing will be there to receive it
ps.onNext(2);

Observer<Integer> o2 = mock(Observer.class);
Subscription s2 = ps.subscribe(o2);

// emit
ps.onNext(3);

// validate we got it
InOrder inOrder2 = inOrder(o2);
inOrder2.verify(o2, times(1)).onNext(3);
inOrder2.verifyNoMoreInteractions();

s2.unsubscribe();
}

/**
* Even if subject received an onError/onCompleted, new subscriptions should be able to restart it.
*/
@Test
public void testReSubscribeAfterTerminalState() {
final PublishSubject<Integer> ps = PublishSubject.create();

Observer<Integer> o1 = mock(Observer.class);
Subscription s1 = ps.subscribe(o1);

// emit
ps.onNext(1);

// validate we got it
InOrder inOrder1 = inOrder(o1);
inOrder1.verify(o1, times(1)).onNext(1);
inOrder1.verifyNoMoreInteractions();

// unsubscribe
s1.unsubscribe();

ps.onCompleted();

// emit again but nothing will be there to receive it
ps.onNext(2);

Observer<Integer> o2 = mock(Observer.class);
Subscription s2 = ps.subscribe(o2);

// emit
ps.onNext(3);

// validate we got it
InOrder inOrder2 = inOrder(o2);
inOrder2.verify(o2, times(1)).onNext(3);
inOrder2.verifyNoMoreInteractions();

s2.unsubscribe();
}

}
}
35 changes: 16 additions & 19 deletions rxjava-core/src/test/java/rx/RefCountTests.java
Original file line number Diff line number Diff line change
Expand Up @@ -118,25 +118,22 @@ public void call(Long t1) {
// advance further
s.advanceTimeBy(1000, TimeUnit.MILLISECONDS);

// the following is not working as it seems the PublishSubject does not allow re-subscribing. TODO fix that in subsequent pull request


// // subscribing a new one should start over because the source should have been unsubscribed
// // subscribe list1
// final List<Long> list3 = new ArrayList<Long>();
// Subscription s3 = interval.subscribe(new Action1<Long>() {
//
// @Override
// public void call(Long t1) {
// list3.add(t1);
// }
//
// });
// s.advanceTimeBy(200, TimeUnit.MILLISECONDS);
//
// assertEquals(2, list3.size());
// assertEquals(0L, list3.get(0).longValue());
// assertEquals(1L, list3.get(1).longValue());
// subscribing a new one should start over because the source should have been unsubscribed
// subscribe list3
final List<Long> list3 = new ArrayList<Long>();
Subscription s3 = interval.subscribe(new Action1<Long>() {

@Override
public void call(Long t1) {
list3.add(t1);
}

});
s.advanceTimeBy(200, TimeUnit.MILLISECONDS);

assertEquals(2, list3.size());
assertEquals(0L, list3.get(0).longValue());
assertEquals(1L, list3.get(1).longValue());

}
}

0 comments on commit 4d5d873

Please sign in to comment.