Skip to content

Commit

Permalink
Merge pull request #3639 from davidmoten/fix-onBackpressureBuffer-err…
Browse files Browse the repository at this point in the history
…or-handling

fix error handling in onBackpressureBuffer
  • Loading branch information
akarnokd committed Jan 25, 2016
2 parents 05a4dee + 25e7b2c commit b7b71d1
Show file tree
Hide file tree
Showing 2 changed files with 43 additions and 4 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import rx.Observable.Operator;
import rx.Producer;
import rx.Subscriber;
import rx.exceptions.Exceptions;
import rx.exceptions.MissingBackpressureException;
import rx.functions.Action0;
import rx.internal.util.BackpressureDrainManager;
Expand Down Expand Up @@ -156,7 +157,15 @@ private boolean assertCapacity() {
"Overflowed buffer of "
+ baseCapacity));
if (onOverflow != null) {
onOverflow.call();
try {
onOverflow.call();
} catch (Throwable e) {
Exceptions.throwIfFatal(e);
manager.terminateAndDrain(e);
// this line not strictly necessary but nice for clarity
// and in case of future changes to code after this catch block
return false;
}
}
}
return false;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,10 +15,17 @@
*/
package rx.internal.operators;

import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue;

import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;

import org.junit.Test;
import org.mockito.Mock;
import org.mockito.Mockito;

import rx.Observable;
import rx.Observable.OnSubscribe;
Expand All @@ -27,12 +34,10 @@
import rx.Subscription;
import rx.exceptions.MissingBackpressureException;
import rx.functions.Action0;
import rx.functions.Action1;
import rx.observers.TestSubscriber;
import rx.schedulers.Schedulers;

import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;

public class OperatorOnBackpressureBufferTest {

@Test
Expand Down Expand Up @@ -147,5 +152,30 @@ public void call(Subscriber<? super Long> s) {
}

});

private static final Action0 THROWS_NON_FATAL = new Action0() {

@Override
public void call() {
throw new RuntimeException();
}};

@Test
public void testNonFatalExceptionThrownByOnOverflowIsNotReportedByUpstream() {
final AtomicBoolean errorOccurred = new AtomicBoolean(false);
TestSubscriber<Long> ts = TestSubscriber.create(0);
infinite
.subscribeOn(Schedulers.computation())
.doOnError(new Action1<Throwable>() {
@Override
public void call(Throwable t) {
errorOccurred.set(true);
}
})
.onBackpressureBuffer(1, THROWS_NON_FATAL)
.subscribe(ts);
ts.awaitTerminalEvent();
assertFalse(errorOccurred.get());
}

}

0 comments on commit b7b71d1

Please sign in to comment.