From 5fec06fdfec66a6a6422b18a65fd9d934d4fd7fd Mon Sep 17 00:00:00 2001 From: Dave Moten Date: Fri, 14 Aug 2015 21:07:33 +1000 Subject: [PATCH] catch onCompleted unsubscribe error and report to RxJavaPlugin error handler --- .../OnCompletedFailedException.java | 29 +++++ .../UnsubscribeFailedException.java | 30 +++++ .../rx/internal/util/RxJavaPluginUtils.java | 40 +++++++ .../java/rx/observers/SafeSubscriber.java | 58 +++------- .../java/rx/observers/SafeObserverTest.java | 27 ++--- .../java/rx/observers/SafeSubscriberTest.java | 105 ++++++++++++++++-- 6 files changed, 223 insertions(+), 66 deletions(-) create mode 100644 src/main/java/rx/exceptions/OnCompletedFailedException.java create mode 100644 src/main/java/rx/exceptions/UnsubscribeFailedException.java create mode 100644 src/main/java/rx/internal/util/RxJavaPluginUtils.java diff --git a/src/main/java/rx/exceptions/OnCompletedFailedException.java b/src/main/java/rx/exceptions/OnCompletedFailedException.java new file mode 100644 index 0000000000..37632d86c6 --- /dev/null +++ b/src/main/java/rx/exceptions/OnCompletedFailedException.java @@ -0,0 +1,29 @@ +/** + * Copyright 2014 Netflix, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package rx.exceptions; + +public final class OnCompletedFailedException extends RuntimeException { + + private static final long serialVersionUID = 8622579378868820554L; + + public OnCompletedFailedException(Throwable throwable) { + super(throwable); + } + + public OnCompletedFailedException(String message, Throwable throwable) { + super(message, throwable); + } +} diff --git a/src/main/java/rx/exceptions/UnsubscribeFailedException.java b/src/main/java/rx/exceptions/UnsubscribeFailedException.java new file mode 100644 index 0000000000..8b01df8aa3 --- /dev/null +++ b/src/main/java/rx/exceptions/UnsubscribeFailedException.java @@ -0,0 +1,30 @@ +/** + * Copyright 2014 Netflix, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package rx.exceptions; + +public final class UnsubscribeFailedException extends RuntimeException { + + private static final long serialVersionUID = 4594672310593167598L; + + public UnsubscribeFailedException(Throwable throwable) { + super(throwable); + } + + public UnsubscribeFailedException(String message, Throwable throwable) { + super(message, throwable); + } + +} diff --git a/src/main/java/rx/internal/util/RxJavaPluginUtils.java b/src/main/java/rx/internal/util/RxJavaPluginUtils.java new file mode 100644 index 0000000000..b6b462412c --- /dev/null +++ b/src/main/java/rx/internal/util/RxJavaPluginUtils.java @@ -0,0 +1,40 @@ +/** + * Copyright 2014 Netflix, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not + * use this file except in compliance with the License. You may obtain a copy of + * the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations under + * the License. + */ +package rx.internal.util; + +import rx.plugins.RxJavaPlugins; + +public final class RxJavaPluginUtils { + + public static void handleException(Throwable e) { + try { + RxJavaPlugins.getInstance().getErrorHandler().handleError(e); + } catch (Throwable pluginException) { + handlePluginException(pluginException); + } + } + + private static void handlePluginException(Throwable pluginException) { + /* + * We don't want errors from the plugin to affect normal flow. + * Since the plugin should never throw this is a safety net + * and will complain loudly to System.err so it gets fixed. + */ + System.err.println("RxJavaErrorHandler threw an Exception. It shouldn't. => " + pluginException.getMessage()); + pluginException.printStackTrace(); + } + +} diff --git a/src/main/java/rx/observers/SafeSubscriber.java b/src/main/java/rx/observers/SafeSubscriber.java index 0181887c34..8a9aad5179 100644 --- a/src/main/java/rx/observers/SafeSubscriber.java +++ b/src/main/java/rx/observers/SafeSubscriber.java @@ -20,9 +20,11 @@ import rx.Subscriber; import rx.exceptions.CompositeException; import rx.exceptions.Exceptions; +import rx.exceptions.OnCompletedFailedException; import rx.exceptions.OnErrorFailedException; import rx.exceptions.OnErrorNotImplementedException; -import rx.plugins.RxJavaPlugins; +import rx.exceptions.UnsubscribeFailedException; +import rx.internal.util.RxJavaPluginUtils; /** * {@code SafeSubscriber} is a wrapper around {@code Subscriber} that ensures that the {@code Subscriber} @@ -83,11 +85,17 @@ public void onCompleted() { // we handle here instead of another method so we don't add stacks to the frame // which can prevent it from being able to handle StackOverflow Exceptions.throwIfFatal(e); - // handle errors if the onCompleted implementation fails, not just if the Observable fails - _onError(e); + RxJavaPluginUtils.handleException(e); + throw new OnCompletedFailedException(e.getMessage(), e); } finally { - // auto-unsubscribe - unsubscribe(); + try { + // Similarly to onError if failure occurs in unsubscribe then Rx contract is broken + // and we throw an UnsubscribeFailureException. + unsubscribe(); + } catch (Throwable e) { + RxJavaPluginUtils.handleException(e); + throw new UnsubscribeFailedException(e.getMessage(), e); + } } } } @@ -145,11 +153,7 @@ public void onNext(T args) { * @see the report of this bug */ protected void _onError(Throwable e) { - try { - RxJavaPlugins.getInstance().getErrorHandler().handleError(e); - } catch (Throwable pluginException) { - handlePluginException(pluginException); - } + RxJavaPluginUtils.handleException(e); try { actual.onError(e); } catch (Throwable e2) { @@ -168,11 +172,7 @@ protected void _onError(Throwable e) { try { unsubscribe(); } catch (Throwable unsubscribeException) { - try { - RxJavaPlugins.getInstance().getErrorHandler().handleError(unsubscribeException); - } catch (Throwable pluginException) { - handlePluginException(pluginException); - } + RxJavaPluginUtils.handleException(unsubscribeException); throw new RuntimeException("Observer.onError not implemented and error while unsubscribing.", new CompositeException(Arrays.asList(e, unsubscribeException))); } throw (OnErrorNotImplementedException) e2; @@ -182,19 +182,11 @@ protected void _onError(Throwable e) { * * https://github.com/ReactiveX/RxJava/issues/198 */ - try { - RxJavaPlugins.getInstance().getErrorHandler().handleError(e2); - } catch (Throwable pluginException) { - handlePluginException(pluginException); - } + RxJavaPluginUtils.handleException(e2); try { unsubscribe(); } catch (Throwable unsubscribeException) { - try { - RxJavaPlugins.getInstance().getErrorHandler().handleError(unsubscribeException); - } catch (Throwable pluginException) { - handlePluginException(pluginException); - } + RxJavaPluginUtils.handleException(unsubscribeException); throw new OnErrorFailedException("Error occurred when trying to propagate error to Observer.onError and during unsubscription.", new CompositeException(Arrays.asList(e, e2, unsubscribeException))); } @@ -205,25 +197,11 @@ protected void _onError(Throwable e) { try { unsubscribe(); } catch (RuntimeException unsubscribeException) { - try { - RxJavaPlugins.getInstance().getErrorHandler().handleError(unsubscribeException); - } catch (Throwable pluginException) { - handlePluginException(pluginException); - } + RxJavaPluginUtils.handleException(unsubscribeException); throw new OnErrorFailedException(unsubscribeException); } } - private void handlePluginException(Throwable pluginException) { - /* - * We don't want errors from the plugin to affect normal flow. - * Since the plugin should never throw this is a safety net - * and will complain loudly to System.err so it gets fixed. - */ - System.err.println("RxJavaErrorHandler threw an Exception. It shouldn't. => " + pluginException.getMessage()); - pluginException.printStackTrace(); - } - /** * Returns the {@link Subscriber} underlying this {@code SafeSubscriber}. * diff --git a/src/test/java/rx/observers/SafeObserverTest.java b/src/test/java/rx/observers/SafeObserverTest.java index 1083e995c7..7924bb4026 100644 --- a/src/test/java/rx/observers/SafeObserverTest.java +++ b/src/test/java/rx/observers/SafeObserverTest.java @@ -22,6 +22,7 @@ import org.junit.Test; +import junit.framework.Assert; import rx.Subscriber; import rx.exceptions.*; import rx.functions.Action0; @@ -68,19 +69,6 @@ public void onCompletedFailure() { } } - @Test - public void onCompletedFailureSafe() { - AtomicReference onError = new AtomicReference(); - try { - new SafeSubscriber(OBSERVER_ONCOMPLETED_FAIL(onError)).onCompleted(); - assertNotNull(onError.get()); - assertTrue(onError.get() instanceof SafeObserverTestException); - assertEquals("onCompletedFail", onError.get().getMessage()); - } catch (Exception e) { - fail("expects exception to be passed to onError"); - } - } - @Test public void onErrorFailure() { try { @@ -184,8 +172,8 @@ public void call() { e.printStackTrace(); assertTrue(o.isUnsubscribed()); - - assertTrue(e instanceof SafeObserverTestException); + assertTrue(e instanceof UnsubscribeFailedException); + assertTrue(e.getCause() instanceof SafeObserverTestException); assertEquals("failure from unsubscribe", e.getMessage()); // expected since onError fails so SafeObserver can't help } @@ -475,9 +463,12 @@ public void onCompleted() { } }); - s.onCompleted(); - - assertTrue("Error not received", error.get() instanceof TestException); + try { + s.onCompleted(); + Assert.fail(); + } catch (OnCompletedFailedException e) { + assertNull(error.get()); + } } @Test diff --git a/src/test/java/rx/observers/SafeSubscriberTest.java b/src/test/java/rx/observers/SafeSubscriberTest.java index 85c2d7b07f..5ce37cdea4 100644 --- a/src/test/java/rx/observers/SafeSubscriberTest.java +++ b/src/test/java/rx/observers/SafeSubscriberTest.java @@ -15,15 +15,25 @@ */ package rx.observers; +import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertTrue; import java.lang.reflect.Method; +import java.util.concurrent.atomic.AtomicInteger; -import org.junit.*; +import org.junit.After; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; -import rx.exceptions.*; +import rx.exceptions.OnCompletedFailedException; +import rx.exceptions.OnErrorFailedException; +import rx.exceptions.OnErrorNotImplementedException; +import rx.exceptions.TestException; +import rx.exceptions.UnsubscribeFailedException; import rx.functions.Action0; -import rx.plugins.*; +import rx.plugins.RxJavaErrorHandler; +import rx.plugins.RxJavaPlugins; import rx.subscriptions.Subscriptions; public class SafeSubscriberTest { @@ -51,10 +61,12 @@ public void onCompleted() { } }; SafeSubscriber safe = new SafeSubscriber(ts); - - safe.onCompleted(); - - assertTrue(safe.isUnsubscribed()); + try { + safe.onCompleted(); + Assert.fail(); + } catch (OnCompletedFailedException e) { + assertTrue(safe.isUnsubscribed()); + } } @Test @@ -76,7 +88,7 @@ public void onCompleted() { assertTrue(safe.isUnsubscribed()); } - @Test + @Test(expected=OnCompletedFailedException.class) public void testPluginException() { RxJavaPlugins.getInstance().registerErrorHandler(new RxJavaErrorHandler() { @Override @@ -227,4 +239,81 @@ public void call() { safe.onError(new TestException()); } + + @Test + public void testPluginErrorHandlerReceivesExceptionWhenUnsubscribeAfterCompletionThrows() { + final AtomicInteger calls = new AtomicInteger(); + RxJavaPlugins.getInstance().registerErrorHandler(new RxJavaErrorHandler() { + @Override + public void handleError(Throwable e) { + calls.incrementAndGet(); + } + }); + + final AtomicInteger errors = new AtomicInteger(); + TestSubscriber ts = new TestSubscriber() { + @Override + public void onError(Throwable e) { + errors.incrementAndGet(); + } + }; + final RuntimeException ex = new RuntimeException(); + SafeSubscriber safe = new SafeSubscriber(ts); + safe.add(Subscriptions.create(new Action0() { + @Override + public void call() { + throw ex; + } + })); + + try { + safe.onCompleted(); + Assert.fail(); + } catch(UnsubscribeFailedException e) { + assertEquals(1, (int) calls.get()); + assertEquals(0, (int) errors.get()); + } + } + + @Test + public void testPluginErrorHandlerReceivesExceptionFromFailingUnsubscribeAfterCompletionThrows() { + final AtomicInteger calls = new AtomicInteger(); + RxJavaPlugins.getInstance().registerErrorHandler(new RxJavaErrorHandler() { + @Override + public void handleError(Throwable e) { + calls.incrementAndGet(); + } + }); + + final AtomicInteger errors = new AtomicInteger(); + TestSubscriber ts = new TestSubscriber() { + + @Override + public void onCompleted() { + throw new RuntimeException(); + } + + @Override + public void onError(Throwable e) { + errors.incrementAndGet(); + } + }; + SafeSubscriber safe = new SafeSubscriber(ts); + safe.add(Subscriptions.create(new Action0() { + @Override + public void call() { + throw new RuntimeException(); + } + })); + + try { + safe.onCompleted(); + Assert.fail(); + } catch(UnsubscribeFailedException e) { + assertEquals(2, (int) calls.get()); + assertEquals(0, (int) errors.get()); + } + } + + }