diff --git a/rxjava-core/src/main/java/rx/subjects/PublishSubject.java b/rxjava-core/src/main/java/rx/subjects/PublishSubject.java
index 39c19d9b7f..5588dd485b 100644
--- a/rxjava-core/src/main/java/rx/subjects/PublishSubject.java
+++ b/rxjava-core/src/main/java/rx/subjects/PublishSubject.java
@@ -1,12 +1,12 @@
/**
* Copyright 2013 Netflix, Inc.
- *
+ *
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
- *
+ *
* http://www.apache.org/licenses/LICENSE-2.0
- *
+ *
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
@@ -51,7 +51,7 @@
*
*
{@code
- PublishSubject
- *
+ *
* @param
*/
public class PublishSubject extends Subject {
public static PublishSubject create() {
final ConcurrentHashMap> observers = new ConcurrentHashMap>();
- final AtomicReference> terminalState = new AtomicReference>();
OnSubscribeFunc onSubscribe = new OnSubscribeFunc() {
@Override
public Subscription onSubscribe(Observer super T> observer) {
- // shortcut check if terminal state exists already
- Subscription s = checkTerminalState(observer);
- if(s != null) return s;
-
final SafeObservableSubscription subscription = new SafeObservableSubscription();
subscription.wrap(new Subscription() {
@@ -87,67 +82,26 @@ public void unsubscribe() {
}
});
- /**
- * NOTE: We are synchronizing to avoid a race condition between terminalState being set and
- * a new observer being added to observers.
- *
- * The synchronization only occurs on subscription and terminal states, it does not affect onNext calls
- * so a high-volume hot-observable will not pay this cost for emitting data.
- *
- * Due to the restricted impact of blocking synchronization here I have not pursued more complicated
- * approaches to try and stay completely non-blocking.
- */
- synchronized (terminalState) {
- // check terminal state again
- s = checkTerminalState(observer);
- if (s != null)
- return s;
-
- // on subscribe add it to the map of outbound observers to notify
- observers.put(subscription, observer);
-
- return subscription;
- }
- }
+ // on subscribe add it to the map of outbound observers to notify
+ observers.put(subscription, observer);
- private Subscription checkTerminalState(Observer super T> observer) {
- Notification extends T> n = terminalState.get();
- if (n != null) {
- // we are terminated to immediately emit and don't continue with subscription
- if (n.isOnCompleted()) {
- observer.onCompleted();
- } else {
- observer.onError(n.getThrowable());
- }
- return Subscriptions.empty();
- } else {
- return null;
- }
+ return subscription;
}
+
};
- return new PublishSubject(onSubscribe, observers, terminalState);
+ return new PublishSubject(onSubscribe, observers);
}
private final ConcurrentHashMap> observers;
- private final AtomicReference> terminalState;
- protected PublishSubject(OnSubscribeFunc onSubscribe, ConcurrentHashMap> observers, AtomicReference> terminalState) {
+ protected PublishSubject(OnSubscribeFunc onSubscribe, ConcurrentHashMap> observers) {
super(onSubscribe);
this.observers = observers;
- this.terminalState = terminalState;
}
@Override
public void onCompleted() {
- /**
- * Synchronizing despite terminalState being an AtomicReference because of multi-step logic in subscription.
- * Why use AtomicReference then? Convenient for passing around a mutable reference holder between the
- * onSubscribe function and PublishSubject instance... and it's a "better volatile" for the shortcut codepath.
- */
- synchronized (terminalState) {
- terminalState.set(new Notification());
- }
for (Observer super T> observer : snapshotOfValues()) {
observer.onCompleted();
}
@@ -156,14 +110,6 @@ public void onCompleted() {
@Override
public void onError(Throwable e) {
- /**
- * Synchronizing despite terminalState being an AtomicReference because of multi-step logic in subscription.
- * Why use AtomicReference then? Convenient for passing around a mutable reference holder between the
- * onSubscribe function and PublishSubject instance... and it's a "better volatile" for the shortcut codepath.
- */
- synchronized (terminalState) {
- terminalState.set(new Notification(e));
- }
for (Observer super T> observer : snapshotOfValues()) {
observer.onError(e);
}
@@ -179,12 +125,12 @@ public void onNext(T args) {
/**
* Current snapshot of 'values()' so that concurrent modifications aren't included.
- *
+ *
* This makes it behave deterministically in a single-threaded execution when nesting subscribes.
- *
+ *
* In multi-threaded execution it will cause new subscriptions to wait until the following onNext instead
* of possibly being included in the current onNext iteration.
- *
+ *
* @return List>
*/
private Collection> snapshotOfValues() {
@@ -378,75 +324,6 @@ private void assertObservedUntilTwo(Observer aObserver)
verify(aObserver, Mockito.never()).onCompleted();
}
- /**
- * Test that subscribing after onError/onCompleted immediately terminates instead of causing it to hang.
- *
- * Nothing is mentioned in Rx Guidelines for what to do in this case so I'm doing what seems to make sense
- * which is:
- *
- * - cache terminal state (onError/onCompleted)
- * - any subsequent subscriptions will immediately receive the terminal state rather than start a new subscription
- *
- */
- @Test
- public void testUnsubscribeAfterOnCompleted() {
- PublishSubject subject = PublishSubject.create();
-
- @SuppressWarnings("unchecked")
- Observer anObserver = mock(Observer.class);
- subject.subscribe(anObserver);
-
- subject.onNext("one");
- subject.onNext("two");
- subject.onCompleted();
-
- InOrder inOrder = inOrder(anObserver);
- inOrder.verify(anObserver, times(1)).onNext("one");
- inOrder.verify(anObserver, times(1)).onNext("two");
- inOrder.verify(anObserver, times(1)).onCompleted();
- inOrder.verify(anObserver, Mockito.never()).onError(any(Throwable.class));
-
- @SuppressWarnings("unchecked")
- Observer anotherObserver = mock(Observer.class);
- subject.subscribe(anotherObserver);
-
- inOrder = inOrder(anotherObserver);
- inOrder.verify(anotherObserver, Mockito.never()).onNext("one");
- inOrder.verify(anotherObserver, Mockito.never()).onNext("two");
- inOrder.verify(anotherObserver, times(1)).onCompleted();
- inOrder.verify(anotherObserver, Mockito.never()).onError(any(Throwable.class));
- }
-
- @Test
- public void testUnsubscribeAfterOnError() {
- PublishSubject subject = PublishSubject.create();
- RuntimeException exception = new RuntimeException("failure");
-
- @SuppressWarnings("unchecked")
- Observer anObserver = mock(Observer.class);
- subject.subscribe(anObserver);
-
- subject.onNext("one");
- subject.onNext("two");
- subject.onError(exception);
-
- InOrder inOrder = inOrder(anObserver);
- inOrder.verify(anObserver, times(1)).onNext("one");
- inOrder.verify(anObserver, times(1)).onNext("two");
- inOrder.verify(anObserver, times(1)).onError(exception);
- inOrder.verify(anObserver, Mockito.never()).onCompleted();
-
- @SuppressWarnings("unchecked")
- Observer anotherObserver = mock(Observer.class);
- subject.subscribe(anotherObserver);
-
- inOrder = inOrder(anotherObserver);
- inOrder.verify(anotherObserver, Mockito.never()).onNext("one");
- inOrder.verify(anotherObserver, Mockito.never()).onNext("two");
- inOrder.verify(anotherObserver, times(1)).onError(exception);
- inOrder.verify(anotherObserver, Mockito.never()).onCompleted();
- }
-
@Test
public void testUnsubscribe()
{
@@ -519,8 +396,7 @@ public void call(String v) {
});
-
- for(int i=0; i<10; i++) {
+ for (int i = 0; i < 10; i++) {
s.onNext(i);
}
s.onCompleted();
@@ -533,5 +409,83 @@ public void call(String v) {
assertEquals(45, list.size());
}
+ /**
+ * Should be able to unsubscribe all Observers, have it stop emitting, then subscribe new ones and it start emitting again.
+ */
+ @Test
+ public void testReSubscribe() {
+ final PublishSubject ps = PublishSubject.create();
+
+ Observer o1 = mock(Observer.class);
+ Subscription s1 = ps.subscribe(o1);
+
+ // emit
+ ps.onNext(1);
+
+ // validate we got it
+ InOrder inOrder1 = inOrder(o1);
+ inOrder1.verify(o1, times(1)).onNext(1);
+ inOrder1.verifyNoMoreInteractions();
+
+ // unsubscribe
+ s1.unsubscribe();
+
+ // emit again but nothing will be there to receive it
+ ps.onNext(2);
+
+ Observer o2 = mock(Observer.class);
+ Subscription s2 = ps.subscribe(o2);
+
+ // emit
+ ps.onNext(3);
+
+ // validate we got it
+ InOrder inOrder2 = inOrder(o2);
+ inOrder2.verify(o2, times(1)).onNext(3);
+ inOrder2.verifyNoMoreInteractions();
+
+ s2.unsubscribe();
+ }
+
+ /**
+ * Even if subject received an onError/onCompleted, new subscriptions should be able to restart it.
+ */
+ @Test
+ public void testReSubscribeAfterTerminalState() {
+ final PublishSubject ps = PublishSubject.create();
+
+ Observer o1 = mock(Observer.class);
+ Subscription s1 = ps.subscribe(o1);
+
+ // emit
+ ps.onNext(1);
+
+ // validate we got it
+ InOrder inOrder1 = inOrder(o1);
+ inOrder1.verify(o1, times(1)).onNext(1);
+ inOrder1.verifyNoMoreInteractions();
+
+ // unsubscribe
+ s1.unsubscribe();
+
+ ps.onCompleted();
+
+ // emit again but nothing will be there to receive it
+ ps.onNext(2);
+
+ Observer o2 = mock(Observer.class);
+ Subscription s2 = ps.subscribe(o2);
+
+ // emit
+ ps.onNext(3);
+
+ // validate we got it
+ InOrder inOrder2 = inOrder(o2);
+ inOrder2.verify(o2, times(1)).onNext(3);
+ inOrder2.verifyNoMoreInteractions();
+
+ s2.unsubscribe();
+ }
+
}
}
diff --git a/rxjava-core/src/test/java/rx/RefCountTest.java b/rxjava-core/src/test/java/rx/RefCountTest.java
deleted file mode 100644
index 3cc09b4076..0000000000
--- a/rxjava-core/src/test/java/rx/RefCountTest.java
+++ /dev/null
@@ -1,103 +0,0 @@
-package rx;
-
-import static org.junit.Assert.*;
-
-import java.util.ArrayList;
-import java.util.List;
-import java.util.concurrent.TimeUnit;
-
-import org.junit.Test;
-
-import rx.concurrency.TestScheduler;
-import rx.util.functions.Action1;
-
-public class RefCountTest {
-
- @Test
- public void testRefCount() {
- TestScheduler s = new TestScheduler();
- Observable interval = Observable.interval(100, TimeUnit.MILLISECONDS, s).publish().refCount();
-
- // subscribe list1
- final List list1 = new ArrayList();
- Subscription s1 = interval.subscribe(new Action1() {
-
- @Override
- public void call(Long t1) {
- list1.add(t1);
- }
-
- });
- s.advanceTimeBy(200, TimeUnit.MILLISECONDS);
-
- assertEquals(2, list1.size());
- assertEquals(0L, list1.get(0).longValue());
- assertEquals(1L, list1.get(1).longValue());
-
- // subscribe list2
- final List list2 = new ArrayList();
- Subscription s2 = interval.subscribe(new Action1() {
-
- @Override
- public void call(Long t1) {
- list2.add(t1);
- }
-
- });
- s.advanceTimeBy(300, TimeUnit.MILLISECONDS);
-
- // list 1 should have 5 items
- assertEquals(5, list1.size());
- assertEquals(2L, list1.get(2).longValue());
- assertEquals(3L, list1.get(3).longValue());
- assertEquals(4L, list1.get(4).longValue());
-
- // list 2 should only have 3 items
- assertEquals(3, list2.size());
- assertEquals(2L, list2.get(0).longValue());
- assertEquals(3L, list2.get(1).longValue());
- assertEquals(4L, list2.get(2).longValue());
-
- // unsubscribe list1
- s1.unsubscribe();
-
- // advance further
- s.advanceTimeBy(300, TimeUnit.MILLISECONDS);
-
- // list 1 should still have 5 items
- assertEquals(5, list1.size());
-
- // list 2 should have 6 items
- assertEquals(6, list2.size());
- assertEquals(5L, list2.get(3).longValue());
- assertEquals(6L, list2.get(4).longValue());
- assertEquals(7L, list2.get(5).longValue());
-
- // unsubscribe list2
- s2.unsubscribe();
-
- // advance further
- s.advanceTimeBy(1000, TimeUnit.MILLISECONDS);
-
- // the following is not working as it seems the PublishSubject does not allow re-subscribing. TODO fix that in subsequent pull request
-
-
-// // subscribing a new one should start over because the source should have been unsubscribed
-// // subscribe list1
-// final List list3 = new ArrayList();
-// Subscription s3 = interval.subscribe(new Action1() {
-//
-// @Override
-// public void call(Long t1) {
-// list3.add(t1);
-// }
-//
-// });
-// s.advanceTimeBy(200, TimeUnit.MILLISECONDS);
-//
-// assertEquals(2, list3.size());
-// assertEquals(0L, list3.get(0).longValue());
-// assertEquals(1L, list3.get(1).longValue());
-
- }
-}
diff --git a/rxjava-core/src/test/java/rx/RefCountTests.java b/rxjava-core/src/test/java/rx/RefCountTests.java
index bf035e0aa4..ff99e60e1c 100644
--- a/rxjava-core/src/test/java/rx/RefCountTests.java
+++ b/rxjava-core/src/test/java/rx/RefCountTests.java
@@ -3,9 +3,15 @@
import org.junit.Before;
import org.junit.Test;
import org.mockito.MockitoAnnotations;
+
+import rx.concurrency.TestScheduler;
import rx.subscriptions.Subscriptions;
import rx.util.functions.Action0;
+import rx.util.functions.Action1;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import static org.junit.Assert.assertEquals;
@@ -45,4 +51,89 @@ public void call() {
second.unsubscribe();
assertEquals(1, unsubscriptionCount.get());
}
+
+ @Test
+ public void testRefCount() {
+ TestScheduler s = new TestScheduler();
+ Observable interval = Observable.interval(100, TimeUnit.MILLISECONDS, s).publish().refCount();
+
+ // subscribe list1
+ final List list1 = new ArrayList();
+ Subscription s1 = interval.subscribe(new Action1() {
+
+ @Override
+ public void call(Long t1) {
+ list1.add(t1);
+ }
+
+ });
+ s.advanceTimeBy(200, TimeUnit.MILLISECONDS);
+
+ assertEquals(2, list1.size());
+ assertEquals(0L, list1.get(0).longValue());
+ assertEquals(1L, list1.get(1).longValue());
+
+ // subscribe list2
+ final List list2 = new ArrayList();
+ Subscription s2 = interval.subscribe(new Action1() {
+
+ @Override
+ public void call(Long t1) {
+ list2.add(t1);
+ }
+
+ });
+ s.advanceTimeBy(300, TimeUnit.MILLISECONDS);
+
+ // list 1 should have 5 items
+ assertEquals(5, list1.size());
+ assertEquals(2L, list1.get(2).longValue());
+ assertEquals(3L, list1.get(3).longValue());
+ assertEquals(4L, list1.get(4).longValue());
+
+ // list 2 should only have 3 items
+ assertEquals(3, list2.size());
+ assertEquals(2L, list2.get(0).longValue());
+ assertEquals(3L, list2.get(1).longValue());
+ assertEquals(4L, list2.get(2).longValue());
+
+ // unsubscribe list1
+ s1.unsubscribe();
+
+ // advance further
+ s.advanceTimeBy(300, TimeUnit.MILLISECONDS);
+
+ // list 1 should still have 5 items
+ assertEquals(5, list1.size());
+
+ // list 2 should have 6 items
+ assertEquals(6, list2.size());
+ assertEquals(5L, list2.get(3).longValue());
+ assertEquals(6L, list2.get(4).longValue());
+ assertEquals(7L, list2.get(5).longValue());
+
+ // unsubscribe list2
+ s2.unsubscribe();
+
+ // advance further
+ s.advanceTimeBy(1000, TimeUnit.MILLISECONDS);
+
+ // subscribing a new one should start over because the source should have been unsubscribed
+ // subscribe list3
+ final List list3 = new ArrayList();
+ Subscription s3 = interval.subscribe(new Action1() {
+
+ @Override
+ public void call(Long t1) {
+ list3.add(t1);
+ }
+
+ });
+ s.advanceTimeBy(200, TimeUnit.MILLISECONDS);
+
+ assertEquals(2, list3.size());
+ assertEquals(0L, list3.get(0).longValue());
+ assertEquals(1L, list3.get(1).longValue());
+
+ }
}