From 6c58557f50eac745fd2de954d3c08dea93fe1d9f Mon Sep 17 00:00:00 2001 From: David Karnok Date: Thu, 20 Apr 2017 19:20:52 +0200 Subject: [PATCH] 2.x: add Single.unsubscribeOn() (#5302) * 2.x: add Single.unsubscribeOn() * Fix experimental marker location --- src/main/java/io/reactivex/Single.java | 23 +++- .../operators/single/SingleUnsubscribeOn.java | 95 +++++++++++++ .../single/SingleUnsubscribeOnTest.java | 130 ++++++++++++++++++ 3 files changed, 247 insertions(+), 1 deletion(-) create mode 100644 src/main/java/io/reactivex/internal/operators/single/SingleUnsubscribeOn.java create mode 100644 src/test/java/io/reactivex/internal/operators/single/SingleUnsubscribeOnTest.java diff --git a/src/main/java/io/reactivex/Single.java b/src/main/java/io/reactivex/Single.java index 2944c741bd..32f3082355 100644 --- a/src/main/java/io/reactivex/Single.java +++ b/src/main/java/io/reactivex/Single.java @@ -16,6 +16,8 @@ import java.util.NoSuchElementException; import java.util.concurrent.*; +import org.reactivestreams.Publisher; + import io.reactivex.annotations.*; import io.reactivex.disposables.Disposable; import io.reactivex.exceptions.Exceptions; @@ -32,7 +34,6 @@ import io.reactivex.observers.TestObserver; import io.reactivex.plugins.RxJavaPlugins; import io.reactivex.schedulers.Schedulers; -import org.reactivestreams.Publisher; /** * The Single class implements the Reactive Pattern for a single value response. @@ -3067,6 +3068,26 @@ public final Observable toObservable() { return RxJavaPlugins.onAssembly(new SingleToObservable(this)); } + /** + * Returns a Single which makes sure when a SingleObserver disposes the Disposable, + * that call is propagated up on the specified scheduler + *
+ *
Scheduler:
+ *
{@code unsubscribeOn} calls dispose() of the upstream on the {@link Scheduler} you specify.
+ *
+ * @param scheduler the target scheduler where to execute the cancellation + * @return the new Single instance + * @throws NullPointerException if scheduler is null + * @since 2.0.9 - experimental + */ + @CheckReturnValue + @SchedulerSupport(SchedulerSupport.CUSTOM) + @Experimental + public final Single unsubscribeOn(final Scheduler scheduler) { + ObjectHelper.requireNonNull(scheduler, "scheduler is null"); + return RxJavaPlugins.onAssembly(new SingleUnsubscribeOn(this, scheduler)); + } + /** * Returns a Single that emits the result of applying a specified function to the pair of items emitted by * the source Single and another specified Single. diff --git a/src/main/java/io/reactivex/internal/operators/single/SingleUnsubscribeOn.java b/src/main/java/io/reactivex/internal/operators/single/SingleUnsubscribeOn.java new file mode 100644 index 0000000000..27c4200d8a --- /dev/null +++ b/src/main/java/io/reactivex/internal/operators/single/SingleUnsubscribeOn.java @@ -0,0 +1,95 @@ +/** + * Copyright (c) 2016-present, RxJava Contributors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in + * compliance with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License is + * distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See + * the License for the specific language governing permissions and limitations under the License. + */ + +package io.reactivex.internal.operators.single; + +import java.util.concurrent.atomic.AtomicReference; + +import io.reactivex.*; +import io.reactivex.disposables.Disposable; +import io.reactivex.internal.disposables.DisposableHelper; + +/** + * Makes sure a dispose() call from downstream happens on the specified scheduler. + * + * @param the value type + */ +public final class SingleUnsubscribeOn extends Single { + + final SingleSource source; + + final Scheduler scheduler; + + public SingleUnsubscribeOn(SingleSource source, Scheduler scheduler) { + this.source = source; + this.scheduler = scheduler; + } + + @Override + protected void subscribeActual(SingleObserver observer) { + source.subscribe(new UnsubscribeOnSingleObserver(observer, scheduler)); + } + + static final class UnsubscribeOnSingleObserver extends AtomicReference + implements SingleObserver, Disposable, Runnable { + + private static final long serialVersionUID = 3256698449646456986L; + + final SingleObserver actual; + + final Scheduler scheduler; + + Disposable ds; + + UnsubscribeOnSingleObserver(SingleObserver actual, Scheduler scheduler) { + this.actual = actual; + this.scheduler = scheduler; + } + + @Override + public void dispose() { + Disposable d = getAndSet(DisposableHelper.DISPOSED); + if (d != DisposableHelper.DISPOSED) { + this.ds = d; + scheduler.scheduleDirect(this); + } + } + + @Override + public void run() { + ds.dispose(); + } + + @Override + public boolean isDisposed() { + return DisposableHelper.isDisposed(get()); + } + + @Override + public void onSubscribe(Disposable d) { + if (DisposableHelper.setOnce(this, d)) { + actual.onSubscribe(this); + } + } + + @Override + public void onSuccess(T value) { + actual.onSuccess(value); + } + + @Override + public void onError(Throwable e) { + actual.onError(e); + } + } +} diff --git a/src/test/java/io/reactivex/internal/operators/single/SingleUnsubscribeOnTest.java b/src/test/java/io/reactivex/internal/operators/single/SingleUnsubscribeOnTest.java new file mode 100644 index 0000000000..733d7d0ccc --- /dev/null +++ b/src/test/java/io/reactivex/internal/operators/single/SingleUnsubscribeOnTest.java @@ -0,0 +1,130 @@ +/** + * Copyright (c) 2016-present, RxJava Contributors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in + * compliance with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License is + * distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See + * the License for the specific language governing permissions and limitations under the License. + */ + +package io.reactivex.internal.operators.single; + +import static org.junit.Assert.*; + +import java.util.concurrent.*; + +import org.junit.Test; + +import io.reactivex.*; +import io.reactivex.disposables.Disposable; +import io.reactivex.exceptions.TestException; +import io.reactivex.functions.*; +import io.reactivex.processors.PublishProcessor; +import io.reactivex.schedulers.Schedulers; + +public class SingleUnsubscribeOnTest { + + @Test + public void normal() throws Exception { + PublishProcessor pp = PublishProcessor.create(); + + final String[] name = { null }; + + final CountDownLatch cdl = new CountDownLatch(1); + + pp.doOnCancel(new Action() { + @Override + public void run() throws Exception { + name[0] = Thread.currentThread().getName(); + cdl.countDown(); + } + }) + .single(-99) + .unsubscribeOn(Schedulers.single()) + .test(true) + ; + + assertTrue(cdl.await(5, TimeUnit.SECONDS)); + + int times = 10; + + while (times-- > 0 && pp.hasSubscribers()) { + Thread.sleep(100); + } + + assertFalse(pp.hasSubscribers()); + + assertNotEquals(Thread.currentThread().getName(), name[0]); + } + + @Test + public void just() { + Single.just(1) + .unsubscribeOn(Schedulers.single()) + .test() + .assertResult(1); + } + + @Test + public void error() { + Single.error(new TestException()) + .unsubscribeOn(Schedulers.single()) + .test() + .assertFailure(TestException.class); + } + + @Test + public void dispose() { + TestHelper.checkDisposed(Single.just(1) + .unsubscribeOn(Schedulers.single())); + } + + @Test + public void doubleOnSubscribe() { + TestHelper.checkDoubleOnSubscribeSingle(new Function, SingleSource>() { + @Override + public SingleSource apply(Single v) throws Exception { + return v.unsubscribeOn(Schedulers.single()); + } + }); + } + + @Test + public void disposeRace() { + for (int i = 0; i < 500; i++) { + PublishProcessor pp = PublishProcessor.create(); + + final Disposable[] ds = { null }; + pp.single(-99).unsubscribeOn(Schedulers.computation()) + .subscribe(new SingleObserver() { + @Override + public void onSubscribe(Disposable d) { + ds[0] = d; + } + + @Override + public void onSuccess(Integer value) { + + } + + @Override + public void onError(Throwable e) { + + } + }); + + Runnable r = new Runnable() { + @Override + public void run() { + ds[0].dispose(); + } + }; + + TestHelper.race(r, r, Schedulers.single()); + } + } +}