diff --git a/src/main/java/rx/Observable.java b/src/main/java/rx/Observable.java index a09da42012..01e14e0e12 100644 --- a/src/main/java/rx/Observable.java +++ b/src/main/java/rx/Observable.java @@ -5263,6 +5263,27 @@ public final Observable onBackpressureBuffer(long capacity, Action0 onOverflo return lift(new OperatorOnBackpressureBuffer(capacity, onOverflow)); } + /** + * Instructs an Observable that is emitting items faster than its observer can consume them to discard, + * rather than emit, those items that its observer is not prepared to observe. + *

+ * + *

+ * If the downstream request count hits 0 then the Observable will refrain from calling {@code onNext} until + * the observer invokes {@code request(n)} again to increase the request count. + *

+ *
Scheduler:
+ *
{@code onBackpressureDrop} does not operate by default on a particular {@link Scheduler}.
+ *
+ * + * @param onDrop the action to invoke for each item dropped. onDrop action should be fast and should never block. + * @return the source Observable modified to drop {@code onNext} notifications on overflow + * @see ReactiveX operators documentation: backpressure operators + */ + public final Observable onBackpressureDrop(Action1 onDrop) { + return lift(new OperatorOnBackpressureDrop(onDrop)); + } + /** * Instructs an Observable that is emitting items faster than its observer can consume them to discard, * rather than emit, those items that its observer is not prepared to observe. diff --git a/src/main/java/rx/internal/operators/OperatorOnBackpressureDrop.java b/src/main/java/rx/internal/operators/OperatorOnBackpressureDrop.java index 5e81162821..fee6289a4b 100644 --- a/src/main/java/rx/internal/operators/OperatorOnBackpressureDrop.java +++ b/src/main/java/rx/internal/operators/OperatorOnBackpressureDrop.java @@ -20,13 +20,16 @@ import rx.Observable.Operator; import rx.Producer; import rx.Subscriber; +import rx.functions.Action1; public class OperatorOnBackpressureDrop implements Operator { + /** Lazy initialization via inner-class holder. */ private static final class Holder { /** A singleton instance. */ static final OperatorOnBackpressureDrop INSTANCE = new OperatorOnBackpressureDrop(); } + /** * @return a singleton instance of this stateless operator. */ @@ -34,7 +37,17 @@ private static final class Holder { public static OperatorOnBackpressureDrop instance() { return (OperatorOnBackpressureDrop)Holder.INSTANCE; } - private OperatorOnBackpressureDrop() { } + + private final Action1 onDrop; + + private OperatorOnBackpressureDrop() { + this(null); + } + + public OperatorOnBackpressureDrop(Action1 onDrop) { + this.onDrop = onDrop; + } + @Override public Subscriber call(final Subscriber child) { final AtomicLong requested = new AtomicLong(); @@ -68,6 +81,11 @@ public void onNext(T t) { if (requested.get() > 0) { child.onNext(t); requested.decrementAndGet(); + } else { + // item dropped + if(onDrop != null) { + onDrop.call(t); + } } } diff --git a/src/test/java/rx/BackpressureTests.java b/src/test/java/rx/BackpressureTests.java index f54b8b67d5..799564d50d 100644 --- a/src/test/java/rx/BackpressureTests.java +++ b/src/test/java/rx/BackpressureTests.java @@ -23,6 +23,7 @@ import org.junit.*; +import org.junit.rules.TestName; import rx.Observable.OnSubscribe; import rx.exceptions.MissingBackpressureException; import rx.functions.*; @@ -33,6 +34,9 @@ public class BackpressureTests { + @Rule + public TestName testName = new TestName(); + @After public void doAfterTest() { TestObstructionDetection.checkObstruction(); @@ -424,18 +428,56 @@ public void testOnBackpressureDrop() { .map(SLOW_PASS_THRU).take(NUM).subscribe(ts); ts.awaitTerminalEvent(); ts.assertNoErrors(); - - + List onNextEvents = ts.getOnNextEvents(); assertEquals(NUM, onNextEvents.size()); Integer lastEvent = onNextEvents.get(NUM - 1); - + System.out.println("testOnBackpressureDrop => Received: " + onNextEvents.size() + " Emitted: " + c.get() + " Last value: " + lastEvent); // it drop, so we should get some number far higher than what would have sequentially incremented assertTrue(NUM - 1 <= lastEvent.intValue()); } } + + @Test(timeout = 10000) + public void testOnBackpressureDropWithAction() { + for (int i = 0; i < 100; i++) { + final AtomicInteger emitCount = new AtomicInteger(); + final AtomicInteger dropCount = new AtomicInteger(); + final AtomicInteger passCount = new AtomicInteger(); + final int NUM = (int) (RxRingBuffer.SIZE * 1.5); // > 1 so that take doesn't prevent buffer overflow + TestSubscriber ts = new TestSubscriber(); + firehose(emitCount).onBackpressureDrop(new Action1() { + @Override + public void call(Integer i) { + dropCount.incrementAndGet(); + } + }) + .doOnNext(new Action1() { + @Override + public void call(Integer integer) { + passCount.incrementAndGet(); + } + }) + .observeOn(Schedulers.computation()) + .map(SLOW_PASS_THRU).take(NUM).subscribe(ts); + ts.awaitTerminalEvent(); + ts.assertNoErrors(); + + List onNextEvents = ts.getOnNextEvents(); + Integer lastEvent = onNextEvents.get(NUM - 1); + System.out.println(testName.getMethodName() + " => Received: " + onNextEvents.size() + " Passed: " + passCount.get() + " Dropped: " + dropCount.get() + " Emitted: " + emitCount.get() + " Last value: " + lastEvent); + assertEquals(NUM, onNextEvents.size()); + // in reality, NUM < passCount + assertTrue(NUM <= passCount.get()); + // it drop, so we should get some number far higher than what would have sequentially incremented + assertTrue(NUM - 1 <= lastEvent.intValue()); + assertTrue(0 < dropCount.get()); + assertEquals(emitCount.get(), passCount.get() + dropCount.get()); + } + } + @Test(timeout = 10000) public void testOnBackpressureDropSynchronous() { for (int i = 0; i < 100; i++) { @@ -446,18 +488,49 @@ public void testOnBackpressureDropSynchronous() { .map(SLOW_PASS_THRU).take(NUM).subscribe(ts); ts.awaitTerminalEvent(); ts.assertNoErrors(); - + List onNextEvents = ts.getOnNextEvents(); assertEquals(NUM, onNextEvents.size()); Integer lastEvent = onNextEvents.get(NUM - 1); - + System.out.println("testOnBackpressureDrop => Received: " + onNextEvents.size() + " Emitted: " + c.get() + " Last value: " + lastEvent); // it drop, so we should get some number far higher than what would have sequentially incremented assertTrue(NUM - 1 <= lastEvent.intValue()); } } + @Test(timeout = 10000) + public void testOnBackpressureDropSynchronousWithAction() { + for (int i = 0; i < 100; i++) { + final AtomicInteger dropCount = new AtomicInteger(); + int NUM = (int) (RxRingBuffer.SIZE * 1.1); // > 1 so that take doesn't prevent buffer overflow + AtomicInteger c = new AtomicInteger(); + TestSubscriber ts = new TestSubscriber(); + firehose(c).onBackpressureDrop(new Action1() { + @Override + public void call(Integer i) { + dropCount.incrementAndGet(); + } + }) + .map(SLOW_PASS_THRU).take(NUM).subscribe(ts); + ts.awaitTerminalEvent(); + ts.assertNoErrors(); + + List onNextEvents = ts.getOnNextEvents(); + assertEquals(NUM, onNextEvents.size()); + + Integer lastEvent = onNextEvents.get(NUM - 1); + + System.out.println("testOnBackpressureDrop => Received: " + onNextEvents.size() + " Dropped: " + dropCount.get() + " Emitted: " + c.get() + " Last value: " + lastEvent); + // it drop, so we should get some number far higher than what would have sequentially incremented + assertTrue(NUM - 1 <= lastEvent.intValue()); + // no drop in synchronous mode + assertEquals(0, dropCount.get()); + assertEquals(c.get(), onNextEvents.size()); + } + } + @Test(timeout = 2000) public void testOnBackpressureBuffer() { int NUM = (int) (RxRingBuffer.SIZE * 1.1); // > 1 so that take doesn't prevent buffer overflow