Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

issue-2764: add new operator onBackpressureDrop(Action1 onDrop) #2776

Merged
merged 1 commit into from
Mar 4, 2015
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
21 changes: 21 additions & 0 deletions src/main/java/rx/Observable.java
Original file line number Diff line number Diff line change
Expand Up @@ -5258,6 +5258,27 @@ public final Observable<T> onBackpressureBuffer(long capacity, Action0 onOverflo
return lift(new OperatorOnBackpressureBuffer<T>(capacity, onOverflow));
}

/**
* Instructs an Observable that is emitting items faster than its observer can consume them to discard,
* rather than emit, those items that its observer is not prepared to observe.
* <p>
* <img width="640" height="245" src="https://raw.github.com/wiki/ReactiveX/RxJava/images/rx-operators/bp.obp.drop.png" alt="">
* <p>
* If the downstream request count hits 0 then the Observable will refrain from calling {@code onNext} until
* the observer invokes {@code request(n)} again to increase the request count.
* <dl>
* <dt><b>Scheduler:</b></dt>
* <dd>{@code onBackpressureDrop} does not operate by default on a particular {@link Scheduler}.</dd>
* </dl>
*
* @param onDrop the action to invoke for each item dropped. onDrop action should be fast and should never block.
* @return the source Observable modified to drop {@code onNext} notifications on overflow
* @see <a href="http://reactivex.io/documentation/operators/backpressure.html">ReactiveX operators documentation: backpressure operators</a>
*/
public final Observable<T> onBackpressureDrop(Action1<? super T> onDrop) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please add @Experimental annotation to this method.

return lift(new OperatorOnBackpressureDrop<T>(onDrop));
}

/**
* Instructs an Observable that is emitting items faster than its observer can consume them to discard,
* rather than emit, those items that its observer is not prepared to observe.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,21 +20,34 @@
import rx.Observable.Operator;
import rx.Producer;
import rx.Subscriber;
import rx.functions.Action1;

public class OperatorOnBackpressureDrop<T> implements Operator<T, T> {

/** Lazy initialization via inner-class holder. */
private static final class Holder {
/** A singleton instance. */
static final OperatorOnBackpressureDrop<Object> INSTANCE = new OperatorOnBackpressureDrop<Object>();
}

/**
* @return a singleton instance of this stateless operator.
*/
@SuppressWarnings({ "unchecked" })
public static <T> OperatorOnBackpressureDrop<T> instance() {
return (OperatorOnBackpressureDrop<T>)Holder.INSTANCE;
}
private OperatorOnBackpressureDrop() { }

private final Action1<? super T> onDrop;

private OperatorOnBackpressureDrop() {
this(null);
}

public OperatorOnBackpressureDrop(Action1<? super T> onDrop) {
this.onDrop = onDrop;
}

