Skip to content

Commit

Permalink
Take operator was breaking the unsubscribe chain
Browse files Browse the repository at this point in the history
Fixes issue ReactiveX#830
  • Loading branch information
benjchristensen committed Feb 7, 2014
1 parent d93dc37 commit ff394f7
Show file tree
Hide file tree
Showing 2 changed files with 39 additions and 7 deletions.
18 changes: 11 additions & 7 deletions rxjava-core/src/main/java/rx/operators/OperatorTake.java
Original file line number Diff line number Diff line change
Expand Up @@ -38,18 +38,22 @@ public OperatorTake(int limit) {
}

@Override
public Subscriber<? super T> call(final Subscriber<? super T> o) {
CompositeSubscription parent = new CompositeSubscription();
public Subscriber<? super T> call(final Subscriber<? super T> child) {
final CompositeSubscription parent = new CompositeSubscription();
if (limit == 0) {
o.onCompleted();
child.onCompleted();
parent.unsubscribe();
}

/*
* We decouple the parent and child subscription so there can be multiple take() in a chain
* such as for the groupBy Observer use case where you may take(1) on groups and take(20) on the children.
*
* Thus, we only unsubscribe UPWARDS to the parent and an onComplete DOWNSTREAM.
*
* However, if we receive an unsubscribe from the child we still want to propagate it upwards so we register 'parent' with 'child'
*/
child.add(parent);
return new Subscriber<T>(parent) {

int count = 0;
Expand All @@ -58,24 +62,24 @@ public Subscriber<? super T> call(final Subscriber<? super T> o) {
@Override
public void onCompleted() {
if (!completed) {
o.onCompleted();
child.onCompleted();
}
}

@Override
public void onError(Throwable e) {
if (!completed) {
o.onError(e);
child.onError(e);
}
}

@Override
public void onNext(T i) {
if (!isUnsubscribed()) {
o.onNext(i);
child.onNext(i);
if (++count >= limit) {
completed = true;
o.onCompleted();
child.onCompleted();
unsubscribe();
}
}
Expand Down
28 changes: 28 additions & 0 deletions rxjava-core/src/test/java/rx/operators/OperatorTakeTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@

import java.util.Arrays;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;

import org.junit.Test;
Expand Down Expand Up @@ -210,6 +211,33 @@ public void call(Long l) {
assertEquals(10, count.get());
}

@Test(timeout = 2000)
public void testMultiTake() {
final AtomicInteger count = new AtomicInteger();
Observable.create(new OnSubscribe<Integer>() {

@Override
public void call(Subscriber<? super Integer> s) {
for (int i = 0; !s.isUnsubscribed(); i++) {
System.out.println("Emit: " + i);
count.incrementAndGet();
s.onNext(i);
}
}

}).take(100).take(1).toBlockingObservable().forEach(new Action1<Integer>() {

@Override
public void call(Integer t1) {
System.out.println("Receive: " + t1);

}

});

assertEquals(1, count.get());
}

private static class TestObservableFunc implements Observable.OnSubscribeFunc<String> {

final Subscription s;
Expand Down

0 comments on commit ff394f7

Please sign in to comment.