Skip to content

Commit

Permalink
Rename DefaultSubject to PublishSubject
Browse files Browse the repository at this point in the history
  • Loading branch information
benjchristensen committed Apr 19, 2013
1 parent 6bff9aa commit fbb27ac
Show file tree
Hide file tree
Showing 3 changed files with 45 additions and 24 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@
import rx.Observable;
import rx.Observer;
import rx.Subscription;
import rx.subjects.DefaultSubject;
import rx.subjects.PublishSubject;
import rx.subscriptions.Subscriptions;
import rx.util.AtomicObservableSubscription;
import rx.util.AtomicObserver;
Expand Down Expand Up @@ -174,7 +174,7 @@ public Boolean call(Integer input)

@Test
public void testTakeWhileOnSubject1() {
DefaultSubject<Integer> s = DefaultSubject.create();
PublishSubject<Integer> s = PublishSubject.create();
Observable<Integer> w = (Observable<Integer>) s;
Observable<Integer> take = Observable.create(takeWhile(w, new Func1<Integer, Boolean>()
{
Expand Down
8 changes: 4 additions & 4 deletions rxjava-core/src/main/java/rx/operators/OperatorMulticast.java
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@
import rx.Observer;
import rx.Subscription;
import rx.observables.ConnectableObservable;
import rx.subjects.DefaultSubject;
import rx.subjects.PublishSubject;
import rx.subjects.Subject;
import rx.util.functions.Func1;

Expand Down Expand Up @@ -96,7 +96,7 @@ public void testMulticast() {
TestObservable source = new TestObservable();

ConnectableObservable<String> multicasted = OperatorMulticast.multicast(source,
DefaultSubject.<String>create());
PublishSubject.<String>create());

Observer<String> observer = mock(Observer.class);
multicasted.subscribe(observer);
Expand All @@ -123,7 +123,7 @@ public void testMulticastConnectTwice() {
TestObservable source = new TestObservable();

ConnectableObservable<String> multicasted = OperatorMulticast.multicast(source,
DefaultSubject.<String>create());
PublishSubject.<String>create());

Observer<String> observer = mock(Observer.class);
multicasted.subscribe(observer);
Expand All @@ -147,7 +147,7 @@ public void testMulticastDisconnect() {
TestObservable source = new TestObservable();

ConnectableObservable<String> multicasted = OperatorMulticast.multicast(source,
DefaultSubject.<String>create());
PublishSubject.<String>create());

Observer<String> observer = mock(Observer.class);
multicasted.subscribe(observer);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,8 +39,29 @@
import rx.util.functions.Func0;
import rx.util.functions.Func1;

public class DefaultSubject<T> extends Subject<T, T> {
public static <T> DefaultSubject<T> create() {
/**
* Subject that publishes a single event to each {@link Observer} that has subscribed.
* <p>
* Example usage:
* <p>
* <pre> {@code
PublishSubject<Object> subject = PublishSubject.create();
// observer1 will receive all onNext events
subject.subscribe(observer1);
subject.onNext("one");
subject.onNext("two");
// observer2 will only receive "three" and onCompleted
subject.subscribe(observer2);
subject.onNext("three");
subject.onCompleted();
} </pre>
*
* @param <T>
*/
public class PublishSubject<T> extends Subject<T, T> {
public static <T> PublishSubject<T> create() {
final ConcurrentHashMap<Subscription, Observer<T>> observers = new ConcurrentHashMap<Subscription, Observer<T>>();

Func1<Observer<T>, Subscription> onSubscribe = new Func1<Observer<T>, Subscription>() {
Expand All @@ -62,12 +83,12 @@ public void unsubscribe() {
}
};

return new DefaultSubject<T>(onSubscribe, observers);
return new PublishSubject<T>(onSubscribe, observers);
}

private final ConcurrentHashMap<Subscription, Observer<T>> observers;

protected DefaultSubject(Func1<Observer<T>, Subscription> onSubscribe, ConcurrentHashMap<Subscription, Observer<T>> observers) {
protected PublishSubject(Func1<Observer<T>, Subscription> onSubscribe, ConcurrentHashMap<Subscription, Observer<T>> observers) {
super(onSubscribe);
this.observers = observers;
}
Expand Down Expand Up @@ -96,7 +117,7 @@ public void onNext(T args) {
public static class UnitTest {
@Test
public void test() {
DefaultSubject<Integer> subject = DefaultSubject.create();
PublishSubject<Integer> subject = PublishSubject.create();
final AtomicReference<List<Notification<String>>> actualRef = new AtomicReference<List<Notification<String>>>();

Observable<List<Notification<Integer>>> wNotificationsList = subject.materialize().toList();
Expand Down Expand Up @@ -147,7 +168,7 @@ public void unsubscribe() {

@Test
public void testCompleted() {
DefaultSubject<Object> subject = DefaultSubject.create();
PublishSubject<Object> subject = PublishSubject.create();

@SuppressWarnings("unchecked")
Observer<String> aObserver = mock(Observer.class);
Expand Down Expand Up @@ -188,7 +209,7 @@ private void assertNeverObserver(Observer<String> aObserver)

@Test
public void testError() {
DefaultSubject<Object> subject = DefaultSubject.create();
PublishSubject<Object> subject = PublishSubject.create();

@SuppressWarnings("unchecked")
Observer<String> aObserver = mock(Observer.class);
Expand Down Expand Up @@ -222,7 +243,7 @@ private void assertErrorObserver(Observer<String> aObserver)

@Test
public void testSubscribeMidSequence() {
DefaultSubject<Object> subject = DefaultSubject.create();
PublishSubject<Object> subject = PublishSubject.create();

@SuppressWarnings("unchecked")
Observer<String> aObserver = mock(Observer.class);
Expand Down Expand Up @@ -255,7 +276,7 @@ private void assertCompletedStartingWithThreeObserver(Observer<String> aObserver

@Test
public void testUnsubscribeFirstObserver() {
DefaultSubject<Object> subject = DefaultSubject.create();
PublishSubject<Object> subject = PublishSubject.create();

@SuppressWarnings("unchecked")
Observer<String> aObserver = mock(Observer.class);
Expand Down Expand Up @@ -290,31 +311,31 @@ private void assertObservedUntilTwo(Observer<String> aObserver)
@Test
public void testUnsubscribe()
{
UnsubscribeTester.test(new Func0<DefaultSubject<Object>>()
UnsubscribeTester.test(new Func0<PublishSubject<Object>>()
{
@Override
public DefaultSubject<Object> call()
public PublishSubject<Object> call()
{
return DefaultSubject.create();
return PublishSubject.create();
}
}, new Action1<DefaultSubject<Object>>()
}, new Action1<PublishSubject<Object>>()
{
@Override
public void call(DefaultSubject<Object> DefaultSubject)
public void call(PublishSubject<Object> DefaultSubject)
{
DefaultSubject.onCompleted();
}
}, new Action1<DefaultSubject<Object>>()
}, new Action1<PublishSubject<Object>>()
{
@Override
public void call(DefaultSubject<Object> DefaultSubject)
public void call(PublishSubject<Object> DefaultSubject)
{
DefaultSubject.onError(new Exception());
}
}, new Action1<DefaultSubject<Object>>()
}, new Action1<PublishSubject<Object>>()
{
@Override
public void call(DefaultSubject<Object> DefaultSubject)
public void call(PublishSubject<Object> DefaultSubject)
{
DefaultSubject.onNext("one");
}
Expand Down

0 comments on commit fbb27ac

Please sign in to comment.