Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

PublishSubject ReSubscribe for publish().refCount() Behavior #426

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
232 changes: 93 additions & 139 deletions rxjava-core/src/main/java/rx/subjects/PublishSubject.java
Original file line number Diff line number Diff line change
@@ -1,12 +1,12 @@
/**
* Copyright 2013 Netflix, Inc.
*
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
*
* http://www.apache.org/licenses/LICENSE-2.0
*
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
Expand Down Expand Up @@ -51,7 +51,7 @@
* <p>
* <pre> {@code

PublishSubject<Object> subject = PublishSubject.create();
* ublishSubject<Object> subject = PublishSubject.create();
// observer1 will receive all onNext and onCompleted events
subject.subscribe(observer1);
subject.onNext("one");
Expand All @@ -62,21 +62,16 @@
subject.onCompleted();

} </pre>
*
*
* @param <T>
*/
public class PublishSubject<T> extends Subject<T, T> {
public static <T> PublishSubject<T> create() {
final ConcurrentHashMap<Subscription, Observer<? super T>> observers = new ConcurrentHashMap<Subscription, Observer<? super T>>();
final AtomicReference<Notification<? extends T>> terminalState = new AtomicReference<Notification<? extends T>>();

OnSubscribeFunc<T> onSubscribe = new OnSubscribeFunc<T>() {
@Override
public Subscription onSubscribe(Observer<? super T> observer) {
// shortcut check if terminal state exists already
Subscription s = checkTerminalState(observer);
if(s != null) return s;

final SafeObservableSubscription subscription = new SafeObservableSubscription();

subscription.wrap(new Subscription() {
Expand All @@ -87,67 +82,26 @@ public void unsubscribe() {
}
});

/**
* NOTE: We are synchronizing to avoid a race condition between terminalState being set and
* a new observer being added to observers.
*
* The synchronization only occurs on subscription and terminal states, it does not affect onNext calls
* so a high-volume hot-observable will not pay this cost for emitting data.
*
* Due to the restricted impact of blocking synchronization here I have not pursued more complicated
* approaches to try and stay completely non-blocking.
*/
synchronized (terminalState) {
// check terminal state again
s = checkTerminalState(observer);
if (s != null)
return s;

// on subscribe add it to the map of outbound observers to notify
observers.put(subscription, observer);

return subscription;
}
}
// on subscribe add it to the map of outbound observers to notify
observers.put(subscription, observer);

private Subscription checkTerminalState(Observer<? super T> observer) {
Notification<? extends T> n = terminalState.get();
if (n != null) {
// we are terminated to immediately emit and don't continue with subscription
if (n.isOnCompleted()) {
observer.onCompleted();
} else {
observer.onError(n.getThrowable());
}
return Subscriptions.empty();
} else {
return null;
}
return subscription;
}

};

return new PublishSubject<T>(onSubscribe, observers, terminalState);
return new PublishSubject<T>(onSubscribe, observers);
}

private final ConcurrentHashMap<Subscription, Observer<? super T>> observers;
private final AtomicReference<Notification<? extends T>> terminalState;

protected PublishSubject(OnSubscribeFunc<T> onSubscribe, ConcurrentHashMap<Subscription, Observer<? super T>> observers, AtomicReference<Notification<? extends T>> terminalState) {
protected PublishSubject(OnSubscribeFunc<T> onSubscribe, ConcurrentHashMap<Subscription, Observer<? super T>> observers) {
super(onSubscribe);
this.observers = observers;
this.terminalState = terminalState;
}

@Override
public void onCompleted() {
/**
* Synchronizing despite terminalState being an AtomicReference because of multi-step logic in subscription.
* Why use AtomicReference then? Convenient for passing around a mutable reference holder between the
* onSubscribe function and PublishSubject instance... and it's a "better volatile" for the shortcut codepath.
*/
synchronized (terminalState) {
terminalState.set(new Notification<T>());
}
for (Observer<? super T> observer : snapshotOfValues()) {
observer.onCompleted();
}
Expand All @@ -156,14 +110,6 @@ public void onCompleted() {

@Override
public void onError(Throwable e) {
/**
* Synchronizing despite terminalState being an AtomicReference because of multi-step logic in subscription.
* Why use AtomicReference then? Convenient for passing around a mutable reference holder between the
* onSubscribe function and PublishSubject instance... and it's a "better volatile" for the shortcut codepath.
*/
synchronized (terminalState) {
terminalState.set(new Notification<T>(e));
}
for (Observer<? super T> observer : snapshotOfValues()) {
observer.onError(e);
}
Expand All @@ -179,12 +125,12 @@ public void onNext(T args) {

/**
* Current snapshot of 'values()' so that concurrent modifications aren't included.
*
*
* This makes it behave deterministically in a single-threaded execution when nesting subscribes.
*
*
* In multi-threaded execution it will cause new subscriptions to wait until the following onNext instead
* of possibly being included in the current onNext iteration.
*
*
* @return List<Observer<T>>
*/
private Collection<Observer<? super T>> snapshotOfValues() {
Expand Down Expand Up @@ -378,75 +324,6 @@ private void assertObservedUntilTwo(Observer<String> aObserver)
verify(aObserver, Mockito.never()).onCompleted();
}

/**
* Test that subscribing after onError/onCompleted immediately terminates instead of causing it to hang.
*
* Nothing is mentioned in Rx Guidelines for what to do in this case so I'm doing what seems to make sense
* which is:
*
* - cache terminal state (onError/onCompleted)
* - any subsequent subscriptions will immediately receive the terminal state rather than start a new subscription
*
*/
@Test
public void testUnsubscribeAfterOnCompleted() {
PublishSubject<String> subject = PublishSubject.create();

@SuppressWarnings("unchecked")
Observer<String> anObserver = mock(Observer.class);
subject.subscribe(anObserver);

subject.onNext("one");
subject.onNext("two");
subject.onCompleted();

InOrder inOrder = inOrder(anObserver);
inOrder.verify(anObserver, times(1)).onNext("one");
inOrder.verify(anObserver, times(1)).onNext("two");
inOrder.verify(anObserver, times(1)).onCompleted();
inOrder.verify(anObserver, Mockito.never()).onError(any(Throwable.class));

@SuppressWarnings("unchecked")
Observer<String> anotherObserver = mock(Observer.class);
subject.subscribe(anotherObserver);

inOrder = inOrder(anotherObserver);
inOrder.verify(anotherObserver, Mockito.never()).onNext("one");
inOrder.verify(anotherObserver, Mockito.never()).onNext("two");
inOrder.verify(anotherObserver, times(1)).onCompleted();
inOrder.verify(anotherObserver, Mockito.never()).onError(any(Throwable.class));
}

@Test
public void testUnsubscribeAfterOnError() {
PublishSubject<String> subject = PublishSubject.create();
RuntimeException exception = new RuntimeException("failure");

@SuppressWarnings("unchecked")
Observer<String> anObserver = mock(Observer.class);
subject.subscribe(anObserver);

subject.onNext("one");
subject.onNext("two");
subject.onError(exception);

InOrder inOrder = inOrder(anObserver);
inOrder.verify(anObserver, times(1)).onNext("one");
inOrder.verify(anObserver, times(1)).onNext("two");
inOrder.verify(anObserver, times(1)).onError(exception);
inOrder.verify(anObserver, Mockito.never()).onCompleted();

@SuppressWarnings("unchecked")
Observer<String> anotherObserver = mock(Observer.class);
subject.subscribe(anotherObserver);

inOrder = inOrder(anotherObserver);
inOrder.verify(anotherObserver, Mockito.never()).onNext("one");
inOrder.verify(anotherObserver, Mockito.never()).onNext("two");
inOrder.verify(anotherObserver, times(1)).onError(exception);
inOrder.verify(anotherObserver, Mockito.never()).onCompleted();
}

@Test
public void testUnsubscribe()
{
Expand Down Expand Up @@ -519,8 +396,7 @@ public void call(String v) {

});


for(int i=0; i<10; i++) {
for (int i = 0; i < 10; i++) {
s.onNext(i);
}
s.onCompleted();
Expand All @@ -533,5 +409,83 @@ public void call(String v) {
assertEquals(45, list.size());
}

/**
* Should be able to unsubscribe all Observers, have it stop emitting, then subscribe new ones and it start emitting again.
*/
@Test
public void testReSubscribe() {
final PublishSubject<Integer> ps = PublishSubject.create();

Observer<Integer> o1 = mock(Observer.class);
Subscription s1 = ps.subscribe(o1);

// emit
ps.onNext(1);

// validate we got it
InOrder inOrder1 = inOrder(o1);
inOrder1.verify(o1, times(1)).onNext(1);
inOrder1.verifyNoMoreInteractions();

// unsubscribe
s1.unsubscribe();

// emit again but nothing will be there to receive it
ps.onNext(2);

Observer<Integer> o2 = mock(Observer.class);
Subscription s2 = ps.subscribe(o2);

// emit
ps.onNext(3);

// validate we got it
InOrder inOrder2 = inOrder(o2);
inOrder2.verify(o2, times(1)).onNext(3);
inOrder2.verifyNoMoreInteractions();

s2.unsubscribe();
}

/**
* Even if subject received an onError/onCompleted, new subscriptions should be able to restart it.
*/
@Test
public void testReSubscribeAfterTerminalState() {
final PublishSubject<Integer> ps = PublishSubject.create();

Observer<Integer> o1 = mock(Observer.class);
Subscription s1 = ps.subscribe(o1);

// emit
ps.onNext(1);

// validate we got it
InOrder inOrder1 = inOrder(o1);
inOrder1.verify(o1, times(1)).onNext(1);
inOrder1.verifyNoMoreInteractions();

// unsubscribe
s1.unsubscribe();

ps.onCompleted();

// emit again but nothing will be there to receive it
ps.onNext(2);

Observer<Integer> o2 = mock(Observer.class);
Subscription s2 = ps.subscribe(o2);

// emit
ps.onNext(3);

// validate we got it
InOrder inOrder2 = inOrder(o2);
inOrder2.verify(o2, times(1)).onNext(3);
inOrder2.verifyNoMoreInteractions();

s2.unsubscribe();
}

}
}
Loading