diff --git a/rxjava-core/src/main/java/rx/Observable.java b/rxjava-core/src/main/java/rx/Observable.java index ee869e23a8..06fd2b53f2 100644 --- a/rxjava-core/src/main/java/rx/Observable.java +++ b/rxjava-core/src/main/java/rx/Observable.java @@ -19,6 +19,7 @@ import java.util.ArrayList; import java.util.Arrays; +import java.util.Comparator; import java.util.List; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.Future; @@ -54,6 +55,7 @@ import rx.operators.OperationMaterialize; import rx.operators.OperationMerge; import rx.operators.OperationMergeDelayError; +import rx.operators.OperationMinMax; import rx.operators.OperationMulticast; import rx.operators.OperationObserveOn; import rx.operators.OperationOnErrorResumeNextViaFunction; @@ -3631,6 +3633,126 @@ public static Observable averageDoubles(Observable source) { return OperationAverage.averageDoubles(source); } + /** + * Returns the minimum element in an observable sequence. + * If there are more than one minimum elements, returns the last one. + * For an empty source, it causes an {@link IllegalArgumentException}. + * + * @param source + * an observable sequence to determine the minimum element of. + * @return an observable emitting the minimum element. + * @throws IllegalArgumentException + * if the source is empty + * @see MSDN: Observable.Min + */ + public static > Observable min(Observable source) { + return OperationMinMax.min(source); + } + + /** + * Returns the minimum element in an observable sequence according to the specified comparator. + * If there are more than one minimum elements, returns the last one. + * For an empty source, it causes an {@link IllegalArgumentException}. + * + * @param comparator + * the comparer used to compare elements. + * @return an observable emitting the minimum value according to the specified comparator. + * @throws IllegalArgumentException + * if the source is empty + * @see MSDN: Observable.Min + */ + public Observable min(Comparator comparator) { + return OperationMinMax.min(this, comparator); + } + + /** + * Returns the elements in an observable sequence with the minimum key value. + * For an empty source, it returns an observable emitting an empty List. + * + * @param selector + * the key selector function. + * @return an observable emitting a List of the elements with the minimum key value. + * @see MSDN: Observable.MinBy + */ + public > Observable> minBy(Func1 selector) { + return OperationMinMax.minBy(this, selector); + } + + /** + * Returns the elements in an observable sequence with the minimum key value according to the specified comparator. + * For an empty source, it returns an observable emitting an empty List. + * + * @param selector + * the key selector function. + * @param comparator + * the comparator used to compare key values. + * @return an observable emitting a List of the elements with the minimum key value according to the specified comparator. + * @see MSDN: Observable.MinBy + */ + public Observable> minBy(Func1 selector, Comparator comparator) { + return OperationMinMax.minBy(this, selector, comparator); + } + + /** + * Returns the maximum element in an observable sequence. + * If there are more than one maximum elements, returns the last one. + * For an empty source, it causes an {@link IllegalArgumentException}. + * + * @param source + * an observable sequence to determine the maximum element of. + * @return an observable emitting the maximum element. + * @throws IllegalArgumentException + * if the source is empty. + * @see MSDN: Observable.Max + */ + public static > Observable max(Observable source) { + return OperationMinMax.max(source); + } + + /** + * Returns the maximum element in an observable sequence according to the specified comparator. + * If there are more than one maximum elements, returns the last one. + * For an empty source, it causes an {@link IllegalArgumentException}. + * + * @param comparator + * the comparer used to compare elements. + * @return an observable emitting the maximum value according to the specified comparator. + * @throws IllegalArgumentException + * if the source is empty. + * @see MSDN: Observable.Max + */ + public Observable max(Comparator comparator) { + return OperationMinMax.max(this, comparator); + } + + /** + * Returns the elements in an observable sequence with the maximum key value. + * For an empty source, it returns an observable emitting an empty List. + * + * @param selector + * the key selector function. + * @return an observable emitting a List of the elements with the maximum key value. + * @see MSDN: Observable.MaxBy + */ + public > Observable> maxBy(Func1 selector) { + return OperationMinMax.maxBy(this, selector); + } + + /** + * Returns the elements in an observable sequence with the maximum key value according to the specified comparator. + * For an empty source, it returns an observable emitting an empty List. + * + * @param selector + * the key selector function. + * @param comparator + * the comparator used to compare key values. + * @return an observable emitting a List of the elements with the maximum key value according to the specified comparator. + * @see MSDN: Observable.MaxBy + */ + public Observable> maxBy(Func1 selector, Comparator comparator) { + return OperationMinMax.maxBy(this, selector, comparator); + } + /** * Returns a {@link ConnectableObservable} that shares a single subscription to the underlying * Observable that will replay all of its items and notifications to any future {@link Observer}. diff --git a/rxjava-core/src/main/java/rx/operators/OperationMinMax.java b/rxjava-core/src/main/java/rx/operators/OperationMinMax.java new file mode 100644 index 0000000000..ba587751e8 --- /dev/null +++ b/rxjava-core/src/main/java/rx/operators/OperationMinMax.java @@ -0,0 +1,147 @@ +/** + * Copyright 2013 Netflix, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package rx.operators; + +import java.util.ArrayList; +import java.util.Comparator; +import java.util.List; + +import rx.Observable; +import rx.util.functions.Func1; +import rx.util.functions.Func2; + +/** + * Returns the minimum element in an observable sequence. + */ +public class OperationMinMax { + + public static > Observable min( + Observable source) { + return minMax(source, -1L); + } + + public static Observable min(Observable source, + final Comparator comparator) { + return minMax(source, comparator, -1L); + } + + public static > Observable> minBy( + Observable source, final Func1 selector) { + return minMaxBy(source, selector, -1L); + } + + public static Observable> minBy(Observable source, + final Func1 selector, final Comparator comparator) { + return minMaxBy(source, selector, comparator, -1L); + } + + public static > Observable max( + Observable source) { + return minMax(source, 1L); + } + + public static Observable max(Observable source, + final Comparator comparator) { + return minMax(source, comparator, 1L); + } + + public static > Observable> maxBy( + Observable source, final Func1 selector) { + return minMaxBy(source, selector, 1L); + } + + public static Observable> maxBy(Observable source, + final Func1 selector, final Comparator comparator) { + return minMaxBy(source, selector, comparator, 1L); + } + + private static > Observable minMax( + Observable source, final long flag) { + return source.reduce(new Func2() { + @Override + public T call(T acc, T value) { + if (flag * acc.compareTo(value) > 0) { + return acc; + } + return value; + } + }); + } + + private static Observable minMax(Observable source, + final Comparator comparator, final long flag) { + return source.reduce(new Func2() { + @Override + public T call(T acc, T value) { + if (flag * comparator.compare(acc, value) > 0) { + return acc; + } + return value; + } + }); + } + + private static > Observable> minMaxBy( + Observable source, final Func1 selector, final long flag) { + return source.reduce(new ArrayList(), + new Func2, T, List>() { + + @Override + public List call(List acc, T value) { + if (acc.isEmpty()) { + acc.add(value); + } else { + int compareResult = selector.call(acc.get(0)) + .compareTo(selector.call(value)); + if (compareResult == 0) { + acc.add(value); + } else if (flag * compareResult < 0) { + acc.clear(); + acc.add(value); + } + } + return acc; + } + }); + } + + private static Observable> minMaxBy(Observable source, + final Func1 selector, final Comparator comparator, + final long flag) { + return source.reduce(new ArrayList(), + new Func2, T, List>() { + + @Override + public List call(List acc, T value) { + if (acc.isEmpty()) { + acc.add(value); + } else { + int compareResult = comparator.compare( + selector.call(acc.get(0)), + selector.call(value)); + if (compareResult == 0) { + acc.add(value); + } else if (flag * compareResult < 0) { + acc.clear(); + acc.add(value); + } + } + return acc; + } + }); + } + +} diff --git a/rxjava-core/src/test/java/rx/operators/OperationMinMaxTest.java b/rxjava-core/src/test/java/rx/operators/OperationMinMaxTest.java new file mode 100644 index 0000000000..f2deff9e57 --- /dev/null +++ b/rxjava-core/src/test/java/rx/operators/OperationMinMaxTest.java @@ -0,0 +1,359 @@ +/** + * Copyright 2013 Netflix, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package rx.operators; + +import static org.mockito.Matchers.isA; +import static org.mockito.Mockito.inOrder; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.times; +import static rx.operators.OperationMinMax.max; +import static rx.operators.OperationMinMax.maxBy; +import static rx.operators.OperationMinMax.min; +import static rx.operators.OperationMinMax.minBy; + +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Comparator; +import java.util.List; + +import org.junit.Test; +import org.mockito.InOrder; + +import rx.Observable; +import rx.Observer; +import rx.util.functions.Func1; + +public class OperationMinMaxTest { + @Test + public void testMin() { + Observable observable = min(Observable.from(2, 3, 1, 4)); + + @SuppressWarnings("unchecked") + Observer observer = (Observer) mock(Observer.class); + + observable.subscribe(observer); + InOrder inOrder = inOrder(observer); + inOrder.verify(observer, times(1)).onNext(1); + inOrder.verify(observer, times(1)).onCompleted(); + inOrder.verifyNoMoreInteractions(); + } + + @Test + public void testMinWithEmpty() { + Observable observable = min(Observable. empty()); + + @SuppressWarnings("unchecked") + Observer observer = (Observer) mock(Observer.class); + + observable.subscribe(observer); + InOrder inOrder = inOrder(observer); + inOrder.verify(observer, times(1)).onError( + isA(IllegalArgumentException.class)); + inOrder.verifyNoMoreInteractions(); + } + + @Test + public void testMinWithComparator() { + Observable observable = min(Observable.from(2, 3, 1, 4), + new Comparator() { + @Override + public int compare(Integer o1, Integer o2) { + return o2 - o1; + } + }); + + @SuppressWarnings("unchecked") + Observer observer = (Observer) mock(Observer.class); + + observable.subscribe(observer); + InOrder inOrder = inOrder(observer); + inOrder.verify(observer, times(1)).onNext(4); + inOrder.verify(observer, times(1)).onCompleted(); + inOrder.verifyNoMoreInteractions(); + } + + @Test + public void testMinWithComparatorAndEmpty() { + Observable observable = min(Observable. empty(), + new Comparator() { + @Override + public int compare(Integer o1, Integer o2) { + return o2 - o1; + } + }); + + @SuppressWarnings("unchecked") + Observer observer = (Observer) mock(Observer.class); + + observable.subscribe(observer); + InOrder inOrder = inOrder(observer); + inOrder.verify(observer, times(1)).onError( + isA(IllegalArgumentException.class)); + inOrder.verifyNoMoreInteractions(); + } + + @Test + public void testMinBy() { + Observable> observable = minBy( + Observable.from("1", "2", "3", "4", "5", "6"), + new Func1() { + @Override + public Integer call(String t1) { + return Integer.parseInt(t1) % 2; + } + }); + + @SuppressWarnings("unchecked") + Observer> observer = (Observer>) mock(Observer.class); + + observable.subscribe(observer); + InOrder inOrder = inOrder(observer); + inOrder.verify(observer, times(1)).onNext(Arrays.asList("2", "4", "6")); + inOrder.verify(observer, times(1)).onCompleted(); + inOrder.verifyNoMoreInteractions(); + } + + @Test + public void testMinByWithEmpty() { + Observable> observable = minBy( + Observable. empty(), new Func1() { + @Override + public Integer call(String t1) { + return Integer.parseInt(t1) % 2; + } + }); + + @SuppressWarnings("unchecked") + Observer> observer = (Observer>) mock(Observer.class); + + observable.subscribe(observer); + InOrder inOrder = inOrder(observer); + inOrder.verify(observer, times(1)).onNext(new ArrayList()); + inOrder.verify(observer, times(1)).onCompleted(); + inOrder.verifyNoMoreInteractions(); + } + + @Test + public void testMinByWithComparator() { + Observable> observable = minBy( + Observable.from("1", "2", "3", "4", "5", "6"), + new Func1() { + @Override + public Integer call(String t1) { + return Integer.parseInt(t1) % 2; + } + }, new Comparator() { + @Override + public int compare(Integer o1, Integer o2) { + return o2 - o1; + } + }); + + @SuppressWarnings("unchecked") + Observer> observer = (Observer>) mock(Observer.class); + + observable.subscribe(observer); + InOrder inOrder = inOrder(observer); + inOrder.verify(observer, times(1)).onNext(Arrays.asList("1", "3", "5")); + inOrder.verify(observer, times(1)).onCompleted(); + inOrder.verifyNoMoreInteractions(); + } + + @Test + public void testMinByWithComparatorAndEmpty() { + Observable> observable = minBy( + Observable. empty(), new Func1() { + @Override + public Integer call(String t1) { + return Integer.parseInt(t1) % 2; + } + }, new Comparator() { + @Override + public int compare(Integer o1, Integer o2) { + return o2 - o1; + } + }); + + @SuppressWarnings("unchecked") + Observer> observer = (Observer>) mock(Observer.class); + + observable.subscribe(observer); + InOrder inOrder = inOrder(observer); + inOrder.verify(observer, times(1)).onNext(new ArrayList()); + inOrder.verify(observer, times(1)).onCompleted(); + inOrder.verifyNoMoreInteractions(); + } + + @Test + public void testMax() { + Observable observable = max(Observable.from(2, 3, 1, 4)); + + @SuppressWarnings("unchecked") + Observer observer = (Observer) mock(Observer.class); + + observable.subscribe(observer); + InOrder inOrder = inOrder(observer); + inOrder.verify(observer, times(1)).onNext(4); + inOrder.verify(observer, times(1)).onCompleted(); + inOrder.verifyNoMoreInteractions(); + } + + @Test + public void testMaxWithEmpty() { + Observable observable = max(Observable. empty()); + + @SuppressWarnings("unchecked") + Observer observer = (Observer) mock(Observer.class); + + observable.subscribe(observer); + InOrder inOrder = inOrder(observer); + inOrder.verify(observer, times(1)).onError( + isA(IllegalArgumentException.class)); + inOrder.verifyNoMoreInteractions(); + } + + @Test + public void testMaxWithComparator() { + Observable observable = max(Observable.from(2, 3, 1, 4), + new Comparator() { + @Override + public int compare(Integer o1, Integer o2) { + return o2 - o1; + } + }); + + @SuppressWarnings("unchecked") + Observer observer = (Observer) mock(Observer.class); + + observable.subscribe(observer); + InOrder inOrder = inOrder(observer); + inOrder.verify(observer, times(1)).onNext(1); + inOrder.verify(observer, times(1)).onCompleted(); + inOrder.verifyNoMoreInteractions(); + } + + @Test + public void testMaxWithComparatorAndEmpty() { + Observable observable = max(Observable. empty(), + new Comparator() { + @Override + public int compare(Integer o1, Integer o2) { + return o2 - o1; + } + }); + + @SuppressWarnings("unchecked") + Observer observer = (Observer) mock(Observer.class); + + observable.subscribe(observer); + InOrder inOrder = inOrder(observer); + inOrder.verify(observer, times(1)).onError( + isA(IllegalArgumentException.class)); + inOrder.verifyNoMoreInteractions(); + } + + @Test + public void testMaxBy() { + Observable> observable = maxBy( + Observable.from("1", "2", "3", "4", "5", "6"), + new Func1() { + @Override + public Integer call(String t1) { + return Integer.parseInt(t1) % 2; + } + }); + + @SuppressWarnings("unchecked") + Observer> observer = (Observer>) mock(Observer.class); + + observable.subscribe(observer); + InOrder inOrder = inOrder(observer); + inOrder.verify(observer, times(1)).onNext(Arrays.asList("1", "3", "5")); + inOrder.verify(observer, times(1)).onCompleted(); + inOrder.verifyNoMoreInteractions(); + } + + @Test + public void testMaxByWithEmpty() { + Observable> observable = maxBy( + Observable. empty(), new Func1() { + @Override + public Integer call(String t1) { + return Integer.parseInt(t1) % 2; + } + }); + + @SuppressWarnings("unchecked") + Observer> observer = (Observer>) mock(Observer.class); + + observable.subscribe(observer); + InOrder inOrder = inOrder(observer); + inOrder.verify(observer, times(1)).onNext(new ArrayList()); + inOrder.verify(observer, times(1)).onCompleted(); + inOrder.verifyNoMoreInteractions(); + } + + @Test + public void testMaxByWithComparator() { + Observable> observable = maxBy( + Observable.from("1", "2", "3", "4", "5", "6"), + new Func1() { + @Override + public Integer call(String t1) { + return Integer.parseInt(t1) % 2; + } + }, new Comparator() { + @Override + public int compare(Integer o1, Integer o2) { + return o2 - o1; + } + }); + + @SuppressWarnings("unchecked") + Observer> observer = (Observer>) mock(Observer.class); + + observable.subscribe(observer); + InOrder inOrder = inOrder(observer); + inOrder.verify(observer, times(1)).onNext(Arrays.asList("2", "4", "6")); + inOrder.verify(observer, times(1)).onCompleted(); + inOrder.verifyNoMoreInteractions(); + } + + @Test + public void testMaxByWithComparatorAndEmpty() { + Observable> observable = maxBy( + Observable. empty(), new Func1() { + @Override + public Integer call(String t1) { + return Integer.parseInt(t1) % 2; + } + }, new Comparator() { + @Override + public int compare(Integer o1, Integer o2) { + return o2 - o1; + } + }); + + @SuppressWarnings("unchecked") + Observer> observer = (Observer>) mock(Observer.class); + + observable.subscribe(observer); + InOrder inOrder = inOrder(observer); + inOrder.verify(observer, times(1)).onNext(new ArrayList()); + inOrder.verify(observer, times(1)).onCompleted(); + inOrder.verifyNoMoreInteractions(); + } +}