diff --git a/rxjava-core/src/main/java/rx/Observable.java b/rxjava-core/src/main/java/rx/Observable.java index 3dd29044e1..48a00638a5 100644 --- a/rxjava-core/src/main/java/rx/Observable.java +++ b/rxjava-core/src/main/java/rx/Observable.java @@ -50,7 +50,7 @@ import rx.operators.OperationMerge; import rx.operators.OperationMergeDelayError; import rx.operators.OperationMostRecent; -import rx.operators.OperatorMulticast; +import rx.operators.OperationMulticast; import rx.operators.OperationNext; import rx.operators.OperationObserveOn; import rx.operators.OperationOnErrorResumeNextViaFunction; @@ -2096,7 +2096,7 @@ public static Iterable mostRecent(Observable source, T initialValue) { * @return a connectable observable sequence that upon connection causes the source sequence to push results into the specified subject. */ public static ConnectableObservable multicast(Observable source, final Subject subject) { - return OperatorMulticast.multicast(source, subject); + return OperationMulticast.multicast(source, subject); } /** diff --git a/rxjava-core/src/main/java/rx/operators/OperatorMulticast.java b/rxjava-core/src/main/java/rx/operators/OperationMulticast.java similarity index 93% rename from rxjava-core/src/main/java/rx/operators/OperatorMulticast.java rename to rxjava-core/src/main/java/rx/operators/OperationMulticast.java index 12fd2e2e64..a4b780ece3 100644 --- a/rxjava-core/src/main/java/rx/operators/OperatorMulticast.java +++ b/rxjava-core/src/main/java/rx/operators/OperationMulticast.java @@ -20,13 +20,13 @@ import rx.Observer; import rx.Subscription; import rx.observables.ConnectableObservable; -import rx.subjects.DefaultSubject; +import rx.subjects.PublishSubject; import rx.subjects.Subject; import rx.util.functions.Func1; import static org.mockito.Mockito.*; -public class OperatorMulticast { +public class OperationMulticast { public static ConnectableObservable multicast(Observable source, final Subject subject) { return new MulticastConnectableObservable(source, subject); } @@ -95,8 +95,8 @@ public static class UnitTest { public void testMulticast() { TestObservable source = new TestObservable(); - ConnectableObservable multicasted = OperatorMulticast.multicast(source, - DefaultSubject.create()); + ConnectableObservable multicasted = OperationMulticast.multicast(source, + PublishSubject.create()); Observer observer = mock(Observer.class); multicasted.subscribe(observer); @@ -122,8 +122,8 @@ public void testMulticast() { public void testMulticastConnectTwice() { TestObservable source = new TestObservable(); - ConnectableObservable multicasted = OperatorMulticast.multicast(source, - DefaultSubject.create()); + ConnectableObservable multicasted = OperationMulticast.multicast(source, + PublishSubject.create()); Observer observer = mock(Observer.class); multicasted.subscribe(observer); @@ -146,8 +146,8 @@ public void testMulticastConnectTwice() { public void testMulticastDisconnect() { TestObservable source = new TestObservable(); - ConnectableObservable multicasted = OperatorMulticast.multicast(source, - DefaultSubject.create()); + ConnectableObservable multicasted = OperationMulticast.multicast(source, + PublishSubject.create()); Observer observer = mock(Observer.class); multicasted.subscribe(observer); diff --git a/rxjava-core/src/main/java/rx/operators/OperationTakeWhile.java b/rxjava-core/src/main/java/rx/operators/OperationTakeWhile.java index 156ed7cc9c..a6b1cdede2 100644 --- a/rxjava-core/src/main/java/rx/operators/OperationTakeWhile.java +++ b/rxjava-core/src/main/java/rx/operators/OperationTakeWhile.java @@ -26,7 +26,7 @@ import rx.Observable; import rx.Observer; import rx.Subscription; -import rx.subjects.DefaultSubject; +import rx.subjects.PublishSubject; import rx.subscriptions.Subscriptions; import rx.util.AtomicObservableSubscription; import rx.util.AtomicObserver; @@ -174,7 +174,7 @@ public Boolean call(Integer input) @Test public void testTakeWhileOnSubject1() { - DefaultSubject s = DefaultSubject.create(); + PublishSubject s = PublishSubject.create(); Observable w = (Observable) s; Observable take = Observable.create(takeWhile(w, new Func1() { diff --git a/rxjava-core/src/main/java/rx/subjects/DefaultSubject.java b/rxjava-core/src/main/java/rx/subjects/PublishSubject.java similarity index 87% rename from rxjava-core/src/main/java/rx/subjects/DefaultSubject.java rename to rxjava-core/src/main/java/rx/subjects/PublishSubject.java index 31ae205914..092af4f607 100644 --- a/rxjava-core/src/main/java/rx/subjects/DefaultSubject.java +++ b/rxjava-core/src/main/java/rx/subjects/PublishSubject.java @@ -39,8 +39,29 @@ import rx.util.functions.Func0; import rx.util.functions.Func1; -public class DefaultSubject extends Subject { - public static DefaultSubject create() { +/** + * Subject that publishes a single event to each {@link Observer} that has subscribed. + *

+ * Example usage: + *

+ *

 {@code
+ 
+  PublishSubject subject = PublishSubject.create();
+  // observer1 will receive all onNext events
+  subject.subscribe(observer1);
+  subject.onNext("one");
+  subject.onNext("two");
+  // observer2 will only receive "three" and onCompleted
+  subject.subscribe(observer2);
+  subject.onNext("three");
+  subject.onCompleted();
+ 
+  } 
+ * 
+ * @param 
+ */
+public class PublishSubject extends Subject {
+    public static  PublishSubject create() {
         final ConcurrentHashMap> observers = new ConcurrentHashMap>();
 
         Func1, Subscription> onSubscribe = new Func1, Subscription>() {
@@ -62,12 +83,12 @@ public void unsubscribe() {
             }
         };
 
-        return new DefaultSubject(onSubscribe, observers);
+        return new PublishSubject(onSubscribe, observers);
     }
 
     private final ConcurrentHashMap> observers;
 
-    protected DefaultSubject(Func1, Subscription> onSubscribe, ConcurrentHashMap> observers) {
+    protected PublishSubject(Func1, Subscription> onSubscribe, ConcurrentHashMap> observers) {
         super(onSubscribe);
         this.observers = observers;
     }
@@ -96,7 +117,7 @@ public void onNext(T args) {
     public static class UnitTest {
         @Test
         public void test() {
-            DefaultSubject subject = DefaultSubject.create();
+            PublishSubject subject = PublishSubject.create();
             final AtomicReference>> actualRef = new AtomicReference>>();
 
             Observable>> wNotificationsList = subject.materialize().toList();
@@ -147,7 +168,7 @@ public void unsubscribe() {
 
         @Test
         public void testCompleted() {
-            DefaultSubject subject = DefaultSubject.create();
+            PublishSubject subject = PublishSubject.create();
 
             @SuppressWarnings("unchecked")
             Observer aObserver = mock(Observer.class);
@@ -188,7 +209,7 @@ private void assertNeverObserver(Observer aObserver)
 
         @Test
         public void testError() {
-            DefaultSubject subject = DefaultSubject.create();
+            PublishSubject subject = PublishSubject.create();
 
             @SuppressWarnings("unchecked")
             Observer aObserver = mock(Observer.class);
@@ -222,7 +243,7 @@ private void assertErrorObserver(Observer aObserver)
 
         @Test
         public void testSubscribeMidSequence() {
-            DefaultSubject subject = DefaultSubject.create();
+            PublishSubject subject = PublishSubject.create();
 
             @SuppressWarnings("unchecked")
             Observer aObserver = mock(Observer.class);
@@ -255,7 +276,7 @@ private void assertCompletedStartingWithThreeObserver(Observer aObserver
 
         @Test
         public void testUnsubscribeFirstObserver() {
-            DefaultSubject subject = DefaultSubject.create();
+            PublishSubject subject = PublishSubject.create();
 
             @SuppressWarnings("unchecked")
             Observer aObserver = mock(Observer.class);
@@ -290,31 +311,31 @@ private void assertObservedUntilTwo(Observer aObserver)
         @Test
         public void testUnsubscribe()
         {
-            UnsubscribeTester.test(new Func0>()
+            UnsubscribeTester.test(new Func0>()
             {
                 @Override
-                public DefaultSubject call()
+                public PublishSubject call()
                 {
-                    return DefaultSubject.create();
+                    return PublishSubject.create();
                 }
-            }, new Action1>()
+            }, new Action1>()
             {
                 @Override
-                public void call(DefaultSubject DefaultSubject)
+                public void call(PublishSubject DefaultSubject)
                 {
                     DefaultSubject.onCompleted();
                 }
-            }, new Action1>()
+            }, new Action1>()
             {
                 @Override
-                public void call(DefaultSubject DefaultSubject)
+                public void call(PublishSubject DefaultSubject)
                 {
                     DefaultSubject.onError(new Exception());
                 }
-            }, new Action1>()
+            }, new Action1>()
             {
                 @Override
-                public void call(DefaultSubject DefaultSubject)
+                public void call(PublishSubject DefaultSubject)
                 {
                     DefaultSubject.onNext("one");
                 }