From 4cb4fed51e23bee21c03b0c2af0b891ba55934a0 Mon Sep 17 00:00:00 2001 From: John Myers Date: Sat, 30 Mar 2013 09:59:00 -0700 Subject: [PATCH 1/8] Rename Subject to PublishSubject --- .../main/java/rx/operators/OperationTake.java | 17 +++++++++++------ .../{Subject.java => PublishSubject.java} | 18 +++++++++--------- 2 files changed, 20 insertions(+), 15 deletions(-) rename rxjava-core/src/main/java/rx/subjects/{Subject.java => PublishSubject.java} (86%) diff --git a/rxjava-core/src/main/java/rx/operators/OperationTake.java b/rxjava-core/src/main/java/rx/operators/OperationTake.java index e86263e562..d2a0951e91 100644 --- a/rxjava-core/src/main/java/rx/operators/OperationTake.java +++ b/rxjava-core/src/main/java/rx/operators/OperationTake.java @@ -26,10 +26,11 @@ import rx.Observable; import rx.Observer; import rx.Subscription; +import rx.subjects.PublishSubject; import rx.util.AtomicObservableSubscription; import rx.util.functions.Func1; import rx.util.functions.Func2; -import rx.subjects.Subject; + /** * Returns a specified number of contiguous values from the start of an observable sequence. */ @@ -181,11 +182,13 @@ public Boolean call(Integer input) { @Test public void testTakeWhileOnSubject1() { - Subject s = Subject.create(); + PublishSubject s = PublishSubject.create(); Observable w = (Observable)s; - Observable take = Observable.create(takeWhile(w, new Func1() { + Observable take = Observable.create(takeWhile(w, new Func1() + { @Override - public Boolean call(Integer input) { + public Boolean call(Integer input) + { return input < 3; } })); @@ -213,9 +216,11 @@ public Boolean call(Integer input) { @Test public void testTakeWhile2() { Observable w = Observable.toObservable("one", "two", "three"); - Observable take = Observable.create(takeWhileWithIndex(w, new Func2() { + Observable take = Observable.create(takeWhileWithIndex(w, new Func2() + { @Override - public Boolean call(String input, Integer index) { + public Boolean call(String input, Integer index) + { return index < 2; } })); diff --git a/rxjava-core/src/main/java/rx/subjects/Subject.java b/rxjava-core/src/main/java/rx/subjects/PublishSubject.java similarity index 86% rename from rxjava-core/src/main/java/rx/subjects/Subject.java rename to rxjava-core/src/main/java/rx/subjects/PublishSubject.java index c9279adb38..172dad7232 100644 --- a/rxjava-core/src/main/java/rx/subjects/Subject.java +++ b/rxjava-core/src/main/java/rx/subjects/PublishSubject.java @@ -19,8 +19,8 @@ import rx.util.functions.Action1; import rx.util.functions.Func1; -public class Subject extends Observable implements Observer { - public static Subject create() { +public class PublishSubject extends Observable implements Observer { + public static PublishSubject create() { final ConcurrentHashMap> observers = new ConcurrentHashMap>(); Func1, Subscription> onSubscribe = new Func1, Subscription>() { @@ -42,12 +42,12 @@ public void unsubscribe() { } }; - return new Subject(onSubscribe, observers); + return new PublishSubject(onSubscribe, observers); } private final ConcurrentHashMap> observers; - protected Subject(Func1, Subscription> onSubscribe, ConcurrentHashMap> observers) { + protected PublishSubject(Func1, Subscription> onSubscribe, ConcurrentHashMap> observers) { super(onSubscribe); this.observers = observers; } @@ -76,10 +76,10 @@ public void onNext(T args) { public static class UnitTest { @Test public void test() { - Subject subject = Subject.create(); + PublishSubject publishSubject = PublishSubject.create(); final AtomicReference>> actualRef = new AtomicReference>>(); - Observable>> wNotificationsList = subject.materialize().toList(); + Observable>> wNotificationsList = publishSubject.materialize().toList(); wNotificationsList.subscribe(new Action1>>() { @Override public void call(List> actual) { @@ -108,10 +108,10 @@ public void unsubscribe() { } }; } - }).subscribe(subject); - // the subject has received an onComplete from the first subscribe because + }).subscribe(publishSubject); + // the publishSubject has received an onComplete from the first subscribe because // it is synchronous and the next subscribe won't do anything. - Observable.toObservable(-1, -2, -3).subscribe(subject); + Observable.toObservable(-1, -2, -3).subscribe(publishSubject); List> expected = new ArrayList>(); expected.add(new Notification(-1)); From 5b9bca978150a0ec352da38dbd49df7b5d9997f6 Mon Sep 17 00:00:00 2001 From: John Myers Date: Sat, 30 Mar 2013 10:07:28 -0700 Subject: [PATCH 2/8] Introduce Subject as analogue to RX ISubject --- .../main/java/rx/subjects/PublishSubject.java | 2 +- .../src/main/java/rx/subjects/Subject.java | 18 ++++++++++++++++++ 2 files changed, 19 insertions(+), 1 deletion(-) create mode 100644 rxjava-core/src/main/java/rx/subjects/Subject.java diff --git a/rxjava-core/src/main/java/rx/subjects/PublishSubject.java b/rxjava-core/src/main/java/rx/subjects/PublishSubject.java index 172dad7232..363042bfdf 100644 --- a/rxjava-core/src/main/java/rx/subjects/PublishSubject.java +++ b/rxjava-core/src/main/java/rx/subjects/PublishSubject.java @@ -19,7 +19,7 @@ import rx.util.functions.Action1; import rx.util.functions.Func1; -public class PublishSubject extends Observable implements Observer { +public class PublishSubject extends Subject { public static PublishSubject create() { final ConcurrentHashMap> observers = new ConcurrentHashMap>(); diff --git a/rxjava-core/src/main/java/rx/subjects/Subject.java b/rxjava-core/src/main/java/rx/subjects/Subject.java new file mode 100644 index 0000000000..ca9242d471 --- /dev/null +++ b/rxjava-core/src/main/java/rx/subjects/Subject.java @@ -0,0 +1,18 @@ +package rx.subjects; + +import rx.Observable; +import rx.Observer; +import rx.Subscription; +import rx.util.functions.Func1; + +public abstract class Subject extends Observable implements Observer { + protected Subject() + { + super(); + } + + protected Subject(Func1, Subscription> onSubscribe) + { + super(onSubscribe); + } +} From 62c29d2c6d7f320454703dda11028e8042dca987 Mon Sep 17 00:00:00 2001 From: John Myers Date: Sat, 30 Mar 2013 22:04:46 -0700 Subject: [PATCH 3/8] Implement RepeatSubject --- .../main/java/rx/subjects/RepeatSubject.java | 404 ++++++++++++++++++ 1 file changed, 404 insertions(+) create mode 100644 rxjava-core/src/main/java/rx/subjects/RepeatSubject.java diff --git a/rxjava-core/src/main/java/rx/subjects/RepeatSubject.java b/rxjava-core/src/main/java/rx/subjects/RepeatSubject.java new file mode 100644 index 0000000000..a5f1470f6b --- /dev/null +++ b/rxjava-core/src/main/java/rx/subjects/RepeatSubject.java @@ -0,0 +1,404 @@ +package rx.subjects; + +import org.junit.Test; +import org.mockito.Mockito; +import rx.Observable; +import rx.Observer; +import rx.Subscription; +import rx.subscriptions.Subscriptions; +import rx.util.functions.Func1; + +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +import static org.junit.Assert.assertTrue; +import static org.mockito.Matchers.any; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; + +public final class RepeatSubject extends Subject +{ + + private boolean isDone = false; + private Exception exception = null; + private final Map> subscriptions = new HashMap>(); + private final List history = Collections.synchronizedList(new ArrayList()); + + public static RepeatSubject create() { + return new RepeatSubject(new DelegateSubscriptionFunc()); + } + + private RepeatSubject(DelegateSubscriptionFunc onSubscribe) { + super(onSubscribe); + onSubscribe.wrap(new SubscriptionFunc()); + } + private static final class DelegateSubscriptionFunc implements Func1,Subscription> + { + private Func1, Subscription> delegate = null; + + public void wrap(Func1, Subscription> delegate) + { + if (this.delegate != null) { + throw new UnsupportedOperationException("delegate already set"); + } + this.delegate = delegate; + } + + @Override + public Subscription call(Observer observer) + { + return delegate.call(observer); + } + } + + private class SubscriptionFunc implements Func1, Subscription> + { + @Override + public Subscription call(Observer observer) { + int item = 0; + Subscription subscription; + + for (;;) { + while (item < history.size()) { + observer.onNext(history.get(item++)); + } + + synchronized (subscriptions) { + if (item < history.size()) { + continue; + } + + if (exception != null) { + observer.onError(exception); + return Subscriptions.empty(); + } + if (isDone) { + observer.onCompleted(); + return Subscriptions.empty(); + } + + subscription = new RepeatSubjectSubscription(); + subscriptions.put(subscription, observer); + break; + } + } + + return subscription; + } + } + + private class RepeatSubjectSubscription implements Subscription + { + @Override + public void unsubscribe() + { + synchronized (subscriptions) { + subscriptions.remove(this); + } + } + } + + @Override + public void onCompleted() + { + synchronized (subscriptions) { + isDone = true; + for (Observer observer : new ArrayList>(subscriptions.values())) { + observer.onCompleted(); + } + subscriptions.clear(); + } + } + + @Override + public void onError(Exception e) + { + synchronized (subscriptions) { + if (isDone) { + return; + } + isDone = true; + exception = e; + for (Observer observer : new ArrayList>(subscriptions.values())) { + observer.onError(e); + } + subscriptions.clear(); + } + } + + @Override + public void onNext(T args) + { + synchronized (subscriptions) { + history.add(args); + for (Observer observer : new ArrayList>(subscriptions.values())) { + observer.onNext(args); + } + } + } + + public static class UnitTest { + + private final Exception testException = new Exception(); + + @Test + public void testCompleted() { + RepeatSubject subject = RepeatSubject.create(); + + Observer aObserver = mock(Observer.class); + subject.subscribe(aObserver); + + subject.onNext("one"); + subject.onNext("two"); + subject.onNext("three"); + subject.onCompleted(); + + subject.onNext("four"); + subject.onCompleted(); + subject.onError(new Exception()); + + assertCompletedObserver(aObserver); + + aObserver = mock(Observer.class); + subject.subscribe(aObserver); + assertCompletedObserver(aObserver); + } + + private void assertCompletedObserver(Observer aObserver) + { + verify(aObserver, times(1)).onNext("one"); + verify(aObserver, times(1)).onNext("two"); + verify(aObserver, times(1)).onNext("three"); + verify(aObserver, Mockito.never()).onError(any(Exception.class)); + verify(aObserver, times(1)).onCompleted(); + } + + @Test + public void testError() { + RepeatSubject subject = RepeatSubject.create(); + + Observer aObserver = mock(Observer.class); + subject.subscribe(aObserver); + + subject.onNext("one"); + subject.onNext("two"); + subject.onNext("three"); + subject.onError(testException); + + subject.onNext("four"); + subject.onError(new Exception()); + subject.onCompleted(); + + assertErrorObserver(aObserver); + + aObserver = mock(Observer.class); + subject.subscribe(aObserver); + assertErrorObserver(aObserver); + } + + private void assertErrorObserver(Observer aObserver) + { + verify(aObserver, times(1)).onNext("one"); + verify(aObserver, times(1)).onNext("two"); + verify(aObserver, times(1)).onNext("three"); + verify(aObserver, times(1)).onError(testException); + verify(aObserver, Mockito.never()).onCompleted(); + } + + + @Test + public void testSubscribeMidSequence() { + RepeatSubject subject = RepeatSubject.create(); + + Observer aObserver = mock(Observer.class); + subject.subscribe(aObserver); + + subject.onNext("one"); + subject.onNext("two"); + + assertObservedUntilTwo(aObserver); + + Observer anotherObserver = mock(Observer.class); + subject.subscribe(anotherObserver); + assertObservedUntilTwo(anotherObserver); + + subject.onNext("three"); + subject.onCompleted(); + + assertCompletedObserver(aObserver); + assertCompletedObserver(anotherObserver); + } + + @Test + public void testUnsubscribeFirstObserver() { + RepeatSubject subject = RepeatSubject.create(); + + Observer aObserver = mock(Observer.class); + Subscription subscription = subject.subscribe(aObserver); + + subject.onNext("one"); + subject.onNext("two"); + + subscription.unsubscribe(); + assertObservedUntilTwo(aObserver); + + Observer anotherObserver = mock(Observer.class); + subject.subscribe(anotherObserver); + assertObservedUntilTwo(anotherObserver); + + subject.onNext("three"); + subject.onCompleted(); + + assertObservedUntilTwo(aObserver); + assertCompletedObserver(anotherObserver); + } + + private void assertObservedUntilTwo(Observer aObserver) + { + verify(aObserver, times(1)).onNext("one"); + verify(aObserver, times(1)).onNext("two"); + verify(aObserver, Mockito.never()).onNext("three"); + verify(aObserver, Mockito.never()).onError(any(Exception.class)); + verify(aObserver, Mockito.never()).onCompleted(); + } + + @Test + public void testUnsubscribeFromOnNext() { + RepeatSubject subject = RepeatSubject.create(); + + UnsubscribeTest test1 = UnsubscribeTest.createOnNext(subject); + UnsubscribeTest test2 = UnsubscribeTest.createOnNext(subject); + + subject.onNext("one"); + + test1.assertPassed(); + test2.assertPassed(); + } + + @Test + public void testUnsubscribeFromOnCompleted() { + RepeatSubject subject = RepeatSubject.create(); + + UnsubscribeTest test1 = UnsubscribeTest.createOnCompleted(subject); + UnsubscribeTest test2 = UnsubscribeTest.createOnCompleted(subject); + + subject.onCompleted(); + + test1.assertPassed(); + test2.assertPassed(); + } + + @Test + public void testUnsubscribeFromOnError() { + RepeatSubject subject = RepeatSubject.create(); + + UnsubscribeTest test1 = UnsubscribeTest.createOnError(subject); + UnsubscribeTest test2 = UnsubscribeTest.createOnError(subject); + + subject.onError(new Exception()); + + test1.assertPassed(); + test2.assertPassed(); + } + + private static class UnsubscribeTest + { + private Subscription subscription; + + private UnsubscribeTest() {} + + public static UnsubscribeTest createOnNext(Observable observable) + { + final UnsubscribeTest test = new UnsubscribeTest(); + test.setSubscription(observable.subscribe(new Observer() + { + @Override + public void onCompleted() + { + } + + @Override + public void onError(Exception e) + { + } + + @Override + public void onNext(T args) + { + test.doUnsubscribe(); + } + })); + return test; + } + + public static UnsubscribeTest createOnCompleted(Observable observable) + { + final UnsubscribeTest test = new UnsubscribeTest(); + test.setSubscription(observable.subscribe(new Observer() + { + @Override + public void onCompleted() + { + test.doUnsubscribe(); + } + + @Override + public void onError(Exception e) + { + } + + @Override + public void onNext(T args) + { + } + })); + return test; + } + + public static UnsubscribeTest createOnError(Observable observable) + { + final UnsubscribeTest test = new UnsubscribeTest(); + test.setSubscription(observable.subscribe(new Observer() + { + @Override + public void onCompleted() + { + } + + @Override + public void onError(Exception e) + { + test.doUnsubscribe(); + } + + @Override + public void onNext(T args) + { + } + })); + return test; + } + + private void setSubscription(Subscription subscription) + { + this.subscription = subscription; + } + + private void doUnsubscribe() + { + Subscription subscription = this.subscription; + this.subscription = null; + subscription.unsubscribe(); + } + + public void assertPassed() + { + assertTrue("expected notification was received", subscription == null); + } + } + } +} From 12954a69895db8e60522c72557b93c53973dc436 Mon Sep 17 00:00:00 2001 From: John Myers Date: Sat, 30 Mar 2013 22:18:43 -0700 Subject: [PATCH 4/8] Extract UnsubscribeTester to top level --- .../main/java/rx/subjects/RepeatSubject.java | 110 ++---------------- .../java/rx/testing/UnsubscribeTester.java | 103 ++++++++++++++++ 2 files changed, 110 insertions(+), 103 deletions(-) create mode 100644 rxjava-core/src/main/java/rx/testing/UnsubscribeTester.java diff --git a/rxjava-core/src/main/java/rx/subjects/RepeatSubject.java b/rxjava-core/src/main/java/rx/subjects/RepeatSubject.java index a5f1470f6b..1d0eb6f145 100644 --- a/rxjava-core/src/main/java/rx/subjects/RepeatSubject.java +++ b/rxjava-core/src/main/java/rx/subjects/RepeatSubject.java @@ -2,10 +2,10 @@ import org.junit.Test; import org.mockito.Mockito; -import rx.Observable; import rx.Observer; import rx.Subscription; import rx.subscriptions.Subscriptions; +import rx.testing.UnsubscribeTester; import rx.util.functions.Func1; import java.util.ArrayList; @@ -14,7 +14,6 @@ import java.util.List; import java.util.Map; -import static org.junit.Assert.assertTrue; import static org.mockito.Matchers.any; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.times; @@ -270,8 +269,8 @@ private void assertObservedUntilTwo(Observer aObserver) public void testUnsubscribeFromOnNext() { RepeatSubject subject = RepeatSubject.create(); - UnsubscribeTest test1 = UnsubscribeTest.createOnNext(subject); - UnsubscribeTest test2 = UnsubscribeTest.createOnNext(subject); + UnsubscribeTester test1 = UnsubscribeTester.createOnNext(subject); + UnsubscribeTester test2 = UnsubscribeTester.createOnNext(subject); subject.onNext("one"); @@ -283,8 +282,8 @@ public void testUnsubscribeFromOnNext() { public void testUnsubscribeFromOnCompleted() { RepeatSubject subject = RepeatSubject.create(); - UnsubscribeTest test1 = UnsubscribeTest.createOnCompleted(subject); - UnsubscribeTest test2 = UnsubscribeTest.createOnCompleted(subject); + UnsubscribeTester test1 = UnsubscribeTester.createOnCompleted(subject); + UnsubscribeTester test2 = UnsubscribeTester.createOnCompleted(subject); subject.onCompleted(); @@ -296,8 +295,8 @@ public void testUnsubscribeFromOnCompleted() { public void testUnsubscribeFromOnError() { RepeatSubject subject = RepeatSubject.create(); - UnsubscribeTest test1 = UnsubscribeTest.createOnError(subject); - UnsubscribeTest test2 = UnsubscribeTest.createOnError(subject); + UnsubscribeTester test1 = UnsubscribeTester.createOnError(subject); + UnsubscribeTester test2 = UnsubscribeTester.createOnError(subject); subject.onError(new Exception()); @@ -305,100 +304,5 @@ public void testUnsubscribeFromOnError() { test2.assertPassed(); } - private static class UnsubscribeTest - { - private Subscription subscription; - - private UnsubscribeTest() {} - - public static UnsubscribeTest createOnNext(Observable observable) - { - final UnsubscribeTest test = new UnsubscribeTest(); - test.setSubscription(observable.subscribe(new Observer() - { - @Override - public void onCompleted() - { - } - - @Override - public void onError(Exception e) - { - } - - @Override - public void onNext(T args) - { - test.doUnsubscribe(); - } - })); - return test; - } - - public static UnsubscribeTest createOnCompleted(Observable observable) - { - final UnsubscribeTest test = new UnsubscribeTest(); - test.setSubscription(observable.subscribe(new Observer() - { - @Override - public void onCompleted() - { - test.doUnsubscribe(); - } - - @Override - public void onError(Exception e) - { - } - - @Override - public void onNext(T args) - { - } - })); - return test; - } - - public static UnsubscribeTest createOnError(Observable observable) - { - final UnsubscribeTest test = new UnsubscribeTest(); - test.setSubscription(observable.subscribe(new Observer() - { - @Override - public void onCompleted() - { - } - - @Override - public void onError(Exception e) - { - test.doUnsubscribe(); - } - - @Override - public void onNext(T args) - { - } - })); - return test; - } - - private void setSubscription(Subscription subscription) - { - this.subscription = subscription; - } - - private void doUnsubscribe() - { - Subscription subscription = this.subscription; - this.subscription = null; - subscription.unsubscribe(); - } - - public void assertPassed() - { - assertTrue("expected notification was received", subscription == null); - } - } } } diff --git a/rxjava-core/src/main/java/rx/testing/UnsubscribeTester.java b/rxjava-core/src/main/java/rx/testing/UnsubscribeTester.java new file mode 100644 index 0000000000..e1988c9093 --- /dev/null +++ b/rxjava-core/src/main/java/rx/testing/UnsubscribeTester.java @@ -0,0 +1,103 @@ +package rx.testing; + +import rx.Observable; +import rx.Observer; +import rx.Subscription; + +import static org.junit.Assert.assertTrue; + +public class UnsubscribeTester +{ + private Subscription subscription; + + public UnsubscribeTester() {} + + public static UnsubscribeTester createOnNext(Observable observable) + { + final UnsubscribeTester test = new UnsubscribeTester(); + test.setSubscription(observable.subscribe(new Observer() + { + @Override + public void onCompleted() + { + } + + @Override + public void onError(Exception e) + { + } + + @Override + public void onNext(T args) + { + test.doUnsubscribe(); + } + })); + return test; + } + + public static UnsubscribeTester createOnCompleted(Observable observable) + { + final UnsubscribeTester test = new UnsubscribeTester(); + test.setSubscription(observable.subscribe(new Observer() + { + @Override + public void onCompleted() + { + test.doUnsubscribe(); + } + + @Override + public void onError(Exception e) + { + } + + @Override + public void onNext(T args) + { + } + })); + return test; + } + + public static UnsubscribeTester createOnError(Observable observable) + { + final UnsubscribeTester test = new UnsubscribeTester(); + test.setSubscription(observable.subscribe(new Observer() + { + @Override + public void onCompleted() + { + } + + @Override + public void onError(Exception e) + { + test.doUnsubscribe(); + } + + @Override + public void onNext(T args) + { + } + })); + return test; + } + + private void setSubscription(Subscription subscription) + { + this.subscription = subscription; + } + + private void doUnsubscribe() + { + Subscription subscription = this.subscription; + this.subscription = null; + subscription.unsubscribe(); + } + + public void assertPassed() + { + assertTrue("expected notification was received", subscription == null); + } +} From cce874510408305fed6e01ab5b70ab753f9ac693 Mon Sep 17 00:00:00 2001 From: John Myers Date: Sat, 30 Mar 2013 22:58:39 -0700 Subject: [PATCH 5/8] Beef up UnsubscribeTester --- .../main/java/rx/subjects/RepeatSubject.java | 72 +++++++++---------- .../java/rx/testing/UnsubscribeTester.java | 53 ++++++++++++-- 2 files changed, 81 insertions(+), 44 deletions(-) diff --git a/rxjava-core/src/main/java/rx/subjects/RepeatSubject.java b/rxjava-core/src/main/java/rx/subjects/RepeatSubject.java index 1d0eb6f145..7ab3a9e2c7 100644 --- a/rxjava-core/src/main/java/rx/subjects/RepeatSubject.java +++ b/rxjava-core/src/main/java/rx/subjects/RepeatSubject.java @@ -2,10 +2,13 @@ import org.junit.Test; import org.mockito.Mockito; +import rx.Observable; import rx.Observer; import rx.Subscription; import rx.subscriptions.Subscriptions; import rx.testing.UnsubscribeTester; +import rx.util.functions.Action1; +import rx.util.functions.Func0; import rx.util.functions.Func1; import java.util.ArrayList; @@ -266,43 +269,38 @@ private void assertObservedUntilTwo(Observer aObserver) } @Test - public void testUnsubscribeFromOnNext() { - RepeatSubject subject = RepeatSubject.create(); - - UnsubscribeTester test1 = UnsubscribeTester.createOnNext(subject); - UnsubscribeTester test2 = UnsubscribeTester.createOnNext(subject); - - subject.onNext("one"); - - test1.assertPassed(); - test2.assertPassed(); - } - - @Test - public void testUnsubscribeFromOnCompleted() { - RepeatSubject subject = RepeatSubject.create(); - - UnsubscribeTester test1 = UnsubscribeTester.createOnCompleted(subject); - UnsubscribeTester test2 = UnsubscribeTester.createOnCompleted(subject); - - subject.onCompleted(); - - test1.assertPassed(); - test2.assertPassed(); - } - - @Test - public void testUnsubscribeFromOnError() { - RepeatSubject subject = RepeatSubject.create(); - - UnsubscribeTester test1 = UnsubscribeTester.createOnError(subject); - UnsubscribeTester test2 = UnsubscribeTester.createOnError(subject); - - subject.onError(new Exception()); - - test1.assertPassed(); - test2.assertPassed(); + public void testUnsubscribe() + { + UnsubscribeTester.test(new Func0>() + { + @Override + public RepeatSubject call() + { + return RepeatSubject.create(); + } + }, new Action1>() + { + @Override + public void call(RepeatSubject repeatSubject) + { + repeatSubject.onCompleted(); + } + }, new Action1>() + { + @Override + public void call(RepeatSubject repeatSubject) + { + repeatSubject.onError(new Exception()); + } + }, new Action1>() + { + @Override + public void call(RepeatSubject repeatSubject) + { + repeatSubject.onNext("one"); + } + } + ); } - } } diff --git a/rxjava-core/src/main/java/rx/testing/UnsubscribeTester.java b/rxjava-core/src/main/java/rx/testing/UnsubscribeTester.java index e1988c9093..08607f2734 100644 --- a/rxjava-core/src/main/java/rx/testing/UnsubscribeTester.java +++ b/rxjava-core/src/main/java/rx/testing/UnsubscribeTester.java @@ -3,6 +3,8 @@ import rx.Observable; import rx.Observer; import rx.Subscription; +import rx.util.functions.Action1; +import rx.util.functions.Func0; import static org.junit.Assert.assertTrue; @@ -12,7 +14,44 @@ public class UnsubscribeTester public UnsubscribeTester() {} - public static UnsubscribeTester createOnNext(Observable observable) + /** + * Tests the unsubscription semantics of an observable. + * + * @param provider Function that when called provides an instance of the observable being tested + * @param generateOnCompleted Causes an observer generated by @param provider to generate an onCompleted event. Null to not test onCompleted. + * @param generateOnError Causes an observer generated by @param provider to generate an onError event. Null to not test onError. + * @param generateOnNext Causes an observer generated by @param provider to generate an onNext event. Null to not test onNext. + * @param The type of object passed by the Observable + */ + public static > void test(Func0 provider, Action1 generateOnCompleted, Action1 generateOnError, Action1 generateOnNext) + { + if (generateOnCompleted != null) { + O observable = provider.call(); + UnsubscribeTester tester1 = createOnCompleted(observable); + UnsubscribeTester tester2 = createOnCompleted(observable); + generateOnCompleted.call(observable); + tester1.assertPassed(); + tester2.assertPassed(); + } + if (generateOnError != null) { + O observable = provider.call(); + UnsubscribeTester tester1 = createOnError(observable); + UnsubscribeTester tester2 = createOnError(observable); + generateOnError.call(observable); + tester1.assertPassed(); + tester2.assertPassed(); + } + if (generateOnNext != null) { + O observable = provider.call(); + UnsubscribeTester tester1 = createOnNext(observable); + UnsubscribeTester tester2 = createOnNext(observable); + generateOnNext.call(observable); + tester1.assertPassed(); + tester2.assertPassed(); + } + } + + private static UnsubscribeTester createOnCompleted(Observable observable) { final UnsubscribeTester test = new UnsubscribeTester(); test.setSubscription(observable.subscribe(new Observer() @@ -20,6 +59,7 @@ public static UnsubscribeTester createOnNext(Observable observable) @Override public void onCompleted() { + test.doUnsubscribe(); } @Override @@ -30,13 +70,12 @@ public void onError(Exception e) @Override public void onNext(T args) { - test.doUnsubscribe(); } })); return test; } - public static UnsubscribeTester createOnCompleted(Observable observable) + private static UnsubscribeTester createOnError(Observable observable) { final UnsubscribeTester test = new UnsubscribeTester(); test.setSubscription(observable.subscribe(new Observer() @@ -44,12 +83,12 @@ public static UnsubscribeTester createOnCompleted(Observable observable) @Override public void onCompleted() { - test.doUnsubscribe(); } @Override public void onError(Exception e) { + test.doUnsubscribe(); } @Override @@ -60,7 +99,7 @@ public void onNext(T args) return test; } - public static UnsubscribeTester createOnError(Observable observable) + private static UnsubscribeTester createOnNext(Observable observable) { final UnsubscribeTester test = new UnsubscribeTester(); test.setSubscription(observable.subscribe(new Observer() @@ -73,12 +112,12 @@ public void onCompleted() @Override public void onError(Exception e) { - test.doUnsubscribe(); } @Override public void onNext(T args) { + test.doUnsubscribe(); } })); return test; @@ -96,7 +135,7 @@ private void doUnsubscribe() subscription.unsubscribe(); } - public void assertPassed() + private void assertPassed() { assertTrue("expected notification was received", subscription == null); } From acfe92f43c5e6b223bb322bc2c65e3d16910357f Mon Sep 17 00:00:00 2001 From: John Myers Date: Sat, 30 Mar 2013 23:11:19 -0700 Subject: [PATCH 6/8] Beef up UnsubscribeTester some more --- .../java/rx/testing/UnsubscribeTester.java | 31 ++++++++++++++----- 1 file changed, 23 insertions(+), 8 deletions(-) diff --git a/rxjava-core/src/main/java/rx/testing/UnsubscribeTester.java b/rxjava-core/src/main/java/rx/testing/UnsubscribeTester.java index 08607f2734..701e7025cf 100644 --- a/rxjava-core/src/main/java/rx/testing/UnsubscribeTester.java +++ b/rxjava-core/src/main/java/rx/testing/UnsubscribeTester.java @@ -6,10 +6,12 @@ import rx.util.functions.Action1; import rx.util.functions.Func0; +import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertTrue; public class UnsubscribeTester { + private boolean isDone = false; private Subscription subscription; public UnsubscribeTester() {} @@ -59,17 +61,19 @@ private static UnsubscribeTester createOnCompleted(Observable observable) @Override public void onCompleted() { - test.doUnsubscribe(); + test.doUnsubscribe("onCompleted"); } @Override public void onError(Exception e) { + test.gotEvent("onError"); } @Override public void onNext(T args) { + test.gotEvent("onNext"); } })); return test; @@ -83,17 +87,19 @@ private static UnsubscribeTester createOnError(Observable observable) @Override public void onCompleted() { + test.gotEvent("onCompleted"); } @Override public void onError(Exception e) { - test.doUnsubscribe(); + test.doUnsubscribe("onError"); } @Override public void onNext(T args) { + test.gotEvent("onNext"); } })); return test; @@ -107,17 +113,19 @@ private static UnsubscribeTester createOnNext(Observable observable) @Override public void onCompleted() { + test.gotEvent("onCompleted"); } @Override public void onError(Exception e) { + test.gotEvent("onError"); } @Override public void onNext(T args) { - test.doUnsubscribe(); + test.doUnsubscribe("onNext"); } })); return test; @@ -128,15 +136,22 @@ private void setSubscription(Subscription subscription) this.subscription = subscription; } - private void doUnsubscribe() + private void gotEvent(String event) { - Subscription subscription = this.subscription; - this.subscription = null; - subscription.unsubscribe(); + assertFalse("received " + event + " after unsubscribe", isDone); + } + + private void doUnsubscribe(String event) + { + gotEvent(event); + if (subscription != null) { + isDone = true; + subscription.unsubscribe(); + } } private void assertPassed() { - assertTrue("expected notification was received", subscription == null); + assertTrue("expected notification was received", isDone); } } From 0d420d7a132ae72db7e3f1acdb72eda209606968 Mon Sep 17 00:00:00 2001 From: John Myers Date: Sat, 30 Mar 2013 23:40:05 -0700 Subject: [PATCH 7/8] Add unit tests to PublishSubject --- .../main/java/rx/subjects/PublishSubject.java | 181 ++++++++++++++++++ 1 file changed, 181 insertions(+) diff --git a/rxjava-core/src/main/java/rx/subjects/PublishSubject.java b/rxjava-core/src/main/java/rx/subjects/PublishSubject.java index 363042bfdf..ab317815da 100644 --- a/rxjava-core/src/main/java/rx/subjects/PublishSubject.java +++ b/rxjava-core/src/main/java/rx/subjects/PublishSubject.java @@ -10,15 +10,23 @@ import org.junit.Test; +import org.mockito.Mockito; import rx.Notification; import rx.Observable; import rx.Observer; import rx.Subscription; +import rx.testing.UnsubscribeTester; import rx.util.AtomicObservableSubscription; import rx.util.SynchronizedObserver; import rx.util.functions.Action1; +import rx.util.functions.Func0; import rx.util.functions.Func1; +import static org.mockito.Matchers.any; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; + public class PublishSubject extends Subject { public static PublishSubject create() { final ConcurrentHashMap> observers = new ConcurrentHashMap>(); @@ -122,5 +130,178 @@ public void unsubscribe() { sub.unsubscribe(); } + + private final Exception testException = new Exception(); + + @Test + public void testCompleted() { + PublishSubject subject = PublishSubject.create(); + + Observer aObserver = mock(Observer.class); + subject.subscribe(aObserver); + + subject.onNext("one"); + subject.onNext("two"); + subject.onNext("three"); + subject.onCompleted(); + + Observer anotherObserver = mock(Observer.class); + subject.subscribe(anotherObserver); + + subject.onNext("four"); + subject.onCompleted(); + subject.onError(new Exception()); + + assertCompletedObserver(aObserver); +// todo bug? assertNeverObserver(anotherObserver); + } + + private void assertCompletedObserver(Observer aObserver) + { + verify(aObserver, times(1)).onNext("one"); + verify(aObserver, times(1)).onNext("two"); + verify(aObserver, times(1)).onNext("three"); + verify(aObserver, Mockito.never()).onError(any(Exception.class)); + verify(aObserver, times(1)).onCompleted(); + } + + private void assertNeverObserver(Observer aObserver) + { + verify(aObserver, Mockito.never()).onNext(any(String.class)); + verify(aObserver, Mockito.never()).onError(any(Exception.class)); + verify(aObserver, Mockito.never()).onCompleted(); + } + + @Test + public void testError() { + PublishSubject subject = PublishSubject.create(); + + Observer aObserver = mock(Observer.class); + subject.subscribe(aObserver); + + subject.onNext("one"); + subject.onNext("two"); + subject.onNext("three"); + subject.onError(testException); + + Observer anotherObserver = mock(Observer.class); + subject.subscribe(anotherObserver); + + subject.onNext("four"); + subject.onError(new Exception()); + subject.onCompleted(); + + assertErrorObserver(aObserver); +// todo bug? assertNeverObserver(anotherObserver); + } + + private void assertErrorObserver(Observer aObserver) + { + verify(aObserver, times(1)).onNext("one"); + verify(aObserver, times(1)).onNext("two"); + verify(aObserver, times(1)).onNext("three"); + verify(aObserver, times(1)).onError(testException); + verify(aObserver, Mockito.never()).onCompleted(); + } + + + @Test + public void testSubscribeMidSequence() { + PublishSubject subject = PublishSubject.create(); + + Observer aObserver = mock(Observer.class); + subject.subscribe(aObserver); + + subject.onNext("one"); + subject.onNext("two"); + + assertObservedUntilTwo(aObserver); + + Observer anotherObserver = mock(Observer.class); + subject.subscribe(anotherObserver); + + subject.onNext("three"); + subject.onCompleted(); + + assertCompletedObserver(aObserver); + assertCompletedStartingWithThreeObserver(anotherObserver); + } + + + private void assertCompletedStartingWithThreeObserver(Observer aObserver) + { + verify(aObserver, Mockito.never()).onNext("one"); + verify(aObserver, Mockito.never()).onNext("two"); + verify(aObserver, times(1)).onNext("three"); + verify(aObserver, Mockito.never()).onError(any(Exception.class)); + verify(aObserver, times(1)).onCompleted(); + } + + @Test + public void testUnsubscribeFirstObserver() { + PublishSubject subject = PublishSubject.create(); + + Observer aObserver = mock(Observer.class); + Subscription subscription = subject.subscribe(aObserver); + + subject.onNext("one"); + subject.onNext("two"); + + subscription.unsubscribe(); + assertObservedUntilTwo(aObserver); + + Observer anotherObserver = mock(Observer.class); + subject.subscribe(anotherObserver); + + subject.onNext("three"); + subject.onCompleted(); + + assertObservedUntilTwo(aObserver); + assertCompletedStartingWithThreeObserver(anotherObserver); + } + + private void assertObservedUntilTwo(Observer aObserver) + { + verify(aObserver, times(1)).onNext("one"); + verify(aObserver, times(1)).onNext("two"); + verify(aObserver, Mockito.never()).onNext("three"); + verify(aObserver, Mockito.never()).onError(any(Exception.class)); + verify(aObserver, Mockito.never()).onCompleted(); + } + + @Test + public void testUnsubscribe() + { + UnsubscribeTester.test(new Func0>() + { + @Override + public PublishSubject call() + { + return PublishSubject.create(); + } + }, new Action1>() + { + @Override + public void call(PublishSubject PublishSubject) + { + PublishSubject.onCompleted(); + } + }, new Action1>() + { + @Override + public void call(PublishSubject PublishSubject) + { + PublishSubject.onError(new Exception()); + } + }, new Action1>() + { + @Override + public void call(PublishSubject PublishSubject) + { + PublishSubject.onNext("one"); + } + } + ); + } } } From af383b5addc32b34aff4bc1dc920339252e6168e Mon Sep 17 00:00:00 2001 From: John Myers Date: Sun, 31 Mar 2013 08:51:28 -0700 Subject: [PATCH 8/8] Resolve merge conflicts --- .../main/java/rx/operators/OperationTake.java | 24 +++++++++---------- .../java/rx/operators/OperationTakeWhile.java | 5 ++-- 2 files changed, 15 insertions(+), 14 deletions(-) diff --git a/rxjava-core/src/main/java/rx/operators/OperationTake.java b/rxjava-core/src/main/java/rx/operators/OperationTake.java index 88639d8c40..d22f8ce95f 100644 --- a/rxjava-core/src/main/java/rx/operators/OperationTake.java +++ b/rxjava-core/src/main/java/rx/operators/OperationTake.java @@ -15,25 +15,25 @@ */ package rx.operators; -import static org.junit.Assert.*; -import static org.mockito.Matchers.*; -import static org.mockito.Mockito.*; -import static rx.operators.AbstractOperation.UnitTest.*; - -import java.util.concurrent.atomic.AtomicBoolean; -import java.util.concurrent.atomic.AtomicInteger; - import org.junit.Test; - import rx.Observable; import rx.Observer; import rx.Subscription; import rx.subscriptions.Subscriptions; -import rx.subjects.PublishSubject; import rx.util.AtomicObservableSubscription; import rx.util.functions.Func1; -import rx.util.functions.Func2; -import rx.subjects.Subject; + +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicInteger; + +import static org.junit.Assert.assertTrue; +import static org.junit.Assert.fail; +import static org.mockito.Matchers.any; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.never; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; +import static rx.operators.AbstractOperation.UnitTest.assertTrustedObservable; /** * Returns a specified number of contiguous values from the start of an observable sequence. */ diff --git a/rxjava-core/src/main/java/rx/operators/OperationTakeWhile.java b/rxjava-core/src/main/java/rx/operators/OperationTakeWhile.java index f45efabc92..8b384528b1 100644 --- a/rxjava-core/src/main/java/rx/operators/OperationTakeWhile.java +++ b/rxjava-core/src/main/java/rx/operators/OperationTakeWhile.java @@ -26,6 +26,7 @@ import rx.Observable; import rx.Observer; import rx.Subscription; +import rx.subjects.PublishSubject; import rx.subscriptions.Subscriptions; import rx.util.AtomicObservableSubscription; import rx.util.functions.Func1; @@ -162,8 +163,8 @@ public Boolean call(Integer input) @Test public void testTakeWhileOnSubject1() { - Subject s = Subject.create(); - Observable w = (Observable)s; + PublishSubject s = PublishSubject.create(); + Observable w = s; Observable take = Observable.create(takeWhile(w, new Func1() { @Override