From caa5d2d62b3d0f395bc2e002676070d86edf6df8 Mon Sep 17 00:00:00 2001 From: John Myers Date: Wed, 27 Mar 2013 21:48:41 -0700 Subject: [PATCH 1/6] Add tests to demonstrate bugs --- .../src/main/java/rx/operators/OperationTake.java | 12 ++++++++++++ 1 file changed, 12 insertions(+) diff --git a/rxjava-core/src/main/java/rx/operators/OperationTake.java b/rxjava-core/src/main/java/rx/operators/OperationTake.java index e86263e5626..fc4372d5ae4 100644 --- a/rxjava-core/src/main/java/rx/operators/OperationTake.java +++ b/rxjava-core/src/main/java/rx/operators/OperationTake.java @@ -260,6 +260,18 @@ public void testTake2() { verify(aObserver, times(1)).onCompleted(); } + @Test + public void testTakeDoesntLeakErrors() { + Observable source = Observable.concat(Observable.from("one"), Observable.error(new Exception("test failed"))); + Observable.create(take(source, 1)).last(); + } + + @Test + public void testTakeZeroDoesntLeakError() { + Observable source = Observable.error(new Exception("test failed")); + Observable.create(take(source, 0)).lastOrDefault("ok"); + } + @Test public void testUnsubscribeAfterTake() { Subscription s = mock(Subscription.class); From 50d79f130c65613902e7374b9d3c9c3bf06b7075 Mon Sep 17 00:00:00 2001 From: John Myers Date: Thu, 28 Mar 2013 21:24:26 -0700 Subject: [PATCH 2/6] Split Take and TakeWhile --- rxjava-core/src/main/java/rx/Observable.java | 13 +- .../main/java/rx/operators/OperationTake.java | 160 ++------- .../java/rx/operators/OperationTakeWhile.java | 313 ++++++++++++++++++ 3 files changed, 349 insertions(+), 137 deletions(-) create mode 100644 rxjava-core/src/main/java/rx/operators/OperationTakeWhile.java diff --git a/rxjava-core/src/main/java/rx/Observable.java b/rxjava-core/src/main/java/rx/Observable.java index d24e48106c8..e332098ba2e 100644 --- a/rxjava-core/src/main/java/rx/Observable.java +++ b/rxjava-core/src/main/java/rx/Observable.java @@ -41,6 +41,8 @@ import rx.operators.OperationDefer; import rx.operators.OperationDematerialize; import rx.operators.OperationFilter; +import rx.operators.OperationTake; +import rx.operators.OperationTakeWhile; import rx.operators.OperationWhere; import rx.operators.OperationMap; import rx.operators.OperationMaterialize; @@ -54,7 +56,6 @@ import rx.operators.OperationScan; import rx.operators.OperationSkip; import rx.operators.OperationSynchronize; -import rx.operators.OperationTake; import rx.operators.OperationTakeLast; import rx.operators.OperationToObservableFuture; import rx.operators.OperationToObservableIterable; @@ -1779,7 +1780,7 @@ public static Observable takeLast(final Observable items, final int co * @return */ public static Observable takeWhile(final Observable items, Func1 predicate) { - return create(OperationTake.takeWhile(items, predicate)); + return create(OperationTakeWhile.takeWhile(items, predicate)); } /** @@ -1811,16 +1812,18 @@ public Boolean call(T t) { * @return */ public static Observable takeWhileWithIndex(final Observable items, Func2 predicate) { - return create(OperationTake.takeWhileWithIndex(items, predicate)); + return create(OperationTakeWhile.takeWhileWithIndex(items, predicate)); } public static Observable takeWhileWithIndex(final Observable items, Object predicate) { @SuppressWarnings("rawtypes") final FuncN _f = Functions.from(predicate); - return create(OperationTake.takeWhileWithIndex(items, new Func2() { + return create(OperationTakeWhile.takeWhileWithIndex(items, new Func2() + { @Override - public Boolean call(T t, Integer integer) { + public Boolean call(T t, Integer integer) + { return (Boolean) _f.call(t, integer); } })); diff --git a/rxjava-core/src/main/java/rx/operators/OperationTake.java b/rxjava-core/src/main/java/rx/operators/OperationTake.java index fc4372d5ae4..1b853d5bbac 100644 --- a/rxjava-core/src/main/java/rx/operators/OperationTake.java +++ b/rxjava-core/src/main/java/rx/operators/OperationTake.java @@ -15,21 +15,23 @@ */ package rx.operators; -import static org.junit.Assert.*; -import static org.mockito.Matchers.*; -import static org.mockito.Mockito.*; - -import java.util.concurrent.atomic.AtomicInteger; - import org.junit.Test; - import rx.Observable; import rx.Observer; import rx.Subscription; +import rx.subscriptions.Subscriptions; import rx.util.AtomicObservableSubscription; import rx.util.functions.Func1; -import rx.util.functions.Func2; -import rx.subjects.Subject; + +import java.util.concurrent.atomic.AtomicInteger; + +import static org.junit.Assert.fail; +import static org.mockito.Matchers.any; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.never; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; + /** * Returns a specified number of contiguous values from the start of an observable sequence. */ @@ -43,61 +45,17 @@ public final class OperationTake { * @return */ public static Func1, Subscription> take(final Observable items, final int num) { - return takeWhileWithIndex(items, OperationTake. numPredicate(num)); - } - - /** - * Returns a specified number of contiguous values from the start of an observable sequence. - * - * @param items - * @param predicate - * a function to test each source element for a condition - * @return - */ - public static Func1, Subscription> takeWhile(final Observable items, final Func1 predicate) { - return takeWhileWithIndex(items, OperationTake. skipIndex(predicate)); - } - - /** - * Returns values from an observable sequence as long as a specified condition is true, and then skips the remaining values. - * - * @param items - * @param predicate - * a function to test each element for a condition; the second parameter of the function represents the index of the source element; otherwise, false. - * @return - */ - public static Func1, Subscription> takeWhileWithIndex(final Observable items, final Func2 predicate) { // wrap in a Func so that if a chain is built up, then asynchronously subscribed to twice we will have 2 instances of Take rather than 1 handing both, which is not thread-safe. return new Func1, Subscription>() { @Override public Subscription call(Observer observer) { - return new TakeWhile(items, predicate).call(observer); - } - - }; - } - - private static Func2 numPredicate(final int num) { - return new Func2() { - - @Override - public Boolean call(T input, Integer index) { - return index < num; + return new Take(items, num).call(observer); } }; } - private static Func2 skipIndex(final Func1 underlying) { - return new Func2() { - @Override - public Boolean call(T input, Integer index) { - return underlying.call(input); - } - }; - } - /** * This class is NOT thread-safe if invoked and referenced multiple times. In other words, don't subscribe to it multiple times from different threads. *

@@ -109,19 +67,24 @@ public Boolean call(T input, Integer index) { * * @param */ - private static class TakeWhile implements Func1, Subscription> { + private static class Take implements Func1, Subscription> { private final AtomicInteger counter = new AtomicInteger(); private final Observable items; - private final Func2 predicate; + private final int num; private final AtomicObservableSubscription subscription = new AtomicObservableSubscription(); - private TakeWhile(Observable items, Func2 predicate) { + private Take(Observable items, int num) { this.items = items; - this.predicate = predicate; + this.num = num; } @Override public Subscription call(Observer observer) { + if (num < 1) { + observer.onCompleted(); + return Subscriptions.empty(); + } + return subscription.wrap(items.subscribe(new ItemObserver(observer))); } @@ -144,10 +107,14 @@ public void onError(Exception e) { @Override public void onNext(T args) { - if (predicate.call(args, counter.getAndIncrement())) { + final int count = counter.incrementAndGet(); + if (count <= num) { observer.onNext(args); - } else { - observer.onCompleted(); + if (count == num) { + observer.onCompleted(); + } + } + if (count >= num) { // this will work if the sequence is asynchronous, it will have no effect on a synchronous observable subscription.unsubscribe(); } @@ -159,77 +126,6 @@ public void onNext(T args) { public static class UnitTest { - @Test - public void testTakeWhile1() { - Observable w = Observable.toObservable(1, 2, 3); - Observable take = Observable.create(takeWhile(w, new Func1() { - @Override - public Boolean call(Integer input) { - return input < 3; - } - })); - - @SuppressWarnings("unchecked") - Observer aObserver = mock(Observer.class); - take.subscribe(aObserver); - verify(aObserver, times(1)).onNext(1); - verify(aObserver, times(1)).onNext(2); - verify(aObserver, never()).onNext(3); - verify(aObserver, never()).onError(any(Exception.class)); - verify(aObserver, times(1)).onCompleted(); - } - - @Test - public void testTakeWhileOnSubject1() { - Subject s = Subject.create(); - Observable w = (Observable)s; - Observable take = Observable.create(takeWhile(w, new Func1() { - @Override - public Boolean call(Integer input) { - return input < 3; - } - })); - - @SuppressWarnings("unchecked") - Observer aObserver = mock(Observer.class); - take.subscribe(aObserver); - - s.onNext(1); - s.onNext(2); - s.onNext(3); - s.onNext(4); - s.onNext(5); - s.onCompleted(); - - verify(aObserver, times(1)).onNext(1); - verify(aObserver, times(1)).onNext(2); - verify(aObserver, never()).onNext(3); - verify(aObserver, never()).onNext(4); - verify(aObserver, never()).onNext(5); - verify(aObserver, never()).onError(any(Exception.class)); - verify(aObserver, times(1)).onCompleted(); - } - - @Test - public void testTakeWhile2() { - Observable w = Observable.toObservable("one", "two", "three"); - Observable take = Observable.create(takeWhileWithIndex(w, new Func2() { - @Override - public Boolean call(String input, Integer index) { - return index < 2; - } - })); - - @SuppressWarnings("unchecked") - Observer aObserver = mock(Observer.class); - take.subscribe(aObserver); - verify(aObserver, times(1)).onNext("one"); - verify(aObserver, times(1)).onNext("two"); - verify(aObserver, never()).onNext("three"); - verify(aObserver, never()).onError(any(Exception.class)); - verify(aObserver, times(1)).onCompleted(); - } - @Test public void testTake1() { Observable w = Observable.toObservable("one", "two", "three"); diff --git a/rxjava-core/src/main/java/rx/operators/OperationTakeWhile.java b/rxjava-core/src/main/java/rx/operators/OperationTakeWhile.java new file mode 100644 index 00000000000..f45efabc923 --- /dev/null +++ b/rxjava-core/src/main/java/rx/operators/OperationTakeWhile.java @@ -0,0 +1,313 @@ +/** + * Copyright 2013 Netflix, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package rx.operators; + +import static org.junit.Assert.*; +import static org.mockito.Matchers.*; +import static org.mockito.Mockito.*; + +import java.util.concurrent.atomic.AtomicInteger; + +import org.junit.Test; + +import rx.Observable; +import rx.Observer; +import rx.Subscription; +import rx.subscriptions.Subscriptions; +import rx.util.AtomicObservableSubscription; +import rx.util.functions.Func1; +import rx.util.functions.Func2; +import rx.subjects.Subject; +/** + * Returns values from an observable sequence as long as a specified condition is true, and then skips the remaining values. + */ +public final class OperationTakeWhile { + + /** + * Returns a specified number of contiguous values from the start of an observable sequence. + * + * @param items + * @param predicate + * a function to test each source element for a condition + * @return + */ + public static Func1, Subscription> takeWhile(final Observable items, final Func1 predicate) { + return takeWhileWithIndex(items, OperationTakeWhile.skipIndex(predicate)); + } + + /** + * Returns values from an observable sequence as long as a specified condition is true, and then skips the remaining values. + * + * @param items + * @param predicate + * a function to test each element for a condition; the second parameter of the function represents the index of the source element; otherwise, false. + * @return + */ + public static Func1, Subscription> takeWhileWithIndex(final Observable items, final Func2 predicate) { + // wrap in a Func so that if a chain is built up, then asynchronously subscribed to twice we will have 2 instances of Take rather than 1 handing both, which is not thread-safe. + return new Func1, Subscription>() { + + @Override + public Subscription call(Observer observer) { + return new TakeWhile(items, predicate).call(observer); + } + + }; + } + + private static Func2 skipIndex(final Func1 underlying) { + return new Func2() { + @Override + public Boolean call(T input, Integer index) { + return underlying.call(input); + } + }; + } + + /** + * This class is NOT thread-safe if invoked and referenced multiple times. In other words, don't subscribe to it multiple times from different threads. + *

+ * It IS thread-safe from within it while receiving onNext events from multiple threads. + *

+ * This should all be fine as long as it's kept as a private class and a new instance created from static factory method above. + *

+ * Note how the takeWhileWithIndex() factory method above protects us from a single instance being exposed with the Observable wrapper handling the subscribe flow. + * + * @param + */ + private static class TakeWhile implements Func1, Subscription> { + private final AtomicInteger counter = new AtomicInteger(); + private final Observable items; + private final Func2 predicate; + private final AtomicObservableSubscription subscription = new AtomicObservableSubscription(); + + private TakeWhile(Observable items, Func2 predicate) { + this.items = items; + this.predicate = predicate; + } + + @Override + public Subscription call(Observer observer) { + return subscription.wrap(items.subscribe(new ItemObserver(observer))); + } + + private class ItemObserver implements Observer { + private final Observer observer; + + public ItemObserver(Observer observer) { + this.observer = observer; + } + + @Override + public void onCompleted() { + observer.onCompleted(); + } + + @Override + public void onError(Exception e) { + observer.onError(e); + } + + @Override + public void onNext(T args) { + if (predicate.call(args, counter.getAndIncrement())) { + observer.onNext(args); + } else { + observer.onCompleted(); + // this will work if the sequence is asynchronous, it will have no effect on a synchronous observable + subscription.unsubscribe(); + } + } + + } + + } + + public static class UnitTest { + + @Test + public void testTakeWhile1() { + Observable w = Observable.toObservable(1, 2, 3); + Observable take = Observable.create(takeWhile(w, new Func1() + { + @Override + public Boolean call(Integer input) + { + return input < 3; + } + })); + + @SuppressWarnings("unchecked") + Observer aObserver = mock(Observer.class); + take.subscribe(aObserver); + verify(aObserver, times(1)).onNext(1); + verify(aObserver, times(1)).onNext(2); + verify(aObserver, never()).onNext(3); + verify(aObserver, never()).onError(any(Exception.class)); + verify(aObserver, times(1)).onCompleted(); + } + + @Test + public void testTakeWhileOnSubject1() { + Subject s = Subject.create(); + Observable w = (Observable)s; + Observable take = Observable.create(takeWhile(w, new Func1() + { + @Override + public Boolean call(Integer input) + { + return input < 3; + } + })); + + @SuppressWarnings("unchecked") + Observer aObserver = mock(Observer.class); + take.subscribe(aObserver); + + s.onNext(1); + s.onNext(2); + s.onNext(3); + s.onNext(4); + s.onNext(5); + s.onCompleted(); + + verify(aObserver, times(1)).onNext(1); + verify(aObserver, times(1)).onNext(2); + verify(aObserver, never()).onNext(3); + verify(aObserver, never()).onNext(4); + verify(aObserver, never()).onNext(5); + verify(aObserver, never()).onError(any(Exception.class)); + verify(aObserver, times(1)).onCompleted(); + } + + @Test + public void testTakeWhile2() { + Observable w = Observable.toObservable("one", "two", "three"); + Observable take = Observable.create(takeWhileWithIndex(w, new Func2() + { + @Override + public Boolean call(String input, Integer index) + { + return index < 2; + } + })); + + @SuppressWarnings("unchecked") + Observer aObserver = mock(Observer.class); + take.subscribe(aObserver); + verify(aObserver, times(1)).onNext("one"); + verify(aObserver, times(1)).onNext("two"); + verify(aObserver, never()).onNext("three"); + verify(aObserver, never()).onError(any(Exception.class)); + verify(aObserver, times(1)).onCompleted(); + } + + @Test + public void testTakeWhileDoesntLeakErrors() { + Observable source = Observable.create(new Func1, Subscription>() + { + @Override + public Subscription call(Observer observer) + { + observer.onNext("one"); + observer.onError(new Exception("test failed")); + return Subscriptions.empty(); + } + }); + + Observable.create(takeWhile(source, new Func1() + { + @Override + public Boolean call(String s) + { + return false; + } + })).last(); + } + + @Test + public void testUnsubscribeAfterTake() { + Subscription s = mock(Subscription.class); + TestObservable w = new TestObservable(s, "one", "two", "three"); + + @SuppressWarnings("unchecked") + Observer aObserver = mock(Observer.class); + Observable take = Observable.create(takeWhileWithIndex(w, new Func2() + { + @Override + public Boolean call(String s, Integer index) + { + return index < 1; + } + })); + take.subscribe(aObserver); + + // wait for the Observable to complete + try { + w.t.join(); + } catch (Exception e) { + e.printStackTrace(); + fail(e.getMessage()); + } + + System.out.println("TestObservable thread finished"); + verify(aObserver, times(1)).onNext("one"); + verify(aObserver, never()).onNext("two"); + verify(aObserver, never()).onNext("three"); + verify(s, times(1)).unsubscribe(); + } + + private static class TestObservable extends Observable { + + final Subscription s; + final String[] values; + Thread t = null; + + public TestObservable(Subscription s, String... values) { + this.s = s; + this.values = values; + } + + @Override + public Subscription subscribe(final Observer observer) { + System.out.println("TestObservable subscribed to ..."); + t = new Thread(new Runnable() { + + @Override + public void run() { + try { + System.out.println("running TestObservable thread"); + for (String s : values) { + System.out.println("TestObservable onNext: " + s); + observer.onNext(s); + } + observer.onCompleted(); + } catch (Exception e) { + throw new RuntimeException(e); + } + } + + }); + System.out.println("starting TestObservable thread"); + t.start(); + System.out.println("done starting TestObservable thread"); + return s; + } + + } + } + +} From 83e81ab5db1e7aaa7dd4c392ee6b665df2915c73 Mon Sep 17 00:00:00 2001 From: John Myers Date: Thu, 28 Mar 2013 23:08:24 -0700 Subject: [PATCH 3/6] Implement TrustedObservableTester.assertTrustedObservable() --- .../rx/testing/TrustedObservableTester.java | 253 ++++++++++++++++++ 1 file changed, 253 insertions(+) create mode 100644 rxjava-core/src/main/java/rx/testing/TrustedObservableTester.java diff --git a/rxjava-core/src/main/java/rx/testing/TrustedObservableTester.java b/rxjava-core/src/main/java/rx/testing/TrustedObservableTester.java new file mode 100644 index 00000000000..f48be8d3f58 --- /dev/null +++ b/rxjava-core/src/main/java/rx/testing/TrustedObservableTester.java @@ -0,0 +1,253 @@ +package rx.testing; + +import org.junit.Test; +import rx.Observable; +import rx.Observer; +import rx.Subscription; +import rx.subscriptions.Subscriptions; +import rx.util.functions.Func1; + +import java.lang.Thread.UncaughtExceptionHandler; +import java.util.ArrayList; +import java.util.List; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicReference; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertNotNull; + +public class TrustedObservableTester +{ + private TrustedObservableTester() {} + + public static Func1, Subscription> assertTrustedObservable(final Func1, Subscription> source) + { + return new Func1, Subscription>() + { + @Override + public Subscription call(Observer observer) + { + return source.call(new TestingObserver(observer)); + } + }; + } + + public static class TestingObserver implements Observer { + + private final Observer actual; + private final AtomicBoolean isFinished = new AtomicBoolean(false); + private final AtomicBoolean isInCallback = new AtomicBoolean(false); + + public TestingObserver(Observer actual) { + this.actual = actual; + } + + @Override + public void onCompleted() { + assertFalse("previous call to onCompleted() or onError()", !isFinished.compareAndSet(false, true)); + assertFalse("concurrent callback pending", !isInCallback.compareAndSet(false, true)); + actual.onCompleted(); + isInCallback.set(false); + } + + @Override + public void onError(Exception e) { + assertFalse("previous call to onCompleted() or onError()", !isFinished.compareAndSet(false, true)); + assertFalse("concurrent callback pending", !isInCallback.compareAndSet(false, true)); + actual.onError(e); + isInCallback.set(false); + } + + @Override + public void onNext(T args) { + assertFalse("previous call to onCompleted() or onError()", isFinished.get()); + assertFalse("concurrent callback pending", !isInCallback.compareAndSet(false, true)); + actual.onNext(args); + isInCallback.set(false); + } + + } + + public static class UnitTest { + @Test(expected = AssertionError.class) + public void testDoubleCompleted() { + Observable.create(assertTrustedObservable(new Func1, Subscription>() + { + @Override + public Subscription call(Observer observer) + { + observer.onCompleted(); + observer.onCompleted(); + return Subscriptions.empty(); + } + })).lastOrDefault("end"); + + } + + @Test(expected = AssertionError.class) + public void testCompletedError() { + Observable.create(assertTrustedObservable(new Func1, Subscription>() + { + @Override + public Subscription call(Observer observer) + { + observer.onCompleted(); + observer.onError(new Exception()); + return Subscriptions.empty(); + } + })).lastOrDefault("end"); + } + + @Test(expected = AssertionError.class) + public void testCompletedNext() { + Observable.create(assertTrustedObservable(new Func1, Subscription>() + { + @Override + public Subscription call(Observer observer) + { + observer.onCompleted(); + observer.onNext("one"); + return Subscriptions.empty(); + } + })).lastOrDefault("end"); + } + + @Test(expected = AssertionError.class) + public void testErrorCompleted() { + Observable.create(assertTrustedObservable(new Func1, Subscription>() + { + @Override + public Subscription call(Observer observer) + { + observer.onError(new Exception()); + observer.onCompleted(); + return Subscriptions.empty(); + } + })).lastOrDefault("end"); + } + + @Test(expected = AssertionError.class) + public void testDoubleError() { + Observable.create(assertTrustedObservable(new Func1, Subscription>() + { + @Override + public Subscription call(Observer observer) + { + observer.onError(new Exception()); + observer.onError(new Exception()); + return Subscriptions.empty(); + } + })).lastOrDefault("end"); + } + + + @Test(expected = AssertionError.class) + public void testErrorNext() { + Observable.create(assertTrustedObservable(new Func1, Subscription>() + { + @Override + public Subscription call(Observer observer) + { + observer.onError(new Exception()); + observer.onNext("one"); + return Subscriptions.empty(); + } + })).lastOrDefault("end"); + } + + @Test + public void testNextCompleted() { + Observable.create(assertTrustedObservable(new Func1, Subscription>() + { + @Override + public Subscription call(Observer observer) + { + observer.onNext("one"); + observer.onCompleted(); + return Subscriptions.empty(); + } + })).lastOrDefault("end"); + } + + @Test + public void testConcurrentNextNext() { + final List threads = new ArrayList(); + final AtomicReference threadFailure = new AtomicReference(); + Observable.create(assertTrustedObservable(new Func1, Subscription>() + { + @Override + public Subscription call(final Observer observer) + { + threads.add(new Thread(new Runnable() + { + @Override + public void run() + { + observer.onNext("one"); + } + })); + threads.add(new Thread(new Runnable() + { + @Override + public void run() + { + observer.onNext("two"); + } + })); + return Subscriptions.empty(); + } + })).subscribe(new SlowObserver()); + for (Thread thread : threads) { + thread.setUncaughtExceptionHandler(new UncaughtExceptionHandler() + { + @Override + public void uncaughtException(Thread thread, Throwable throwable) + { + threadFailure.set(throwable); + } + }); + thread.start(); + } + for (Thread thread : threads) { + try { + thread.join(); + } catch (InterruptedException ignored) { + } + } + // Junit seems pretty bad about exposing test failures inside of created threads. + assertNotNull("exception thrown by thread", threadFailure.get()); + assertEquals("class of exception thrown by thread", AssertionError.class, threadFailure.get().getClass()); + } + + private static class SlowObserver implements Observer + { + @Override + public void onCompleted() + { + try { + Thread.sleep(10); + } catch (InterruptedException ignored) { + } + } + + @Override + public void onError(Exception e) + { + try { + Thread.sleep(10); + } catch (InterruptedException ignored) { + } + } + + @Override + public void onNext(String args) + { + try { + Thread.sleep(10); + } catch (InterruptedException ignored) { + } + } + } + } +} From 6e98591dc4084978705e21ea094e14ca2205131f Mon Sep 17 00:00:00 2001 From: John Myers Date: Thu, 28 Mar 2013 23:11:36 -0700 Subject: [PATCH 4/6] Fix violations of the Observer contract. --- .../main/java/rx/operators/OperationTake.java | 32 +++++++++++++------ 1 file changed, 23 insertions(+), 9 deletions(-) diff --git a/rxjava-core/src/main/java/rx/operators/OperationTake.java b/rxjava-core/src/main/java/rx/operators/OperationTake.java index 1b853d5bbac..ebefc09c659 100644 --- a/rxjava-core/src/main/java/rx/operators/OperationTake.java +++ b/rxjava-core/src/main/java/rx/operators/OperationTake.java @@ -31,6 +31,7 @@ import static org.mockito.Mockito.never; import static org.mockito.Mockito.times; import static org.mockito.Mockito.verify; +import static rx.testing.TrustedObservableTester.assertTrustedObservable; /** * Returns a specified number of contiguous values from the start of an observable sequence. @@ -97,12 +98,16 @@ public ItemObserver(Observer observer) { @Override public void onCompleted() { - observer.onCompleted(); + if (counter.getAndSet(num) < num) { + observer.onCompleted(); + } } @Override public void onError(Exception e) { - observer.onError(e); + if (counter.getAndSet(num) < num) { + observer.onError(e); + } } @Override @@ -129,7 +134,7 @@ public static class UnitTest { @Test public void testTake1() { Observable w = Observable.toObservable("one", "two", "three"); - Observable take = Observable.create(take(w, 2)); + Observable take = Observable.create(assertTrustedObservable(take(w, 2))); @SuppressWarnings("unchecked") Observer aObserver = mock(Observer.class); @@ -144,7 +149,7 @@ public void testTake1() { @Test public void testTake2() { Observable w = Observable.toObservable("one", "two", "three"); - Observable take = Observable.create(take(w, 1)); + Observable take = Observable.create(assertTrustedObservable(take(w, 1))); @SuppressWarnings("unchecked") Observer aObserver = mock(Observer.class); @@ -158,14 +163,23 @@ public void testTake2() { @Test public void testTakeDoesntLeakErrors() { - Observable source = Observable.concat(Observable.from("one"), Observable.error(new Exception("test failed"))); - Observable.create(take(source, 1)).last(); + Observable source = Observable.create(new Func1, Subscription>() + { + @Override + public Subscription call(Observer observer) + { + observer.onNext("one"); + observer.onError(new Exception("test failed")); + return Subscriptions.empty(); + } + }); + Observable.create(assertTrustedObservable(take(source, 1))).last(); } @Test public void testTakeZeroDoesntLeakError() { - Observable source = Observable.error(new Exception("test failed")); - Observable.create(take(source, 0)).lastOrDefault("ok"); + Observable source = Observable.error(new Exception("test failed")); + Observable.create(assertTrustedObservable(take(source, 0))).lastOrDefault("ok"); } @Test @@ -175,7 +189,7 @@ public void testUnsubscribeAfterTake() { @SuppressWarnings("unchecked") Observer aObserver = mock(Observer.class); - Observable take = Observable.create(take(w, 1)); + Observable take = Observable.create(assertTrustedObservable(take(w, 1))); take.subscribe(aObserver); // wait for the Observable to complete From 983e142c8a3a2c0b7135b3e0d0d7abdb44680580 Mon Sep 17 00:00:00 2001 From: John Myers Date: Fri, 29 Mar 2013 20:49:31 -0700 Subject: [PATCH 5/6] take(0) subscribes to its source --- .../main/java/rx/operators/OperationTake.java | 41 ++++++++++++++++++- 1 file changed, 40 insertions(+), 1 deletion(-) diff --git a/rxjava-core/src/main/java/rx/operators/OperationTake.java b/rxjava-core/src/main/java/rx/operators/OperationTake.java index ebefc09c659..c4335b71d8f 100644 --- a/rxjava-core/src/main/java/rx/operators/OperationTake.java +++ b/rxjava-core/src/main/java/rx/operators/OperationTake.java @@ -23,8 +23,10 @@ import rx.util.AtomicObservableSubscription; import rx.util.functions.Func1; +import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; +import static org.junit.Assert.assertTrue; import static org.junit.Assert.fail; import static org.mockito.Matchers.any; import static org.mockito.Mockito.mock; @@ -82,6 +84,23 @@ private Take(Observable items, int num) { @Override public Subscription call(Observer observer) { if (num < 1) { + items.subscribe(new Observer() + { + @Override + public void onCompleted() + { + } + + @Override + public void onError(Exception e) + { + } + + @Override + public void onNext(T args) + { + } + }).unsubscribe(); observer.onCompleted(); return Subscriptions.empty(); } @@ -178,8 +197,28 @@ public Subscription call(Observer observer) @Test public void testTakeZeroDoesntLeakError() { - Observable source = Observable.error(new Exception("test failed")); + final AtomicBoolean subscribed = new AtomicBoolean(false); + final AtomicBoolean unSubscribed = new AtomicBoolean(false); + Observable source = Observable.create(new Func1, Subscription>() + { + @Override + public Subscription call(Observer observer) + { + subscribed.set(true); + observer.onError(new Exception("test failed")); + return new Subscription() + { + @Override + public void unsubscribe() + { + unSubscribed.set(true); + } + }; + } + }); Observable.create(assertTrustedObservable(take(source, 0))).lastOrDefault("ok"); + assertTrue("source subscribed", subscribed.get()); + assertTrue("source unsubscribed", unSubscribed.get()); } @Test From 5f852fd8a855c0f1acc4fd312e2db50f97a620ee Mon Sep 17 00:00:00 2001 From: Ben Christensen Date: Sat, 30 Mar 2013 19:54:40 -0700 Subject: [PATCH 6/6] Small reorganization of code for OperationTake and TrustedObservableTester - removed rx.testing package (if that's going to exist that means it's bleeding into something that should live in /src/test and beyond what works well for inner class testing) - made TrustedObservableTester part of rx.operation package and an inner UnitTest class so it doesn't become part of the public API --- .../AbstractOperation.java} | 109 +++++++++--------- .../main/java/rx/operators/OperationTake.java | 21 ++-- 2 files changed, 65 insertions(+), 65 deletions(-) rename rxjava-core/src/main/java/rx/{testing/TrustedObservableTester.java => operators/AbstractOperation.java} (77%) diff --git a/rxjava-core/src/main/java/rx/testing/TrustedObservableTester.java b/rxjava-core/src/main/java/rx/operators/AbstractOperation.java similarity index 77% rename from rxjava-core/src/main/java/rx/testing/TrustedObservableTester.java rename to rxjava-core/src/main/java/rx/operators/AbstractOperation.java index f48be8d3f58..dd71bd837a5 100644 --- a/rxjava-core/src/main/java/rx/testing/TrustedObservableTester.java +++ b/rxjava-core/src/main/java/rx/operators/AbstractOperation.java @@ -1,11 +1,6 @@ -package rx.testing; +package rx.operators; -import org.junit.Test; -import rx.Observable; -import rx.Observer; -import rx.Subscription; -import rx.subscriptions.Subscriptions; -import rx.util.functions.Func1; +import static org.junit.Assert.*; import java.lang.Thread.UncaughtExceptionHandler; import java.util.ArrayList; @@ -13,63 +8,72 @@ import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicReference; -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertFalse; -import static org.junit.Assert.assertNotNull; +import org.junit.Test; -public class TrustedObservableTester +import rx.Observable; +import rx.Observer; +import rx.Subscription; +import rx.subscriptions.Subscriptions; +import rx.util.functions.Func1; + +/** + * Common utility functions for operator implementations and tests. + */ +/* package */class AbstractOperation { - private TrustedObservableTester() {} + private AbstractOperation() { + } - public static Func1, Subscription> assertTrustedObservable(final Func1, Subscription> source) - { - return new Func1, Subscription>() + public static class UnitTest { + + public static Func1, Subscription> assertTrustedObservable(final Func1, Subscription> source) { - @Override - public Subscription call(Observer observer) + return new Func1, Subscription>() { - return source.call(new TestingObserver(observer)); - } - }; - } + @Override + public Subscription call(Observer observer) + { + return source.call(new TestingObserver(observer)); + } + }; + } - public static class TestingObserver implements Observer { + public static class TestingObserver implements Observer { - private final Observer actual; - private final AtomicBoolean isFinished = new AtomicBoolean(false); - private final AtomicBoolean isInCallback = new AtomicBoolean(false); + private final Observer actual; + private final AtomicBoolean isFinished = new AtomicBoolean(false); + private final AtomicBoolean isInCallback = new AtomicBoolean(false); - public TestingObserver(Observer actual) { - this.actual = actual; - } + public TestingObserver(Observer actual) { + this.actual = actual; + } - @Override - public void onCompleted() { - assertFalse("previous call to onCompleted() or onError()", !isFinished.compareAndSet(false, true)); - assertFalse("concurrent callback pending", !isInCallback.compareAndSet(false, true)); - actual.onCompleted(); - isInCallback.set(false); - } + @Override + public void onCompleted() { + assertFalse("previous call to onCompleted() or onError()", !isFinished.compareAndSet(false, true)); + assertFalse("concurrent callback pending", !isInCallback.compareAndSet(false, true)); + actual.onCompleted(); + isInCallback.set(false); + } - @Override - public void onError(Exception e) { - assertFalse("previous call to onCompleted() or onError()", !isFinished.compareAndSet(false, true)); - assertFalse("concurrent callback pending", !isInCallback.compareAndSet(false, true)); - actual.onError(e); - isInCallback.set(false); - } + @Override + public void onError(Exception e) { + assertFalse("previous call to onCompleted() or onError()", !isFinished.compareAndSet(false, true)); + assertFalse("concurrent callback pending", !isInCallback.compareAndSet(false, true)); + actual.onError(e); + isInCallback.set(false); + } - @Override - public void onNext(T args) { - assertFalse("previous call to onCompleted() or onError()", isFinished.get()); - assertFalse("concurrent callback pending", !isInCallback.compareAndSet(false, true)); - actual.onNext(args); - isInCallback.set(false); - } + @Override + public void onNext(T args) { + assertFalse("previous call to onCompleted() or onError()", isFinished.get()); + assertFalse("concurrent callback pending", !isInCallback.compareAndSet(false, true)); + actual.onNext(args); + isInCallback.set(false); + } - } + } - public static class UnitTest { @Test(expected = AssertionError.class) public void testDoubleCompleted() { Observable.create(assertTrustedObservable(new Func1, Subscription>() @@ -141,7 +145,6 @@ public Subscription call(Observer observer) })).lastOrDefault("end"); } - @Test(expected = AssertionError.class) public void testErrorNext() { Observable.create(assertTrustedObservable(new Func1, Subscription>() @@ -250,4 +253,4 @@ public void onNext(String args) } } } -} +} \ No newline at end of file diff --git a/rxjava-core/src/main/java/rx/operators/OperationTake.java b/rxjava-core/src/main/java/rx/operators/OperationTake.java index c4335b71d8f..5ea6b627e4b 100644 --- a/rxjava-core/src/main/java/rx/operators/OperationTake.java +++ b/rxjava-core/src/main/java/rx/operators/OperationTake.java @@ -15,7 +15,16 @@ */ package rx.operators; +import static org.junit.Assert.*; +import static org.mockito.Matchers.*; +import static org.mockito.Mockito.*; +import static rx.operators.AbstractOperation.UnitTest.*; + +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicInteger; + import org.junit.Test; + import rx.Observable; import rx.Observer; import rx.Subscription; @@ -23,18 +32,6 @@ import rx.util.AtomicObservableSubscription; import rx.util.functions.Func1; -import java.util.concurrent.atomic.AtomicBoolean; -import java.util.concurrent.atomic.AtomicInteger; - -import static org.junit.Assert.assertTrue; -import static org.junit.Assert.fail; -import static org.mockito.Matchers.any; -import static org.mockito.Mockito.mock; -import static org.mockito.Mockito.never; -import static org.mockito.Mockito.times; -import static org.mockito.Mockito.verify; -import static rx.testing.TrustedObservableTester.assertTrustedObservable; - /** * Returns a specified number of contiguous values from the start of an observable sequence. */