Skip to content

Commit

Permalink
Adding a draft of Subject class
Browse files Browse the repository at this point in the history
  • Loading branch information
abersnaze committed Jan 23, 2013
1 parent fa1af64 commit 967337e
Show file tree
Hide file tree
Showing 3 changed files with 171 additions and 5 deletions.
39 changes: 39 additions & 0 deletions rxjava-core/src/main/java/rx/observables/Notification.java
Original file line number Diff line number Diff line change
Expand Up @@ -119,4 +119,43 @@ public boolean isOnNext() {
public static enum Kind {
OnNext, OnError, OnCompleted
}

@Override
public String toString() {
StringBuilder str = new StringBuilder("[").append(super.toString()).append(" ").append(getKind());
if (hasValue())
str.append(" ").append(getValue());
if (hasException())
str.append(" ").append(getException().getMessage());
str.append("]");
return str.toString();
}

@Override
public int hashCode() {
int hash = getKind().hashCode();
if (hasValue())
hash = hash * 31 + getValue().hashCode();
if (hasException())
hash = hash * 31 + getException().hashCode();
return hash;
}

@Override
public boolean equals(Object obj) {
if (obj == null)
return false;
if (this == obj)
return true;
if (obj.getClass() != getClass())
return false;
Notification notification = (Notification) obj;
if (notification.getKind() != getKind())
return false;
if (hasValue() && !getValue().equals(notification.getValue()))
return false;
if (hasException() && !getException().equals(notification.getException()))
return false;
return true;
}
}
127 changes: 127 additions & 0 deletions rxjava-core/src/main/java/rx/subjects/Subject.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,127 @@
package rx.subjects;

import groovy.lang.Reference;

import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicBoolean;

import junit.framework.Assert;

import org.junit.Test;

import rx.observables.Notification;
import rx.observables.Observable;
import rx.observables.Observer;
import rx.observables.Subscription;
import rx.util.AtomicObservableSubscription;
import rx.util.AtomicObserver;
import rx.util.functions.Action1;
import rx.util.functions.Func1;

public class Subject<T> extends Observable<T> implements Observer<T> {
public static <T> Subject<T> create() {
final ConcurrentHashMap<Subscription, Observer<T>> observers = new ConcurrentHashMap<Subscription, Observer<T>>();

Func1<Observer<T>, Subscription> onSubscribe = new Func1<Observer<T>, Subscription>() {
@Override
public Subscription call(Observer<T> observer) {
final AtomicObservableSubscription subscription = new AtomicObservableSubscription();

subscription.wrap(new Subscription() {
@Override
public void unsubscribe() {
// on unsubscribe remove it from the map of outbound observers to notify
observers.remove(subscription);
}
});

// on subscribe add it to the map of outbound observers to notify
observers.put(subscription, new AtomicObserver<T>(observer, subscription));
return subscription;
}
};

return new Subject<T>(onSubscribe, observers);
}

private final ConcurrentHashMap<Subscription, Observer<T>> observers;

protected Subject(Func1<Observer<T>, Subscription> onSubscribe, ConcurrentHashMap<Subscription, Observer<T>> observers) {
super(onSubscribe);
this.observers = observers;
}

@Override
public void onCompleted() {
for (Observer<T> observer : observers.values()) {
observer.onCompleted();
}
}

@Override
public void onError(Exception e) {
for (Observer<T> observer : observers.values()) {
observer.onError(e);
}
}

@Override
public void onNext(T args) {
for (Observer<T> observer : observers.values()) {
observer.onNext(args);
}
}

public static class UnitTest {
@Test
public void test() {
Subject<Integer> subject = Subject.<Integer> create();
final Reference<List<Notification<String>>> actualRef = new Reference<List<Notification<String>>>();

Observable<List<Notification<Integer>>> wNotificationsList = subject.materialize().toList();
wNotificationsList.subscribe(new Action1<List<Notification<String>>>() {
@Override
public void call(List<Notification<String>> actual) {
actualRef.set(actual);
}
});

Subscription sub = Observable.create(new Func1<Observer<Integer>, Subscription>() {
@Override
public Subscription call(final Observer<Integer> observer) {
final AtomicBoolean stop = new AtomicBoolean(false);
new Thread() {
@Override
public void run() {
int i = 1;
while (!stop.get()) {
observer.onNext(i++);
}
observer.onCompleted();
}
}.start();
return new Subscription() {
@Override
public void unsubscribe() {
stop.set(true);
}
};
}
}).subscribe(subject);
// the subject has received an onComplete from the first subscribe because
// it is synchronous and the next subscribe won't do anything.
Observable.toObservable(-1, -2, -3).subscribe(subject);

List<Notification<Integer>> expected = new ArrayList<Notification<Integer>>();
expected.add(new Notification<Integer>(-1));
expected.add(new Notification<Integer>(-2));
expected.add(new Notification<Integer>(-3));
expected.add(new Notification<Integer>());
Assert.assertTrue(actualRef.get().containsAll(expected));

sub.unsubscribe();
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -65,13 +65,13 @@ public final class AtomicObserverSingleThreaded<T> implements Observer<T> {
* compositional by its very nature.
*/

private final Observer<T> Observer;
private final Observer<T> observer;
private final AtomicObservableSubscription subscription;
private volatile boolean finishRequested = false;
private volatile boolean finished = false;

public AtomicObserverSingleThreaded(Observer<T> Observer, AtomicObservableSubscription subscription) {
this.Observer = Observer;
this.observer = Observer;
this.subscription = subscription;
}

Expand All @@ -86,7 +86,7 @@ public void onNext(T arg) {
// if we're already stopped, or a finish request has been received, we won't allow further onNext requests
return;
}
Observer.onNext(arg);
observer.onNext(arg);
}
}

Expand All @@ -101,7 +101,7 @@ public void onError(Exception e) {
if (finished || subscription.isUnsubscribed()) {
return;
}
Observer.onError(e);
observer.onError(e);
finished = true;
}
}
Expand All @@ -117,7 +117,7 @@ public void onCompleted() {
if (finished || subscription.isUnsubscribed()) {
return;
}
Observer.onCompleted();
observer.onCompleted();
finished = true;
}
}
Expand Down

0 comments on commit 967337e

Please sign in to comment.