Skip to content

Commit

Permalink
Merge pull request #244 from benjchristensen/pre-release-changes
Browse files Browse the repository at this point in the history
Pre 0.8.0 Release Changes
  • Loading branch information
benjchristensen committed Apr 19, 2013
2 parents 6bff9aa + 3e5bf0e commit 38cab99
Show file tree
Hide file tree
Showing 4 changed files with 51 additions and 30 deletions.
4 changes: 2 additions & 2 deletions rxjava-core/src/main/java/rx/Observable.java
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@
import rx.operators.OperationMerge;
import rx.operators.OperationMergeDelayError;
import rx.operators.OperationMostRecent;
import rx.operators.OperatorMulticast;
import rx.operators.OperationMulticast;
import rx.operators.OperationNext;
import rx.operators.OperationObserveOn;
import rx.operators.OperationOnErrorResumeNextViaFunction;
Expand Down Expand Up @@ -2096,7 +2096,7 @@ public static <T> Iterable<T> mostRecent(Observable<T> source, T initialValue) {
* @return a connectable observable sequence that upon connection causes the source sequence to push results into the specified subject.
*/
public static <T, R> ConnectableObservable<R> multicast(Observable<T> source, final Subject<T, R> subject) {
return OperatorMulticast.multicast(source, subject);
return OperationMulticast.multicast(source, subject);
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,13 +20,13 @@
import rx.Observer;
import rx.Subscription;
import rx.observables.ConnectableObservable;
import rx.subjects.DefaultSubject;
import rx.subjects.PublishSubject;
import rx.subjects.Subject;
import rx.util.functions.Func1;

import static org.mockito.Mockito.*;

public class OperatorMulticast {
public class OperationMulticast {
public static <T, R> ConnectableObservable<R> multicast(Observable<T> source, final Subject<T, R> subject) {
return new MulticastConnectableObservable<T, R>(source, subject);
}
Expand Down Expand Up @@ -95,8 +95,8 @@ public static class UnitTest {
public void testMulticast() {
TestObservable source = new TestObservable();

ConnectableObservable<String> multicasted = OperatorMulticast.multicast(source,
DefaultSubject.<String>create());
ConnectableObservable<String> multicasted = OperationMulticast.multicast(source,
PublishSubject.<String>create());

Observer<String> observer = mock(Observer.class);
multicasted.subscribe(observer);
Expand All @@ -122,8 +122,8 @@ public void testMulticast() {
public void testMulticastConnectTwice() {
TestObservable source = new TestObservable();

ConnectableObservable<String> multicasted = OperatorMulticast.multicast(source,
DefaultSubject.<String>create());
ConnectableObservable<String> multicasted = OperationMulticast.multicast(source,
PublishSubject.<String>create());

Observer<String> observer = mock(Observer.class);
multicasted.subscribe(observer);
Expand All @@ -146,8 +146,8 @@ public void testMulticastConnectTwice() {
public void testMulticastDisconnect() {
TestObservable source = new TestObservable();

ConnectableObservable<String> multicasted = OperatorMulticast.multicast(source,
DefaultSubject.<String>create());
ConnectableObservable<String> multicasted = OperationMulticast.multicast(source,
PublishSubject.<String>create());

Observer<String> observer = mock(Observer.class);
multicasted.subscribe(observer);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@
import rx.Observable;
import rx.Observer;
import rx.Subscription;
import rx.subjects.DefaultSubject;
import rx.subjects.PublishSubject;
import rx.subscriptions.Subscriptions;
import rx.util.AtomicObservableSubscription;
import rx.util.AtomicObserver;
Expand Down Expand Up @@ -174,7 +174,7 @@ public Boolean call(Integer input)

@Test
public void testTakeWhileOnSubject1() {
DefaultSubject<Integer> s = DefaultSubject.create();
PublishSubject<Integer> s = PublishSubject.create();
Observable<Integer> w = (Observable<Integer>) s;
Observable<Integer> take = Observable.create(takeWhile(w, new Func1<Integer, Boolean>()
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,8 +39,29 @@
import rx.util.functions.Func0;
import rx.util.functions.Func1;

public class DefaultSubject<T> extends Subject<T, T> {
public static <T> DefaultSubject<T> create() {
/**
* Subject that publishes a single event to each {@link Observer} that has subscribed.
* <p>
* Example usage:
* <p>
* <pre> {@code
PublishSubject<Object> subject = PublishSubject.create();
// observer1 will receive all onNext events
subject.subscribe(observer1);
subject.onNext("one");
subject.onNext("two");
// observer2 will only receive "three" and onCompleted
subject.subscribe(observer2);
subject.onNext("three");
subject.onCompleted();
} </pre>
*
* @param <T>
*/
public class PublishSubject<T> extends Subject<T, T> {
public static <T> PublishSubject<T> create() {
final ConcurrentHashMap<Subscription, Observer<T>> observers = new ConcurrentHashMap<Subscription, Observer<T>>();

Func1<Observer<T>, Subscription> onSubscribe = new Func1<Observer<T>, Subscription>() {
Expand All @@ -62,12 +83,12 @@ public void unsubscribe() {
}
};

return new DefaultSubject<T>(onSubscribe, observers);
return new PublishSubject<T>(onSubscribe, observers);
}

private final ConcurrentHashMap<Subscription, Observer<T>> observers;

protected DefaultSubject(Func1<Observer<T>, Subscription> onSubscribe, ConcurrentHashMap<Subscription, Observer<T>> observers) {
protected PublishSubject(Func1<Observer<T>, Subscription> onSubscribe, ConcurrentHashMap<Subscription, Observer<T>> observers) {
super(onSubscribe);
this.observers = observers;
}
Expand Down Expand Up @@ -96,7 +117,7 @@ public void onNext(T args) {
public static class UnitTest {
@Test
public void test() {
DefaultSubject<Integer> subject = DefaultSubject.create();
PublishSubject<Integer> subject = PublishSubject.create();
final AtomicReference<List<Notification<String>>> actualRef = new AtomicReference<List<Notification<String>>>();

Observable<List<Notification<Integer>>> wNotificationsList = subject.materialize().toList();
Expand Down Expand Up @@ -147,7 +168,7 @@ public void unsubscribe() {

@Test
public void testCompleted() {
DefaultSubject<Object> subject = DefaultSubject.create();
PublishSubject<Object> subject = PublishSubject.create();

@SuppressWarnings("unchecked")
Observer<String> aObserver = mock(Observer.class);
Expand Down Expand Up @@ -188,7 +209,7 @@ private void assertNeverObserver(Observer<String> aObserver)

@Test
public void testError() {
DefaultSubject<Object> subject = DefaultSubject.create();
PublishSubject<Object> subject = PublishSubject.create();

@SuppressWarnings("unchecked")
Observer<String> aObserver = mock(Observer.class);
Expand Down Expand Up @@ -222,7 +243,7 @@ private void assertErrorObserver(Observer<String> aObserver)

@Test
public void testSubscribeMidSequence() {
DefaultSubject<Object> subject = DefaultSubject.create();
PublishSubject<Object> subject = PublishSubject.create();

@SuppressWarnings("unchecked")
Observer<String> aObserver = mock(Observer.class);
Expand Down Expand Up @@ -255,7 +276,7 @@ private void assertCompletedStartingWithThreeObserver(Observer<String> aObserver

@Test
public void testUnsubscribeFirstObserver() {
DefaultSubject<Object> subject = DefaultSubject.create();
PublishSubject<Object> subject = PublishSubject.create();

@SuppressWarnings("unchecked")
Observer<String> aObserver = mock(Observer.class);
Expand Down Expand Up @@ -290,31 +311,31 @@ private void assertObservedUntilTwo(Observer<String> aObserver)
@Test
public void testUnsubscribe()
{
UnsubscribeTester.test(new Func0<DefaultSubject<Object>>()
UnsubscribeTester.test(new Func0<PublishSubject<Object>>()
{
@Override
public DefaultSubject<Object> call()
public PublishSubject<Object> call()
{
return DefaultSubject.create();
return PublishSubject.create();
}
}, new Action1<DefaultSubject<Object>>()
}, new Action1<PublishSubject<Object>>()
{
@Override
public void call(DefaultSubject<Object> DefaultSubject)
public void call(PublishSubject<Object> DefaultSubject)
{
DefaultSubject.onCompleted();
}
}, new Action1<DefaultSubject<Object>>()
}, new Action1<PublishSubject<Object>>()
{
@Override
public void call(DefaultSubject<Object> DefaultSubject)
public void call(PublishSubject<Object> DefaultSubject)
{
DefaultSubject.onError(new Exception());
}
}, new Action1<DefaultSubject<Object>>()
}, new Action1<PublishSubject<Object>>()
{
@Override
public void call(DefaultSubject<Object> DefaultSubject)
public void call(PublishSubject<Object> DefaultSubject)
{
DefaultSubject.onNext("one");
}
Expand Down

0 comments on commit 38cab99

Please sign in to comment.