Skip to content

Commit

Permalink
take(0) subscribes to its source
Browse files Browse the repository at this point in the history
  • Loading branch information
johngmyers committed Mar 30, 2013
1 parent 550005c commit 6c1a1ab
Showing 1 changed file with 40 additions and 1 deletion.
41 changes: 40 additions & 1 deletion rxjava-core/src/main/java/rx/operators/OperationTake.java
Original file line number Diff line number Diff line change
Expand Up @@ -23,8 +23,10 @@
import rx.util.AtomicObservableSubscription;
import rx.util.functions.Func1;

import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;

import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
import static org.mockito.Matchers.any;
import static org.mockito.Mockito.mock;
Expand Down Expand Up @@ -82,6 +84,23 @@ private Take(Observable<T> items, int num) {
@Override
public Subscription call(Observer<T> observer) {
if (num < 1) {
items.subscribe(new Observer<T>()
{
@Override
public void onCompleted()
{
}

@Override
public void onError(Exception e)
{
}

@Override
public void onNext(T args)
{
}
}).unsubscribe();
observer.onCompleted();
return Subscriptions.empty();
}
Expand Down Expand Up @@ -178,8 +197,28 @@ public Subscription call(Observer<String> observer)

@Test
public void testTakeZeroDoesntLeakError() {
Observable<String> source = Observable.error(new Exception("test failed"));
final AtomicBoolean subscribed = new AtomicBoolean(false);
final AtomicBoolean unSubscribed = new AtomicBoolean(false);
Observable<String> source = Observable.create(new Func1<Observer<String>, Subscription>()
{
@Override
public Subscription call(Observer<String> observer)
{
subscribed.set(true);
observer.onError(new Exception("test failed"));
return new Subscription()
{
@Override
public void unsubscribe()
{
unSubscribed.set(true);
}
};
}
});
Observable.create(assertTrustedObservable(take(source, 0))).lastOrDefault("ok");
assertTrue("source subscribed", subscribed.get());
assertTrue("source unsubscribed", unSubscribed.get());
}

@Test
Expand Down

0 comments on commit 6c1a1ab

Please sign in to comment.