Skip to content

Commit

Permalink
Merge pull request #2904 from davidmoten/take-last-queue-producer-req…
Browse files Browse the repository at this point in the history
…uest-overflow

TakeLast - add request overflow check
  • Loading branch information
akarnokd committed Apr 22, 2015
2 parents 7147a8d + 01ceedc commit 51e03cc
Show file tree
Hide file tree
Showing 2 changed files with 31 additions and 1 deletion.
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ public void request(long n) {
if (n == Long.MAX_VALUE) {
_c = REQUESTED_UPDATER.getAndSet(this, Long.MAX_VALUE);
} else {
_c = REQUESTED_UPDATER.getAndAdd(this, n);
_c = BackpressureUtils.getAndAddRequest(REQUESTED_UPDATER, this, n);
}
if (!emittingStarted) {
// we haven't started yet, so record what was requested and return
Expand Down
30 changes: 30 additions & 0 deletions src/test/java/rx/internal/operators/OperatorTakeLastTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,9 @@
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;

import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.concurrent.atomic.AtomicInteger;

import org.junit.Test;
Expand Down Expand Up @@ -293,4 +295,32 @@ public void onNext(Integer integer) {
});
assertEquals(1,count.get());
}

@Test(timeout=10000)
public void testRequestOverflow() {
final List<Integer> list = new ArrayList<Integer>();
Observable.range(1, 100).takeLast(50).subscribe(new Subscriber<Integer>() {

@Override
public void onStart() {
request(2);
}

@Override
public void onCompleted() {

}

@Override
public void onError(Throwable e) {

}

@Override
public void onNext(Integer t) {
list.add(t);
request(Long.MAX_VALUE-1);
}});
assertEquals(50, list.size());
}
}

0 comments on commit 51e03cc

Please sign in to comment.