Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

SafeSubscriber - report onCompleted unsubscribe error to RxJavaPlugin #3155

Merged
merged 1 commit into from
Aug 24, 2015
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
29 changes: 29 additions & 0 deletions src/main/java/rx/exceptions/OnCompletedFailedException.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
/**
* Copyright 2014 Netflix, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package rx.exceptions;

public final class OnCompletedFailedException extends RuntimeException {

private static final long serialVersionUID = 8622579378868820554L;

public OnCompletedFailedException(Throwable throwable) {
super(throwable);
}

public OnCompletedFailedException(String message, Throwable throwable) {
super(message, throwable);
}
}
30 changes: 30 additions & 0 deletions src/main/java/rx/exceptions/UnsubscribeFailedException.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
/**
* Copyright 2014 Netflix, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package rx.exceptions;

public final class UnsubscribeFailedException extends RuntimeException {

private static final long serialVersionUID = 4594672310593167598L;

public UnsubscribeFailedException(Throwable throwable) {
super(throwable);
}

public UnsubscribeFailedException(String message, Throwable throwable) {
super(message, throwable);
}

}
40 changes: 40 additions & 0 deletions src/main/java/rx/internal/util/RxJavaPluginUtils.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
/**
* Copyright 2014 Netflix, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not
* use this file except in compliance with the License. You may obtain a copy of
* the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations under
* the License.
*/
package rx.internal.util;

import rx.plugins.RxJavaPlugins;

public final class RxJavaPluginUtils {

public static void handleException(Throwable e) {
try {
RxJavaPlugins.getInstance().getErrorHandler().handleError(e);
} catch (Throwable pluginException) {
handlePluginException(pluginException);
}
}

private static void handlePluginException(Throwable pluginException) {
/*
* We don't want errors from the plugin to affect normal flow.
* Since the plugin should never throw this is a safety net
* and will complain loudly to System.err so it gets fixed.
*/
System.err.println("RxJavaErrorHandler threw an Exception. It shouldn't. => " + pluginException.getMessage());
pluginException.printStackTrace();
}

}
58 changes: 18 additions & 40 deletions src/main/java/rx/observers/SafeSubscriber.java
Original file line number Diff line number Diff line change
Expand Up @@ -20,9 +20,11 @@
import rx.Subscriber;
import rx.exceptions.CompositeException;
import rx.exceptions.Exceptions;
import rx.exceptions.OnCompletedFailedException;
import rx.exceptions.OnErrorFailedException;
import rx.exceptions.OnErrorNotImplementedException;
import rx.plugins.RxJavaPlugins;
import rx.exceptions.UnsubscribeFailedException;
import rx.internal.util.RxJavaPluginUtils;

