From bf1a4f3c449afb1aac438563f523960fefedcc20 Mon Sep 17 00:00:00 2001 From: Alex Wenckus Date: Thu, 23 Oct 2014 15:51:47 -0500 Subject: [PATCH 1/2] Fix for #1791 - don't retry (subscribe) to source if child has unsubscribed. --- .../internal/operators/OnSubscribeRedo.java | 4 +++ .../internal/operators/OperatorRetryTest.java | 31 +++++++++++++++++++ 2 files changed, 35 insertions(+) diff --git a/src/main/java/rx/internal/operators/OnSubscribeRedo.java b/src/main/java/rx/internal/operators/OnSubscribeRedo.java index 21cd177323..3a795aafd0 100644 --- a/src/main/java/rx/internal/operators/OnSubscribeRedo.java +++ b/src/main/java/rx/internal/operators/OnSubscribeRedo.java @@ -206,6 +206,10 @@ public void call(final Subscriber child) { final Action0 subscribeToSource = new Action0() { @Override public void call() { + if (child.isUnsubscribed()) { + return; + } + Subscriber terminalDelegatingSubscriber = new Subscriber() { @Override public void onCompleted() { diff --git a/src/test/java/rx/internal/operators/OperatorRetryTest.java b/src/test/java/rx/internal/operators/OperatorRetryTest.java index f6ff65478a..01ad91c49e 100644 --- a/src/test/java/rx/internal/operators/OperatorRetryTest.java +++ b/src/test/java/rx/internal/operators/OperatorRetryTest.java @@ -244,6 +244,37 @@ public Observable call(Observable t1) { inOrder.verifyNoMoreInteractions(); } + @Test + public void testSingleSubscriptionOnFirst() throws Exception { + final AtomicInteger inc = new AtomicInteger(0); + Observable.OnSubscribe onSubscribe = new OnSubscribe() { + @Override + public void call(Subscriber subscriber) { + final int emit = inc.incrementAndGet(); + subscriber.onNext(emit); + subscriber.onCompleted(); + } + }; + + int first = Observable.create(onSubscribe) + .retryWhen(new Func1, Observable>() { + @Override + public Observable call(Observable attempt) { + return attempt.zipWith(Observable.just(1), new Func2() { + @Override + public Void call(Throwable o, Integer integer) { + return null; + } + }); + } + }) + .toBlocking() + .first(); + + assertEquals("Observer did not receive the expected output", 1, first); + assertEquals("Subscribe was not called once", 1, inc.get()); + } + @Test public void testOriginFails() { @SuppressWarnings("unchecked") From d21720938426e6612ec026c2bd2d80395fbd3f07 Mon Sep 17 00:00:00 2001 From: Ben Christensen Date: Thu, 23 Oct 2014 14:20:12 -0700 Subject: [PATCH 2/2] Take Unsubscribes Before OnNext To prevent issues such as https://github.com/ReactiveX/RxJava/issues/1791 --- src/main/java/rx/internal/operators/OperatorTake.java | 8 ++++++-- 1 file changed, 6 insertions(+), 2 deletions(-) diff --git a/src/main/java/rx/internal/operators/OperatorTake.java b/src/main/java/rx/internal/operators/OperatorTake.java index 77d6ccf248..b2c62e3920 100644 --- a/src/main/java/rx/internal/operators/OperatorTake.java +++ b/src/main/java/rx/internal/operators/OperatorTake.java @@ -61,12 +61,16 @@ public void onError(Throwable e) { @Override public void onNext(T i) { if (!isUnsubscribed()) { - child.onNext(i); if (++count >= limit) { completed = true; - child.onCompleted(); + // unsubscribe before emitting onNext so shutdown happens before possible effects + // of onNext such as product.request(n) calls be sent upstream. unsubscribe(); } + child.onNext(i); + if (completed) { + child.onCompleted(); + } } }