From 6152588227e35365ec19c241a048bc7596f7e175 Mon Sep 17 00:00:00 2001 From: prabirshrestha Date: Wed, 13 Mar 2013 18:52:10 -0400 Subject: [PATCH 1/2] implemented OperationWhere --- .../java/rx/operators/OperationWhere.java | 61 +++++++++++++++++++ 1 file changed, 61 insertions(+) create mode 100644 rxjava-core/src/main/java/rx/operators/OperationWhere.java diff --git a/rxjava-core/src/main/java/rx/operators/OperationWhere.java b/rxjava-core/src/main/java/rx/operators/OperationWhere.java new file mode 100644 index 0000000000..8fd7d2b36b --- /dev/null +++ b/rxjava-core/src/main/java/rx/operators/OperationWhere.java @@ -0,0 +1,61 @@ +/** + * Copyright 2013 Netflix, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package rx.operators; + +import org.junit.Test; +import org.mockito.Mockito; + +import rx.Observable; +import rx.Observer; +import rx.Subscription; +import rx.util.functions.Func1; + +import static org.mockito.Matchers.any; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; + +public final class OperationWhere { + + public static Func1, Subscription> where(Observable that, Func1 predicate) { + return OperationFilter.filter(that, predicate); + } + + public static class UnitTest { + + @Test + public void testWhere() { + Observable w = Observable.toObservable("one", "two", "three"); + Observable observable = Observable.create(where(w, new Func1() { + + @Override + public Boolean call(String t1) { + return t1.equals("two"); + } + })); + + @SuppressWarnings("unchecked") + Observer aObserver = mock(Observer.class); + observable.subscribe(aObserver); + verify(aObserver, Mockito.never()).onNext("one"); + verify(aObserver, times(1)).onNext("two"); + verify(aObserver, Mockito.never()).onNext("three"); + verify(aObserver, Mockito.never()).onError(any(Exception.class)); + verify(aObserver, times(1)).onCompleted(); + } + } + +} From 141a9f8108f2a7249734cf2611e6ad71e1503c39 Mon Sep 17 00:00:00 2001 From: prabirshrestha Date: Wed, 13 Mar 2013 19:01:15 -0400 Subject: [PATCH 2/2] added where operation to Observable --- rxjava-core/src/main/java/rx/Observable.java | 31 ++++++++++++++++++++ 1 file changed, 31 insertions(+) diff --git a/rxjava-core/src/main/java/rx/Observable.java b/rxjava-core/src/main/java/rx/Observable.java index c125682bd0..5d11c7b9ed 100644 --- a/rxjava-core/src/main/java/rx/Observable.java +++ b/rxjava-core/src/main/java/rx/Observable.java @@ -40,6 +40,7 @@ import rx.operators.OperationDefer; import rx.operators.OperationDematerialize; import rx.operators.OperationFilter; +import rx.operators.OperationWhere; import rx.operators.OperationMap; import rx.operators.OperationMaterialize; import rx.operators.OperationMerge; @@ -722,6 +723,21 @@ public Boolean call(T t1) { }); } + /** + * Filters an Observable by discarding any of its emissions that do not meet some test. + *

+ * + * + * @param that + * the Observable to filter + * @param predicate + * a function that evaluates the items emitted by the source Observable, returning true if they pass the filter + * @return an Observable that emits only those items in the original Observable that the filter evaluates as true + */ + public static Observable where(Observable that, Func1 predicate) { + return _create(OperationWhere.where(that, predicate)); + } + /** * Converts an {@link Iterable} sequence to an Observable sequence. * @@ -2419,6 +2435,21 @@ public Boolean call(T t1) { }); } + /** + * Filters an Observable by discarding any of its emissions that do not meet some test. + *

+ * + * + * @param predicate + * a function that evaluates the items emitted by the source Observable, returning + * true if they pass the filter + * @return an Observable that emits only those items in the original Observable that the filter + * evaluates as true + */ + public Observable where(Func1 predicate) { + return where(this, predicate); + } + /** * Returns the last element of an observable sequence with a specified source. *