Skip to content

Commit

Permalink
Add doAfterTerminate callback to the Single type. (#5093)
Browse files Browse the repository at this point in the history
* Add doAfterTerminate callback to the Single type.

* Mark the doAfterTerminate() experimental since 2.0.6
  • Loading branch information
VeskoI authored and akarnokd committed Feb 13, 2017
1 parent 28d1352 commit 7bfeccc
Show file tree
Hide file tree
Showing 3 changed files with 257 additions and 0 deletions.
26 changes: 26 additions & 0 deletions src/main/java/io/reactivex/Single.java
Original file line number Diff line number Diff line change
Expand Up @@ -1798,6 +1798,32 @@ public final Single<T> doAfterSuccess(Consumer<? super T> onAfterSuccess) {
return RxJavaPlugins.onAssembly(new SingleDoAfterSuccess<T>(this, onAfterSuccess));
}

/**
* Registers an {@link Action} to be called after this Single invokes either onSuccess or onError.
* * <p>Note that the {@code doAfterSuccess} action is shared between subscriptions and as such
* should be thread-safe.</p>
* <p>
* <img width="640" height="310" src="https://raw.github.com/wiki/ReactiveX/RxJava/images/rx-operators/doAfterTerminate.png" alt="">
* <dl>
* <dt><b>Scheduler:</b></dt>
* <dd>{@code doAfterTerminate} does not operate by default on a particular {@link Scheduler}.</dd>
* </dl>
*
* @param onAfterTerminate
* an {@link Action} to be invoked when the source Single finishes
* @return a Single that emits the same items as the source Single, then invokes the
* {@link Action}
* @see <a href="http://reactivex.io/documentation/operators/do.html">ReactiveX operators documentation: Do</a>
* @since 2.0.6 - experimental
*/
@CheckReturnValue
@SchedulerSupport(SchedulerSupport.NONE)
@Experimental
public final Single<T> doAfterTerminate(Action onAfterTerminate) {
ObjectHelper.requireNonNull(onAfterTerminate, "onAfterTerminate is null");
return RxJavaPlugins.onAssembly(new SingleDoAfterTerminate<T>(this, onAfterTerminate));
}

/**
* Calls the specified action after this Single signals onSuccess or onError or gets disposed by
* the downstream.
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,101 @@
/**
* Copyright (c) 2016-present, RxJava Contributors.
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in
* compliance with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License is
* distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See
* the License for the specific language governing permissions and limitations under the License.
*/

package io.reactivex.internal.operators.single;

import io.reactivex.Single;
import io.reactivex.SingleObserver;
import io.reactivex.SingleSource;
import io.reactivex.disposables.Disposable;
import io.reactivex.exceptions.Exceptions;
import io.reactivex.functions.Action;
import io.reactivex.internal.disposables.DisposableHelper;
import io.reactivex.plugins.RxJavaPlugins;

/**
* Calls an action after pushing the current item or an error to the downstream.
* @param <T> the value type
* @since 2.0.6 - experimental
*/
public final class SingleDoAfterTerminate<T> extends Single<T> {

final SingleSource<T> source;

final Action onAfterTerminate;

public SingleDoAfterTerminate(SingleSource<T> source, Action onAfterTerminate) {
this.source = source;
this.onAfterTerminate = onAfterTerminate;
}

@Override
protected void subscribeActual(SingleObserver<? super T> s) {
source.subscribe(new DoAfterTerminateObserver<T>(s, onAfterTerminate));
}

static final class DoAfterTerminateObserver<T> implements SingleObserver<T>, Disposable {

final SingleObserver<? super T> actual;

final Action onAfterTerminate;

Disposable d;

DoAfterTerminateObserver(SingleObserver<? super T> actual, Action onAfterTerminate) {
this.actual = actual;
this.onAfterTerminate = onAfterTerminate;
}

@Override
public void onSubscribe(Disposable d) {
if (DisposableHelper.validate(this.d, d)) {
this.d = d;

actual.onSubscribe(this);
}
}

@Override
public void onSuccess(T t) {
actual.onSuccess(t);

onAfterTerminate();
}

@Override
public void onError(Throwable e) {
actual.onError(e);

onAfterTerminate();
}

@Override
public void dispose() {
d.dispose();
}

@Override
public boolean isDisposed() {
return d.isDisposed();
}

private void onAfterTerminate() {
try {
onAfterTerminate.run();
} catch (Throwable ex) {
Exceptions.throwIfFatal(ex);
RxJavaPlugins.onError(ex);
}
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,130 @@
/**
* Copyright (c) 2016-present, RxJava Contributors.
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in
* compliance with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License is
* distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See
* the License for the specific language governing permissions and limitations under the License.
*/

package io.reactivex.internal.operators.single;

import io.reactivex.Single;
import io.reactivex.SingleSource;
import io.reactivex.TestHelper;
import io.reactivex.exceptions.TestException;
import io.reactivex.functions.Action;
import io.reactivex.functions.Function;
import io.reactivex.internal.functions.Functions;
import io.reactivex.observers.TestObserver;
import io.reactivex.plugins.RxJavaPlugins;
import io.reactivex.subjects.PublishSubject;
import org.junit.Test;

import java.util.List;

import static org.junit.Assert.assertEquals;

public class SingleDoAfterTerminateTest {

private final int[] call = { 0 };

private final Action afterTerminate = new Action() {
@Override
public void run() throws Exception {
call[0]++;
}
};

private final TestObserver<Integer> ts = new TestObserver<Integer>();

@Test
public void just() {
Single.just(1)
.doAfterTerminate(afterTerminate)
.subscribeWith(ts)
.assertResult(1);

assertAfterTerminateCalledOnce();
}

@Test
public void error() {
Single.<Integer>error(new TestException())
.doAfterTerminate(afterTerminate)
.subscribeWith(ts)
.assertFailure(TestException.class);

assertAfterTerminateCalledOnce();
}

@Test(expected = NullPointerException.class)
public void afterTerminateActionNull() {
Single.just(1).doAfterTerminate(null);
}

@Test
public void justConditional() {
Single.just(1)
.doAfterTerminate(afterTerminate)
.filter(Functions.alwaysTrue())
.subscribeWith(ts)
.assertResult(1);

assertAfterTerminateCalledOnce();
}

@Test
public void errorConditional() {
Single.<Integer>error(new TestException())
.doAfterTerminate(afterTerminate)
.filter(Functions.alwaysTrue())
.subscribeWith(ts)
.assertFailure(TestException.class);

assertAfterTerminateCalledOnce();
}

@Test
public void actionThrows() {
List<Throwable> errors = TestHelper.trackPluginErrors();
try {
Single.just(1)
.doAfterTerminate(new Action() {
@Override
public void run() throws Exception {
throw new TestException();
}
})
.test()
.assertResult(1);

TestHelper.assertUndeliverable(errors, 0, TestException.class);
} finally {
RxJavaPlugins.reset();
}
}

@Test
public void dispose() {
TestHelper.checkDisposed(PublishSubject.<Integer>create().singleOrError().doAfterTerminate(afterTerminate));
}

@Test
public void doubleOnSubscribe() {
TestHelper.checkDoubleOnSubscribeSingle(new Function<Single<Integer>, SingleSource<Integer>>() {
@Override
public SingleSource<Integer> apply(Single<Integer> m) throws Exception {
return m.doAfterTerminate(afterTerminate);
}
});
}

private void assertAfterTerminateCalledOnce() {
assertEquals(1, call[0]);
}
}

0 comments on commit 7bfeccc

Please sign in to comment.