Skip to content

Commit

Permalink
Merge pull request #2238 from zsxwing/issue2191
Browse files Browse the repository at this point in the history
Fix the bug that cache doesn't unsubscribe the source Observable when th...
  • Loading branch information
zsxwing committed Jan 21, 2015
2 parents 681163e + e26dd2b commit 2783fa2
Show file tree
Hide file tree
Showing 2 changed files with 17 additions and 3 deletions.
5 changes: 3 additions & 2 deletions src/main/java/rx/internal/operators/OnSubscribeCache.java
Original file line number Diff line number Diff line change
Expand Up @@ -64,9 +64,10 @@ public OnSubscribeCache(Observable<? extends T> source, int capacity) {
@Override
public void call(Subscriber<? super T> s) {
if (SRC_SUBSCRIBED_UPDATER.compareAndSet(this, 0, 1)) {
source.unsafeSubscribe(Subscribers.from(cache));
source.subscribe(cache);
/*
* Note that we will never unsubscribe from 'source' as we want to receive and cache all of its values.
* Note that we will never unsubscribe from 'source' unless we receive `onCompleted` or `onError`,
* as we want to receive and cache all of its values.
*
* This means this should never be used on an infinite or very large sequence, similar to toList().
*/
Expand Down
15 changes: 14 additions & 1 deletion src/test/java/rx/internal/operators/OnSubscribeCacheTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,9 @@

import static org.junit.Assert.assertEquals;
import static org.junit.Assert.fail;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;

import java.util.Arrays;
import java.util.concurrent.CountDownLatch;
Expand All @@ -27,10 +30,10 @@

import rx.Observable;
import rx.Subscriber;
import rx.functions.Action0;
import rx.functions.Action1;
import rx.functions.Func1;
import rx.functions.Func2;
import rx.internal.operators.OnSubscribeCache;
import rx.observers.TestSubscriber;
import rx.schedulers.Schedulers;
import rx.subjects.AsyncSubject;
Expand Down Expand Up @@ -148,4 +151,14 @@ public void testWithPublishSubjectAndRepeat() {
public void testWithReplaySubjectAndRepeat() {
testWithCustomSubjectAndRepeat(ReplaySubject.<Integer> create(), 1, 2, 3, 1, 2, 3, 1, 2, 3, 1, 2, 3);
}

@Test
public void testUnsubscribeSource() {
Action0 unsubscribe = mock(Action0.class);
Observable<Integer> o = Observable.just(1).doOnUnsubscribe(unsubscribe).cache();
o.subscribe();
o.subscribe();
o.subscribe();
verify(unsubscribe, times(1)).call();
}
}

0 comments on commit 2783fa2

Please sign in to comment.