diff --git a/src/main/java/rx/internal/operators/OnSubscribeSingle.java b/src/main/java/rx/internal/operators/OnSubscribeSingle.java index 63d4d0a49a..27e976e30c 100644 --- a/src/main/java/rx/internal/operators/OnSubscribeSingle.java +++ b/src/main/java/rx/internal/operators/OnSubscribeSingle.java @@ -80,7 +80,7 @@ public void onNext(T t) { } }; child.add(parent); - observable.subscribe(parent); + observable.unsafeSubscribe(parent); } public static OnSubscribeSingle create(Observable observable) { diff --git a/src/test/java/rx/internal/operators/OnSubscribeSingleTest.java b/src/test/java/rx/internal/operators/OnSubscribeSingleTest.java index 6bc24dbe75..8b3dbf910e 100644 --- a/src/test/java/rx/internal/operators/OnSubscribeSingleTest.java +++ b/src/test/java/rx/internal/operators/OnSubscribeSingleTest.java @@ -15,14 +15,19 @@ */ package rx.internal.operators; +import static org.junit.Assert.assertFalse; + +import java.util.Collections; +import java.util.NoSuchElementException; +import java.util.concurrent.atomic.AtomicBoolean; + import org.junit.Test; + import rx.Observable; import rx.Single; +import rx.functions.Action0; import rx.observers.TestSubscriber; -import java.util.Collections; -import java.util.NoSuchElementException; - public class OnSubscribeSingleTest { @Test @@ -70,4 +75,19 @@ public void testRepeatObservableThrowsError() { subscriber.assertError(IllegalArgumentException.class); } + + @Test + public void testShouldUseUnsafeSubscribeInternallyNotSubscribe() { + TestSubscriber subscriber = TestSubscriber.create(); + final AtomicBoolean unsubscribed = new AtomicBoolean(false); + Single single = Observable.just("Hello World!").doOnUnsubscribe(new Action0() { + + @Override + public void call() { + unsubscribed.set(true); + }}).toSingle(); + single.unsafeSubscribe(subscriber); + subscriber.assertCompleted(); + assertFalse(unsubscribed.get()); + } }