diff --git a/rxjava-core/src/main/java/rx/observables/Notification.java b/rxjava-core/src/main/java/rx/observables/Notification.java index e6db6eec6a..5180f883de 100644 --- a/rxjava-core/src/main/java/rx/observables/Notification.java +++ b/rxjava-core/src/main/java/rx/observables/Notification.java @@ -119,4 +119,43 @@ public boolean isOnNext() { public static enum Kind { OnNext, OnError, OnCompleted } + + @Override + public String toString() { + StringBuilder str = new StringBuilder("[").append(super.toString()).append(" ").append(getKind()); + if (hasValue()) + str.append(" ").append(getValue()); + if (hasException()) + str.append(" ").append(getException().getMessage()); + str.append("]"); + return str.toString(); + } + + @Override + public int hashCode() { + int hash = getKind().hashCode(); + if (hasValue()) + hash = hash * 31 + getValue().hashCode(); + if (hasException()) + hash = hash * 31 + getException().hashCode(); + return hash; + } + + @Override + public boolean equals(Object obj) { + if (obj == null) + return false; + if (this == obj) + return true; + if (obj.getClass() != getClass()) + return false; + Notification notification = (Notification) obj; + if (notification.getKind() != getKind()) + return false; + if (hasValue() && !getValue().equals(notification.getValue())) + return false; + if (hasException() && !getException().equals(notification.getException())) + return false; + return true; + } } diff --git a/rxjava-core/src/main/java/rx/subjects/Subject.java b/rxjava-core/src/main/java/rx/subjects/Subject.java new file mode 100644 index 0000000000..c23b412447 --- /dev/null +++ b/rxjava-core/src/main/java/rx/subjects/Subject.java @@ -0,0 +1,126 @@ +package rx.subjects; + +import java.util.ArrayList; +import java.util.List; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicReference; + +import junit.framework.Assert; + +import org.junit.Test; + +import rx.observables.Notification; +import rx.observables.Observable; +import rx.observables.Observer; +import rx.observables.Subscription; +import rx.util.AtomicObservableSubscription; +import rx.util.AtomicObserver; +import rx.util.functions.Action1; +import rx.util.functions.Func1; + +public class Subject extends Observable implements Observer { + public static Subject create() { + final ConcurrentHashMap> observers = new ConcurrentHashMap>(); + + Func1, Subscription> onSubscribe = new Func1, Subscription>() { + @Override + public Subscription call(Observer observer) { + final AtomicObservableSubscription subscription = new AtomicObservableSubscription(); + + subscription.wrap(new Subscription() { + @Override + public void unsubscribe() { + // on unsubscribe remove it from the map of outbound observers to notify + observers.remove(subscription); + } + }); + + // on subscribe add it to the map of outbound observers to notify + observers.put(subscription, new AtomicObserver(observer, subscription)); + return subscription; + } + }; + + return new Subject(onSubscribe, observers); + } + + private final ConcurrentHashMap> observers; + + protected Subject(Func1, Subscription> onSubscribe, ConcurrentHashMap> observers) { + super(onSubscribe); + this.observers = observers; + } + + @Override + public void onCompleted() { + for (Observer observer : observers.values()) { + observer.onCompleted(); + } + } + + @Override + public void onError(Exception e) { + for (Observer observer : observers.values()) { + observer.onError(e); + } + } + + @Override + public void onNext(T args) { + for (Observer observer : observers.values()) { + observer.onNext(args); + } + } + + public static class UnitTest { + @Test + public void test() { + Subject subject = Subject. create(); + final AtomicReference>> actualRef = new AtomicReference>>(); + + Observable>> wNotificationsList = subject.materialize().toList(); + wNotificationsList.subscribe(new Action1>>() { + @Override + public void call(List> actual) { + actualRef.set(actual); + } + }); + + Subscription sub = Observable.create(new Func1, Subscription>() { + @Override + public Subscription call(final Observer observer) { + final AtomicBoolean stop = new AtomicBoolean(false); + new Thread() { + @Override + public void run() { + int i = 1; + while (!stop.get()) { + observer.onNext(i++); + } + observer.onCompleted(); + } + }.start(); + return new Subscription() { + @Override + public void unsubscribe() { + stop.set(true); + } + }; + } + }).subscribe(subject); + // the subject has received an onComplete from the first subscribe because + // it is synchronous and the next subscribe won't do anything. + Observable.toObservable(-1, -2, -3).subscribe(subject); + + List> expected = new ArrayList>(); + expected.add(new Notification(-1)); + expected.add(new Notification(-2)); + expected.add(new Notification(-3)); + expected.add(new Notification()); + Assert.assertTrue(actualRef.get().containsAll(expected)); + + sub.unsubscribe(); + } + } +} diff --git a/rxjava-core/src/main/java/rx/util/AtomicObserverSingleThreaded.java b/rxjava-core/src/main/java/rx/util/AtomicObserverSingleThreaded.java index 49e60fef48..e3fa302c2f 100644 --- a/rxjava-core/src/main/java/rx/util/AtomicObserverSingleThreaded.java +++ b/rxjava-core/src/main/java/rx/util/AtomicObserverSingleThreaded.java @@ -65,13 +65,13 @@ public final class AtomicObserverSingleThreaded implements Observer { * compositional by its very nature. */ - private final Observer Observer; + private final Observer observer; private final AtomicObservableSubscription subscription; private volatile boolean finishRequested = false; private volatile boolean finished = false; public AtomicObserverSingleThreaded(Observer Observer, AtomicObservableSubscription subscription) { - this.Observer = Observer; + this.observer = Observer; this.subscription = subscription; } @@ -86,7 +86,7 @@ public void onNext(T arg) { // if we're already stopped, or a finish request has been received, we won't allow further onNext requests return; } - Observer.onNext(arg); + observer.onNext(arg); } } @@ -101,7 +101,7 @@ public void onError(Exception e) { if (finished || subscription.isUnsubscribed()) { return; } - Observer.onError(e); + observer.onError(e); finished = true; } } @@ -117,7 +117,7 @@ public void onCompleted() { if (finished || subscription.isUnsubscribed()) { return; } - Observer.onCompleted(); + observer.onCompleted(); finished = true; } }