From 19b954fde81d5529ed9d093b430bf938b2156a48 Mon Sep 17 00:00:00 2001 From: headinthebox Date: Fri, 22 Nov 2013 22:52:21 -0800 Subject: [PATCH 1/3] Added OperationRepeat & repeat operator --- rxjava-core/src/main/java/rx/Observable.java | 118 ++++-------------- .../java/rx/operators/OperationRepeat.java | 70 +++++++++++ 2 files changed, 95 insertions(+), 93 deletions(-) create mode 100644 rxjava-core/src/main/java/rx/operators/OperationRepeat.java diff --git a/rxjava-core/src/main/java/rx/Observable.java b/rxjava-core/src/main/java/rx/Observable.java index 56a4664654..b53754e697 100644 --- a/rxjava-core/src/main/java/rx/Observable.java +++ b/rxjava-core/src/main/java/rx/Observable.java @@ -15,83 +15,11 @@ */ package rx; -import static rx.util.functions.Functions.*; - -import java.util.ArrayList; -import java.util.Arrays; -import java.util.Comparator; -import java.util.List; -import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.Future; -import java.util.concurrent.TimeUnit; - import rx.concurrency.Schedulers; import rx.observables.BlockingObservable; import rx.observables.ConnectableObservable; import rx.observables.GroupedObservable; -import rx.operators.OperationAll; -import rx.operators.OperationAmb; -import rx.operators.OperationAny; -import rx.operators.OperationAverage; -import rx.operators.OperationBuffer; -import rx.operators.OperationCache; -import rx.operators.OperationCast; -import rx.operators.OperationCombineLatest; -import rx.operators.OperationConcat; -import rx.operators.OperationDebounce; -import rx.operators.OperationDefaultIfEmpty; -import rx.operators.OperationDefer; -import rx.operators.OperationDematerialize; -import rx.operators.OperationDistinct; -import rx.operators.OperationDistinctUntilChanged; -import rx.operators.OperationDoOnEach; -import rx.operators.OperationElementAt; -import rx.operators.OperationFilter; -import rx.operators.OperationFinally; -import rx.operators.OperationFirstOrDefault; -import rx.operators.OperationGroupBy; -import rx.operators.OperationInterval; -import rx.operators.OperationLast; -import rx.operators.OperationMap; -import rx.operators.OperationMaterialize; -import rx.operators.OperationMerge; -import rx.operators.OperationMergeDelayError; -import rx.operators.OperationMinMax; -import rx.operators.OperationMulticast; -import rx.operators.OperationObserveOn; -import rx.operators.OperationOnErrorResumeNextViaFunction; -import rx.operators.OperationOnErrorResumeNextViaObservable; -import rx.operators.OperationOnErrorReturn; -import rx.operators.OperationOnExceptionResumeNextViaObservable; -import rx.operators.OperationParallel; -import rx.operators.OperationParallelMerge; -import rx.operators.OperationRetry; -import rx.operators.OperationSample; -import rx.operators.OperationScan; -import rx.operators.OperationSkip; -import rx.operators.OperationSkipLast; -import rx.operators.OperationSkipWhile; -import rx.operators.OperationSubscribeOn; -import rx.operators.OperationSum; -import rx.operators.OperationSwitch; -import rx.operators.OperationSynchronize; -import rx.operators.OperationTake; -import rx.operators.OperationTakeLast; -import rx.operators.OperationTakeUntil; -import rx.operators.OperationTakeWhile; -import rx.operators.OperationThrottleFirst; -import rx.operators.OperationTimeInterval; -import rx.operators.OperationTimeout; -import rx.operators.OperationTimestamp; -import rx.operators.OperationToObservableFuture; -import rx.operators.OperationToObservableIterable; -import rx.operators.OperationToObservableList; -import rx.operators.OperationToObservableSortedList; -import rx.operators.OperationUsing; -import rx.operators.OperationWindow; -import rx.operators.OperationZip; -import rx.operators.SafeObservableSubscription; -import rx.operators.SafeObserver; +import rx.operators.*; import rx.plugins.RxJavaErrorHandler; import rx.plugins.RxJavaObservableExecutionHook; import rx.plugins.RxJavaPlugins; @@ -100,26 +28,19 @@ import rx.subjects.ReplaySubject; import rx.subjects.Subject; import rx.subscriptions.Subscriptions; -import rx.util.Closing; -import rx.util.OnErrorNotImplementedException; -import rx.util.Opening; -import rx.util.Range; -import rx.util.TimeInterval; -import rx.util.Timestamped; -import rx.util.functions.Action0; -import rx.util.functions.Action1; -import rx.util.functions.Func0; -import rx.util.functions.Func1; -import rx.util.functions.Func2; -import rx.util.functions.Func3; -import rx.util.functions.Func4; -import rx.util.functions.Func5; -import rx.util.functions.Func6; -import rx.util.functions.Func7; -import rx.util.functions.Func8; -import rx.util.functions.Func9; -import rx.util.functions.FuncN; -import rx.util.functions.Function; +import rx.util.*; +import rx.util.functions.*; + +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Comparator; +import java.util.List; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.Future; +import java.util.concurrent.TimeUnit; + +import static rx.util.functions.Functions.alwaysFalse; +import static rx.util.functions.Functions.not; /** * The Observable interface that implements the Reactive Pattern. @@ -1040,6 +961,17 @@ public static Observable range(int start, int count, Scheduler schedule return range(start, count).observeOn(scheduler); } + /** + * Repeats the observable sequence indefinitely. + *

+ * + * @return The observable sequence producing the elements of the given sequence repeatedly and sequentially. + * @see MSDN: Observable.Repeat + */ + public Observable repeat() { + return create(rx.operators.OperationRepeat.repeat(this)); + } + /** * Returns an Observable that calls an Observable factory to create its * Observable for each new Observer that subscribes. That is, for each diff --git a/rxjava-core/src/main/java/rx/operators/OperationRepeat.java b/rxjava-core/src/main/java/rx/operators/OperationRepeat.java new file mode 100644 index 0000000000..61f42be76c --- /dev/null +++ b/rxjava-core/src/main/java/rx/operators/OperationRepeat.java @@ -0,0 +1,70 @@ +/** + * Copyright 2013 Netflix, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package rx.operators; + +import rx.Observable; +import rx.Observer; +import rx.Scheduler; +import rx.Subscription; +import rx.concurrency.Schedulers; +import rx.subscriptions.MultipleAssignmentSubscription; +import rx.util.functions.Func2; + +public final class OperationRepeat { + + public static Observable.OnSubscribeFunc repeat(Observable source) { + return new RepeatObservable(source); + } + + static class RepeatObservable implements Observable.OnSubscribeFunc { + + RepeatObservable(Observable source) { + this.source = source; + } + + private Observable source; + private Observer observer; + private MultipleAssignmentSubscription subscription = new MultipleAssignmentSubscription(); + + @Override + public Subscription onSubscribe(Observer observer) { + this.observer = observer; + Loop(); + return subscription; + } + + void Loop() { + subscription.setSubscription(Schedulers.currentThread().schedule(0, new Func2() { + @Override + public Subscription call(Scheduler s, Integer n) { + return source.subscribe(new Observer() { + @Override + public void onCompleted() { Loop(); } + + @Override + public void onError(Throwable error) { observer.onError(error); } + + @Override + public void onNext(T value) { observer.onNext(value); } + }); + } + })); + } + } + + +} From 9a6f9cbb2cb7e54e698a01657d59f8ee0ed5f462 Mon Sep 17 00:00:00 2001 From: headinthebox Date: Sat, 23 Nov 2013 10:29:20 -0800 Subject: [PATCH 2/3] Added OperationRepeat & repeat operator --- .../java/rx/operators/OperationRepeat.java | 75 +++++++++---------- 1 file changed, 36 insertions(+), 39 deletions(-) diff --git a/rxjava-core/src/main/java/rx/operators/OperationRepeat.java b/rxjava-core/src/main/java/rx/operators/OperationRepeat.java index 61f42be76c..83c62272b1 100644 --- a/rxjava-core/src/main/java/rx/operators/OperationRepeat.java +++ b/rxjava-core/src/main/java/rx/operators/OperationRepeat.java @@ -18,53 +18,50 @@ import rx.Observable; import rx.Observer; -import rx.Scheduler; import rx.Subscription; import rx.concurrency.Schedulers; -import rx.subscriptions.MultipleAssignmentSubscription; -import rx.util.functions.Func2; +import rx.subscriptions.SerialSubscription; +import rx.util.functions.Action0; +import rx.util.functions.Action1; -public final class OperationRepeat { +public class OperationRepeat implements Observable.OnSubscribeFunc { - public static Observable.OnSubscribeFunc repeat(Observable source) { - return new RepeatObservable(source); - } - - static class RepeatObservable implements Observable.OnSubscribeFunc { + private final Observable source; - RepeatObservable(Observable source) { - this.source = source; - } + public static Observable.OnSubscribeFunc repeat(Observable seed) { + return new OperationRepeat(seed); + } - private Observable source; - private Observer observer; - private MultipleAssignmentSubscription subscription = new MultipleAssignmentSubscription(); + private OperationRepeat(Observable source) { + this.source = source; + } - @Override - public Subscription onSubscribe(Observer observer) { - this.observer = observer; - Loop(); - return subscription; - } + @Override + public Subscription onSubscribe(final Observer observer) { + final SerialSubscription subscription = new SerialSubscription(); + subscription.setSubscription(Schedulers.currentThread().schedule(new Action1() { + @Override + public void call(final Action0 self) { + subscription.setSubscription(source.subscribe(new Observer() { - void Loop() { - subscription.setSubscription(Schedulers.currentThread().schedule(0, new Func2() { - @Override - public Subscription call(Scheduler s, Integer n) { - return source.subscribe(new Observer() { - @Override - public void onCompleted() { Loop(); } + @Override + public void onCompleted() { + subscription.getSubscription().unsubscribe(); + self.call(); + } - @Override - public void onError(Throwable error) { observer.onError(error); } + @Override + public void onError(Throwable error) { + observer.onError(error); + } - @Override - public void onNext(T value) { observer.onNext(value); } - }); - } - })); - } + @Override + public void onNext(T value) { + observer.onNext(value); + } + })); + } + })); + return subscription; } - - -} +} \ No newline at end of file From 1a7e51f33a28f9299719252035d6bc218eeedbdd Mon Sep 17 00:00:00 2001 From: headinthebox Date: Sat, 23 Nov 2013 13:22:25 -0800 Subject: [PATCH 3/3] Added test --- rxjava-core/src/main/java/rx/Observable.java | 13 ++++++- .../java/rx/operators/OperationRepeat.java | 17 +++++---- .../src/test/java/rx/ObservableTests.java | 38 +++++++++++-------- 3 files changed, 44 insertions(+), 24 deletions(-) diff --git a/rxjava-core/src/main/java/rx/Observable.java b/rxjava-core/src/main/java/rx/Observable.java index b53754e697..ea975bb42e 100644 --- a/rxjava-core/src/main/java/rx/Observable.java +++ b/rxjava-core/src/main/java/rx/Observable.java @@ -969,7 +969,18 @@ public static Observable range(int start, int count, Scheduler schedule * @see MSDN: Observable.Repeat */ public Observable repeat() { - return create(rx.operators.OperationRepeat.repeat(this)); + return this.repeat(Schedulers.currentThread()); + } + + /** + * Repeats the observable sequence indefinitely. + *

+ * @param scheduler the scheduler to send the values on. + * @return The observable sequence producing the elements of the given sequence repeatedly and sequentially. + * @see MSDN: Observable.Repeat + */ + public Observable repeat(Scheduler scheduler) { + return create(OperationRepeat.repeat(this, scheduler)); } /** diff --git a/rxjava-core/src/main/java/rx/operators/OperationRepeat.java b/rxjava-core/src/main/java/rx/operators/OperationRepeat.java index 83c62272b1..6bf34efa78 100644 --- a/rxjava-core/src/main/java/rx/operators/OperationRepeat.java +++ b/rxjava-core/src/main/java/rx/operators/OperationRepeat.java @@ -18,35 +18,36 @@ import rx.Observable; import rx.Observer; +import rx.Scheduler; import rx.Subscription; -import rx.concurrency.Schedulers; -import rx.subscriptions.SerialSubscription; +import rx.subscriptions.MultipleAssignmentSubscription; import rx.util.functions.Action0; import rx.util.functions.Action1; public class OperationRepeat implements Observable.OnSubscribeFunc { private final Observable source; + private final Scheduler scheduler; - public static Observable.OnSubscribeFunc repeat(Observable seed) { - return new OperationRepeat(seed); + public static Observable.OnSubscribeFunc repeat(Observable source, Scheduler scheduler) { + return new OperationRepeat(source, scheduler); } - private OperationRepeat(Observable source) { + private OperationRepeat(Observable source, Scheduler scheduler) { this.source = source; + this.scheduler = scheduler; } @Override public Subscription onSubscribe(final Observer observer) { - final SerialSubscription subscription = new SerialSubscription(); - subscription.setSubscription(Schedulers.currentThread().schedule(new Action1() { + final MultipleAssignmentSubscription subscription = new MultipleAssignmentSubscription(); + subscription.setSubscription(scheduler.schedule(new Action1() { @Override public void call(final Action0 self) { subscription.setSubscription(source.subscribe(new Observer() { @Override public void onCompleted() { - subscription.getSubscription().unsubscribe(); self.call(); } diff --git a/rxjava-core/src/test/java/rx/ObservableTests.java b/rxjava-core/src/test/java/rx/ObservableTests.java index 3d718210a1..dcf0fe678b 100644 --- a/rxjava-core/src/test/java/rx/ObservableTests.java +++ b/rxjava-core/src/test/java/rx/ObservableTests.java @@ -15,35 +15,36 @@ */ package rx; -import static org.junit.Assert.*; -import static org.mockito.Matchers.*; -import static org.mockito.Mockito.*; - -import java.util.ArrayList; -import java.util.Arrays; -import java.util.LinkedList; -import java.util.List; -import java.util.concurrent.CountDownLatch; -import java.util.concurrent.TimeUnit; -import java.util.concurrent.atomic.AtomicInteger; -import java.util.concurrent.atomic.AtomicReference; - import org.junit.Before; import org.junit.Test; import org.mockito.InOrder; import org.mockito.Mock; import org.mockito.MockitoAnnotations; - import rx.Observable.OnSubscribeFunc; +import rx.concurrency.Schedulers; import rx.concurrency.TestScheduler; import rx.observables.ConnectableObservable; import rx.subscriptions.BooleanSubscription; import rx.subscriptions.Subscriptions; -import rx.util.functions.Action0; import rx.util.functions.Action1; import rx.util.functions.Func1; import rx.util.functions.Func2; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.LinkedList; +import java.util.List; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.atomic.AtomicReference; + +import static org.junit.Assert.*; +import static org.mockito.Matchers.any; +import static org.mockito.Matchers.anyInt; +import static org.mockito.Matchers.anyString; +import static org.mockito.Mockito.*; + public class ObservableTests { @Mock @@ -958,4 +959,11 @@ public void testRangeWithScheduler() { inOrder.verify(aObserver, times(1)).onCompleted(); inOrder.verifyNoMoreInteractions(); } + + @Test + public void testRepeatTake() { + Observable xs = Observable.from(1,2); + Object[] ys = xs.repeat(Schedulers.newThread()).take(4).toList().toBlockingObservable().last().toArray(); + assertArrayEquals(new Object[]{ 1, 2, 1, 2}, ys); + } } \ No newline at end of file