From 051df13544fc97fccaac78a64313194f10eec63f Mon Sep 17 00:00:00 2001 From: Niklas Baudy Date: Fri, 23 Sep 2016 14:16:39 +0200 Subject: [PATCH] 2.x: Report errors from onError to Plugin when done --- .../operators/flowable/FlowableAny.java | 10 ++++-- .../operators/flowable/FlowableAnySingle.java | 11 ++++--- .../operators/flowable/FlowableFlatMap.java | 2 ++ .../flowable/FlowableOnBackpressureDrop.java | 2 ++ .../operators/flowable/FlowableSingle.java | 2 ++ .../flowable/FlowableSingleMaybe.java | 1 + .../flowable/FlowableSingleSingle.java | 1 + .../operators/flowable/FlowableTakeWhile.java | 2 ++ .../operators/observable/ObservableAny.java | 10 ++++-- .../observable/ObservableAnySingle.java | 9 ++++-- .../observable/ObservableElementAt.java | 2 ++ .../observable/ObservableElementAtMaybe.java | 1 + .../observable/ObservableElementAtSingle.java | 2 ++ .../observable/ObservableFlatMap.java | 4 +-- .../observable/ObservableSingle.java | 2 ++ .../observable/ObservableSingleMaybe.java | 2 ++ .../observable/ObservableSingleSingle.java | 2 ++ .../operators/observable/ObservableTake.java | 12 ++++--- .../observable/ObservableTakeWhile.java | 2 ++ .../io/reactivex/observers/SafeObserver.java | 1 + .../io/reactivex/subjects/UnicastSubject.java | 2 ++ .../reactivex/subscribers/SafeSubscriber.java | 1 + .../observable/ObservableSingleTest.java | 31 ++++++++++++++++++- 23 files changed, 94 insertions(+), 20 deletions(-) diff --git a/src/main/java/io/reactivex/internal/operators/flowable/FlowableAny.java b/src/main/java/io/reactivex/internal/operators/flowable/FlowableAny.java index a20206e26b..9e3ba4e096 100644 --- a/src/main/java/io/reactivex/internal/operators/flowable/FlowableAny.java +++ b/src/main/java/io/reactivex/internal/operators/flowable/FlowableAny.java @@ -12,6 +12,7 @@ */ package io.reactivex.internal.operators.flowable; +import io.reactivex.plugins.RxJavaPlugins; import org.reactivestreams.*; import io.reactivex.exceptions.Exceptions; @@ -76,10 +77,13 @@ public void onNext(T t) { @Override public void onError(Throwable t) { - if (!done) { - done = true; - actual.onError(t); + if (done) { + RxJavaPlugins.onError(t); + return; } + + done = true; + actual.onError(t); } @Override diff --git a/src/main/java/io/reactivex/internal/operators/flowable/FlowableAnySingle.java b/src/main/java/io/reactivex/internal/operators/flowable/FlowableAnySingle.java index e732fd3964..da6a780e43 100644 --- a/src/main/java/io/reactivex/internal/operators/flowable/FlowableAnySingle.java +++ b/src/main/java/io/reactivex/internal/operators/flowable/FlowableAnySingle.java @@ -90,11 +90,14 @@ public void onNext(T t) { @Override public void onError(Throwable t) { - if (!done) { - done = true; - s = SubscriptionHelper.CANCELLED; - actual.onError(t); + if (done) { + RxJavaPlugins.onError(t); + return; } + + done = true; + s = SubscriptionHelper.CANCELLED; + actual.onError(t); } @Override diff --git a/src/main/java/io/reactivex/internal/operators/flowable/FlowableFlatMap.java b/src/main/java/io/reactivex/internal/operators/flowable/FlowableFlatMap.java index 61827ded2a..d005faf6ec 100644 --- a/src/main/java/io/reactivex/internal/operators/flowable/FlowableFlatMap.java +++ b/src/main/java/io/reactivex/internal/operators/flowable/FlowableFlatMap.java @@ -13,6 +13,7 @@ package io.reactivex.internal.operators.flowable; +import io.reactivex.plugins.RxJavaPlugins; import java.util.concurrent.Callable; import java.util.concurrent.atomic.*; @@ -309,6 +310,7 @@ void tryEmit(U value, InnerSubscriber inner) { public void onError(Throwable t) { // safeguard against misbehaving sources if (done) { + RxJavaPlugins.onError(t); return; } getErrorQueue().offer(t); diff --git a/src/main/java/io/reactivex/internal/operators/flowable/FlowableOnBackpressureDrop.java b/src/main/java/io/reactivex/internal/operators/flowable/FlowableOnBackpressureDrop.java index 4878aa2fc3..064cd060cb 100644 --- a/src/main/java/io/reactivex/internal/operators/flowable/FlowableOnBackpressureDrop.java +++ b/src/main/java/io/reactivex/internal/operators/flowable/FlowableOnBackpressureDrop.java @@ -13,6 +13,7 @@ package io.reactivex.internal.operators.flowable; +import io.reactivex.plugins.RxJavaPlugins; import java.util.concurrent.atomic.AtomicLong; import org.reactivestreams.*; @@ -97,6 +98,7 @@ public void onNext(T t) { @Override public void onError(Throwable t) { if (done) { + RxJavaPlugins.onError(t); return; } done = true; diff --git a/src/main/java/io/reactivex/internal/operators/flowable/FlowableSingle.java b/src/main/java/io/reactivex/internal/operators/flowable/FlowableSingle.java index ecf6bf0a7e..08c4679181 100644 --- a/src/main/java/io/reactivex/internal/operators/flowable/FlowableSingle.java +++ b/src/main/java/io/reactivex/internal/operators/flowable/FlowableSingle.java @@ -13,6 +13,7 @@ package io.reactivex.internal.operators.flowable; +import io.reactivex.plugins.RxJavaPlugins; import org.reactivestreams.*; import io.reactivex.internal.subscriptions.*; @@ -73,6 +74,7 @@ public void onNext(T t) { @Override public void onError(Throwable t) { if (done) { + RxJavaPlugins.onError(t); return; } done = true; diff --git a/src/main/java/io/reactivex/internal/operators/flowable/FlowableSingleMaybe.java b/src/main/java/io/reactivex/internal/operators/flowable/FlowableSingleMaybe.java index 0584e0371d..7a8957d5f2 100644 --- a/src/main/java/io/reactivex/internal/operators/flowable/FlowableSingleMaybe.java +++ b/src/main/java/io/reactivex/internal/operators/flowable/FlowableSingleMaybe.java @@ -81,6 +81,7 @@ public void onNext(T t) { @Override public void onError(Throwable t) { if (done) { + RxJavaPlugins.onError(t); return; } done = true; diff --git a/src/main/java/io/reactivex/internal/operators/flowable/FlowableSingleSingle.java b/src/main/java/io/reactivex/internal/operators/flowable/FlowableSingleSingle.java index b0b0ad0202..6a61e17ae0 100644 --- a/src/main/java/io/reactivex/internal/operators/flowable/FlowableSingleSingle.java +++ b/src/main/java/io/reactivex/internal/operators/flowable/FlowableSingleSingle.java @@ -87,6 +87,7 @@ public void onNext(T t) { @Override public void onError(Throwable t) { if (done) { + RxJavaPlugins.onError(t); return; } done = true; diff --git a/src/main/java/io/reactivex/internal/operators/flowable/FlowableTakeWhile.java b/src/main/java/io/reactivex/internal/operators/flowable/FlowableTakeWhile.java index e56b5f808f..4765a08c86 100644 --- a/src/main/java/io/reactivex/internal/operators/flowable/FlowableTakeWhile.java +++ b/src/main/java/io/reactivex/internal/operators/flowable/FlowableTakeWhile.java @@ -13,6 +13,7 @@ package io.reactivex.internal.operators.flowable; +import io.reactivex.plugins.RxJavaPlugins; import org.reactivestreams.*; import io.reactivex.exceptions.Exceptions; @@ -80,6 +81,7 @@ public void onNext(T t) { @Override public void onError(Throwable t) { if (done) { + RxJavaPlugins.onError(t); return; } done = true; diff --git a/src/main/java/io/reactivex/internal/operators/observable/ObservableAny.java b/src/main/java/io/reactivex/internal/operators/observable/ObservableAny.java index 3db708d5f5..3ed0f94480 100644 --- a/src/main/java/io/reactivex/internal/operators/observable/ObservableAny.java +++ b/src/main/java/io/reactivex/internal/operators/observable/ObservableAny.java @@ -17,6 +17,7 @@ import io.reactivex.exceptions.Exceptions; import io.reactivex.functions.Predicate; import io.reactivex.internal.disposables.DisposableHelper; +import io.reactivex.plugins.RxJavaPlugins; public final class ObservableAny extends AbstractObservableWithUpstream { final Predicate predicate; @@ -75,10 +76,13 @@ public void onNext(T t) { @Override public void onError(Throwable t) { - if (!done) { - done = true; - actual.onError(t); + if (done) { + RxJavaPlugins.onError(t); + return; } + + done = true; + actual.onError(t); } @Override diff --git a/src/main/java/io/reactivex/internal/operators/observable/ObservableAnySingle.java b/src/main/java/io/reactivex/internal/operators/observable/ObservableAnySingle.java index 337a573d76..3bc22e6b0c 100644 --- a/src/main/java/io/reactivex/internal/operators/observable/ObservableAnySingle.java +++ b/src/main/java/io/reactivex/internal/operators/observable/ObservableAnySingle.java @@ -84,10 +84,13 @@ public void onNext(T t) { @Override public void onError(Throwable t) { - if (!done) { - done = true; - actual.onError(t); + if (done) { + RxJavaPlugins.onError(t); + return; } + + done = true; + actual.onError(t); } @Override diff --git a/src/main/java/io/reactivex/internal/operators/observable/ObservableElementAt.java b/src/main/java/io/reactivex/internal/operators/observable/ObservableElementAt.java index fb7a1f5061..ac4152744d 100644 --- a/src/main/java/io/reactivex/internal/operators/observable/ObservableElementAt.java +++ b/src/main/java/io/reactivex/internal/operators/observable/ObservableElementAt.java @@ -16,6 +16,7 @@ import io.reactivex.*; import io.reactivex.disposables.Disposable; import io.reactivex.internal.disposables.DisposableHelper; +import io.reactivex.plugins.RxJavaPlugins; public final class ObservableElementAt extends AbstractObservableWithUpstream { final long index; @@ -86,6 +87,7 @@ public void onNext(T t) { @Override public void onError(Throwable t) { if (done) { + RxJavaPlugins.onError(t); return; } done = true; diff --git a/src/main/java/io/reactivex/internal/operators/observable/ObservableElementAtMaybe.java b/src/main/java/io/reactivex/internal/operators/observable/ObservableElementAtMaybe.java index 81a50d6416..1eb6063445 100644 --- a/src/main/java/io/reactivex/internal/operators/observable/ObservableElementAtMaybe.java +++ b/src/main/java/io/reactivex/internal/operators/observable/ObservableElementAtMaybe.java @@ -89,6 +89,7 @@ public void onNext(T t) { @Override public void onError(Throwable t) { if (done) { + RxJavaPlugins.onError(t); return; } done = true; diff --git a/src/main/java/io/reactivex/internal/operators/observable/ObservableElementAtSingle.java b/src/main/java/io/reactivex/internal/operators/observable/ObservableElementAtSingle.java index 5edf4c343b..cf7770cc1c 100644 --- a/src/main/java/io/reactivex/internal/operators/observable/ObservableElementAtSingle.java +++ b/src/main/java/io/reactivex/internal/operators/observable/ObservableElementAtSingle.java @@ -16,6 +16,7 @@ import io.reactivex.*; import io.reactivex.disposables.Disposable; import io.reactivex.internal.disposables.DisposableHelper; +import io.reactivex.plugins.RxJavaPlugins; public final class ObservableElementAtSingle extends Single { final ObservableSource source; @@ -86,6 +87,7 @@ public void onNext(T t) { @Override public void onError(Throwable t) { if (done) { + RxJavaPlugins.onError(t); return; } done = true; diff --git a/src/main/java/io/reactivex/internal/operators/observable/ObservableFlatMap.java b/src/main/java/io/reactivex/internal/operators/observable/ObservableFlatMap.java index 0832dd9391..2eb224bc51 100644 --- a/src/main/java/io/reactivex/internal/operators/observable/ObservableFlatMap.java +++ b/src/main/java/io/reactivex/internal/operators/observable/ObservableFlatMap.java @@ -13,6 +13,7 @@ package io.reactivex.internal.operators.observable; +import io.reactivex.plugins.RxJavaPlugins; import java.util.*; import java.util.concurrent.Callable; import java.util.concurrent.atomic.*; @@ -277,8 +278,8 @@ void tryEmit(U value, InnerObserver inner) { @Override public void onError(Throwable t) { - // safeguard against misbehaving sources if (done) { + RxJavaPlugins.onError(t); return; } getErrorQueue().offer(t); @@ -288,7 +289,6 @@ public void onError(Throwable t) { @Override public void onComplete() { - // safeguard against misbehaving sources if (done) { return; } diff --git a/src/main/java/io/reactivex/internal/operators/observable/ObservableSingle.java b/src/main/java/io/reactivex/internal/operators/observable/ObservableSingle.java index dc855daaea..993051f6d5 100644 --- a/src/main/java/io/reactivex/internal/operators/observable/ObservableSingle.java +++ b/src/main/java/io/reactivex/internal/operators/observable/ObservableSingle.java @@ -16,6 +16,7 @@ import io.reactivex.*; import io.reactivex.disposables.Disposable; import io.reactivex.internal.disposables.DisposableHelper; +import io.reactivex.plugins.RxJavaPlugins; public final class ObservableSingle extends AbstractObservableWithUpstream { @@ -82,6 +83,7 @@ public void onNext(T t) { @Override public void onError(Throwable t) { if (done) { + RxJavaPlugins.onError(t); return; } done = true; diff --git a/src/main/java/io/reactivex/internal/operators/observable/ObservableSingleMaybe.java b/src/main/java/io/reactivex/internal/operators/observable/ObservableSingleMaybe.java index 6373ff3c8c..e72e866a8c 100644 --- a/src/main/java/io/reactivex/internal/operators/observable/ObservableSingleMaybe.java +++ b/src/main/java/io/reactivex/internal/operators/observable/ObservableSingleMaybe.java @@ -16,6 +16,7 @@ import io.reactivex.*; import io.reactivex.disposables.Disposable; import io.reactivex.internal.disposables.DisposableHelper; +import io.reactivex.plugins.RxJavaPlugins; public final class ObservableSingleMaybe extends Maybe { @@ -79,6 +80,7 @@ public void onNext(T t) { @Override public void onError(Throwable t) { if (done) { + RxJavaPlugins.onError(t); return; } done = true; diff --git a/src/main/java/io/reactivex/internal/operators/observable/ObservableSingleSingle.java b/src/main/java/io/reactivex/internal/operators/observable/ObservableSingleSingle.java index 9dc3665059..c05b8354d6 100644 --- a/src/main/java/io/reactivex/internal/operators/observable/ObservableSingleSingle.java +++ b/src/main/java/io/reactivex/internal/operators/observable/ObservableSingleSingle.java @@ -16,6 +16,7 @@ import io.reactivex.*; import io.reactivex.disposables.Disposable; import io.reactivex.internal.disposables.DisposableHelper; +import io.reactivex.plugins.RxJavaPlugins; public final class ObservableSingleSingle extends Single { @@ -85,6 +86,7 @@ public void onNext(T t) { @Override public void onError(Throwable t) { if (done) { + RxJavaPlugins.onError(t); return; } done = true; diff --git a/src/main/java/io/reactivex/internal/operators/observable/ObservableTake.java b/src/main/java/io/reactivex/internal/operators/observable/ObservableTake.java index 5a480119f8..ed4f5ebb51 100644 --- a/src/main/java/io/reactivex/internal/operators/observable/ObservableTake.java +++ b/src/main/java/io/reactivex/internal/operators/observable/ObservableTake.java @@ -16,6 +16,7 @@ import io.reactivex.*; import io.reactivex.disposables.Disposable; import io.reactivex.internal.disposables.*; +import io.reactivex.plugins.RxJavaPlugins; public final class ObservableTake extends AbstractObservableWithUpstream { final long limit; @@ -66,11 +67,14 @@ public void onNext(T t) { } @Override public void onError(Throwable t) { - if (!done) { - done = true; - subscription.dispose(); - actual.onError(t); + if (done) { + RxJavaPlugins.onError(t); + return; } + + done = true; + subscription.dispose(); + actual.onError(t); } @Override public void onComplete() { diff --git a/src/main/java/io/reactivex/internal/operators/observable/ObservableTakeWhile.java b/src/main/java/io/reactivex/internal/operators/observable/ObservableTakeWhile.java index 336df31267..76a9b54c0f 100644 --- a/src/main/java/io/reactivex/internal/operators/observable/ObservableTakeWhile.java +++ b/src/main/java/io/reactivex/internal/operators/observable/ObservableTakeWhile.java @@ -18,6 +18,7 @@ import io.reactivex.exceptions.Exceptions; import io.reactivex.functions.Predicate; import io.reactivex.internal.disposables.DisposableHelper; +import io.reactivex.plugins.RxJavaPlugins; public final class ObservableTakeWhile extends AbstractObservableWithUpstream { final Predicate predicate; @@ -92,6 +93,7 @@ public void onNext(T t) { @Override public void onError(Throwable t) { if (done) { + RxJavaPlugins.onError(t); return; } done = true; diff --git a/src/main/java/io/reactivex/observers/SafeObserver.java b/src/main/java/io/reactivex/observers/SafeObserver.java index 9ba9b4172c..592b1f5e79 100644 --- a/src/main/java/io/reactivex/observers/SafeObserver.java +++ b/src/main/java/io/reactivex/observers/SafeObserver.java @@ -136,6 +136,7 @@ void onNextNoSubscription() { @Override public void onError(Throwable t) { if (done) { + RxJavaPlugins.onError(t); return; } done = true; diff --git a/src/main/java/io/reactivex/subjects/UnicastSubject.java b/src/main/java/io/reactivex/subjects/UnicastSubject.java index 33e9e74a19..f5570772bb 100644 --- a/src/main/java/io/reactivex/subjects/UnicastSubject.java +++ b/src/main/java/io/reactivex/subjects/UnicastSubject.java @@ -13,6 +13,7 @@ package io.reactivex.subjects; +import io.reactivex.plugins.RxJavaPlugins; import java.util.concurrent.atomic.*; import io.reactivex.Observer; @@ -193,6 +194,7 @@ public void onNext(T t) { @Override public void onError(Throwable t) { if (done || disposed) { + RxJavaPlugins.onError(t); return; } if (t == null) { diff --git a/src/main/java/io/reactivex/subscribers/SafeSubscriber.java b/src/main/java/io/reactivex/subscribers/SafeSubscriber.java index ae8fdf10c1..a08e39a911 100644 --- a/src/main/java/io/reactivex/subscribers/SafeSubscriber.java +++ b/src/main/java/io/reactivex/subscribers/SafeSubscriber.java @@ -124,6 +124,7 @@ void onNextNoSubscription() { @Override public void onError(Throwable t) { if (done) { + RxJavaPlugins.onError(t); return; } done = true; diff --git a/src/test/java/io/reactivex/internal/operators/observable/ObservableSingleTest.java b/src/test/java/io/reactivex/internal/operators/observable/ObservableSingleTest.java index 396ff37c95..ef4d561868 100644 --- a/src/test/java/io/reactivex/internal/operators/observable/ObservableSingleTest.java +++ b/src/test/java/io/reactivex/internal/operators/observable/ObservableSingleTest.java @@ -13,6 +13,7 @@ package io.reactivex.internal.operators.observable; +import static org.junit.Assert.assertSame; import static org.junit.Assert.assertEquals; import static org.mockito.Matchers.isA; import static org.mockito.Mockito.*; @@ -20,8 +21,11 @@ import org.junit.Test; import org.mockito.InOrder; +import java.util.concurrent.atomic.AtomicReference; + import io.reactivex.*; import io.reactivex.functions.*; +import io.reactivex.plugins.RxJavaPlugins; public class ObservableSingleTest { @@ -455,4 +459,29 @@ public Integer apply(Integer i1, Integer i2) { Integer r = reduced.blockingGet(); assertEquals(21, r.intValue()); } -} \ No newline at end of file + + @Test + public void singleElementOperatorDoNotSwallowExceptionWhenDone() { + final Throwable exception = new RuntimeException("some error"); + final AtomicReference error = new AtomicReference(); + + try { + RxJavaPlugins.setErrorHandler(new Consumer() { + @Override public void accept(final Throwable throwable) throws Exception { + error.set(throwable); + } + }); + + Observable.unsafeCreate(new ObservableSource() { + @Override public void subscribe(final Observer observer) { + observer.onComplete(); + observer.onError(exception); + } + }).singleElement().test().assertComplete(); + + assertSame(exception, error.get()); + } finally { + RxJavaPlugins.reset(); + } + } +}