diff --git a/src/main/java/rx/internal/operators/OperatorOnBackpressureDrop.java b/src/main/java/rx/internal/operators/OperatorOnBackpressureDrop.java index a9a8def2d4..dee334bb4d 100644 --- a/src/main/java/rx/internal/operators/OperatorOnBackpressureDrop.java +++ b/src/main/java/rx/internal/operators/OperatorOnBackpressureDrop.java @@ -20,6 +20,7 @@ import rx.Observable.Operator; import rx.Producer; import rx.Subscriber; +import rx.exceptions.Exceptions; import rx.functions.Action1; public class OperatorOnBackpressureDrop implements Operator { @@ -84,7 +85,12 @@ public void onNext(T t) { } else { // item dropped if(onDrop != null) { - onDrop.call(t); + try { + onDrop.call(t); + } catch (Throwable e) { + Exceptions.throwOrReport(e, child, t); + return; + } } } } diff --git a/src/test/java/rx/internal/operators/OperatorOnBackpressureDropTest.java b/src/test/java/rx/internal/operators/OperatorOnBackpressureDropTest.java index b61f000704..1489e0c5ae 100644 --- a/src/test/java/rx/internal/operators/OperatorOnBackpressureDropTest.java +++ b/src/test/java/rx/internal/operators/OperatorOnBackpressureDropTest.java @@ -16,8 +16,10 @@ package rx.internal.operators; import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; import java.util.concurrent.CountDownLatch; +import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; import org.junit.Test; @@ -26,6 +28,8 @@ import rx.Observable.OnSubscribe; import rx.Observer; import rx.Subscriber; +import rx.functions.Action0; +import rx.functions.Action1; import rx.internal.util.RxRingBuffer; import rx.observers.TestSubscriber; import rx.schedulers.Schedulers; @@ -117,6 +121,33 @@ public void onNext(Long t) { }}); assertEquals(n, count.get()); } + + @Test + public void testNonFatalExceptionFromOverflowActionIsNotReportedFromUpstreamOperator() { + final AtomicBoolean errorOccurred = new AtomicBoolean(false); + //request 0 + TestSubscriber ts = TestSubscriber.create(0); + //range method emits regardless of requests so should trigger onBackpressureDrop action + range(2) + // if haven't caught exception in onBackpressureDrop operator then would incorrectly + // be picked up by this call to doOnError + .doOnError(new Action1() { + @Override + public void call(Throwable t) { + errorOccurred.set(true); + } + }) + .onBackpressureDrop(THROW_NON_FATAL) + .subscribe(ts); + assertFalse(errorOccurred.get()); + } + + private static final Action1 THROW_NON_FATAL = new Action1() { + @Override + public void call(Long n) { + throw new RuntimeException(); + } + }; static final Observable infinite = Observable.create(new OnSubscribe() {