diff --git a/rxjava-core/src/main/java/rx/operators/OperationObserveOn.java b/rxjava-core/src/main/java/rx/operators/OperationObserveOn.java index 4a09797f85..12276c7b8d 100644 --- a/rxjava-core/src/main/java/rx/operators/OperationObserveOn.java +++ b/rxjava-core/src/main/java/rx/operators/OperationObserveOn.java @@ -60,7 +60,7 @@ public static class UnitTest { @SuppressWarnings("unchecked") public void testObserveOn() { - Scheduler scheduler = spy(Tester.UnitTest.forwardingScheduler(Schedulers.immediate())); + Scheduler scheduler = spy(OperatorTester.UnitTest.forwardingScheduler(Schedulers.immediate())); Observer observer = mock(Observer.class); Observable.create(observeOn(Observable.toObservable(1, 2, 3), scheduler)).subscribe(observer); diff --git a/rxjava-core/src/main/java/rx/operators/OperationSubscribeOn.java b/rxjava-core/src/main/java/rx/operators/OperationSubscribeOn.java index 5b6368cedc..7e3756057d 100644 --- a/rxjava-core/src/main/java/rx/operators/OperationSubscribeOn.java +++ b/rxjava-core/src/main/java/rx/operators/OperationSubscribeOn.java @@ -81,7 +81,7 @@ public static class UnitTest { public void testSubscribeOn() { Observable w = Observable.toObservable(1, 2, 3); - Scheduler scheduler = spy(Tester.UnitTest.forwardingScheduler(Schedulers.immediate())); + Scheduler scheduler = spy(OperatorTester.UnitTest.forwardingScheduler(Schedulers.immediate())); Observer observer = mock(Observer.class); Subscription subscription = Observable.create(subscribeOn(w, scheduler)).subscribe(observer); diff --git a/rxjava-core/src/main/java/rx/operators/OperationTake.java b/rxjava-core/src/main/java/rx/operators/OperationTake.java index a4b1922e22..cf99ba6408 100644 --- a/rxjava-core/src/main/java/rx/operators/OperationTake.java +++ b/rxjava-core/src/main/java/rx/operators/OperationTake.java @@ -18,7 +18,7 @@ import static org.junit.Assert.*; import static org.mockito.Matchers.*; import static org.mockito.Mockito.*; -import static rx.operators.Tester.UnitTest.*; +import static rx.operators.OperatorTester.UnitTest.*; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; diff --git a/rxjava-core/src/main/java/rx/operators/Tester.java b/rxjava-core/src/main/java/rx/operators/OperatorTester.java similarity index 91% rename from rxjava-core/src/main/java/rx/operators/Tester.java rename to rxjava-core/src/main/java/rx/operators/OperatorTester.java index 7a1c60e7dc..086affa96d 100644 --- a/rxjava-core/src/main/java/rx/operators/Tester.java +++ b/rxjava-core/src/main/java/rx/operators/OperatorTester.java @@ -1,3 +1,18 @@ +/** + * Copyright 2013 Netflix, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ package rx.operators; import static org.junit.Assert.*; @@ -24,14 +39,18 @@ /** * Common utility functions for testing operator implementations. */ -/* package */class Tester { +/* package */class OperatorTester { /* * This is purposefully package-only so it does not leak into the public API outside of this package. * * This package is implementation details and not part of the Javadocs and thus can change without breaking backwards compatibility. + * + * benjchristensen => I'm procrastinating the decision of where and how these types of classes (see rx.subjects.UnsubscribeTester) should exist. + * If they are only for internal implementations then I don't want them as part of the API. + * If they are truly useful for everyone to use then an "rx.testing" package may make sense. */ - private Tester() { + private OperatorTester() { } public static class UnitTest { diff --git a/rxjava-core/src/main/java/rx/subjects/DefaultSubject.java b/rxjava-core/src/main/java/rx/subjects/DefaultSubject.java index d70b30f0a0..31ae205914 100644 --- a/rxjava-core/src/main/java/rx/subjects/DefaultSubject.java +++ b/rxjava-core/src/main/java/rx/subjects/DefaultSubject.java @@ -1,12 +1,12 @@ /** * Copyright 2013 Netflix, Inc. - * + * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at - * + * * http://www.apache.org/licenses/LICENSE-2.0 - * + * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. @@ -15,6 +15,9 @@ */ package rx.subjects; +import static org.mockito.Matchers.*; +import static org.mockito.Mockito.*; + import java.util.ArrayList; import java.util.List; import java.util.concurrent.ConcurrentHashMap; @@ -24,6 +27,7 @@ import junit.framework.Assert; import org.junit.Test; +import org.mockito.Mockito; import rx.Notification; import rx.Observable; @@ -32,6 +36,7 @@ import rx.util.AtomicObservableSubscription; import rx.util.SynchronizedObserver; import rx.util.functions.Action1; +import rx.util.functions.Func0; import rx.util.functions.Func1; public class DefaultSubject extends Subject { @@ -137,5 +142,183 @@ public void unsubscribe() { sub.unsubscribe(); } + + private final Exception testException = new Exception(); + + @Test + public void testCompleted() { + DefaultSubject subject = DefaultSubject.create(); + + @SuppressWarnings("unchecked") + Observer aObserver = mock(Observer.class); + subject.subscribe(aObserver); + + subject.onNext("one"); + subject.onNext("two"); + subject.onNext("three"); + subject.onCompleted(); + + @SuppressWarnings("unchecked") + Observer anotherObserver = mock(Observer.class); + subject.subscribe(anotherObserver); + + subject.onNext("four"); + subject.onCompleted(); + subject.onError(new Exception()); + + assertCompletedObserver(aObserver); + // todo bug? assertNeverObserver(anotherObserver); + } + + private void assertCompletedObserver(Observer aObserver) + { + verify(aObserver, times(1)).onNext("one"); + verify(aObserver, times(1)).onNext("two"); + verify(aObserver, times(1)).onNext("three"); + verify(aObserver, Mockito.never()).onError(any(Exception.class)); + verify(aObserver, times(1)).onCompleted(); + } + + private void assertNeverObserver(Observer aObserver) + { + verify(aObserver, Mockito.never()).onNext(any(String.class)); + verify(aObserver, Mockito.never()).onError(any(Exception.class)); + verify(aObserver, Mockito.never()).onCompleted(); + } + + @Test + public void testError() { + DefaultSubject subject = DefaultSubject.create(); + + @SuppressWarnings("unchecked") + Observer aObserver = mock(Observer.class); + subject.subscribe(aObserver); + + subject.onNext("one"); + subject.onNext("two"); + subject.onNext("three"); + subject.onError(testException); + + @SuppressWarnings("unchecked") + Observer anotherObserver = mock(Observer.class); + subject.subscribe(anotherObserver); + + subject.onNext("four"); + subject.onError(new Exception()); + subject.onCompleted(); + + assertErrorObserver(aObserver); + // todo bug? assertNeverObserver(anotherObserver); + } + + private void assertErrorObserver(Observer aObserver) + { + verify(aObserver, times(1)).onNext("one"); + verify(aObserver, times(1)).onNext("two"); + verify(aObserver, times(1)).onNext("three"); + verify(aObserver, times(1)).onError(testException); + verify(aObserver, Mockito.never()).onCompleted(); + } + + @Test + public void testSubscribeMidSequence() { + DefaultSubject subject = DefaultSubject.create(); + + @SuppressWarnings("unchecked") + Observer aObserver = mock(Observer.class); + subject.subscribe(aObserver); + + subject.onNext("one"); + subject.onNext("two"); + + assertObservedUntilTwo(aObserver); + + @SuppressWarnings("unchecked") + Observer anotherObserver = mock(Observer.class); + subject.subscribe(anotherObserver); + + subject.onNext("three"); + subject.onCompleted(); + + assertCompletedObserver(aObserver); + assertCompletedStartingWithThreeObserver(anotherObserver); + } + + private void assertCompletedStartingWithThreeObserver(Observer aObserver) + { + verify(aObserver, Mockito.never()).onNext("one"); + verify(aObserver, Mockito.never()).onNext("two"); + verify(aObserver, times(1)).onNext("three"); + verify(aObserver, Mockito.never()).onError(any(Exception.class)); + verify(aObserver, times(1)).onCompleted(); + } + + @Test + public void testUnsubscribeFirstObserver() { + DefaultSubject subject = DefaultSubject.create(); + + @SuppressWarnings("unchecked") + Observer aObserver = mock(Observer.class); + Subscription subscription = subject.subscribe(aObserver); + + subject.onNext("one"); + subject.onNext("two"); + + subscription.unsubscribe(); + assertObservedUntilTwo(aObserver); + + @SuppressWarnings("unchecked") + Observer anotherObserver = mock(Observer.class); + subject.subscribe(anotherObserver); + + subject.onNext("three"); + subject.onCompleted(); + + assertObservedUntilTwo(aObserver); + assertCompletedStartingWithThreeObserver(anotherObserver); + } + + private void assertObservedUntilTwo(Observer aObserver) + { + verify(aObserver, times(1)).onNext("one"); + verify(aObserver, times(1)).onNext("two"); + verify(aObserver, Mockito.never()).onNext("three"); + verify(aObserver, Mockito.never()).onError(any(Exception.class)); + verify(aObserver, Mockito.never()).onCompleted(); + } + + @Test + public void testUnsubscribe() + { + UnsubscribeTester.test(new Func0>() + { + @Override + public DefaultSubject call() + { + return DefaultSubject.create(); + } + }, new Action1>() + { + @Override + public void call(DefaultSubject DefaultSubject) + { + DefaultSubject.onCompleted(); + } + }, new Action1>() + { + @Override + public void call(DefaultSubject DefaultSubject) + { + DefaultSubject.onError(new Exception()); + } + }, new Action1>() + { + @Override + public void call(DefaultSubject DefaultSubject) + { + DefaultSubject.onNext("one"); + } + }); + } } } diff --git a/rxjava-core/src/main/java/rx/subjects/ReplaySubject.java b/rxjava-core/src/main/java/rx/subjects/ReplaySubject.java new file mode 100644 index 0000000000..ff5f2badcc --- /dev/null +++ b/rxjava-core/src/main/java/rx/subjects/ReplaySubject.java @@ -0,0 +1,348 @@ +/** + * Copyright 2013 Netflix, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package rx.subjects; + +import static org.mockito.Matchers.*; +import static org.mockito.Mockito.*; + +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +import org.junit.Test; +import org.mockito.InOrder; +import org.mockito.Mockito; + +import rx.Observer; +import rx.Subscription; +import rx.subscriptions.Subscriptions; +import rx.util.functions.Action1; +import rx.util.functions.Func0; +import rx.util.functions.Func1; + +/** + * Subject that retains all events and will replay them to an {@link Observer} that subscribes. + *

+ * Example usage: + *

+ *

 {@code
+ 
+  ReplaySubject subject = ReplaySubject.create();
+  subject.onNext("one");
+  subject.onNext("two");
+  subject.onNext("three");
+  subject.onCompleted();
+  
+  // both of the following will get the onNext/onCompleted calls from above
+  subject.subscribe(observer1);
+  subject.subscribe(observer2);
+ 
+  } 
+ * 
+ * @param 
+ */
+public final class ReplaySubject extends Subject
+{
+
+    private boolean isDone = false;
+    private Exception exception = null;
+    private final Map> subscriptions = new HashMap>();
+    private final List history = Collections.synchronizedList(new ArrayList());
+
+    public static  ReplaySubject create() {
+        return new ReplaySubject(new DelegateSubscriptionFunc());
+    }
+
+    private ReplaySubject(DelegateSubscriptionFunc onSubscribe) {
+        super(onSubscribe);
+        onSubscribe.wrap(new SubscriptionFunc());
+    }
+
+    private static final class DelegateSubscriptionFunc implements Func1, Subscription>
+    {
+        private Func1, Subscription> delegate = null;
+
+        public void wrap(Func1, Subscription> delegate)
+        {
+            if (this.delegate != null) {
+                throw new UnsupportedOperationException("delegate already set");
+            }
+            this.delegate = delegate;
+        }
+
+        @Override
+        public Subscription call(Observer observer)
+        {
+            return delegate.call(observer);
+        }
+    }
+
+    private class SubscriptionFunc implements Func1, Subscription>
+    {
+        @Override
+        public Subscription call(Observer observer) {
+            int item = 0;
+            Subscription subscription;
+
+            for (;;) {
+                while (item < history.size()) {
+                    observer.onNext(history.get(item++));
+                }
+
+                synchronized (subscriptions) {
+                    if (item < history.size()) {
+                        continue;
+                    }
+
+                    if (exception != null) {
+                        observer.onError(exception);
+                        return Subscriptions.empty();
+                    }
+                    if (isDone) {
+                        observer.onCompleted();
+                        return Subscriptions.empty();
+                    }
+
+                    subscription = new RepeatSubjectSubscription();
+                    subscriptions.put(subscription, observer);
+                    break;
+                }
+            }
+
+            return subscription;
+        }
+    }
+
+    private class RepeatSubjectSubscription implements Subscription
+    {
+        @Override
+        public void unsubscribe()
+        {
+            synchronized (subscriptions) {
+                subscriptions.remove(this);
+            }
+        }
+    }
+
+    @Override
+    public void onCompleted()
+    {
+        synchronized (subscriptions) {
+            isDone = true;
+            for (Observer observer : new ArrayList>(subscriptions.values())) {
+                observer.onCompleted();
+            }
+            subscriptions.clear();
+        }
+    }
+
+    @Override
+    public void onError(Exception e)
+    {
+        synchronized (subscriptions) {
+            if (isDone) {
+                return;
+            }
+            isDone = true;
+            exception = e;
+            for (Observer observer : new ArrayList>(subscriptions.values())) {
+                observer.onError(e);
+            }
+            subscriptions.clear();
+        }
+    }
+
+    @Override
+    public void onNext(T args)
+    {
+        synchronized (subscriptions) {
+            history.add(args);
+            for (Observer observer : new ArrayList>(subscriptions.values())) {
+                observer.onNext(args);
+            }
+        }
+    }
+
+    public static class UnitTest {
+
+        private final Exception testException = new Exception();
+
+        @SuppressWarnings("unchecked")
+        @Test
+        public void testCompleted() {
+            ReplaySubject subject = ReplaySubject.create();
+
+            Observer o1 = mock(Observer.class);
+            subject.subscribe(o1);
+
+            subject.onNext("one");
+            subject.onNext("two");
+            subject.onNext("three");
+            subject.onCompleted();
+
+            subject.onNext("four");
+            subject.onCompleted();
+            subject.onError(new Exception());
+
+            assertCompletedObserver(o1);
+
+            // assert that subscribing a 2nd time gets the same data
+            Observer o2 = mock(Observer.class);
+            subject.subscribe(o2);
+            assertCompletedObserver(o2);
+        }
+
+        private void assertCompletedObserver(Observer aObserver)
+        {
+            InOrder inOrder = inOrder(aObserver);
+
+            inOrder.verify(aObserver, times(1)).onNext("one");
+            inOrder.verify(aObserver, times(1)).onNext("two");
+            inOrder.verify(aObserver, times(1)).onNext("three");
+            inOrder.verify(aObserver, Mockito.never()).onError(any(Exception.class));
+            inOrder.verify(aObserver, times(1)).onCompleted();
+            inOrder.verifyNoMoreInteractions();
+        }
+
+        @SuppressWarnings("unchecked")
+        @Test
+        public void testError() {
+            ReplaySubject subject = ReplaySubject.create();
+
+            Observer aObserver = mock(Observer.class);
+            subject.subscribe(aObserver);
+
+            subject.onNext("one");
+            subject.onNext("two");
+            subject.onNext("three");
+            subject.onError(testException);
+
+            subject.onNext("four");
+            subject.onError(new Exception());
+            subject.onCompleted();
+
+            assertErrorObserver(aObserver);
+
+            aObserver = mock(Observer.class);
+            subject.subscribe(aObserver);
+            assertErrorObserver(aObserver);
+        }
+
+        private void assertErrorObserver(Observer aObserver)
+        {
+            verify(aObserver, times(1)).onNext("one");
+            verify(aObserver, times(1)).onNext("two");
+            verify(aObserver, times(1)).onNext("three");
+            verify(aObserver, times(1)).onError(testException);
+            verify(aObserver, Mockito.never()).onCompleted();
+        }
+
+        @SuppressWarnings("unchecked")
+        @Test
+        public void testSubscribeMidSequence() {
+            ReplaySubject subject = ReplaySubject.create();
+
+            Observer aObserver = mock(Observer.class);
+            subject.subscribe(aObserver);
+
+            subject.onNext("one");
+            subject.onNext("two");
+
+            assertObservedUntilTwo(aObserver);
+
+            Observer anotherObserver = mock(Observer.class);
+            subject.subscribe(anotherObserver);
+            assertObservedUntilTwo(anotherObserver);
+
+            subject.onNext("three");
+            subject.onCompleted();
+
+            assertCompletedObserver(aObserver);
+            assertCompletedObserver(anotherObserver);
+        }
+
+        @SuppressWarnings("unchecked")
+        @Test
+        public void testUnsubscribeFirstObserver() {
+            ReplaySubject subject = ReplaySubject.create();
+
+            Observer aObserver = mock(Observer.class);
+            Subscription subscription = subject.subscribe(aObserver);
+
+            subject.onNext("one");
+            subject.onNext("two");
+
+            subscription.unsubscribe();
+            assertObservedUntilTwo(aObserver);
+
+            Observer anotherObserver = mock(Observer.class);
+            subject.subscribe(anotherObserver);
+            assertObservedUntilTwo(anotherObserver);
+
+            subject.onNext("three");
+            subject.onCompleted();
+
+            assertObservedUntilTwo(aObserver);
+            assertCompletedObserver(anotherObserver);
+        }
+
+        private void assertObservedUntilTwo(Observer aObserver)
+        {
+            verify(aObserver, times(1)).onNext("one");
+            verify(aObserver, times(1)).onNext("two");
+            verify(aObserver, Mockito.never()).onNext("three");
+            verify(aObserver, Mockito.never()).onError(any(Exception.class));
+            verify(aObserver, Mockito.never()).onCompleted();
+        }
+
+        @Test
+        public void testUnsubscribe()
+        {
+            UnsubscribeTester.test(new Func0>()
+            {
+                @Override
+                public ReplaySubject call()
+                {
+                    return ReplaySubject.create();
+                }
+            }, new Action1>()
+            {
+                @Override
+                public void call(ReplaySubject repeatSubject)
+                {
+                    repeatSubject.onCompleted();
+                }
+            }, new Action1>()
+            {
+                @Override
+                public void call(ReplaySubject repeatSubject)
+                {
+                    repeatSubject.onError(new Exception());
+                }
+            }, new Action1>()
+            {
+                @Override
+                public void call(ReplaySubject repeatSubject)
+                {
+                    repeatSubject.onNext("one");
+                }
+            }
+                    );
+        }
+    }
+}
diff --git a/rxjava-core/src/main/java/rx/subjects/UnsubscribeTester.java b/rxjava-core/src/main/java/rx/subjects/UnsubscribeTester.java
new file mode 100644
index 0000000000..acd7fe1919
--- /dev/null
+++ b/rxjava-core/src/main/java/rx/subjects/UnsubscribeTester.java
@@ -0,0 +1,188 @@
+/**
+ * Copyright 2013 Netflix, Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package rx.subjects;
+
+import rx.Observable;
+import rx.Observer;
+import rx.Subscription;
+import rx.util.functions.Action1;
+import rx.util.functions.Func0;
+
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+
+/* package */class UnsubscribeTester {
+
+    /*
+     * This is purposefully package-only so it does not leak into the public API outside of this package.
+     * 
+     * This package is implementation details and not part of the Javadocs and thus can change without breaking backwards compatibility.
+     * 
+     * benjchristensen => I'm procrastinating the decision of where and how these types of classes (see rx.operators.OperatorTester) should exist.
+     * If they are only for internal implementations then I don't want them as part of the API.
+     * If they are truly useful for everyone to use then an "rx.testing" package may make sense.
+     */
+
+    private boolean isDone = false;
+    private Subscription subscription;
+
+    public UnsubscribeTester() {
+    }
+
+    /**
+     * Tests the unsubscription semantics of an observable.
+     * 
+     * @param provider
+     *            Function that when called provides an instance of the observable being tested
+     * @param generateOnCompleted
+     *            Causes an observer generated by @param provider to generate an onCompleted event. Null to not test onCompleted.
+     * @param generateOnError
+     *            Causes an observer generated by @param provider to generate an onError event. Null to not test onError.
+     * @param generateOnNext
+     *            Causes an observer generated by @param provider to generate an onNext event. Null to not test onNext.
+     * @param 
+     *            The type of object passed by the Observable
+     */
+    public static > void test(Func0 provider, Action1 generateOnCompleted, Action1 generateOnError, Action1 generateOnNext)
+    {
+        if (generateOnCompleted != null) {
+            O observable = provider.call();
+            UnsubscribeTester tester1 = createOnCompleted(observable);
+            UnsubscribeTester tester2 = createOnCompleted(observable);
+            generateOnCompleted.call(observable);
+            tester1.assertPassed();
+            tester2.assertPassed();
+        }
+        if (generateOnError != null) {
+            O observable = provider.call();
+            UnsubscribeTester tester1 = createOnError(observable);
+            UnsubscribeTester tester2 = createOnError(observable);
+            generateOnError.call(observable);
+            tester1.assertPassed();
+            tester2.assertPassed();
+        }
+        if (generateOnNext != null) {
+            O observable = provider.call();
+            UnsubscribeTester tester1 = createOnNext(observable);
+            UnsubscribeTester tester2 = createOnNext(observable);
+            generateOnNext.call(observable);
+            tester1.assertPassed();
+            tester2.assertPassed();
+        }
+    }
+
+    private static  UnsubscribeTester createOnCompleted(Observable observable)
+    {
+        final UnsubscribeTester test = new UnsubscribeTester();
+        test.setSubscription(observable.subscribe(new Observer()
+        {
+            @Override
+            public void onCompleted()
+            {
+                test.doUnsubscribe("onCompleted");
+            }
+
+            @Override
+            public void onError(Exception e)
+            {
+                test.gotEvent("onError");
+            }
+
+            @Override
+            public void onNext(T args)
+            {
+                test.gotEvent("onNext");
+            }
+        }));
+        return test;
+    }
+
+    private static  UnsubscribeTester createOnError(Observable observable)
+    {
+        final UnsubscribeTester test = new UnsubscribeTester();
+        test.setSubscription(observable.subscribe(new Observer()
+        {
+            @Override
+            public void onCompleted()
+            {
+                test.gotEvent("onCompleted");
+            }
+
+            @Override
+            public void onError(Exception e)
+            {
+                test.doUnsubscribe("onError");
+            }
+
+            @Override
+            public void onNext(T args)
+            {
+                test.gotEvent("onNext");
+            }
+        }));
+        return test;
+    }
+
+    private static  UnsubscribeTester createOnNext(Observable observable)
+    {
+        final UnsubscribeTester test = new UnsubscribeTester();
+        test.setSubscription(observable.subscribe(new Observer()
+        {
+            @Override
+            public void onCompleted()
+            {
+                test.gotEvent("onCompleted");
+            }
+
+            @Override
+            public void onError(Exception e)
+            {
+                test.gotEvent("onError");
+            }
+
+            @Override
+            public void onNext(T args)
+            {
+                test.doUnsubscribe("onNext");
+            }
+        }));
+        return test;
+    }
+
+    private void setSubscription(Subscription subscription)
+    {
+        this.subscription = subscription;
+    }
+
+    private void gotEvent(String event)
+    {
+        assertFalse("received " + event + " after unsubscribe", isDone);
+    }
+
+    private void doUnsubscribe(String event)
+    {
+        gotEvent(event);
+        if (subscription != null) {
+            isDone = true;
+            subscription.unsubscribe();
+        }
+    }
+
+    private void assertPassed()
+    {
+        assertTrue("expected notification was received", isDone);
+    }
+}