From 550005cf260188a06660a3652c65fda594ee8f85 Mon Sep 17 00:00:00 2001 From: John Myers Date: Thu, 28 Mar 2013 23:11:36 -0700 Subject: [PATCH] Fix violations of the Observer contract. --- .../main/java/rx/operators/OperationTake.java | 32 +++++++++++++------ 1 file changed, 23 insertions(+), 9 deletions(-) diff --git a/rxjava-core/src/main/java/rx/operators/OperationTake.java b/rxjava-core/src/main/java/rx/operators/OperationTake.java index 1b853d5bba..ebefc09c65 100644 --- a/rxjava-core/src/main/java/rx/operators/OperationTake.java +++ b/rxjava-core/src/main/java/rx/operators/OperationTake.java @@ -31,6 +31,7 @@ import static org.mockito.Mockito.never; import static org.mockito.Mockito.times; import static org.mockito.Mockito.verify; +import static rx.testing.TrustedObservableTester.assertTrustedObservable; /** * Returns a specified number of contiguous values from the start of an observable sequence. @@ -97,12 +98,16 @@ public ItemObserver(Observer observer) { @Override public void onCompleted() { - observer.onCompleted(); + if (counter.getAndSet(num) < num) { + observer.onCompleted(); + } } @Override public void onError(Exception e) { - observer.onError(e); + if (counter.getAndSet(num) < num) { + observer.onError(e); + } } @Override @@ -129,7 +134,7 @@ public static class UnitTest { @Test public void testTake1() { Observable w = Observable.toObservable("one", "two", "three"); - Observable take = Observable.create(take(w, 2)); + Observable take = Observable.create(assertTrustedObservable(take(w, 2))); @SuppressWarnings("unchecked") Observer aObserver = mock(Observer.class); @@ -144,7 +149,7 @@ public void testTake1() { @Test public void testTake2() { Observable w = Observable.toObservable("one", "two", "three"); - Observable take = Observable.create(take(w, 1)); + Observable take = Observable.create(assertTrustedObservable(take(w, 1))); @SuppressWarnings("unchecked") Observer aObserver = mock(Observer.class); @@ -158,14 +163,23 @@ public void testTake2() { @Test public void testTakeDoesntLeakErrors() { - Observable source = Observable.concat(Observable.from("one"), Observable.error(new Exception("test failed"))); - Observable.create(take(source, 1)).last(); + Observable source = Observable.create(new Func1, Subscription>() + { + @Override + public Subscription call(Observer observer) + { + observer.onNext("one"); + observer.onError(new Exception("test failed")); + return Subscriptions.empty(); + } + }); + Observable.create(assertTrustedObservable(take(source, 1))).last(); } @Test public void testTakeZeroDoesntLeakError() { - Observable source = Observable.error(new Exception("test failed")); - Observable.create(take(source, 0)).lastOrDefault("ok"); + Observable source = Observable.error(new Exception("test failed")); + Observable.create(assertTrustedObservable(take(source, 0))).lastOrDefault("ok"); } @Test @@ -175,7 +189,7 @@ public void testUnsubscribeAfterTake() { @SuppressWarnings("unchecked") Observer aObserver = mock(Observer.class); - Observable take = Observable.create(take(w, 1)); + Observable take = Observable.create(assertTrustedObservable(take(w, 1))); take.subscribe(aObserver); // wait for the Observable to complete