From ff394f7df04031438490b5ea22680acc58453d4f Mon Sep 17 00:00:00 2001 From: Ben Christensen Date: Thu, 6 Feb 2014 19:51:29 -0800 Subject: [PATCH] Take operator was breaking the unsubscribe chain Fixes issue https://github.com/Netflix/RxJava/issues/830 --- .../main/java/rx/operators/OperatorTake.java | 18 +++++++----- .../java/rx/operators/OperatorTakeTest.java | 28 +++++++++++++++++++ 2 files changed, 39 insertions(+), 7 deletions(-) diff --git a/rxjava-core/src/main/java/rx/operators/OperatorTake.java b/rxjava-core/src/main/java/rx/operators/OperatorTake.java index c1463608cc..e58e120386 100644 --- a/rxjava-core/src/main/java/rx/operators/OperatorTake.java +++ b/rxjava-core/src/main/java/rx/operators/OperatorTake.java @@ -38,18 +38,22 @@ public OperatorTake(int limit) { } @Override - public Subscriber call(final Subscriber o) { - CompositeSubscription parent = new CompositeSubscription(); + public Subscriber call(final Subscriber child) { + final CompositeSubscription parent = new CompositeSubscription(); if (limit == 0) { - o.onCompleted(); + child.onCompleted(); parent.unsubscribe(); } + /* * We decouple the parent and child subscription so there can be multiple take() in a chain * such as for the groupBy Observer use case where you may take(1) on groups and take(20) on the children. * * Thus, we only unsubscribe UPWARDS to the parent and an onComplete DOWNSTREAM. + * + * However, if we receive an unsubscribe from the child we still want to propagate it upwards so we register 'parent' with 'child' */ + child.add(parent); return new Subscriber(parent) { int count = 0; @@ -58,24 +62,24 @@ public Subscriber call(final Subscriber o) { @Override public void onCompleted() { if (!completed) { - o.onCompleted(); + child.onCompleted(); } } @Override public void onError(Throwable e) { if (!completed) { - o.onError(e); + child.onError(e); } } @Override public void onNext(T i) { if (!isUnsubscribed()) { - o.onNext(i); + child.onNext(i); if (++count >= limit) { completed = true; - o.onCompleted(); + child.onCompleted(); unsubscribe(); } } diff --git a/rxjava-core/src/test/java/rx/operators/OperatorTakeTest.java b/rxjava-core/src/test/java/rx/operators/OperatorTakeTest.java index 5331bfa90a..9eb07b20ef 100644 --- a/rxjava-core/src/test/java/rx/operators/OperatorTakeTest.java +++ b/rxjava-core/src/test/java/rx/operators/OperatorTakeTest.java @@ -21,6 +21,7 @@ import java.util.Arrays; import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicLong; import org.junit.Test; @@ -210,6 +211,33 @@ public void call(Long l) { assertEquals(10, count.get()); } + @Test(timeout = 2000) + public void testMultiTake() { + final AtomicInteger count = new AtomicInteger(); + Observable.create(new OnSubscribe() { + + @Override + public void call(Subscriber s) { + for (int i = 0; !s.isUnsubscribed(); i++) { + System.out.println("Emit: " + i); + count.incrementAndGet(); + s.onNext(i); + } + } + + }).take(100).take(1).toBlockingObservable().forEach(new Action1() { + + @Override + public void call(Integer t1) { + System.out.println("Receive: " + t1); + + } + + }); + + assertEquals(1, count.get()); + } + private static class TestObservableFunc implements Observable.OnSubscribeFunc { final Subscription s;