diff --git a/language-adaptors/rxjava-clojure/src/test/clojure/rx/lang/clojure/observable_tests.clj b/language-adaptors/rxjava-clojure/src/test/clojure/rx/lang/clojure/observable_tests.clj index 3daa4727895..f0521c2127f 100644 --- a/language-adaptors/rxjava-clojure/src/test/clojure/rx/lang/clojure/observable_tests.clj +++ b/language-adaptors/rxjava-clojure/src/test/clojure/rx/lang/clojure/observable_tests.clj @@ -2,4 +2,6 @@ (import rx.Observable)) ;; still need to get this wired up in build.gradle to run as tests -; (-> (rx.Observable/toObservable [\"one\" \"two\" \"three\"]) (.take 2) (.subscribe (fn [arg] (println arg)))) \ No newline at end of file +; (-> (rx.Observable/toObservable ["one" "two" "three"]) (.take 2) (.subscribe (fn [arg] (println arg)))) + +; (-> (rx.Observable/toObservable [1 2 3]) (.takeWhile (fn [x i] (< x 2))) (.subscribe (fn [arg] (println arg)))) diff --git a/language-adaptors/rxjava-groovy/src/test/groovy/rx/lang/groovy/ObservableTests.groovy b/language-adaptors/rxjava-groovy/src/test/groovy/rx/lang/groovy/ObservableTests.groovy index 8f50ec7c9ec..2bb4c4c6bfa 100644 --- a/language-adaptors/rxjava-groovy/src/test/groovy/rx/lang/groovy/ObservableTests.groovy +++ b/language-adaptors/rxjava-groovy/src/test/groovy/rx/lang/groovy/ObservableTests.groovy @@ -174,6 +174,22 @@ def class ObservableTests { verify(a, times(0)).received(3); } + @Test + public void testTakeWhileViaGroovy() { + Observable.takeWhile(Observable.toObservable(1, 2, 3), { x -> x < 3}).subscribe({ result -> a.received(result)}); + verify(a, times(1)).received(1); + verify(a, times(1)).received(2); + verify(a, times(0)).received(3); + } + + @Test + public void testTakeWhileWithIndexViaGroovy() { + Observable.takeWhileWithIndex(Observable.toObservable(1, 2, 3), { x, i -> i < 2}).subscribe({ result -> a.received(result)}); + verify(a, times(1)).received(1); + verify(a, times(1)).received(2); + verify(a, times(0)).received(3); + } + @Test public void testToSortedList() { new TestFactory().getNumbers().toSortedList().subscribe({ result -> a.received(result)}); diff --git a/rxjava-core/src/main/java/rx/Observable.java b/rxjava-core/src/main/java/rx/Observable.java index c305f19c9a7..3dc60b8d0d9 100644 --- a/rxjava-core/src/main/java/rx/Observable.java +++ b/rxjava-core/src/main/java/rx/Observable.java @@ -1370,6 +1370,57 @@ public static Observable takeLast(final Observable items, final int co return _create(OperationTakeLast.takeLast(items, count)); } + /** + * Returns a specified number of contiguous values from the start of an observable sequence. + * + * @param items + * @param predicate a function to test each source element for a condition + * @return + */ + public static Observable takeWhile(final Observable items, Func1 predicate) { + return create(OperationTake.takeWhile(items, predicate)); + } + + /** + * Returns a specified number of contiguous values from the start of an observable sequence. + * + * @param items + * @param predicate a function to test each source element for a condition + * @return + */ + public static Observable takeWhile(final Observable items, Object predicate) { + final FuncN _f = Functions.from(predicate); + + return takeWhile(items, new Func1() { + @Override + public Boolean call(T t) { + return (Boolean) _f.call(t); + } + }); + } + + /** + * Returns values from an observable sequence as long as a specified condition is true, and then skips the remaining values. + * + * @param items + * @param predicate a function to test each element for a condition; the second parameter of the function represents the index of the source element; otherwise, false. + * @return + */ + public static Observable takeWhileWithIndex(final Observable items, Func2 predicate) { + return create(OperationTake.takeWhileWithIndex(items, predicate)); + } + + public static Observable takeWhileWithIndex(final Observable items, Object predicate) { + final FuncN _f = Functions.from(predicate); + + return create(OperationTake.takeWhileWithIndex(items, new Func2() { + @Override + public Boolean call(T t, Integer integer) { + return (Boolean) _f.call(t, integer); + } + })); + } + /** * Returns an Observable that emits a single item, a list composed of all the items emitted by * the source Observable. @@ -2301,6 +2352,47 @@ public Observable take(final int num) { return take(this, num); } + + /** + * Returns an Observable that items emitted by the source Observable as long as a specified condition is true. + * + * @param predicate a function to test each source element for a condition + * @return + */ + public Observable takeWhile(final Func1 predicate) { + return takeWhile(this, predicate); + } + + /** + * Returns a specified number of contiguous values from the start of an observable sequence. + * + * @param predicate a function to test each source element for a condition + * @return + */ + public Observable takeWhile(final Object predicate) { + return takeWhile(this, predicate); + } + + /** + * Returns values from an observable sequence as long as a specified condition is true, and then skips the remaining values. + * + * @param predicate a function to test each element for a condition; the second parameter of the function represents the index of the source element; otherwise, false. + * @return + */ + public Observable takeWhileWithIndex(final Func2 predicate) { + return takeWhileWithIndex(this, predicate); + } + + /** + * Returns values from an observable sequence as long as a specified condition is true, and then skips the remaining values. + * + * @param predicate a function to test each element for a condition; the second parameter of the function represents the index of the source element; otherwise, false. + * @return + */ + public Observable takeWhileWithIndex(final Object predicate) { + return takeWhileWithIndex(this, predicate); + } + /** * Returns an Observable that emits the last count items emitted by the source * Observable. diff --git a/rxjava-core/src/main/java/rx/operators/OperationTake.java b/rxjava-core/src/main/java/rx/operators/OperationTake.java index 3601de490f7..326420b270b 100644 --- a/rxjava-core/src/main/java/rx/operators/OperationTake.java +++ b/rxjava-core/src/main/java/rx/operators/OperationTake.java @@ -1,12 +1,12 @@ /** * Copyright 2013 Netflix, Inc. - * + * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at - * + * * http://www.apache.org/licenses/LICENSE-2.0 - * + * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. @@ -28,33 +28,75 @@ import rx.Subscription; import rx.util.AtomicObservableSubscription; import rx.util.functions.Func1; +import rx.util.functions.Func2; /** * Returns a specified number of contiguous values from the start of an observable sequence. - * - * @param */ public final class OperationTake { /** * Returns a specified number of contiguous values from the start of an observable sequence. - * + * * @param items * @param num * @return */ public static Func1, Subscription> take(final Observable items, final int num) { - // wrap in a Watchbable so that if a chain is built up, then asynchronously subscribed to twice we will have 2 instances of Take rather than 1 handing both, which is not thread-safe. + return takeWhileWithIndex(items, OperationTake.numPredicate(num)); + } + + /** + * Returns a specified number of contiguous values from the start of an observable sequence. + * + * @param items + * @param predicate a function to test each source element for a condition + * @return + */ + public static Func1, Subscription> takeWhile(final Observable items, final Func1 predicate) { + return takeWhileWithIndex(items, OperationTake.skipIndex(predicate)); + } + + /** + * Returns values from an observable sequence as long as a specified condition is true, and then skips the remaining values. + * + * @param items + * @param predicate a function to test each element for a condition; the second parameter of the function represents the index of the source element; otherwise, false. + * @return + */ + public static Func1, Subscription> takeWhileWithIndex(final Observable items, final Func2 predicate) { + // wrap in a Func so that if a chain is built up, then asynchronously subscribed to twice we will have 2 instances of Take rather than 1 handing both, which is not thread-safe. return new Func1, Subscription>() { @Override public Subscription call(Observer observer) { - return new Take(items, num).call(observer); + return new TakeWhile(items, predicate).call(observer); } }; } + private static Func2 numPredicate(final int num) { + return new Func2() { + + @Override + public Boolean call(T input, Integer index) { + return index < num; + } + + }; + } + + private static Func2 skipIndex(final Func1 underlying) { + return new Func2() { + @Override + public Boolean call(T input, Integer index) { + return underlying.call(input); + } + }; + } + + /** * This class is NOT thread-safe if invoked and referenced multiple times. In other words, don't subscribe to it multiple times from different threads. *

@@ -63,29 +105,27 @@ public Subscription call(Observer observer) { * This should all be fine as long as it's kept as a private class and a new instance created from static factory method above. *

* Note how the take() factory method above protects us from a single instance being exposed with the Observable wrapper handling the subscribe flow. - * + * * @param */ - private static class Take implements Func1, Subscription> { - private final int num; + private static class TakeWhile implements Func1, Subscription> { + private final AtomicInteger counter = new AtomicInteger(); private final Observable items; + private final Func2 predicate; private final AtomicObservableSubscription subscription = new AtomicObservableSubscription(); - Take(final Observable items, final int num) { - this.num = num; + private TakeWhile(Observable items, Func2 predicate) { this.items = items; + this.predicate = predicate; } + + @Override public Subscription call(Observer observer) { return subscription.wrap(items.subscribe(new ItemObserver(observer))); } - /** - * Used to subscribe to the 'items' Observable sequence and forward to the actualObserver up to 'num' count. - */ private class ItemObserver implements Observer { - - private AtomicInteger counter = new AtomicInteger(); private final Observer observer; public ItemObserver(Observer observer) { @@ -104,7 +144,7 @@ public void onError(Exception e) { @Override public void onNext(T args) { - if (counter.getAndIncrement() < num) { + if (predicate.call(args, counter.getAndIncrement())) { observer.onNext(args); } else { // this will work if the sequence is asynchronous, it will have no effect on a synchronous observable @@ -118,6 +158,48 @@ public void onNext(T args) { public static class UnitTest { + + + @Test + public void testTakeWhile1() { + Observable w = Observable.toObservable(1, 2, 3); + Observable take = Observable.create(takeWhile(w, new Func1() { + @Override + public Boolean call(Integer input) { + return input < 3; + } + })); + + @SuppressWarnings("unchecked") + Observer aObserver = mock(Observer.class); + take.subscribe(aObserver); + verify(aObserver, times(1)).onNext(1); + verify(aObserver, times(1)).onNext(2); + verify(aObserver, never()).onNext(3); + verify(aObserver, never()).onError(any(Exception.class)); + verify(aObserver, times(1)).onCompleted(); + } + + @Test + public void testTakeWhile2() { + Observable w = Observable.toObservable("one", "two", "three"); + Observable take = Observable.create(takeWhileWithIndex(w, new Func2() { + @Override + public Boolean call(String input, Integer index) { + return index < 2; + } + })); + + @SuppressWarnings("unchecked") + Observer aObserver = mock(Observer.class); + take.subscribe(aObserver); + verify(aObserver, times(1)).onNext("one"); + verify(aObserver, times(1)).onNext("two"); + verify(aObserver, never()).onNext("three"); + verify(aObserver, never()).onError(any(Exception.class)); + verify(aObserver, times(1)).onCompleted(); + } + @Test public void testTake1() { Observable w = Observable.toObservable("one", "two", "three"); diff --git a/rxjava-core/src/main/java/rx/util/functions/Functions.java b/rxjava-core/src/main/java/rx/util/functions/Functions.java index 22b935296b6..6f6d03bcd6c 100644 --- a/rxjava-core/src/main/java/rx/util/functions/Functions.java +++ b/rxjava-core/src/main/java/rx/util/functions/Functions.java @@ -26,7 +26,7 @@ *

* Language support is provided via implementations of {@link FunctionLanguageAdaptor}. *

- * This class will dynamically look for known language adaptors on the classpath at startup or new ones can be registered using {@link #registerLanguageAdaptor(Class, FunctionLanguageAdaptor)}. + * This class will dynamically look for known language adaptors on the classpath at startup or new ones can be registered using {@link #registerLanguageAdaptor(Class[], FunctionLanguageAdaptor)}. */ public class Functions { @@ -81,7 +81,6 @@ public static Collection getRegisteredLanguageAdaptors( * Utility method for determining the type of closure/function and executing it. * * @param function - * @param args */ @SuppressWarnings({ "rawtypes" }) public static FuncN from(final Object function) {