Skip to content

Commit

Permalink
Merge pull request #2589 from akarnokd/OperatorDistinctFix
Browse files Browse the repository at this point in the history
Repeat/retry: fixed unbounded downstream requesting above Long.MAX_VALUE
  • Loading branch information
benjchristensen committed Feb 3, 2015
2 parents 43a912f + a280480 commit e901ffa
Show file tree
Hide file tree
Showing 2 changed files with 21 additions and 5 deletions.
2 changes: 1 addition & 1 deletion src/main/java/rx/internal/operators/OnSubscribeRedo.java
Original file line number Diff line number Diff line change
Expand Up @@ -323,7 +323,7 @@ public void setProducer(Producer producer) {

@Override
public void request(final long n) {
long c = consumerCapacity.getAndAdd(n);
long c = BackpressureUtils.getAndAddRequest(consumerCapacity, n);
Producer producer = currentProducer.get();
if (producer != null) {
producer.request(n);
Expand Down
24 changes: 20 additions & 4 deletions src/test/java/rx/internal/operators/OperatorRepeatTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -18,11 +18,9 @@
import static org.junit.Assert.assertArrayEquals;
import static org.junit.Assert.assertEquals;
import static org.mockito.Matchers.any;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.never;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.*;

import java.util.Arrays;
import java.util.concurrent.atomic.AtomicInteger;

import org.junit.Test;
Expand All @@ -33,6 +31,7 @@
import rx.Subscriber;
import rx.exceptions.TestException;
import rx.functions.Func1;
import rx.observers.TestSubscriber;
import rx.schedulers.Schedulers;

public class OperatorRepeatTest {
Expand Down Expand Up @@ -158,4 +157,21 @@ public void testRepeatOne() {
verify(o, times(1)).onNext(any());
verify(o, never()).onError(any(Throwable.class));
}

/** Issue #2587. */
@Test
public void testRepeatAndDistinctUnbounded() {
Observable<Integer> src = Observable.from(Arrays.asList(1, 2, 3, 4, 5))
.take(3)
.repeat(3)
.distinct();

TestSubscriber<Integer> ts = new TestSubscriber<Integer>();

src.subscribe(ts);

ts.assertNoErrors();
ts.assertTerminalEvent();
ts.assertReceivedOnNext(Arrays.asList(1, 2, 3));
}
}

0 comments on commit e901ffa

Please sign in to comment.