diff --git a/rxjava-core/src/main/java/rx/GroupedObservable.java b/rxjava-core/src/main/java/rx/GroupedObservable.java index c51aca92b4..a173394059 100644 --- a/rxjava-core/src/main/java/rx/GroupedObservable.java +++ b/rxjava-core/src/main/java/rx/GroupedObservable.java @@ -1,12 +1,12 @@ /** * Copyright 2013 Netflix, Inc. - * + * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at - * + * * http://www.apache.org/licenses/LICENSE-2.0 - * + * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. @@ -15,264 +15,26 @@ */ package rx; -import rx.util.functions.*; - -import java.util.List; -import java.util.Map; +import rx.util.functions.Func1; +/** + * An {@link Observable} that has been grouped by a key whose value can be obtained using {@link #getKey()}

+ * + * @see {@link Observable#groupBy(Observable, Func1)} + * + * @param + * @param + */ public class GroupedObservable extends Observable { private final K key; - private final Observable delegate; - public GroupedObservable(K key, Observable delegate) { + public GroupedObservable(K key, Func1, Subscription> onSubscribe) { + super(onSubscribe, true); this.key = key; - this.delegate = delegate; } public K getKey() { return key; } - public Subscription subscribe(Observer observer) { - return delegate.subscribe(observer); - } - - @Override - @SuppressWarnings({ "rawtypes", "unchecked" }) - public Subscription subscribe(Map callbacks) { - return delegate.subscribe(callbacks); - } - - @Override - @SuppressWarnings({ "rawtypes", "unchecked" }) - public Subscription subscribe(Object o) { - return delegate.subscribe(o); - } - - public Subscription subscribe(Action1 onNext) { - return delegate.subscribe(onNext); - } - - @Override - @SuppressWarnings({ "rawtypes", "unchecked" }) - public Subscription subscribe(Object onNext, Object onError) { - return delegate.subscribe(onNext, onError); - } - - public Subscription subscribe(Action1 onNext, Action1 onError) { - return delegate.subscribe(onNext, onError); - } - - @Override - @SuppressWarnings({ "rawtypes", "unchecked" }) - public Subscription subscribe(Object onNext, Object onError, Object onComplete) { - return delegate.subscribe(onNext, onError, onComplete); - } - - public Subscription subscribe(Action1 onNext, Action1 onError, Action0 onComplete) { - return delegate.subscribe(onNext, onError, onComplete); - } - - public void forEach(Action1 onNext) { - delegate.forEach(onNext); - } - - @Override - @SuppressWarnings({ "rawtypes", "unchecked" }) - public void forEach(Object o) { - delegate.forEach(o); - } - - @Override - public T single() { - return delegate.single(); - } - - public T single(Func1 predicate) { - return delegate.single(predicate); - } - - @Override - public T single(Object predicate) { - return delegate.single(predicate); - } - - public T singleOrDefault(T defaultValue) { - return delegate.singleOrDefault(defaultValue); - } - - public T singleOrDefault(T defaultValue, Func1 predicate) { - return delegate.singleOrDefault(defaultValue, predicate); - } - - public T singleOrDefault(T defaultValue, Object predicate) { - return delegate.singleOrDefault(defaultValue, predicate); - } - - public Observable filter(Func1 predicate) { - return delegate.filter(predicate); - } - - @Override - public Observable filter(Object callback) { - return delegate.filter(callback); - } - - @Override - public Observable last() { - return delegate.last(); - } - - public T lastOrDefault(T defaultValue) { - return delegate.lastOrDefault(defaultValue); - } - - public T lastOrDefault(T defaultValue, Func1 predicate) { - return delegate.lastOrDefault(defaultValue, predicate); - } - - public T lastOrDefault(T defaultValue, Object predicate) { - return delegate.lastOrDefault(defaultValue, predicate); - } - - public Observable map(Func1 func) { - return delegate.map(func); - } - - @Override - public Observable map(Object callback) { - return delegate.map(callback); - } - - public Observable mapMany(Func1> func) { - return delegate.mapMany(func); - } - - @Override - public Observable mapMany(Object callback) { - return delegate.mapMany(callback); - } - - @Override - public Observable> materialize() { - return delegate.materialize(); - } - - public Observable onErrorResumeNext(Func1> resumeFunction) { - return delegate.onErrorResumeNext(resumeFunction); - } - - @Override - public Observable onErrorResumeNext(Object resumeFunction) { - return delegate.onErrorResumeNext(resumeFunction); - } - - public Observable onErrorResumeNext(Observable resumeSequence) { - return delegate.onErrorResumeNext(resumeSequence); - } - - public Observable onErrorReturn(Func1 resumeFunction) { - return delegate.onErrorReturn(resumeFunction); - } - - @Override - public Observable onErrorReturn(Object resumeFunction) { - return delegate.onErrorReturn(resumeFunction); - } - - public Observable reduce(Func2 accumulator) { - return delegate.reduce(accumulator); - } - - @Override - public Observable reduce(Object accumulator) { - return delegate.reduce(accumulator); - } - - public Observable reduce(T initialValue, Func2 accumulator) { - return delegate.reduce(initialValue, accumulator); - } - - public Observable reduce(T initialValue, Object accumulator) { - return delegate.reduce(initialValue, accumulator); - } - - public Observable scan(Func2 accumulator) { - return delegate.scan(accumulator); - } - - @Override - public Observable scan(Object accumulator) { - return delegate.scan(accumulator); - } - - public Observable scan(T initialValue, Func2 accumulator) { - return delegate.scan(initialValue, accumulator); - } - - public Observable scan(T initialValue, Object accumulator) { - return delegate.scan(initialValue, accumulator); - } - - @Override - public Observable skip(int num) { - return delegate.skip(num); - } - - @Override - public Observable take(int num) { - return delegate.take(num); - } - - public Observable takeWhile(Func1 predicate) { - return delegate.takeWhile(predicate); - } - - @Override - public Observable takeWhile(Object predicate) { - return delegate.takeWhile(predicate); - } - - public Observable takeWhileWithIndex(Func2 predicate) { - return delegate.takeWhileWithIndex(predicate); - } - - @Override - public Observable takeWhileWithIndex(Object predicate) { - return delegate.takeWhileWithIndex(predicate); - } - - @Override - public Observable takeLast(int count) { - return delegate.takeLast(count); - } - - @Override - public Observable> toList() { - return delegate.toList(); - } - - @Override - public Observable> toSortedList() { - return delegate.toSortedList(); - } - - public Observable> toSortedList(Func2 sortFunction) { - return delegate.toSortedList(sortFunction); - } - - @Override - public Observable> toSortedList(Object sortFunction) { - return delegate.toSortedList(sortFunction); - } - - @Override - public Iterable toIterable() { - return delegate.toIterable(); - } - - @Override - public Iterable next() { - return delegate.next(); - } } diff --git a/rxjava-core/src/main/java/rx/Observable.java b/rxjava-core/src/main/java/rx/Observable.java index 35475273a2..321d67281b 100644 --- a/rxjava-core/src/main/java/rx/Observable.java +++ b/rxjava-core/src/main/java/rx/Observable.java @@ -65,7 +65,7 @@ protected Observable() { this(null, false); } - private Observable(Func1, Subscription> onSubscribe, boolean isTrusted) { + /*package*/ Observable(Func1, Subscription> onSubscribe, boolean isTrusted) { this.onSubscribe = onSubscribe; this.isTrusted = isTrusted; } diff --git a/rxjava-core/src/main/java/rx/operators/OperatorGroupBy.java b/rxjava-core/src/main/java/rx/operators/OperatorGroupBy.java index 9b6f92a3cf..46b278ae5e 100644 --- a/rxjava-core/src/main/java/rx/operators/OperatorGroupBy.java +++ b/rxjava-core/src/main/java/rx/operators/OperatorGroupBy.java @@ -98,9 +98,15 @@ public R call(KeyValue pair) { return pair.value; } }); - return new GroupedObservable(key, observable); - } + return new GroupedObservable(key, new Func1, Subscription>() { + + @Override + public Subscription call(Observer observer) { + return observable.subscribe(observer); + } + }); + } private static class KeyValue { private final K key;