From d8f9e8620cd4c52dce037bb205840a1c9853f5b6 Mon Sep 17 00:00:00 2001 From: Dave Moten Date: Thu, 28 May 2015 19:53:07 +1000 Subject: [PATCH] fix skip race conditions and request overflow --- .../rx/internal/operators/OperatorSkip.java | 17 ++------ .../internal/operators/OperatorSkipTest.java | 40 ++++++++++++++++++- 2 files changed, 43 insertions(+), 14 deletions(-) diff --git a/src/main/java/rx/internal/operators/OperatorSkip.java b/src/main/java/rx/internal/operators/OperatorSkip.java index 2598145e84..878898aaba 100644 --- a/src/main/java/rx/internal/operators/OperatorSkip.java +++ b/src/main/java/rx/internal/operators/OperatorSkip.java @@ -15,6 +15,8 @@ */ package rx.internal.operators; +import java.util.concurrent.atomic.AtomicBoolean; + import rx.Observable; import rx.Producer; import rx.Subscriber; @@ -63,19 +65,8 @@ public void onNext(T t) { @Override public void setProducer(final Producer producer) { - child.setProducer(new Producer() { - - @Override - public void request(long n) { - if (n == Long.MAX_VALUE) { - // infinite so leave it alone - producer.request(n); - } else if (n > 0) { - // add the skip num to the requested amount, since we'll skip everything and then emit to the buffer downstream - producer.request(n + (toSkip - skipped)); - } - } - }); + child.setProducer(producer); + producer.request(toSkip); } }; diff --git a/src/test/java/rx/internal/operators/OperatorSkipTest.java b/src/test/java/rx/internal/operators/OperatorSkipTest.java index 9f4b75d5a7..0e9ca9367e 100644 --- a/src/test/java/rx/internal/operators/OperatorSkipTest.java +++ b/src/test/java/rx/internal/operators/OperatorSkipTest.java @@ -15,17 +15,23 @@ */ package rx.internal.operators; +import static org.junit.Assert.assertEquals; import static org.mockito.Matchers.any; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.never; import static org.mockito.Mockito.times; import static org.mockito.Mockito.verify; +import java.util.Arrays; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicLong; + import org.junit.Test; import rx.Observable; import rx.Observer; -import rx.internal.operators.OperatorSkip; +import rx.functions.Action1; +import rx.observers.TestSubscriber; public class OperatorSkipTest { @@ -144,4 +150,36 @@ public void testSkipError() { verify(observer, never()).onCompleted(); } + + @Test + public void testBackpressureMultipleSmallAsyncRequests() throws InterruptedException { + final AtomicLong requests = new AtomicLong(0); + TestSubscriber ts = new TestSubscriber(0); + Observable.interval(100, TimeUnit.MILLISECONDS) + .doOnRequest(new Action1() { + @Override + public void call(Long n) { + requests.addAndGet(n); + } + }).skip(4).subscribe(ts); + Thread.sleep(100); + ts.requestMore(1); + ts.requestMore(1); + Thread.sleep(100); + ts.unsubscribe(); + ts.assertUnsubscribed(); + ts.assertNoErrors(); + assertEquals(6, requests.get()); + } + + @Test + public void testRequestOverflowDoesNotOccur() { + TestSubscriber ts = new TestSubscriber(Long.MAX_VALUE-1); + Observable.range(1, 10).skip(5).subscribe(ts); + ts.assertTerminalEvent(); + ts.assertCompleted(); + ts.assertNoErrors(); + assertEquals(Arrays.asList(6,7,8,9,10), ts.getOnNextEvents()); + } + }