From 967337e24e594c9a0fc19436b993ce549c93d288 Mon Sep 17 00:00:00 2001 From: George Campbell Date: Wed, 23 Jan 2013 12:26:03 -0800 Subject: [PATCH 1/2] Adding a draft of Subject class https://github.com/Netflix/RxJava/issues/19 --- .../java/rx/observables/Notification.java | 39 ++++++ .../src/main/java/rx/subjects/Subject.java | 127 ++++++++++++++++++ .../rx/util/AtomicObserverSingleThreaded.java | 10 +- 3 files changed, 171 insertions(+), 5 deletions(-) create mode 100644 rxjava-core/src/main/java/rx/subjects/Subject.java diff --git a/rxjava-core/src/main/java/rx/observables/Notification.java b/rxjava-core/src/main/java/rx/observables/Notification.java index e6db6eec6a..5180f883de 100644 --- a/rxjava-core/src/main/java/rx/observables/Notification.java +++ b/rxjava-core/src/main/java/rx/observables/Notification.java @@ -119,4 +119,43 @@ public boolean isOnNext() { public static enum Kind { OnNext, OnError, OnCompleted } + + @Override + public String toString() { + StringBuilder str = new StringBuilder("[").append(super.toString()).append(" ").append(getKind()); + if (hasValue()) + str.append(" ").append(getValue()); + if (hasException()) + str.append(" ").append(getException().getMessage()); + str.append("]"); + return str.toString(); + } + + @Override + public int hashCode() { + int hash = getKind().hashCode(); + if (hasValue()) + hash = hash * 31 + getValue().hashCode(); + if (hasException()) + hash = hash * 31 + getException().hashCode(); + return hash; + } + + @Override + public boolean equals(Object obj) { + if (obj == null) + return false; + if (this == obj) + return true; + if (obj.getClass() != getClass()) + return false; + Notification notification = (Notification) obj; + if (notification.getKind() != getKind()) + return false; + if (hasValue() && !getValue().equals(notification.getValue())) + return false; + if (hasException() && !getException().equals(notification.getException())) + return false; + return true; + } } diff --git a/rxjava-core/src/main/java/rx/subjects/Subject.java b/rxjava-core/src/main/java/rx/subjects/Subject.java new file mode 100644 index 0000000000..664513c34d --- /dev/null +++ b/rxjava-core/src/main/java/rx/subjects/Subject.java @@ -0,0 +1,127 @@ +package rx.subjects; + +import groovy.lang.Reference; + +import java.util.ArrayList; +import java.util.List; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.atomic.AtomicBoolean; + +import junit.framework.Assert; + +import org.junit.Test; + +import rx.observables.Notification; +import rx.observables.Observable; +import rx.observables.Observer; +import rx.observables.Subscription; +import rx.util.AtomicObservableSubscription; +import rx.util.AtomicObserver; +import rx.util.functions.Action1; +import rx.util.functions.Func1; + +public class Subject extends Observable implements Observer { + public static Subject create() { + final ConcurrentHashMap> observers = new ConcurrentHashMap>(); + + Func1, Subscription> onSubscribe = new Func1, Subscription>() { + @Override + public Subscription call(Observer observer) { + final AtomicObservableSubscription subscription = new AtomicObservableSubscription(); + + subscription.wrap(new Subscription() { + @Override + public void unsubscribe() { + // on unsubscribe remove it from the map of outbound observers to notify + observers.remove(subscription); + } + }); + + // on subscribe add it to the map of outbound observers to notify + observers.put(subscription, new AtomicObserver(observer, subscription)); + return subscription; + } + }; + + return new Subject(onSubscribe, observers); + } + + private final ConcurrentHashMap> observers; + + protected Subject(Func1, Subscription> onSubscribe, ConcurrentHashMap> observers) { + super(onSubscribe); + this.observers = observers; + } + + @Override + public void onCompleted() { + for (Observer observer : observers.values()) { + observer.onCompleted(); + } + } + + @Override + public void onError(Exception e) { + for (Observer observer : observers.values()) { + observer.onError(e); + } + } + + @Override + public void onNext(T args) { + for (Observer observer : observers.values()) { + observer.onNext(args); + } + } + + public static class UnitTest { + @Test + public void test() { + Subject subject = Subject. create(); + final Reference>> actualRef = new Reference>>(); + + Observable>> wNotificationsList = subject.materialize().toList(); + wNotificationsList.subscribe(new Action1>>() { + @Override + public void call(List> actual) { + actualRef.set(actual); + } + }); + + Subscription sub = Observable.create(new Func1, Subscription>() { + @Override + public Subscription call(final Observer observer) { + final AtomicBoolean stop = new AtomicBoolean(false); + new Thread() { + @Override + public void run() { + int i = 1; + while (!stop.get()) { + observer.onNext(i++); + } + observer.onCompleted(); + } + }.start(); + return new Subscription() { + @Override + public void unsubscribe() { + stop.set(true); + } + }; + } + }).subscribe(subject); + // the subject has received an onComplete from the first subscribe because + // it is synchronous and the next subscribe won't do anything. + Observable.toObservable(-1, -2, -3).subscribe(subject); + + List> expected = new ArrayList>(); + expected.add(new Notification(-1)); + expected.add(new Notification(-2)); + expected.add(new Notification(-3)); + expected.add(new Notification()); + Assert.assertTrue(actualRef.get().containsAll(expected)); + + sub.unsubscribe(); + } + } +} diff --git a/rxjava-core/src/main/java/rx/util/AtomicObserverSingleThreaded.java b/rxjava-core/src/main/java/rx/util/AtomicObserverSingleThreaded.java index 49e60fef48..e3fa302c2f 100644 --- a/rxjava-core/src/main/java/rx/util/AtomicObserverSingleThreaded.java +++ b/rxjava-core/src/main/java/rx/util/AtomicObserverSingleThreaded.java @@ -65,13 +65,13 @@ public final class AtomicObserverSingleThreaded implements Observer { * compositional by its very nature. */ - private final Observer Observer; + private final Observer observer; private final AtomicObservableSubscription subscription; private volatile boolean finishRequested = false; private volatile boolean finished = false; public AtomicObserverSingleThreaded(Observer Observer, AtomicObservableSubscription subscription) { - this.Observer = Observer; + this.observer = Observer; this.subscription = subscription; } @@ -86,7 +86,7 @@ public void onNext(T arg) { // if we're already stopped, or a finish request has been received, we won't allow further onNext requests return; } - Observer.onNext(arg); + observer.onNext(arg); } } @@ -101,7 +101,7 @@ public void onError(Exception e) { if (finished || subscription.isUnsubscribed()) { return; } - Observer.onError(e); + observer.onError(e); finished = true; } } @@ -117,7 +117,7 @@ public void onCompleted() { if (finished || subscription.isUnsubscribed()) { return; } - Observer.onCompleted(); + observer.onCompleted(); finished = true; } } From c7f554f26d4ce3439299d772b7d3ea91071decb5 Mon Sep 17 00:00:00 2001 From: George Campbell Date: Wed, 23 Jan 2013 13:29:13 -0800 Subject: [PATCH 2/2] removing the dependency on groovy --- rxjava-core/src/main/java/rx/subjects/Subject.java | 5 ++--- 1 file changed, 2 insertions(+), 3 deletions(-) diff --git a/rxjava-core/src/main/java/rx/subjects/Subject.java b/rxjava-core/src/main/java/rx/subjects/Subject.java index 664513c34d..c23b412447 100644 --- a/rxjava-core/src/main/java/rx/subjects/Subject.java +++ b/rxjava-core/src/main/java/rx/subjects/Subject.java @@ -1,11 +1,10 @@ package rx.subjects; -import groovy.lang.Reference; - import java.util.ArrayList; import java.util.List; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicReference; import junit.framework.Assert; @@ -78,7 +77,7 @@ public static class UnitTest { @Test public void test() { Subject subject = Subject. create(); - final Reference>> actualRef = new Reference>>(); + final AtomicReference>> actualRef = new AtomicReference>>(); Observable>> wNotificationsList = subject.materialize().toList(); wNotificationsList.subscribe(new Action1>>() {