From 34ee83bdabff1bb833dceb0b4ac490a0ef6281cd Mon Sep 17 00:00:00 2001 From: Ben Christensen Date: Tue, 8 Oct 2013 23:31:11 -0700 Subject: [PATCH 1/2] Combine RefCountTest and RefCountTests --- .../src/test/java/rx/RefCountTest.java | 103 ------------------ .../src/test/java/rx/RefCountTests.java | 94 ++++++++++++++++ 2 files changed, 94 insertions(+), 103 deletions(-) delete mode 100644 rxjava-core/src/test/java/rx/RefCountTest.java diff --git a/rxjava-core/src/test/java/rx/RefCountTest.java b/rxjava-core/src/test/java/rx/RefCountTest.java deleted file mode 100644 index 3cc09b4076a..00000000000 --- a/rxjava-core/src/test/java/rx/RefCountTest.java +++ /dev/null @@ -1,103 +0,0 @@ -package rx; - -import static org.junit.Assert.*; - -import java.util.ArrayList; -import java.util.List; -import java.util.concurrent.TimeUnit; - -import org.junit.Test; - -import rx.concurrency.TestScheduler; -import rx.util.functions.Action1; - -public class RefCountTest { - - @Test - public void testRefCount() { - TestScheduler s = new TestScheduler(); - Observable interval = Observable.interval(100, TimeUnit.MILLISECONDS, s).publish().refCount(); - - // subscribe list1 - final List list1 = new ArrayList(); - Subscription s1 = interval.subscribe(new Action1() { - - @Override - public void call(Long t1) { - list1.add(t1); - } - - }); - s.advanceTimeBy(200, TimeUnit.MILLISECONDS); - - assertEquals(2, list1.size()); - assertEquals(0L, list1.get(0).longValue()); - assertEquals(1L, list1.get(1).longValue()); - - // subscribe list2 - final List list2 = new ArrayList(); - Subscription s2 = interval.subscribe(new Action1() { - - @Override - public void call(Long t1) { - list2.add(t1); - } - - }); - s.advanceTimeBy(300, TimeUnit.MILLISECONDS); - - // list 1 should have 5 items - assertEquals(5, list1.size()); - assertEquals(2L, list1.get(2).longValue()); - assertEquals(3L, list1.get(3).longValue()); - assertEquals(4L, list1.get(4).longValue()); - - // list 2 should only have 3 items - assertEquals(3, list2.size()); - assertEquals(2L, list2.get(0).longValue()); - assertEquals(3L, list2.get(1).longValue()); - assertEquals(4L, list2.get(2).longValue()); - - // unsubscribe list1 - s1.unsubscribe(); - - // advance further - s.advanceTimeBy(300, TimeUnit.MILLISECONDS); - - // list 1 should still have 5 items - assertEquals(5, list1.size()); - - // list 2 should have 6 items - assertEquals(6, list2.size()); - assertEquals(5L, list2.get(3).longValue()); - assertEquals(6L, list2.get(4).longValue()); - assertEquals(7L, list2.get(5).longValue()); - - // unsubscribe list2 - s2.unsubscribe(); - - // advance further - s.advanceTimeBy(1000, TimeUnit.MILLISECONDS); - - // the following is not working as it seems the PublishSubject does not allow re-subscribing. TODO fix that in subsequent pull request - - -// // subscribing a new one should start over because the source should have been unsubscribed -// // subscribe list1 -// final List list3 = new ArrayList(); -// Subscription s3 = interval.subscribe(new Action1() { -// -// @Override -// public void call(Long t1) { -// list3.add(t1); -// } -// -// }); -// s.advanceTimeBy(200, TimeUnit.MILLISECONDS); -// -// assertEquals(2, list3.size()); -// assertEquals(0L, list3.get(0).longValue()); -// assertEquals(1L, list3.get(1).longValue()); - - } -} diff --git a/rxjava-core/src/test/java/rx/RefCountTests.java b/rxjava-core/src/test/java/rx/RefCountTests.java index bf035e0aa41..bf529b4076d 100644 --- a/rxjava-core/src/test/java/rx/RefCountTests.java +++ b/rxjava-core/src/test/java/rx/RefCountTests.java @@ -3,9 +3,15 @@ import org.junit.Before; import org.junit.Test; import org.mockito.MockitoAnnotations; + +import rx.concurrency.TestScheduler; import rx.subscriptions.Subscriptions; import rx.util.functions.Action0; +import rx.util.functions.Action1; +import java.util.ArrayList; +import java.util.List; +import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; import static org.junit.Assert.assertEquals; @@ -45,4 +51,92 @@ public void call() { second.unsubscribe(); assertEquals(1, unsubscriptionCount.get()); } + + @Test + public void testRefCount() { + TestScheduler s = new TestScheduler(); + Observable interval = Observable.interval(100, TimeUnit.MILLISECONDS, s).publish().refCount(); + + // subscribe list1 + final List list1 = new ArrayList(); + Subscription s1 = interval.subscribe(new Action1() { + + @Override + public void call(Long t1) { + list1.add(t1); + } + + }); + s.advanceTimeBy(200, TimeUnit.MILLISECONDS); + + assertEquals(2, list1.size()); + assertEquals(0L, list1.get(0).longValue()); + assertEquals(1L, list1.get(1).longValue()); + + // subscribe list2 + final List list2 = new ArrayList(); + Subscription s2 = interval.subscribe(new Action1() { + + @Override + public void call(Long t1) { + list2.add(t1); + } + + }); + s.advanceTimeBy(300, TimeUnit.MILLISECONDS); + + // list 1 should have 5 items + assertEquals(5, list1.size()); + assertEquals(2L, list1.get(2).longValue()); + assertEquals(3L, list1.get(3).longValue()); + assertEquals(4L, list1.get(4).longValue()); + + // list 2 should only have 3 items + assertEquals(3, list2.size()); + assertEquals(2L, list2.get(0).longValue()); + assertEquals(3L, list2.get(1).longValue()); + assertEquals(4L, list2.get(2).longValue()); + + // unsubscribe list1 + s1.unsubscribe(); + + // advance further + s.advanceTimeBy(300, TimeUnit.MILLISECONDS); + + // list 1 should still have 5 items + assertEquals(5, list1.size()); + + // list 2 should have 6 items + assertEquals(6, list2.size()); + assertEquals(5L, list2.get(3).longValue()); + assertEquals(6L, list2.get(4).longValue()); + assertEquals(7L, list2.get(5).longValue()); + + // unsubscribe list2 + s2.unsubscribe(); + + // advance further + s.advanceTimeBy(1000, TimeUnit.MILLISECONDS); + + // the following is not working as it seems the PublishSubject does not allow re-subscribing. TODO fix that in subsequent pull request + + +// // subscribing a new one should start over because the source should have been unsubscribed +// // subscribe list1 +// final List list3 = new ArrayList(); +// Subscription s3 = interval.subscribe(new Action1() { +// +// @Override +// public void call(Long t1) { +// list3.add(t1); +// } +// +// }); +// s.advanceTimeBy(200, TimeUnit.MILLISECONDS); +// +// assertEquals(2, list3.size()); +// assertEquals(0L, list3.get(0).longValue()); +// assertEquals(1L, list3.get(1).longValue()); + + } } From 4d5d8735d5531dbddec556516271940892d280e8 Mon Sep 17 00:00:00 2001 From: Ben Christensen Date: Tue, 8 Oct 2013 23:55:18 -0700 Subject: [PATCH 2/2] Remove PublishSubject Terminal State Behavior The PublishSubject implementation was performing onError/onCompleted unsubscribe logic that was put in place long ago and I am now pretty sure it was wrong. This was revealed while playing with `refCount` which intends on allowing a re-subscription to the source once new Observers arrive. PublishSubject was preventing that. The one use case that I'm still wondering about though is if someone subscribes to a PublishSubject after it has emitted onCompleted and isn't "restarted". That Observer would wait forever if it is a "single-shot" PublishSubject use case. I'm not sure if that's just a bad use and fits into the "don't do that" scenario, or if it's a legit issue that has a solution. Rightn now this code is "thread-safe" in the visibility sense, but it's not atomic and could have race conditions between adding/removing Observers and event notifications. I don't think that's an issue as if someone is concurrently adding/removing it's always a race, but am not 100% sure if there's a use case I'm missing. This also assumes (as it always did) that someone is not invoking onNext concurrently as that would break the Rx contract. --- .../main/java/rx/subjects/PublishSubject.java | 232 +++++++----------- .../src/test/java/rx/RefCountTests.java | 35 ++- 2 files changed, 109 insertions(+), 158 deletions(-) diff --git a/rxjava-core/src/main/java/rx/subjects/PublishSubject.java b/rxjava-core/src/main/java/rx/subjects/PublishSubject.java index 39c19d9b7f1..5588dd485bc 100644 --- a/rxjava-core/src/main/java/rx/subjects/PublishSubject.java +++ b/rxjava-core/src/main/java/rx/subjects/PublishSubject.java @@ -1,12 +1,12 @@ /** * Copyright 2013 Netflix, Inc. - * + * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at - * + * * http://www.apache.org/licenses/LICENSE-2.0 - * + * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. @@ -51,7 +51,7 @@ *

*

 {@code
 
-  PublishSubject subject = PublishSubject.create();
+ * ublishSubject subject = PublishSubject.create();
   // observer1 will receive all onNext and onCompleted events
   subject.subscribe(observer1);
   subject.onNext("one");
@@ -62,21 +62,16 @@
   subject.onCompleted();
 
   } 
- *
+ * 
  * @param 
  */
 public class PublishSubject extends Subject {
     public static  PublishSubject create() {
         final ConcurrentHashMap> observers = new ConcurrentHashMap>();
-        final AtomicReference> terminalState = new AtomicReference>();
 
         OnSubscribeFunc onSubscribe = new OnSubscribeFunc() {
             @Override
             public Subscription onSubscribe(Observer observer) {
-                // shortcut check if terminal state exists already
-                Subscription s = checkTerminalState(observer);
-                if(s != null) return s;
-
                 final SafeObservableSubscription subscription = new SafeObservableSubscription();
 
                 subscription.wrap(new Subscription() {
@@ -87,67 +82,26 @@ public void unsubscribe() {
                     }
                 });
 
-                /**
-                 * NOTE: We are synchronizing to avoid a race condition between terminalState being set and
-                 * a new observer being added to observers.
-                 *
-                 * The synchronization only occurs on subscription and terminal states, it does not affect onNext calls
-                 * so a high-volume hot-observable will not pay this cost for emitting data.
-                 *
-                 * Due to the restricted impact of blocking synchronization here I have not pursued more complicated
-                 * approaches to try and stay completely non-blocking.
-                 */
-                synchronized (terminalState) {
-                    // check terminal state again
-                    s = checkTerminalState(observer);
-                    if (s != null)
-                        return s;
-
-                    // on subscribe add it to the map of outbound observers to notify
-                    observers.put(subscription, observer);
-
-                    return subscription;
-                }
-            }
+                // on subscribe add it to the map of outbound observers to notify
+                observers.put(subscription, observer);
 
-            private Subscription checkTerminalState(Observer observer) {
-                Notification n = terminalState.get();
-                if (n != null) {
-                    // we are terminated to immediately emit and don't continue with subscription
-                    if (n.isOnCompleted()) {
-                        observer.onCompleted();
-                    } else {
-                        observer.onError(n.getThrowable());
-                    }
-                    return Subscriptions.empty();
-                } else {
-                    return null;
-                }
+                return subscription;
             }
+
         };
 
-        return new PublishSubject(onSubscribe, observers, terminalState);
+        return new PublishSubject(onSubscribe, observers);
     }
 
     private final ConcurrentHashMap> observers;
-    private final AtomicReference> terminalState;
 
-    protected PublishSubject(OnSubscribeFunc onSubscribe, ConcurrentHashMap> observers, AtomicReference> terminalState) {
+    protected PublishSubject(OnSubscribeFunc onSubscribe, ConcurrentHashMap> observers) {
         super(onSubscribe);
         this.observers = observers;
-        this.terminalState = terminalState;
     }
 
     @Override
     public void onCompleted() {
-        /**
-         * Synchronizing despite terminalState being an AtomicReference because of multi-step logic in subscription.
-         * Why use AtomicReference then? Convenient for passing around a mutable reference holder between the
-         * onSubscribe function and PublishSubject instance... and it's a "better volatile" for the shortcut codepath.
-         */
-        synchronized (terminalState) {
-            terminalState.set(new Notification());
-        }
         for (Observer observer : snapshotOfValues()) {
             observer.onCompleted();
         }
@@ -156,14 +110,6 @@ public void onCompleted() {
 
     @Override
     public void onError(Throwable e) {
-        /**
-         * Synchronizing despite terminalState being an AtomicReference because of multi-step logic in subscription.
-         * Why use AtomicReference then? Convenient for passing around a mutable reference holder between the
-         * onSubscribe function and PublishSubject instance... and it's a "better volatile" for the shortcut codepath.
-         */
-        synchronized (terminalState) {
-            terminalState.set(new Notification(e));
-        }
         for (Observer observer : snapshotOfValues()) {
             observer.onError(e);
         }
@@ -179,12 +125,12 @@ public void onNext(T args) {
 
     /**
      * Current snapshot of 'values()' so that concurrent modifications aren't included.
-     *
+     * 
      * This makes it behave deterministically in a single-threaded execution when nesting subscribes.
-     *
+     * 
      * In multi-threaded execution it will cause new subscriptions to wait until the following onNext instead
      * of possibly being included in the current onNext iteration.
-     *
+     * 
      * @return List>
      */
     private Collection> snapshotOfValues() {
@@ -378,75 +324,6 @@ private void assertObservedUntilTwo(Observer aObserver)
             verify(aObserver, Mockito.never()).onCompleted();
         }
 
-        /**
-         * Test that subscribing after onError/onCompleted immediately terminates instead of causing it to hang.
-         *
-         * Nothing is mentioned in Rx Guidelines for what to do in this case so I'm doing what seems to make sense
-         * which is:
-         *
-         * - cache terminal state (onError/onCompleted)
-         * - any subsequent subscriptions will immediately receive the terminal state rather than start a new subscription
-         *
-         */
-        @Test
-        public void testUnsubscribeAfterOnCompleted() {
-            PublishSubject subject = PublishSubject.create();
-
-            @SuppressWarnings("unchecked")
-            Observer anObserver = mock(Observer.class);
-            subject.subscribe(anObserver);
-
-            subject.onNext("one");
-            subject.onNext("two");
-            subject.onCompleted();
-
-            InOrder inOrder = inOrder(anObserver);
-            inOrder.verify(anObserver, times(1)).onNext("one");
-            inOrder.verify(anObserver, times(1)).onNext("two");
-            inOrder.verify(anObserver, times(1)).onCompleted();
-            inOrder.verify(anObserver, Mockito.never()).onError(any(Throwable.class));
-
-            @SuppressWarnings("unchecked")
-            Observer anotherObserver = mock(Observer.class);
-            subject.subscribe(anotherObserver);
-
-            inOrder = inOrder(anotherObserver);
-            inOrder.verify(anotherObserver, Mockito.never()).onNext("one");
-            inOrder.verify(anotherObserver, Mockito.never()).onNext("two");
-            inOrder.verify(anotherObserver, times(1)).onCompleted();
-            inOrder.verify(anotherObserver, Mockito.never()).onError(any(Throwable.class));
-        }
-
-        @Test
-        public void testUnsubscribeAfterOnError() {
-            PublishSubject subject = PublishSubject.create();
-            RuntimeException exception = new RuntimeException("failure");
-
-            @SuppressWarnings("unchecked")
-            Observer anObserver = mock(Observer.class);
-            subject.subscribe(anObserver);
-
-            subject.onNext("one");
-            subject.onNext("two");
-            subject.onError(exception);
-
-            InOrder inOrder = inOrder(anObserver);
-            inOrder.verify(anObserver, times(1)).onNext("one");
-            inOrder.verify(anObserver, times(1)).onNext("two");
-            inOrder.verify(anObserver, times(1)).onError(exception);
-            inOrder.verify(anObserver, Mockito.never()).onCompleted();
-
-            @SuppressWarnings("unchecked")
-            Observer anotherObserver = mock(Observer.class);
-            subject.subscribe(anotherObserver);
-
-            inOrder = inOrder(anotherObserver);
-            inOrder.verify(anotherObserver, Mockito.never()).onNext("one");
-            inOrder.verify(anotherObserver, Mockito.never()).onNext("two");
-            inOrder.verify(anotherObserver, times(1)).onError(exception);
-            inOrder.verify(anotherObserver, Mockito.never()).onCompleted();
-        }
-
         @Test
         public void testUnsubscribe()
         {
@@ -519,8 +396,7 @@ public void call(String v) {
 
             });
 
-
-            for(int i=0; i<10; i++) {
+            for (int i = 0; i < 10; i++) {
                 s.onNext(i);
             }
             s.onCompleted();
@@ -533,5 +409,83 @@ public void call(String v) {
             assertEquals(45, list.size());
         }
 
+        /**
+         * Should be able to unsubscribe all Observers, have it stop emitting, then subscribe new ones and it start emitting again.
+         */
+        @Test
+        public void testReSubscribe() {
+            final PublishSubject ps = PublishSubject.create();
+
+            Observer o1 = mock(Observer.class);
+            Subscription s1 = ps.subscribe(o1);
+
+            // emit
+            ps.onNext(1);
+
+            // validate we got it
+            InOrder inOrder1 = inOrder(o1);
+            inOrder1.verify(o1, times(1)).onNext(1);
+            inOrder1.verifyNoMoreInteractions();
+
+            // unsubscribe
+            s1.unsubscribe();
+
+            // emit again but nothing will be there to receive it
+            ps.onNext(2);
+
+            Observer o2 = mock(Observer.class);
+            Subscription s2 = ps.subscribe(o2);
+
+            // emit
+            ps.onNext(3);
+
+            // validate we got it
+            InOrder inOrder2 = inOrder(o2);
+            inOrder2.verify(o2, times(1)).onNext(3);
+            inOrder2.verifyNoMoreInteractions();
+
+            s2.unsubscribe();
+        }
+
+        /**
+         * Even if subject received an onError/onCompleted, new subscriptions should be able to restart it.
+         */
+        @Test
+        public void testReSubscribeAfterTerminalState() {
+            final PublishSubject ps = PublishSubject.create();
+
+            Observer o1 = mock(Observer.class);
+            Subscription s1 = ps.subscribe(o1);
+
+            // emit
+            ps.onNext(1);
+
+            // validate we got it
+            InOrder inOrder1 = inOrder(o1);
+            inOrder1.verify(o1, times(1)).onNext(1);
+            inOrder1.verifyNoMoreInteractions();
+
+            // unsubscribe
+            s1.unsubscribe();
+
+            ps.onCompleted();
+
+            // emit again but nothing will be there to receive it
+            ps.onNext(2);
+
+            Observer o2 = mock(Observer.class);
+            Subscription s2 = ps.subscribe(o2);
+
+            // emit
+            ps.onNext(3);
+
+            // validate we got it
+            InOrder inOrder2 = inOrder(o2);
+            inOrder2.verify(o2, times(1)).onNext(3);
+            inOrder2.verifyNoMoreInteractions();
+
+            s2.unsubscribe();
+        }
+
     }
 }
diff --git a/rxjava-core/src/test/java/rx/RefCountTests.java b/rxjava-core/src/test/java/rx/RefCountTests.java
index bf529b4076d..ff99e60e1ce 100644
--- a/rxjava-core/src/test/java/rx/RefCountTests.java
+++ b/rxjava-core/src/test/java/rx/RefCountTests.java
@@ -118,25 +118,22 @@ public void call(Long t1) {
         // advance further
         s.advanceTimeBy(1000, TimeUnit.MILLISECONDS);
 
-        // the following is not working as it seems the PublishSubject does not allow re-subscribing. TODO fix that in subsequent pull request
-        
-        
-//        // subscribing a new one should start over because the source should have been unsubscribed
-//        // subscribe list1
-//        final List list3 = new ArrayList();
-//        Subscription s3 = interval.subscribe(new Action1() {
-//
-//            @Override
-//            public void call(Long t1) {
-//                list3.add(t1);
-//            }
-//
-//        });
-//        s.advanceTimeBy(200, TimeUnit.MILLISECONDS);
-//
-//        assertEquals(2, list3.size());
-//        assertEquals(0L, list3.get(0).longValue());
-//        assertEquals(1L, list3.get(1).longValue());
+        // subscribing a new one should start over because the source should have been unsubscribed
+        // subscribe list3
+        final List list3 = new ArrayList();
+        Subscription s3 = interval.subscribe(new Action1() {
+
+            @Override
+            public void call(Long t1) {
+                list3.add(t1);
+            }
+
+        });
+        s.advanceTimeBy(200, TimeUnit.MILLISECONDS);
+
+        assertEquals(2, list3.size());
+        assertEquals(0L, list3.get(0).longValue());
+        assertEquals(1L, list3.get(1).longValue());
 
     }
 }