Skip to content

Commit

Permalink
Repeat with Count
Browse files Browse the repository at this point in the history
- merging changes from ReactiveX#807
  • Loading branch information
akarnokd authored and benjchristensen committed Feb 6, 2014
1 parent 85debff commit cb664ff
Show file tree
Hide file tree
Showing 3 changed files with 110 additions and 9 deletions.
33 changes: 33 additions & 0 deletions rxjava-core/src/main/java/rx/Observable.java
Original file line number Diff line number Diff line change
Expand Up @@ -5547,6 +5547,39 @@ public final Observable<T> repeat(Scheduler scheduler) {
return nest().lift(new OperatorRepeat<T>(scheduler));
}

/**
* Returns an Observable that repeats the sequence of items emitted by the source
* Observable at most count times.
*
* @param count
* the number of times the source Observable items are repeated,
* a count of 0 will yield an empty sequence.
* @return an Observable that repeats the sequence of items emitted by the source
* Observable at most count times.
*/
public final Observable<T> repeat(long count) {
if (count < 0) {
throw new IllegalArgumentException("count >= 0 expected");
}
return nest().lift(new OperatorRepeat<T>(count));
}

/**
* Returns an Observable that repeats the sequence of items emitted by the source
* Observable at most count times on a particular scheduler.
*
* @param count
* the number of times the source Observable items are repeated,
* a count of 0 will yield an empty sequence.
* @param scheduler
* the scheduler to emit the items on
* @return an Observable that repeats the sequence of items emitted by the source
* Observable at most count times on a particular scheduler.
*/
public final Observable<T> repeat(long count, Scheduler scheduler) {
return nest().lift(new OperatorRepeat<T>(count, scheduler));
}

/**
* Returns a {@link ConnectableObservable} that shares a single subscription to the underlying
* Observable that will replay all of its items and notifications to any future {@link Observer}.
Expand Down
28 changes: 24 additions & 4 deletions rxjava-core/src/main/java/rx/operators/OperatorRepeat.java
Original file line number Diff line number Diff line change
Expand Up @@ -20,26 +20,42 @@
import rx.Scheduler;
import rx.Scheduler.Inner;
import rx.Subscriber;
import rx.observers.Subscribers;
import rx.schedulers.Schedulers;
import rx.util.functions.Action1;

public class OperatorRepeat<T> implements Operator<T, Observable<T>> {

private final Scheduler scheduler;
private final long count;

public OperatorRepeat(Scheduler scheduler) {
public OperatorRepeat(long count, Scheduler scheduler) {
this.scheduler = scheduler;
this.count = count;
}

public OperatorRepeat(Scheduler scheduler) {
this(-1, scheduler);
}

public OperatorRepeat(long count) {
this(count, Schedulers.trampoline());
}

public OperatorRepeat() {
this(Schedulers.trampoline());
this(-1, Schedulers.trampoline());
}

@Override
public Subscriber<? super Observable<T>> call(final Subscriber<? super T> child) {
if (count == 0) {
child.onCompleted();
return Subscribers.empty();
}
return new Subscriber<Observable<T>>(child) {

int executionCount = 0;

@Override
public void onCompleted() {
// ignore as we will keep repeating
Expand All @@ -58,12 +74,16 @@ public void onNext(final Observable<T> t) {

@Override
public void call(final Inner inner) {

executionCount++;
t.subscribe(new Subscriber<T>(child) {

@Override
public void onCompleted() {
inner.schedule(self);
if (count == -1 || executionCount < count) {
inner.schedule(self);
} else {
child.onCompleted();
}
}

@Override
Expand Down
58 changes: 53 additions & 5 deletions rxjava-core/src/test/java/rx/operators/OperatorRepeatTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,8 @@
package rx.operators;

import static org.junit.Assert.*;
import static org.mockito.Matchers.*;
import static org.mockito.Mockito.*;

import java.util.concurrent.atomic.AtomicInteger;

Expand All @@ -33,7 +35,7 @@

public class OperatorRepeatTest {

@Test
@Test(timeout = 2000)
public void testRepetition() {
int NUM = 10;
final AtomicInteger count = new AtomicInteger();
Expand All @@ -50,14 +52,14 @@ public Subscription onSubscribe(Observer<? super Integer> o) {
assertEquals(NUM, value);
}

@Test
@Test(timeout = 2000)
public void testRepeatTake() {
Observable<Integer> xs = Observable.from(1, 2);
Object[] ys = xs.repeat(Schedulers.newThread()).take(4).toList().toBlockingObservable().last().toArray();
assertArrayEquals(new Object[] { 1, 2, 1, 2 }, ys);
}

@Test
@Test(timeout = 20000)
public void testNoStackOverFlow() {
Observable.from(1).repeat(Schedulers.newThread()).take(100000).toBlockingObservable().last();
}
Expand All @@ -70,7 +72,6 @@ public void testRepeatTakeWithSubscribeOn() throws InterruptedException {

@Override
public void call(Subscriber<? super Integer> sub) {
System.out.println("invoked!");
counter.incrementAndGet();
sub.onNext(1);
sub.onNext(2);
Expand All @@ -82,7 +83,6 @@ public void call(Subscriber<? super Integer> sub) {

@Override
public Integer call(Integer t1) {
System.out.println("t1: " + t1);
try {
Thread.sleep(50);
} catch (InterruptedException e) {
Expand All @@ -97,4 +97,52 @@ public Integer call(Integer t1) {
assertArrayEquals(new Object[] { 1, 2, 1, 2 }, ys);
}

@Test(timeout = 2000)
public void testRepeatAndTake() {
@SuppressWarnings("unchecked")
Observer<Object> o = mock(Observer.class);

Observable.from(1).repeat().take(10).subscribe(o);

verify(o, times(10)).onNext(1);
verify(o).onCompleted();
verify(o, never()).onError(any(Throwable.class));
}
@Test(timeout = 2000)
public void testRepeatLimited() {
@SuppressWarnings("unchecked")
Observer<Object> o = mock(Observer.class);

Observable.from(1).repeat(10).subscribe(o);

verify(o, times(10)).onNext(1);
verify(o).onCompleted();
verify(o, never()).onError(any(Throwable.class));
}
@Test(timeout = 2000)
public void testRepeatError() {
@SuppressWarnings("unchecked")
Observer<Object> o = mock(Observer.class);

Observable.error(new CustomException()).repeat(10).subscribe(o);

verify(o).onError(any(CustomException.class));
verify(o, never()).onNext(any());
verify(o, never()).onCompleted();

}
@Test(timeout = 2000)
public void testRepeatZero() {
@SuppressWarnings("unchecked")
Observer<Object> o = mock(Observer.class);

Observable.from(1).repeat(0).subscribe(o);

verify(o).onCompleted();
verify(o, never()).onNext(any());
verify(o, never()).onError(any(Throwable.class));
}

private static class CustomException extends RuntimeException {
}
}

0 comments on commit cb664ff

Please sign in to comment.