diff --git a/rxjava-core/src/main/java/rx/Observable.java b/rxjava-core/src/main/java/rx/Observable.java index 6ad27fa409..9027fcc147 100644 --- a/rxjava-core/src/main/java/rx/Observable.java +++ b/rxjava-core/src/main/java/rx/Observable.java @@ -73,6 +73,7 @@ import rx.operators.OperationOnExceptionResumeNextViaObservable; import rx.operators.OperationParallel; import rx.operators.OperationParallelMerge; +import rx.operators.OperationRepeat; import rx.operators.OperationReplay; import rx.operators.OperationRetry; import rx.operators.OperationSample; @@ -1097,6 +1098,28 @@ public static Observable range(int start, int count, Scheduler schedule return from(Range.createWithCount(start, count), scheduler); } + /** + * Repeats the observable sequence indefinitely. + *

+ * + * @return The observable sequence producing the elements of the given sequence repeatedly and sequentially. + * @see MSDN: Observable.Repeat + */ + public Observable repeat() { + return this.repeat(Schedulers.currentThread()); + } + + /** + * Repeats the observable sequence indefinitely. + *

+ * @param scheduler the scheduler to send the values on. + * @return The observable sequence producing the elements of the given sequence repeatedly and sequentially. + * @see MSDN: Observable.Repeat + */ + public Observable repeat(Scheduler scheduler) { + return create(OperationRepeat.repeat(this, scheduler)); + } + /** * Returns an Observable that calls an Observable factory to create its * Observable for each new Observer that subscribes. That is, for each diff --git a/rxjava-core/src/main/java/rx/operators/OperationRepeat.java b/rxjava-core/src/main/java/rx/operators/OperationRepeat.java new file mode 100644 index 0000000000..8bdff97776 --- /dev/null +++ b/rxjava-core/src/main/java/rx/operators/OperationRepeat.java @@ -0,0 +1,71 @@ +/** + * Copyright 2013 Netflix, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package rx.operators; + +import rx.Observable; +import rx.Observer; +import rx.Scheduler; +import rx.Subscription; +import rx.subscriptions.CompositeSubscription; +import rx.subscriptions.MultipleAssignmentSubscription; +import rx.util.functions.Action0; +import rx.util.functions.Action1; + +public class OperationRepeat implements Observable.OnSubscribeFunc { + + private final Observable source; + private final Scheduler scheduler; + + public static Observable.OnSubscribeFunc repeat(Observable source, Scheduler scheduler) { + return new OperationRepeat(source, scheduler); + } + + private OperationRepeat(Observable source, Scheduler scheduler) { + this.source = source; + this.scheduler = scheduler; + } + + @Override + public Subscription onSubscribe(final Observer observer) { + final CompositeSubscription compositeSubscription = new CompositeSubscription(); + final MultipleAssignmentSubscription innerSubscription = new MultipleAssignmentSubscription(); + compositeSubscription.add(innerSubscription); + compositeSubscription.add(scheduler.schedule(new Action1() { + @Override + public void call(final Action0 self) { + innerSubscription.set(source.subscribe(new Observer() { + + @Override + public void onCompleted() { + self.call(); + } + + @Override + public void onError(Throwable error) { + observer.onError(error); + } + + @Override + public void onNext(T value) { + observer.onNext(value); + } + })); + } + })); + return compositeSubscription; + } +} \ No newline at end of file diff --git a/rxjava-core/src/test/java/rx/ObservableTests.java b/rxjava-core/src/test/java/rx/ObservableTests.java index b02afea164..cd5a985104 100644 --- a/rxjava-core/src/test/java/rx/ObservableTests.java +++ b/rxjava-core/src/test/java/rx/ObservableTests.java @@ -37,11 +37,11 @@ import org.mockito.stubbing.Answer; import rx.Observable.OnSubscribeFunc; -import rx.schedulers.TestScheduler; import rx.observables.ConnectableObservable; +import rx.schedulers.Schedulers; +import rx.schedulers.TestScheduler; import rx.subscriptions.BooleanSubscription; import rx.subscriptions.Subscriptions; -import rx.util.functions.Action0; import rx.util.functions.Action1; import rx.util.functions.Func0; import rx.util.functions.Func1; diff --git a/rxjava-core/src/test/java/rx/operators/OperationRepeatTest.java b/rxjava-core/src/test/java/rx/operators/OperationRepeatTest.java new file mode 100644 index 0000000000..da62c2744f --- /dev/null +++ b/rxjava-core/src/test/java/rx/operators/OperationRepeatTest.java @@ -0,0 +1,62 @@ +/** + * Copyright 2013 Netflix, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package rx.operators; + +import static org.junit.Assert.*; + +import java.util.concurrent.atomic.AtomicInteger; + +import org.junit.Test; + +import rx.Observable; +import rx.Observable.OnSubscribeFunc; +import rx.Observer; +import rx.Subscription; +import rx.schedulers.Schedulers; +import rx.subscriptions.Subscriptions; + +public class OperationRepeatTest { + + @Test + public void testRepetition() { + int NUM = 10; + final AtomicInteger count = new AtomicInteger(); + int value = Observable.create(new OnSubscribeFunc() { + + @Override + public Subscription onSubscribe(Observer o) { + o.onNext(count.incrementAndGet()); + o.onCompleted(); + return Subscriptions.empty(); + } + }).repeat(Schedulers.threadPoolForComputation()).take(NUM).toBlockingObservable().last(); + + assertEquals(NUM, value); + } + + @Test + public void testRepeatTake() { + Observable xs = Observable.from(1, 2); + Object[] ys = xs.repeat(Schedulers.newThread()).take(4).toList().toBlockingObservable().last().toArray(); + assertArrayEquals(new Object[] { 1, 2, 1, 2 }, ys); + } + + @Test + public void testNoStackOverFlow() { + Observable.from(1).repeat(Schedulers.newThread()).take(100000).toBlockingObservable().last(); + } + +}