diff --git a/rxjava-core/src/main/java/rx/operators/OperationTake.java b/rxjava-core/src/main/java/rx/operators/OperationTake.java index ebefc09c65..c4335b71d8 100644 --- a/rxjava-core/src/main/java/rx/operators/OperationTake.java +++ b/rxjava-core/src/main/java/rx/operators/OperationTake.java @@ -23,8 +23,10 @@ import rx.util.AtomicObservableSubscription; import rx.util.functions.Func1; +import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; +import static org.junit.Assert.assertTrue; import static org.junit.Assert.fail; import static org.mockito.Matchers.any; import static org.mockito.Mockito.mock; @@ -82,6 +84,23 @@ private Take(Observable items, int num) { @Override public Subscription call(Observer observer) { if (num < 1) { + items.subscribe(new Observer() + { + @Override + public void onCompleted() + { + } + + @Override + public void onError(Exception e) + { + } + + @Override + public void onNext(T args) + { + } + }).unsubscribe(); observer.onCompleted(); return Subscriptions.empty(); } @@ -178,8 +197,28 @@ public Subscription call(Observer observer) @Test public void testTakeZeroDoesntLeakError() { - Observable source = Observable.error(new Exception("test failed")); + final AtomicBoolean subscribed = new AtomicBoolean(false); + final AtomicBoolean unSubscribed = new AtomicBoolean(false); + Observable source = Observable.create(new Func1, Subscription>() + { + @Override + public Subscription call(Observer observer) + { + subscribed.set(true); + observer.onError(new Exception("test failed")); + return new Subscription() + { + @Override + public void unsubscribe() + { + unSubscribed.set(true); + } + }; + } + }); Observable.create(assertTrustedObservable(take(source, 0))).lastOrDefault("ok"); + assertTrue("source subscribed", subscribed.get()); + assertTrue("source unsubscribed", unSubscribed.get()); } @Test