Skip to content

Commit

Permalink
Merge pull request #2987 from davidmoten/skip-bug
Browse files Browse the repository at this point in the history
fix skip() race condition and request overflow
  • Loading branch information
akarnokd committed May 28, 2015
2 parents 6bb5539 + d8f9e86 commit ae09b86
Show file tree
Hide file tree
Showing 2 changed files with 43 additions and 14 deletions.
17 changes: 4 additions & 13 deletions src/main/java/rx/internal/operators/OperatorSkip.java
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,8 @@
*/
package rx.internal.operators;

import java.util.concurrent.atomic.AtomicBoolean;

import rx.Observable;
import rx.Producer;
import rx.Subscriber;
Expand Down Expand Up @@ -63,19 +65,8 @@ public void onNext(T t) {

@Override
public void setProducer(final Producer producer) {
child.setProducer(new Producer() {

@Override
public void request(long n) {
if (n == Long.MAX_VALUE) {
// infinite so leave it alone
producer.request(n);
} else if (n > 0) {
// add the skip num to the requested amount, since we'll skip everything and then emit to the buffer downstream
producer.request(n + (toSkip - skipped));
}
}
});
child.setProducer(producer);
producer.request(toSkip);
}

};
Expand Down
40 changes: 39 additions & 1 deletion src/test/java/rx/internal/operators/OperatorSkipTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -15,17 +15,23 @@
*/
package rx.internal.operators;

import static org.junit.Assert.assertEquals;
import static org.mockito.Matchers.any;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.never;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;

import java.util.Arrays;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;

import org.junit.Test;

import rx.Observable;
import rx.Observer;
import rx.internal.operators.OperatorSkip;
import rx.functions.Action1;
import rx.observers.TestSubscriber;

public class OperatorSkipTest {

Expand Down Expand Up @@ -144,4 +150,36 @@ public void testSkipError() {
verify(observer, never()).onCompleted();

}

@Test
public void testBackpressureMultipleSmallAsyncRequests() throws InterruptedException {
final AtomicLong requests = new AtomicLong(0);
TestSubscriber<Long> ts = new TestSubscriber<Long>(0);
Observable.interval(100, TimeUnit.MILLISECONDS)
.doOnRequest(new Action1<Long>() {
@Override
public void call(Long n) {
requests.addAndGet(n);
}
}).skip(4).subscribe(ts);
Thread.sleep(100);
ts.requestMore(1);
ts.requestMore(1);
Thread.sleep(100);
ts.unsubscribe();
ts.assertUnsubscribed();
ts.assertNoErrors();
assertEquals(6, requests.get());
}

@Test
public void testRequestOverflowDoesNotOccur() {
TestSubscriber<Integer> ts = new TestSubscriber<Integer>(Long.MAX_VALUE-1);
Observable.range(1, 10).skip(5).subscribe(ts);
ts.assertTerminalEvent();
ts.assertCompleted();
ts.assertNoErrors();
assertEquals(Arrays.asList(6,7,8,9,10), ts.getOnNextEvents());
}

}

0 comments on commit ae09b86

Please sign in to comment.