Skip to content

Commit

Permalink
2.x: add Single.unsubscribeOn() (#5302)
Browse files Browse the repository at this point in the history
* 2.x: add Single.unsubscribeOn()

* Fix experimental marker location
  • Loading branch information
akarnokd authored Apr 20, 2017
1 parent db62772 commit 6c58557
Show file tree
Hide file tree
Showing 3 changed files with 247 additions and 1 deletion.
23 changes: 22 additions & 1 deletion src/main/java/io/reactivex/Single.java
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,8 @@
import java.util.NoSuchElementException;
import java.util.concurrent.*;

import org.reactivestreams.Publisher;

import io.reactivex.annotations.*;
import io.reactivex.disposables.Disposable;
import io.reactivex.exceptions.Exceptions;
Expand All @@ -32,7 +34,6 @@
import io.reactivex.observers.TestObserver;
import io.reactivex.plugins.RxJavaPlugins;
import io.reactivex.schedulers.Schedulers;
import org.reactivestreams.Publisher;

/**
* The Single class implements the Reactive Pattern for a single value response.
Expand Down Expand Up @@ -3067,6 +3068,26 @@ public final Observable<T> toObservable() {
return RxJavaPlugins.onAssembly(new SingleToObservable<T>(this));
}

/**
* Returns a Single which makes sure when a SingleObserver disposes the Disposable,
* that call is propagated up on the specified scheduler
* <dl>
* <dt><b>Scheduler:</b></dt>
* <dd>{@code unsubscribeOn} calls dispose() of the upstream on the {@link Scheduler} you specify.</dd>
* </dl>
* @param scheduler the target scheduler where to execute the cancellation
* @return the new Single instance
* @throws NullPointerException if scheduler is null
* @since 2.0.9 - experimental
*/
@CheckReturnValue
@SchedulerSupport(SchedulerSupport.CUSTOM)
@Experimental
public final Single<T> unsubscribeOn(final Scheduler scheduler) {
ObjectHelper.requireNonNull(scheduler, "scheduler is null");
return RxJavaPlugins.onAssembly(new SingleUnsubscribeOn<T>(this, scheduler));
}

/**
* Returns a Single that emits the result of applying a specified function to the pair of items emitted by
* the source Single and another specified Single.
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,95 @@
/**
* Copyright (c) 2016-present, RxJava Contributors.
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in
* compliance with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License is
* distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See
* the License for the specific language governing permissions and limitations under the License.
*/

package io.reactivex.internal.operators.single;

import java.util.concurrent.atomic.AtomicReference;

import io.reactivex.*;
import io.reactivex.disposables.Disposable;
import io.reactivex.internal.disposables.DisposableHelper;

/**
* Makes sure a dispose() call from downstream happens on the specified scheduler.
*
* @param <T> the value type
*/
public final class SingleUnsubscribeOn<T> extends Single<T> {

final SingleSource<T> source;

final Scheduler scheduler;

public SingleUnsubscribeOn(SingleSource<T> source, Scheduler scheduler) {
this.source = source;
this.scheduler = scheduler;
}

@Override
protected void subscribeActual(SingleObserver<? super T> observer) {
source.subscribe(new UnsubscribeOnSingleObserver<T>(observer, scheduler));
}

static final class UnsubscribeOnSingleObserver<T> extends AtomicReference<Disposable>
implements SingleObserver<T>, Disposable, Runnable {

private static final long serialVersionUID = 3256698449646456986L;

final SingleObserver<? super T> actual;

final Scheduler scheduler;

Disposable ds;

UnsubscribeOnSingleObserver(SingleObserver<? super T> actual, Scheduler scheduler) {
this.actual = actual;
this.scheduler = scheduler;
}

@Override
public void dispose() {
Disposable d = getAndSet(DisposableHelper.DISPOSED);
if (d != DisposableHelper.DISPOSED) {
this.ds = d;
scheduler.scheduleDirect(this);
}
}

@Override
public void run() {
ds.dispose();
}

@Override
public boolean isDisposed() {
return DisposableHelper.isDisposed(get());
}

@Override
public void onSubscribe(Disposable d) {
if (DisposableHelper.setOnce(this, d)) {
actual.onSubscribe(this);
}
}

@Override
public void onSuccess(T value) {
actual.onSuccess(value);
}

@Override
public void onError(Throwable e) {
actual.onError(e);
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,130 @@
/**
* Copyright (c) 2016-present, RxJava Contributors.
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in
* compliance with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License is
* distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See
* the License for the specific language governing permissions and limitations under the License.
*/

package io.reactivex.internal.operators.single;

import static org.junit.Assert.*;

import java.util.concurrent.*;

import org.junit.Test;

import io.reactivex.*;
import io.reactivex.disposables.Disposable;
import io.reactivex.exceptions.TestException;
import io.reactivex.functions.*;
import io.reactivex.processors.PublishProcessor;
import io.reactivex.schedulers.Schedulers;

public class SingleUnsubscribeOnTest {

@Test
public void normal() throws Exception {
PublishProcessor<Integer> pp = PublishProcessor.create();

final String[] name = { null };

final CountDownLatch cdl = new CountDownLatch(1);

pp.doOnCancel(new Action() {
@Override
public void run() throws Exception {
name[0] = Thread.currentThread().getName();
cdl.countDown();
}
})
.single(-99)
.unsubscribeOn(Schedulers.single())
.test(true)
;

assertTrue(cdl.await(5, TimeUnit.SECONDS));

int times = 10;

while (times-- > 0 && pp.hasSubscribers()) {
Thread.sleep(100);
}

assertFalse(pp.hasSubscribers());

assertNotEquals(Thread.currentThread().getName(), name[0]);
}

@Test
public void just() {
Single.just(1)
.unsubscribeOn(Schedulers.single())
.test()
.assertResult(1);
}

@Test
public void error() {
Single.<Integer>error(new TestException())
.unsubscribeOn(Schedulers.single())
.test()
.assertFailure(TestException.class);
}

@Test
public void dispose() {
TestHelper.checkDisposed(Single.just(1)
.unsubscribeOn(Schedulers.single()));
}

@Test
public void doubleOnSubscribe() {
TestHelper.checkDoubleOnSubscribeSingle(new Function<Single<Object>, SingleSource<Object>>() {
@Override
public SingleSource<Object> apply(Single<Object> v) throws Exception {
return v.unsubscribeOn(Schedulers.single());
}
});
}

@Test
public void disposeRace() {
for (int i = 0; i < 500; i++) {
PublishProcessor<Integer> pp = PublishProcessor.create();

final Disposable[] ds = { null };
pp.single(-99).unsubscribeOn(Schedulers.computation())
.subscribe(new SingleObserver<Integer>() {
@Override
public void onSubscribe(Disposable d) {
ds[0] = d;
}

@Override
public void onSuccess(Integer value) {

}

@Override
public void onError(Throwable e) {

}
});

Runnable r = new Runnable() {
@Override
public void run() {
ds[0].dispose();
}
};

TestHelper.race(r, r, Schedulers.single());
}
}
}

0 comments on commit 6c58557

Please sign in to comment.