Skip to content

Commit

Permalink
1.x: Add Single.onErrorResumeNext(Func)
Browse files Browse the repository at this point in the history
  • Loading branch information
artem-zinnatullin committed Mar 14, 2016
1 parent a42d0bf commit afd6649
Show file tree
Hide file tree
Showing 5 changed files with 159 additions and 47 deletions.
33 changes: 32 additions & 1 deletion src/main/java/rx/Single.java
Original file line number Diff line number Diff line change
Expand Up @@ -1432,7 +1432,38 @@ public final Single<T> onErrorReturn(Func1<Throwable, ? extends T> resumeFunctio
* @see <a href="http://reactivex.io/documentation/operators/catch.html">ReactiveX operators documentation: Catch</a>
*/
public final Single<T> onErrorResumeNext(Single<? extends T> resumeSingleInCaseOfError) {
return new Single<T>(new SingleOperatorOnErrorResumeNextViaSingle<T>(this, resumeSingleInCaseOfError));
return new Single<T>(SingleOperatorOnErrorResumeNext.withOther(this, resumeSingleInCaseOfError));
}

/**
* Instructs a Single to pass control to another Single rather than invoking
* {@link Observer#onError(Throwable)} if it encounters an error.
* <p/>
* <img width="640" height="310" src="https://raw.github.com/wiki/ReactiveX/RxJava/images/rx-operators/onErrorResumeNext.png" alt="">
* <p/>
* By default, when a Single encounters an error that prevents it from emitting the expected item to
* its {@link Observer}, the Single invokes its Observer's {@code onError} method, and then quits
* without invoking any more of its Observer's methods. The {@code onErrorResumeNext} method changes this
* behavior. If you pass a function that will return another Single ({@code resumeFunctionInCaseOfError}) to an Single's
* {@code onErrorResumeNext} method, if the original Single encounters an error, instead of invoking its
* Observer's {@code onError} method, it will instead relinquish control to {@code resumeSingleInCaseOfError} which
* will invoke the Observer's {@link Observer#onNext onNext} method if it is able to do so. In such a case,
* because no Single necessarily invokes {@code onError}, the Observer may never know that an error
* happened.
* <p/>
* You can use this to prevent errors from propagating or to supply fallback data should errors be
* encountered.
* <dl>
* <dt><b>Scheduler:</b></dt>
* <dd>{@code onErrorResumeNext} does not operate by default on a particular {@link Scheduler}.</dd>
* </dl>
*
* @param resumeFunctionInCaseOfError a function that returns a Single that will take control if source Single encounters an error.
* @return the original Single, with appropriately modified behavior.
* @see <a href="http://reactivex.io/documentation/operators/catch.html">ReactiveX operators documentation: Catch</a>
*/
public final Single<T> onErrorResumeNext(final Func1<Throwable, ? extends Single<? extends T>> resumeFunctionInCaseOfError) {
return new Single<T>(SingleOperatorOnErrorResumeNext.withFunction(this, resumeFunctionInCaseOfError));
}

/**
Expand Down
15 changes: 15 additions & 0 deletions src/main/java/rx/exceptions/Exceptions.java
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
import java.util.*;

import rx.Observer;
import rx.SingleSubscriber;
import rx.annotations.Experimental;

/**
Expand Down Expand Up @@ -188,6 +189,7 @@ public static void throwOrReport(Throwable t, Observer<?> o, Object value) {
Exceptions.throwIfFatal(t);
o.onError(OnErrorThrowable.addValueAsLastCause(t, value));
}

/**
* Forwards a fatal exception or reports it to the given Observer.
* @param t the exception
Expand All @@ -199,4 +201,17 @@ public static void throwOrReport(Throwable t, Observer<?> o) {
Exceptions.throwIfFatal(t);
o.onError(t);
}

/**
* Forwards a fatal exception or reports it to the given Observer.
*
* @param throwable the exception.
* @param subscriber the subscriber to report to.
* @since (if this graduates from Experimental/Beta to supported, replace this parenthetical with the release number).
*/
@Experimental
public static void throwOrReport(Throwable throwable, SingleSubscriber<?> subscriber) {
Exceptions.throwIfFatal(throwable);
subscriber.onError(throwable);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,65 @@
package rx.internal.operators;

import rx.Single;
import rx.SingleSubscriber;
import rx.exceptions.Exceptions;
import rx.functions.Func1;
import rx.plugins.RxJavaPlugins;

public class SingleOperatorOnErrorResumeNext<T> implements Single.OnSubscribe<T> {

private final Single<? extends T> originalSingle;
private final Func1<Throwable, ? extends Single<? extends T>> resumeFunctionInCaseOfError;

private SingleOperatorOnErrorResumeNext(Single<? extends T> originalSingle, Func1<Throwable, ? extends Single<? extends T>> resumeFunctionInCaseOfError) {
if (originalSingle == null) {
throw new NullPointerException("originalSingle must not be null");
}

if (resumeFunctionInCaseOfError == null) {
throw new NullPointerException("resumeFunctionInCaseOfError must not be null");
}

this.originalSingle = originalSingle;
this.resumeFunctionInCaseOfError = resumeFunctionInCaseOfError;
}

public static <T> SingleOperatorOnErrorResumeNext<T> withFunction(Single<? extends T> originalSingle, Func1<Throwable, ? extends Single<? extends T>> resumeFunctionInCaseOfError) {
return new SingleOperatorOnErrorResumeNext<T>(originalSingle, resumeFunctionInCaseOfError);
}

public static <T> SingleOperatorOnErrorResumeNext<T> withOther(Single<? extends T> originalSingle, final Single<? extends T> resumeSingleInCaseOfError) {
if (resumeSingleInCaseOfError == null) {
throw new NullPointerException("resumeSingleInCaseOfError must not be null");
}

return new SingleOperatorOnErrorResumeNext<T>(originalSingle, new Func1<Throwable, Single<? extends T>>() {
@Override
public Single<? extends T> call(Throwable throwable) {
return resumeSingleInCaseOfError;
}
});
}

@Override
public void call(final SingleSubscriber<? super T> child) {
final SingleSubscriber<? super T> parent = new SingleSubscriber<T>() {
@Override
public void onSuccess(T value) {
child.onSuccess(value);
}

@Override
public void onError(Throwable error) {
try {
resumeFunctionInCaseOfError.call(error).subscribe(child);
} catch (Throwable innerError) {
Exceptions.throwOrReport(innerError, child);
}
}
};

child.add(parent);
originalSingle.subscribe(parent);
}
}

This file was deleted.

48 changes: 47 additions & 1 deletion src/test/java/rx/SingleTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -1220,13 +1220,59 @@ public void onErrorResumeNextViaSingleShouldPreventNullSingle() {
try {
Single
.just("value")
.onErrorResumeNext(null);
.onErrorResumeNext((Single<String>) null);
fail();
} catch (NullPointerException expected) {
assertEquals("resumeSingleInCaseOfError must not be null", expected.getMessage());
}
}

@Test
public void onErrorResumeNextViaFunctionShouldNotInterruptSuccesfulSingle() {
TestSubscriber<String> testSubscriber = new TestSubscriber<String>();

Single
.just("success")
.onErrorResumeNext(new Func1<Throwable, Single<? extends String>>() {
@Override
public Single<? extends String> call(Throwable throwable) {
return Single.just("fail");
}
})
.subscribe(testSubscriber);

testSubscriber.assertValue("success");
}

@Test
public void onErrorResumeNextViaFunctionShouldResumeWithPassedSingleInCaseOfError() {
TestSubscriber<String> testSubscriber = new TestSubscriber<String>();

Single
.<String> error(new RuntimeException("test exception"))
.onErrorResumeNext(new Func1<Throwable, Single<? extends String>>() {
@Override
public Single<? extends String> call(Throwable throwable) {
return Single.just("fallback");
}
})
.subscribe(testSubscriber);

testSubscriber.assertValue("fallback");
}

@Test
public void onErrorResumeNextViaFunctionShouldPreventNullFunction() {
try {
Single
.just("value")
.onErrorResumeNext((Func1<Throwable, ? extends Single<? extends String>>) null);
fail();
} catch (NullPointerException expected) {
assertEquals("resumeFunctionInCaseOfError must not be null", expected.getMessage());
}
}

@Test(expected = NullPointerException.class)
public void iterableToArrayShouldThrowNullPointerExceptionIfIterableNull() {
Single.iterableToArray(null);
Expand Down

0 comments on commit afd6649

Please sign in to comment.