diff --git a/rxjava-core/src/main/java/rx/Observable.java b/rxjava-core/src/main/java/rx/Observable.java index c6f17c3149..b7fba3ca30 100644 --- a/rxjava-core/src/main/java/rx/Observable.java +++ b/rxjava-core/src/main/java/rx/Observable.java @@ -29,6 +29,7 @@ import org.mockito.Mockito; import org.mockito.MockitoAnnotations; +import rx.observables.GroupedObservable; import rx.operators.*; import rx.plugins.RxJavaErrorHandler; import rx.plugins.RxJavaPlugins; @@ -65,7 +66,7 @@ protected Observable() { this(null, false); } - private Observable(Func1, Subscription> onSubscribe, boolean isTrusted) { + /*package*/ Observable(Func1, Subscription> onSubscribe, boolean isTrusted) { this.onSubscribe = onSubscribe; this.isTrusted = isTrusted; } @@ -1031,6 +1032,34 @@ public static Observable concat(Observable... source) { return _create(OperationConcat.concat(source)); } + /** + * Groups the elements of an observable and selects the resulting elements by using a specified function. + * + * @param source an observable whose elements to group. + * @param keySelector a function to extract the key for each element. + * @param elementSelector a function to map each source element to an element in an observable group. + * @param the key type. + * @param the source type. + * @param the resulting observable type. + * @return an observable of observable groups, each of which corresponds to a unique key value, containing all elements that share that same key value. + */ + public static Observable> groupBy(Observable source, final Func1 keySelector, final Func1 elementSelector) { + return _create(OperatorGroupBy.groupBy(source, keySelector, elementSelector)); + } + + /** + * Groups the elements of an observable according to a specified key selector function and + * + * @param source an observable whose elements to group. + * @param keySelector a function to extract the key for each element. + * @param the key type. + * @param the source type. + * @return an observable of observable groups, each of which corresponds to a unique key value, containing all elements that share that same key value. + */ + public static Observable> groupBy(Observable source, final Func1 keySelector) { + return _create(OperatorGroupBy.groupBy(source, keySelector)); + } + /** * Same functionality as merge except that errors received to onError will be held until all sequences have finished (onComplete/onError) before sending the error. *

@@ -2931,6 +2960,34 @@ public Iterable toIterable() { return toIterable(this); } + public Observable startWith(T... values) { + return concat(Observable.from(values), this); + } + + /** + * Groups the elements of an observable and selects the resulting elements by using a specified function. + * + * @param keySelector a function to extract the key for each element. + * @param elementSelector a function to map each source element to an element in an observable group. + * @param the key type. + * @param the resulting observable type. + * @return an observable of observable groups, each of which corresponds to a unique key value, containing all elements that share that same key value. + */ + public Observable> groupBy(final Func1 keySelector, final Func1 elementSelector) { + return groupBy(this, keySelector, elementSelector); + } + + /** + * Groups the elements of an observable according to a specified key selector function and + * + * @param keySelector a function to extract the key for each element. + * @param the key type. + * @return an observable of observable groups, each of which corresponds to a unique key value, containing all elements that share that same key value. + */ + public Observable> groupBy(final Func1 keySelector) { + return groupBy(this, keySelector); + } + /** * Returns an iterator that iterates all values of the observable. * diff --git a/rxjava-core/src/main/java/rx/observables/GroupedObservable.java b/rxjava-core/src/main/java/rx/observables/GroupedObservable.java new file mode 100644 index 0000000000..28ee67de49 --- /dev/null +++ b/rxjava-core/src/main/java/rx/observables/GroupedObservable.java @@ -0,0 +1,43 @@ +/** + * Copyright 2013 Netflix, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package rx.observables; + +import rx.Observable; +import rx.Observer; +import rx.Subscription; +import rx.util.functions.Func1; + +/** + * An {@link Observable} that has been grouped by a key whose value can be obtained using {@link #getKey()}

+ * + * @see {@link Observable#groupBy(Observable, Func1)} + * + * @param + * @param + */ +public class GroupedObservable extends Observable { + private final K key; + + public GroupedObservable(K key, Func1, Subscription> onSubscribe) { + super(onSubscribe); + this.key = key; + } + + public K getKey() { + return key; + } + +} diff --git a/rxjava-core/src/main/java/rx/operators/OperatorGroupBy.java b/rxjava-core/src/main/java/rx/operators/OperatorGroupBy.java new file mode 100644 index 0000000000..2ebe77e652 --- /dev/null +++ b/rxjava-core/src/main/java/rx/operators/OperatorGroupBy.java @@ -0,0 +1,175 @@ +/** + * Copyright 2013 Netflix, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package rx.operators; + +import org.junit.Test; +import rx.Observable; +import rx.Observer; +import rx.Subscription; +import rx.observables.GroupedObservable; +import rx.util.functions.Func1; +import rx.util.functions.Functions; + +import java.util.*; +import java.util.concurrent.ConcurrentHashMap; + +import static org.junit.Assert.*; + +public final class OperatorGroupBy { + + public static Func1>, Subscription> groupBy(Observable source, final Func1 keySelector, final Func1 elementSelector) { + + final Observable> keyval = source.map(new Func1>() { + @Override + public KeyValue call(T t) { + K key = keySelector.call(t); + R value = elementSelector.call(t); + + return new KeyValue(key, value); + } + }); + + return new GroupBy(keyval); + } + + public static Func1>, Subscription> groupBy(Observable source, final Func1 keySelector) { + return groupBy(source, keySelector, Functions.identity()); + } + + private static class GroupBy implements Func1>, Subscription> { + private final Observable> source; + private final ConcurrentHashMap keys = new ConcurrentHashMap(); + + private GroupBy(Observable> source) { + this.source = source; + } + + + @Override + public Subscription call(final Observer> observer) { + + return source.subscribe(new Observer>() { + + @Override + public void onCompleted() { + observer.onCompleted(); + } + + @Override + public void onError(Exception e) { + observer.onError(e); + } + + @Override + public void onNext(final KeyValue args) { + K key = args.key; + boolean newGroup = keys.putIfAbsent(key, true) == null; + if (newGroup) { + observer.onNext(buildObservableFor(source, key)); + } + } + + }); + } + } + + private static GroupedObservable buildObservableFor(Observable> source, final K key) { + final Observable observable = source.filter(new Func1, Boolean>() { + @Override + public Boolean call(KeyValue pair) { + return key.equals(pair.key); + } + }).map(new Func1, R>() { + @Override + public R call(KeyValue pair) { + return pair.value; + } + }); + return new GroupedObservable(key, new Func1, Subscription>() { + + @Override + public Subscription call(Observer observer) { + return observable.subscribe(observer); + } + + }); + } + + private static class KeyValue { + private final K key; + private final V value; + + private KeyValue(K key, V value) { + this.key = key; + this.value = value; + } + } + + public static class UnitTest { + final Func1 length = new Func1() { + @Override + public Integer call(String s) { + return s.length(); + } + }; + + @Test + public void testGroupBy() { + Observable source = Observable.from("one", "two", "three", "four", "five", "six"); + Observable> grouped = Observable.create(groupBy(source, length)); + + Map> map = toMap(grouped); + + assertEquals(3, map.size()); + assertEquals(Arrays.asList("one", "two", "six"), map.get(3)); + assertEquals(Arrays.asList("four", "five"), map.get(4)); + assertEquals(Arrays.asList("three"), map.get(5)); + + } + + @Test + public void testEmpty() { + Observable source = Observable.from(); + Observable> grouped = Observable.create(groupBy(source, length)); + + Map> map = toMap(grouped); + + assertTrue(map.isEmpty()); + } + + private static Map> toMap(Observable> observable) { + Map> result = new HashMap>(); + for (GroupedObservable g : observable.toIterable()) { + K key = g.getKey(); + + for (V value : g.toIterable()) { + List values = result.get(key); + if (values == null) { + values = new ArrayList(); + result.put(key, values); + } + + values.add(value); + } + + } + + return result; + } + + } + +} diff --git a/rxjava-core/src/main/java/rx/util/functions/Functions.java b/rxjava-core/src/main/java/rx/util/functions/Functions.java index 3267e0f540..93a4049c08 100644 --- a/rxjava-core/src/main/java/rx/util/functions/Functions.java +++ b/rxjava-core/src/main/java/rx/util/functions/Functions.java @@ -555,6 +555,11 @@ public static Func1 alwaysTrue() { return (Func1) AlwaysTrue.INSTANCE; } + @SuppressWarnings("unchecked") + public static Func1 identity() { + return (Func1) Identity.INSTANCE; + } + private enum AlwaysTrue implements Func1 { INSTANCE; @@ -564,4 +569,13 @@ public Boolean call(Object o) { } } + private enum Identity implements Func1 { + INSTANCE; + + @Override + public Object call(Object o) { + return o; + } + } + }