@Override
public Subscriber<? super T> call(final Subscriber<? super T> child) {
final AtomicLong requested = new AtomicLong();
Expand Down Expand Up @@ -68,6 +81,11 @@ public void onNext(T t) {
if (requested.get() > 0) {
child.onNext(t);
requested.decrementAndGet();
} else {
// item dropped
if(onDrop != null) {
onDrop.call(t);
}
}
}

Expand Down
83 changes: 78 additions & 5 deletions src/test/java/rx/BackpressureTests.java
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@

import org.junit.*;

import org.junit.rules.TestName;
import rx.Observable.OnSubscribe;
import rx.exceptions.MissingBackpressureException;
import rx.functions.*;
Expand All @@ -33,6 +34,9 @@

public class BackpressureTests {

@Rule
public TestName testName = new TestName();

@After
public void doAfterTest() {
TestObstructionDetection.checkObstruction();
Expand Down Expand Up @@ -424,18 +428,56 @@ public void testOnBackpressureDrop() {
.map(SLOW_PASS_THRU).take(NUM).subscribe(ts);
ts.awaitTerminalEvent();
ts.assertNoErrors();



List<Integer> onNextEvents = ts.getOnNextEvents();
assertEquals(NUM, onNextEvents.size());

Integer lastEvent = onNextEvents.get(NUM - 1);

System.out.println("testOnBackpressureDrop => Received: " + onNextEvents.size() + " Emitted: " + c.get() + " Last value: " + lastEvent);
// it drop, so we should get some number far higher than what would have sequentially incremented
assertTrue(NUM - 1 <= lastEvent.intValue());
}
}

@Test(timeout = 10000)
public void testOnBackpressureDropWithAction() {
for (int i = 0; i < 100; i++) {
final AtomicInteger emitCount = new AtomicInteger();
final AtomicInteger dropCount = new AtomicInteger();
final AtomicInteger passCount = new AtomicInteger();
final int NUM = (int) (RxRingBuffer.SIZE * 1.5); // > 1 so that take doesn't prevent buffer overflow
TestSubscriber<Integer> ts = new TestSubscriber<Integer>();
firehose(emitCount).onBackpressureDrop(new Action1<Integer>() {
@Override
public void call(Integer i) {
dropCount.incrementAndGet();
}
})
.doOnNext(new Action1<Integer>() {
@Override
public void call(Integer integer) {
passCount.incrementAndGet();
}
})
.observeOn(Schedulers.computation())
.map(SLOW_PASS_THRU).take(NUM).subscribe(ts);
ts.awaitTerminalEvent();
ts.assertNoErrors();

List<Integer> onNextEvents = ts.getOnNextEvents();
Integer lastEvent = onNextEvents.get(NUM - 1);
System.out.println(testName.getMethodName() + " => Received: " + onNextEvents.size() + " Passed: " + passCount.get() + " Dropped: " + dropCount.get() + " Emitted: " + emitCount.get() + " Last value: " + lastEvent);
assertEquals(NUM, onNextEvents.size());
// in reality, NUM < passCount
assertTrue(NUM <= passCount.get());
// it drop, so we should get some number far higher than what would have sequentially incremented
assertTrue(NUM - 1 <= lastEvent.intValue());
assertTrue(0 < dropCount.get());
assertEquals(emitCount.get(), passCount.get() + dropCount.get());
}
}

@Test(timeout = 10000)
public void testOnBackpressureDropSynchronous() {
for (int i = 0; i < 100; i++) {
Expand All @@ -446,18 +488,49 @@ public void testOnBackpressureDropSynchronous() {
.map(SLOW_PASS_THRU).take(NUM).subscribe(ts);
ts.awaitTerminalEvent();
ts.assertNoErrors();

List<Integer> onNextEvents = ts.getOnNextEvents();
assertEquals(NUM, onNextEvents.size());

Integer lastEvent = onNextEvents.get(NUM - 1);

System.out.println("testOnBackpressureDrop => Received: " + onNextEvents.size() + " Emitted: " + c.get() + " Last value: " + lastEvent);
// it drop, so we should get some number far higher than what would have sequentially incremented
assertTrue(NUM - 1 <= lastEvent.intValue());
}
}

@Test(timeout = 10000)
public void testOnBackpressureDropSynchronousWithAction() {
for (int i = 0; i < 100; i++) {
final AtomicInteger dropCount = new AtomicInteger();
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

These changes should be in a separate test method. Leave the testOnBackpressureDropSynchronous as is and introduce testOnBackpressureDropWithActionSynchronous.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

will do

int NUM = (int) (RxRingBuffer.SIZE * 1.1); // > 1 so that take doesn't prevent buffer overflow
AtomicInteger c = new AtomicInteger();
TestSubscriber<Integer> ts = new TestSubscriber<Integer>();
firehose(c).onBackpressureDrop(new Action1<Integer>() {
@Override
public void call(Integer i) {
dropCount.incrementAndGet();
}
})
.map(SLOW_PASS_THRU).take(NUM).subscribe(ts);
ts.awaitTerminalEvent();
ts.assertNoErrors();

List<Integer> onNextEvents = ts.getOnNextEvents();
assertEquals(NUM, onNextEvents.size());

Integer lastEvent = onNextEvents.get(NUM - 1);

System.out.println("testOnBackpressureDrop => Received: " + onNextEvents.size() + " Dropped: " + dropCount.get() + " Emitted: " + c.get() + " Last value: " + lastEvent);
// it drop, so we should get some number far higher than what would have sequentially incremented
assertTrue(NUM - 1 <= lastEvent.intValue());
// no drop in synchronous mode
assertEquals(0, dropCount.get());
assertEquals(c.get(), onNextEvents.size());
}
}

@Test(timeout = 2000)
public void testOnBackpressureBuffer() {
int NUM = (int) (RxRingBuffer.SIZE * 1.1); // > 1 so that take doesn't prevent buffer overflow
Expand Down