Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Error Handling Unsubscribe and Terminal State #1669

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
import rx.Observable;
import rx.Observable.Operator;
import rx.Subscriber;
import rx.exceptions.Exceptions;
import rx.functions.Func1;
import rx.plugins.RxJavaPlugins;

Expand Down Expand Up @@ -49,17 +50,29 @@ public OperatorOnErrorResumeNextViaFunction(Func1<Throwable, ? extends Observabl

@Override
public Subscriber<? super T> call(final Subscriber<? super T> child) {
return new Subscriber<T>(child) {
Subscriber<T> parent = new Subscriber<T>() {

private boolean done = false;

@Override
public void onCompleted() {
if (done) {
return;
}
done = true;
child.onCompleted();
}

@Override
public void onError(Throwable e) {
if (done) {
Exceptions.throwIfFatal(e);
return;
}
done = true;
try {
RxJavaPlugins.getInstance().getErrorHandler().handleError(e);
unsubscribe();
Observable<? extends T> resume = resumeFunction.call(e);
resume.unsafeSubscribe(child);
} catch (Throwable e2) {
Expand All @@ -69,10 +82,15 @@ public void onError(Throwable e) {

@Override
public void onNext(T t) {
if (done) {
return;
}
child.onNext(t);
}

};
child.add(parent);
return parent;
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
import rx.Observable;
import rx.Observable.Operator;
import rx.Subscriber;
import rx.exceptions.Exceptions;
import rx.plugins.RxJavaPlugins;

/**
Expand Down Expand Up @@ -51,20 +52,35 @@ public OperatorOnErrorResumeNextViaObservable(Observable<? extends T> resumeSequ
public Subscriber<? super T> call(final Subscriber<? super T> child) {
// shared subscription won't work here
Subscriber<T> s = new Subscriber<T>() {

private boolean done = false;

@Override
public void onNext(T t) {
if (done) {
return;
}
child.onNext(t);
}

@Override
public void onError(Throwable e) {
if (done) {
Exceptions.throwIfFatal(e);
return;
}
done = true;
RxJavaPlugins.getInstance().getErrorHandler().handleError(e);
unsubscribe();
resumeSequence.unsafeSubscribe(child);
}

@Override
public void onCompleted() {
if (done) {
return;
}
done = true;
child.onCompleted();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,9 +16,11 @@
package rx.internal.operators;

import java.util.Arrays;

import rx.Observable.Operator;
import rx.Subscriber;
import rx.exceptions.CompositeException;
import rx.exceptions.Exceptions;
import rx.functions.Func1;
import rx.plugins.RxJavaPlugins;

Expand Down Expand Up @@ -50,7 +52,7 @@ public OperatorOnErrorReturn(Func1<Throwable, ? extends T> resultFunction) {

@Override
public Subscriber<? super T> call(final Subscriber<? super T> child) {
return new Subscriber<T>(child) {
Subscriber<T> parent = new Subscriber<T>() {

private boolean done = false;

Expand All @@ -65,13 +67,14 @@ public void onNext(T t) {
@Override
public void onError(Throwable e) {
if (done) {
Exceptions.throwIfFatal(e);
return;
}
done = true;
try {
RxJavaPlugins.getInstance().getErrorHandler().handleError(e);
unsubscribe();
T result = resultFunction.call(e);

child.onNext(result);
} catch (Throwable x) {
child.onError(new CompositeException(Arrays.asList(e, x)));
Expand All @@ -90,5 +93,7 @@ public void onCompleted() {
}

};
child.add(parent);
return parent;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
import rx.Observable;
import rx.Observable.Operator;
import rx.Subscriber;
import rx.exceptions.Exceptions;
import rx.plugins.RxJavaPlugins;

/**
Expand Down Expand Up @@ -56,13 +57,23 @@ public Subscriber<? super T> call(final Subscriber<? super T> child) {
// needs to independently unsubscribe so child can continue with the resume
Subscriber<T> s = new Subscriber<T>() {

private boolean done = false;

@Override
public void onNext(T t) {
if (done) {
return;
}
child.onNext(t);
}

@Override
public void onError(Throwable e) {
if (done) {
Exceptions.throwIfFatal(e);
return;
}
done = true;
if (e instanceof Exception) {
RxJavaPlugins.getInstance().getErrorHandler().handleError(e);
unsubscribe();
Expand All @@ -74,6 +85,10 @@ public void onError(Throwable e) {

@Override
public void onCompleted() {
if (done) {
return;
}
done = true;
child.onCompleted();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@
import rx.Subscription;
import rx.functions.Func1;
import rx.observers.TestSubscriber;
import rx.schedulers.Schedulers;

public class OperatorOnErrorResumeNextViaFunctionTest {

Expand All @@ -47,6 +48,8 @@ public void testResumeNextWithSynchronousExecution() {
public void call(Subscriber<? super String> observer) {
observer.onNext("one");
observer.onError(new Throwable("injected failure"));
observer.onNext("two");
observer.onNext("three");
}
});

Expand Down Expand Up @@ -226,6 +229,47 @@ public Observable<String> call(Throwable t1) {
System.out.println(ts.getOnNextEvents());
ts.assertReceivedOnNext(Arrays.asList("success"));
}

@Test
public void testMapResumeAsyncNext() {
// Trigger multiple failures
Observable<String> w = Observable.just("one", "fail", "two", "three", "fail");

// Introduce map function that fails intermittently (Map does not prevent this when the observer is a
// rx.operator incl onErrorResumeNextViaObservable)
w = w.map(new Func1<String, String>() {
@Override
public String call(String s) {
if ("fail".equals(s))
throw new RuntimeException("Forced Failure");
System.out.println("BadMapper:" + s);
return s;
}
});

Observable<String> observable = w.onErrorResumeNext(new Func1<Throwable, Observable<String>>() {

@Override
public Observable<String> call(Throwable t1) {
return Observable.just("twoResume", "threeResume").subscribeOn(Schedulers.computation());
}

});

@SuppressWarnings("unchecked")
Observer<String> observer = mock(Observer.class);
TestSubscriber<String> ts = new TestSubscriber<String>(observer);
observable.subscribe(ts);
ts.awaitTerminalEvent();

verify(observer, Mockito.never()).onError(any(Throwable.class));
verify(observer, times(1)).onCompleted();
verify(observer, times(1)).onNext("one");
verify(observer, Mockito.never()).onNext("two");
verify(observer, Mockito.never()).onNext("three");
verify(observer, times(1)).onNext("twoResume");
verify(observer, times(1)).onNext("threeResume");
}

private static class TestObservable implements Observable.OnSubscribe<String> {

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,8 @@
import rx.Observer;
import rx.Subscriber;
import rx.functions.Func1;
import rx.observers.TestSubscriber;
import rx.schedulers.Schedulers;

public class OperatorOnErrorReturnTest {

Expand Down Expand Up @@ -104,6 +106,46 @@ public String call(Throwable e) {
verify(observer, times(0)).onCompleted();
assertNotNull(capturedException.get());
}

@Test
public void testMapResumeAsyncNext() {
// Trigger multiple failures
Observable<String> w = Observable.just("one", "fail", "two", "three", "fail");

// Introduce map function that fails intermittently (Map does not prevent this when the observer is a
// rx.operator incl onErrorResumeNextViaObservable)
w = w.map(new Func1<String, String>() {
@Override
public String call(String s) {
if ("fail".equals(s))
throw new RuntimeException("Forced Failure");
System.out.println("BadMapper:" + s);
return s;
}
});

Observable<String> observable = w.onErrorReturn(new Func1<Throwable, String>() {

@Override
public String call(Throwable t1) {
return "resume";
}

});

@SuppressWarnings("unchecked")
Observer<String> observer = mock(Observer.class);
TestSubscriber<String> ts = new TestSubscriber<String>(observer);
observable.subscribe(ts);
ts.awaitTerminalEvent();

verify(observer, Mockito.never()).onError(any(Throwable.class));
verify(observer, times(1)).onCompleted();
verify(observer, times(1)).onNext("one");
verify(observer, Mockito.never()).onNext("two");
verify(observer, Mockito.never()).onNext("three");
verify(observer, times(1)).onNext("resume");
}

private static class TestObservable implements Observable.OnSubscribe<String> {

Expand Down