Skip to content

Commit

Permalink
Merge pull request #2770 from davidmoten/onBackpressureDrop-request-o…
Browse files Browse the repository at this point in the history
…verflow

OperatorOnBackpressureDrop request overflow check
  • Loading branch information
akarnokd committed Feb 24, 2015
2 parents 3a6ce5a + 9edfdac commit 2605ede
Show file tree
Hide file tree
Showing 2 changed files with 51 additions and 3 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ public Subscriber<? super T> call(final Subscriber<? super T> child) {

@Override
public void request(long n) {
requested.getAndAdd(n);
BackpressureUtils.getAndAddRequest(requested, n);
}

});
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,9 @@

import static org.junit.Assert.assertEquals;

import java.util.concurrent.CountDownLatch;
import java.util.concurrent.atomic.AtomicInteger;

import org.junit.Test;

import rx.Observable;
Expand All @@ -27,8 +30,6 @@
import rx.observers.TestSubscriber;
import rx.schedulers.Schedulers;

import java.util.concurrent.CountDownLatch;

public class OperatorOnBackpressureDropTest {

@Test
Expand Down Expand Up @@ -87,6 +88,35 @@ public void onNext(Long t) {
ts.assertNoErrors();
assertEquals(0, ts.getOnNextEvents().get(0).intValue());
}

@Test
public void testRequestOverflow() throws InterruptedException {
final AtomicInteger count = new AtomicInteger();
int n = 10;
range(n).onBackpressureDrop().subscribe(new Subscriber<Long>() {

@Override
public void onStart() {
request(10);
}

@Override
public void onCompleted() {
}

@Override
public void onError(Throwable e) {
throw new RuntimeException(e);
}

@Override
public void onNext(Long t) {
count.incrementAndGet();
//cause overflow of requested if not handled properly in onBackpressureDrop operator
request(Long.MAX_VALUE-1);
}});
assertEquals(n, count.get());
}

static final Observable<Long> infinite = Observable.create(new OnSubscribe<Long>() {

Expand All @@ -99,4 +129,22 @@ public void call(Subscriber<? super Long> s) {
}

});

private static final Observable<Long> range(final long n) {
return Observable.create(new OnSubscribe<Long>() {

@Override
public void call(Subscriber<? super Long> s) {
for (long i=0;i < n;i++) {
if (s.isUnsubscribed()) {
break;
}
s.onNext(i);
}
s.onCompleted();
}

});
}

}

0 comments on commit 2605ede

Please sign in to comment.