/**
* {@code SafeSubscriber} is a wrapper around {@code Subscriber} that ensures that the {@code Subscriber}
Expand Down Expand Up @@ -83,11 +85,17 @@ public void onCompleted() {
// we handle here instead of another method so we don't add stacks to the frame
// which can prevent it from being able to handle StackOverflow
Exceptions.throwIfFatal(e);
// handle errors if the onCompleted implementation fails, not just if the Observable fails
_onError(e);
RxJavaPluginUtils.handleException(e);
throw new OnCompletedFailedException(e.getMessage(), e);
} finally {
// auto-unsubscribe
unsubscribe();
try {
// Similarly to onError if failure occurs in unsubscribe then Rx contract is broken
// and we throw an UnsubscribeFailureException.
unsubscribe();
} catch (Throwable e) {
RxJavaPluginUtils.handleException(e);
throw new UnsubscribeFailedException(e.getMessage(), e);
}
}
}
}
Expand Down Expand Up @@ -145,11 +153,7 @@ public void onNext(T args) {
* @see <a href="https://github.com/ReactiveX/RxJava/issues/630">the report of this bug</a>
*/
protected void _onError(Throwable e) {
try {
RxJavaPlugins.getInstance().getErrorHandler().handleError(e);
} catch (Throwable pluginException) {
handlePluginException(pluginException);
}
RxJavaPluginUtils.handleException(e);
try {
actual.onError(e);
} catch (Throwable e2) {
Expand All @@ -168,11 +172,7 @@ protected void _onError(Throwable e) {
try {
unsubscribe();
} catch (Throwable unsubscribeException) {
try {
RxJavaPlugins.getInstance().getErrorHandler().handleError(unsubscribeException);
} catch (Throwable pluginException) {
handlePluginException(pluginException);
}
RxJavaPluginUtils.handleException(unsubscribeException);
throw new RuntimeException("Observer.onError not implemented and error while unsubscribing.", new CompositeException(Arrays.asList(e, unsubscribeException)));
}
throw (OnErrorNotImplementedException) e2;
Expand All @@ -182,19 +182,11 @@ protected void _onError(Throwable e) {
*
* https://github.com/ReactiveX/RxJava/issues/198
*/
try {
RxJavaPlugins.getInstance().getErrorHandler().handleError(e2);
} catch (Throwable pluginException) {
handlePluginException(pluginException);
}
RxJavaPluginUtils.handleException(e2);
try {
unsubscribe();
} catch (Throwable unsubscribeException) {
try {
RxJavaPlugins.getInstance().getErrorHandler().handleError(unsubscribeException);
} catch (Throwable pluginException) {
handlePluginException(pluginException);
}
RxJavaPluginUtils.handleException(unsubscribeException);
throw new OnErrorFailedException("Error occurred when trying to propagate error to Observer.onError and during unsubscription.", new CompositeException(Arrays.asList(e, e2, unsubscribeException)));
}

Expand All @@ -205,25 +197,11 @@ protected void _onError(Throwable e) {
try {
unsubscribe();
} catch (RuntimeException unsubscribeException) {
try {
RxJavaPlugins.getInstance().getErrorHandler().handleError(unsubscribeException);
} catch (Throwable pluginException) {
handlePluginException(pluginException);
}
RxJavaPluginUtils.handleException(unsubscribeException);
throw new OnErrorFailedException(unsubscribeException);
}
}

private void handlePluginException(Throwable pluginException) {
/*
* We don't want errors from the plugin to affect normal flow.
* Since the plugin should never throw this is a safety net
* and will complain loudly to System.err so it gets fixed.
*/
System.err.println("RxJavaErrorHandler threw an Exception. It shouldn't. => " + pluginException.getMessage());
pluginException.printStackTrace();
}

/**
* Returns the {@link Subscriber} underlying this {@code SafeSubscriber}.
*
Expand Down
27 changes: 9 additions & 18 deletions src/test/java/rx/observers/SafeObserverTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@

import org.junit.Test;

import junit.framework.Assert;
import rx.Subscriber;
import rx.exceptions.*;
import rx.functions.Action0;
Expand Down Expand Up @@ -68,19 +69,6 @@ public void onCompletedFailure() {
}
}

@Test
public void onCompletedFailureSafe() {
AtomicReference<Throwable> onError = new AtomicReference<Throwable>();
try {
new SafeSubscriber<String>(OBSERVER_ONCOMPLETED_FAIL(onError)).onCompleted();
assertNotNull(onError.get());
assertTrue(onError.get() instanceof SafeObserverTestException);
assertEquals("onCompletedFail", onError.get().getMessage());
} catch (Exception e) {
fail("expects exception to be passed to onError");
}
}

@Test
public void onErrorFailure() {
try {
Expand Down Expand Up @@ -184,8 +172,8 @@ public void call() {
e.printStackTrace();

assertTrue(o.isUnsubscribed());

assertTrue(e instanceof SafeObserverTestException);
assertTrue(e instanceof UnsubscribeFailedException);
assertTrue(e.getCause() instanceof SafeObserverTestException);
assertEquals("failure from unsubscribe", e.getMessage());
// expected since onError fails so SafeObserver can't help
}
Expand Down Expand Up @@ -475,9 +463,12 @@ public void onCompleted() {
}
});

s.onCompleted();

assertTrue("Error not received", error.get() instanceof TestException);
try {
s.onCompleted();
Assert.fail();
} catch (OnCompletedFailedException e) {
assertNull(error.get());
}
}

@Test
Expand Down
Loading