Skip to content

Commit

Permalink
Merge pull request #179 from benjchristensen/groupBy-review
Browse files Browse the repository at this point in the history
Operator GroupBy Pull Request - Review and Refactor
  • Loading branch information
benjchristensen committed Mar 12, 2013
2 parents a5c2931 + 571a970 commit cf83270
Show file tree
Hide file tree
Showing 4 changed files with 290 additions and 1 deletion.
59 changes: 58 additions & 1 deletion rxjava-core/src/main/java/rx/Observable.java
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
import org.mockito.Mockito;
import org.mockito.MockitoAnnotations;

import rx.observables.GroupedObservable;
import rx.operators.*;
import rx.plugins.RxJavaErrorHandler;
import rx.plugins.RxJavaPlugins;
Expand Down Expand Up @@ -65,7 +66,7 @@ protected Observable() {
this(null, false);
}

private Observable(Func1<Observer<T>, Subscription> onSubscribe, boolean isTrusted) {
/*package*/ Observable(Func1<Observer<T>, Subscription> onSubscribe, boolean isTrusted) {
this.onSubscribe = onSubscribe;
this.isTrusted = isTrusted;
}
Expand Down Expand Up @@ -1031,6 +1032,34 @@ public static <T> Observable<T> concat(Observable<T>... source) {
return _create(OperationConcat.concat(source));
}

/**
* Groups the elements of an observable and selects the resulting elements by using a specified function.
*
* @param source an observable whose elements to group.
* @param keySelector a function to extract the key for each element.
* @param elementSelector a function to map each source element to an element in an observable group.
* @param <K> the key type.
* @param <T> the source type.
* @param <R> the resulting observable type.
* @return an observable of observable groups, each of which corresponds to a unique key value, containing all elements that share that same key value.
*/
public static <K, T, R> Observable<GroupedObservable<K, R>> groupBy(Observable<T> source, final Func1<T, K> keySelector, final Func1<T, R> elementSelector) {
return _create(OperatorGroupBy.groupBy(source, keySelector, elementSelector));
}

/**
* Groups the elements of an observable according to a specified key selector function and
*
* @param source an observable whose elements to group.
* @param keySelector a function to extract the key for each element.
* @param <K> the key type.
* @param <T> the source type.
* @return an observable of observable groups, each of which corresponds to a unique key value, containing all elements that share that same key value.
*/
public static <K, T> Observable<GroupedObservable<K, T>> groupBy(Observable<T> source, final Func1<T, K> keySelector) {
return _create(OperatorGroupBy.groupBy(source, keySelector));
}

/**
* Same functionality as <code>merge</code> except that errors received to onError will be held until all sequences have finished (onComplete/onError) before sending the error.
* <p>
Expand Down Expand Up @@ -2931,6 +2960,34 @@ public Iterable<T> toIterable() {
return toIterable(this);
}

public Observable<T> startWith(T... values) {
return concat(Observable.<T>from(values), this);
}

/**
* Groups the elements of an observable and selects the resulting elements by using a specified function.
*
* @param keySelector a function to extract the key for each element.
* @param elementSelector a function to map each source element to an element in an observable group.
* @param <K> the key type.
* @param <R> the resulting observable type.
* @return an observable of observable groups, each of which corresponds to a unique key value, containing all elements that share that same key value.
*/
public <K, R> Observable<GroupedObservable<K, R>> groupBy(final Func1<T, K> keySelector, final Func1<T, R> elementSelector) {
return groupBy(this, keySelector, elementSelector);
}

/**
* Groups the elements of an observable according to a specified key selector function and
*
* @param keySelector a function to extract the key for each element.
* @param <K> the key type.
* @return an observable of observable groups, each of which corresponds to a unique key value, containing all elements that share that same key value.
*/
public <K> Observable<GroupedObservable<K, T>> groupBy(final Func1<T, K> keySelector) {
return groupBy(this, keySelector);
}

/**
* Returns an iterator that iterates all values of the observable.
*
Expand Down
43 changes: 43 additions & 0 deletions rxjava-core/src/main/java/rx/observables/GroupedObservable.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
/**
* Copyright 2013 Netflix, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package rx.observables;

import rx.Observable;
import rx.Observer;
import rx.Subscription;
import rx.util.functions.Func1;

/**
* An {@link Observable} that has been grouped by a key whose value can be obtained using {@link #getKey()} <p>
*
* @see {@link Observable#groupBy(Observable, Func1)}
*
* @param <K>
* @param <T>
*/
public class GroupedObservable<K, T> extends Observable<T> {
private final K key;

public GroupedObservable(K key, Func1<Observer<T>, Subscription> onSubscribe) {
super(onSubscribe);
this.key = key;
}

public K getKey() {
return key;
}

}
175 changes: 175 additions & 0 deletions rxjava-core/src/main/java/rx/operators/OperatorGroupBy.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,175 @@
/**
* Copyright 2013 Netflix, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package rx.operators;

import org.junit.Test;
import rx.Observable;
import rx.Observer;
import rx.Subscription;
import rx.observables.GroupedObservable;
import rx.util.functions.Func1;
import rx.util.functions.Functions;

import java.util.*;
import java.util.concurrent.ConcurrentHashMap;

import static org.junit.Assert.*;

public final class OperatorGroupBy {

public static <K, T, R> Func1<Observer<GroupedObservable<K, R>>, Subscription> groupBy(Observable<T> source, final Func1<T, K> keySelector, final Func1<T, R> elementSelector) {

final Observable<KeyValue<K, R>> keyval = source.map(new Func1<T, KeyValue<K, R>>() {
@Override
public KeyValue<K, R> call(T t) {
K key = keySelector.call(t);
R value = elementSelector.call(t);

return new KeyValue<K, R>(key, value);
}
});

return new GroupBy<K, R>(keyval);
}

public static <K, T> Func1<Observer<GroupedObservable<K, T>>, Subscription> groupBy(Observable<T> source, final Func1<T, K> keySelector) {
return groupBy(source, keySelector, Functions.<T>identity());
}

private static class GroupBy<K, V> implements Func1<Observer<GroupedObservable<K, V>>, Subscription> {
private final Observable<KeyValue<K, V>> source;
private final ConcurrentHashMap<K, Boolean> keys = new ConcurrentHashMap<K, Boolean>();

private GroupBy(Observable<KeyValue<K, V>> source) {
this.source = source;
}


@Override
public Subscription call(final Observer<GroupedObservable<K, V>> observer) {

return source.subscribe(new Observer<KeyValue<K, V>>() {

@Override
public void onCompleted() {
observer.onCompleted();
}

@Override
public void onError(Exception e) {
observer.onError(e);
}

@Override
public void onNext(final KeyValue<K, V> args) {
K key = args.key;
boolean newGroup = keys.putIfAbsent(key, true) == null;
if (newGroup) {
observer.onNext(buildObservableFor(source, key));
}
}

});
}
}

private static <K, R> GroupedObservable<K, R> buildObservableFor(Observable<KeyValue<K, R>> source, final K key) {
final Observable<R> observable = source.filter(new Func1<KeyValue<K, R>, Boolean>() {
@Override
public Boolean call(KeyValue<K, R> pair) {
return key.equals(pair.key);
}
}).map(new Func1<KeyValue<K, R>, R>() {
@Override
public R call(KeyValue<K, R> pair) {
return pair.value;
}
});
return new GroupedObservable<K, R>(key, new Func1<Observer<R>, Subscription>() {

@Override
public Subscription call(Observer<R> observer) {
return observable.subscribe(observer);
}

});
}

private static class KeyValue<K, V> {
private final K key;
private final V value;

private KeyValue(K key, V value) {
this.key = key;
this.value = value;
}
}

public static class UnitTest {
final Func1<String, Integer> length = new Func1<String, Integer>() {
@Override
public Integer call(String s) {
return s.length();
}
};

@Test
public void testGroupBy() {
Observable<String> source = Observable.from("one", "two", "three", "four", "five", "six");
Observable<GroupedObservable<Integer, String>> grouped = Observable.create(groupBy(source, length));

Map<Integer, List<String>> map = toMap(grouped);

assertEquals(3, map.size());
assertEquals(Arrays.asList("one", "two", "six"), map.get(3));
assertEquals(Arrays.asList("four", "five"), map.get(4));
assertEquals(Arrays.asList("three"), map.get(5));

}

@Test
public void testEmpty() {
Observable<String> source = Observable.from();
Observable<GroupedObservable<Integer, String>> grouped = Observable.create(groupBy(source, length));

Map<Integer, List<String>> map = toMap(grouped);

assertTrue(map.isEmpty());
}

private static <K, V> Map<K, List<V>> toMap(Observable<GroupedObservable<K, V>> observable) {
Map<K, List<V>> result = new HashMap<K, List<V>>();
for (GroupedObservable<K, V> g : observable.toIterable()) {
K key = g.getKey();

for (V value : g.toIterable()) {
List<V> values = result.get(key);
if (values == null) {
values = new ArrayList<V>();
result.put(key, values);
}

values.add(value);
}

}

return result;
}

}

}
14 changes: 14 additions & 0 deletions rxjava-core/src/main/java/rx/util/functions/Functions.java
Original file line number Diff line number Diff line change
Expand Up @@ -555,6 +555,11 @@ public static <T> Func1<T, Boolean> alwaysTrue() {
return (Func1<T, Boolean>) AlwaysTrue.INSTANCE;
}

@SuppressWarnings("unchecked")
public static <T> Func1<T, T> identity() {
return (Func1<T, T>) Identity.INSTANCE;
}

private enum AlwaysTrue implements Func1<Object, Boolean> {
INSTANCE;

Expand All @@ -564,4 +569,13 @@ public Boolean call(Object o) {
}
}

private enum Identity implements Func1<Object, Object> {
INSTANCE;

@Override
public Object call(Object o) {
return o;
}
}

}

0 comments on commit cf83270

Please sign in to comment.