Skip to content

Commit

Permalink
Operations Aggregate, Average and Sum with selector
Browse files Browse the repository at this point in the history
  • Loading branch information
akarnokd committed Dec 23, 2013
1 parent 1586113 commit 14b701d
Show file tree
Hide file tree
Showing 7 changed files with 1,304 additions and 1 deletion.
140 changes: 140 additions & 0 deletions rxjava-core/src/main/java/rx/Observable.java
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@
import rx.observables.BlockingObservable;
import rx.observables.ConnectableObservable;
import rx.observables.GroupedObservable;
import rx.operators.OperationAggregate;
import rx.operators.OperationAll;
import rx.operators.OperationAmb;
import rx.operators.OperationAny;
Expand Down Expand Up @@ -4118,6 +4119,54 @@ public static Observable<Double> sumDoubles(Observable<Double> source) {
return OperationSum.sumDoubles(source);
}

/**
* Create an Observable that extracts integer values from this Observable via
* the provided function and computes the integer sum of the value sequence.
*
* @param valueExtractor the function to extract an integer from this Observable
* @return an Observable that extracts integer values from this Observable via
* the provided function and computes the integer sum of the value sequence.
*/
public Observable<Integer> sumInteger(Func1<? super T, Integer> valueExtractor) {
return create(new OperationSum.SumIntegerExtractor<T>(this, valueExtractor));
}

/**
* Create an Observable that extracts long values from this Observable via
* the provided function and computes the long sum of the value sequence.
*
* @param valueExtractor the function to extract an long from this Observable
* @return an Observable that extracts long values from this Observable via
* the provided function and computes the long sum of the value sequence.
*/
public Observable<Long> sumLong(Func1<? super T, Long> valueExtractor) {
return create(new OperationSum.SumLongExtractor<T>(this, valueExtractor));
}

/**
* Create an Observable that extracts float values from this Observable via
* the provided function and computes the float sum of the value sequence.
*
* @param valueExtractor the function to extract an float from this Observable
* @return an Observable that extracts float values from this Observable via
* the provided function and computes the float sum of the value sequence.
*/
public Observable<Float> sumFloat(Func1<? super T, Float> valueExtractor) {
return create(new OperationSum.SumFloatExtractor<T>(this, valueExtractor));
}

/**
* Create an Observable that extracts double values from this Observable via
* the provided function and computes the double sum of the value sequence.
*
* @param valueExtractor the function to extract an double from this Observable
* @return an Observable that extracts double values from this Observable via
* the provided function and computes the double sum of the value sequence.
*/
public Observable<Double> sumDouble(Func1<? super T, Double> valueExtractor) {
return create(new OperationSum.SumDoubleExtractor<T>(this, valueExtractor));
}

/**
* Returns an Observable that computes the average of the Integers emitted
* by the source Observable.
Expand Down Expand Up @@ -4183,6 +4232,54 @@ public static Observable<Double> averageDoubles(Observable<Double> source) {
return OperationAverage.averageDoubles(source);
}

/**
* Create an Observable that extracts integer values from this Observable via
* the provided function and computes the integer average of the value sequence.
*
* @param valueExtractor the function to extract an integer from this Observable
* @return an Observable that extracts integer values from this Observable via
* the provided function and computes the integer average of the value sequence.
*/
public Observable<Integer> averageInteger(Func1<? super T, Integer> valueExtractor) {
return create(new OperationAverage.AverageIntegerExtractor<T>(this, valueExtractor));
}

/**
* Create an Observable that extracts long values from this Observable via
* the provided function and computes the long average of the value sequence.
*
* @param valueExtractor the function to extract an long from this Observable
* @return an Observable that extracts long values from this Observable via
* the provided function and computes the long average of the value sequence.
*/
public Observable<Long> averageLong(Func1<? super T, Long> valueExtractor) {
return create(new OperationAverage.AverageLongExtractor<T>(this, valueExtractor));
}

/**
* Create an Observable that extracts float values from this Observable via
* the provided function and computes the float average of the value sequence.
*
* @param valueExtractor the function to extract an float from this Observable
* @return an Observable that extracts float values from this Observable via
* the provided function and computes the float average of the value sequence.
*/
public Observable<Float> averageFloat(Func1<? super T, Float> valueExtractor) {
return create(new OperationAverage.AverageFloatExtractor<T>(this, valueExtractor));
}

/**
* Create an Observable that extracts double values from this Observable via
* the provided function and computes the double average of the value sequence.
*
* @param valueExtractor the function to extract an double from this Observable
* @return an Observable that extracts double values from this Observable via
* the provided function and computes the double average of the value sequence.
*/
public Observable<Double> averageDouble(Func1<? super T, Double> valueExtractor) {
return create(new OperationAverage.AverageDoubleExtractor<T>(this, valueExtractor));
}

/**
* Returns an Observable that emits the minimum item emitted by the source
* Observable. If there is more than one such item, it returns the
Expand Down Expand Up @@ -4954,6 +5051,49 @@ public <R> Observable<R> reduce(R initialValue, Func2<R, ? super T, R> accumulat
public <R> Observable<R> aggregate(R initialValue, Func2<R, ? super T, R> accumulator) {
return reduce(initialValue, accumulator);
}

/**
* Create an Observable that aggregates the source values with the given accumulator
* function and projects the final result via the resultselector.
* <p>
* Works like the {@link #aggregate(java.lang.Object, rx.util.functions.Func2)} projected
* with {@link #map(rx.util.functions.Func1)} without the overhead of some helper
* operators.
* @param <U> the intermediate (accumulator) type
* @param <V> the result type
* @param seed the initial value of the accumulator
* @param accumulator the function that takes the current accumulator value,
* the current emitted value and returns a (new) accumulated value.
* @param resultSelector the selector to project the final value of the accumulator
* @return an Observable that aggregates the source values with the given accumulator
* function and projects the final result via the resultselector
*/
public <U, V> Observable<V> aggregate(
U seed, Func2<U, ? super T, U> accumulator,
Func1<? super U, ? extends V> resultSelector) {
return create(new OperationAggregate.AggregateSelector<T, U, V>(this, seed, accumulator, resultSelector));
}

/**
* Create an Observable that aggregates the source values with the given indexed accumulator
* function and projects the final result via the indexed resultselector.
*
* @param <U> the intermediate (accumulator) type
* @param <V> the result type
* @param seed the initial value of the accumulator
* @param accumulator the function that takes the current accumulator value,
* the current emitted value and returns a (new) accumulated value.
* @param resultSelector the selector to project the final value of the accumulator, where
* the second argument is the total number of elements accumulated
* @return an Observable that aggregates the source values with the given indexed accumulator
* function and projects the final result via the indexed resultselector.
*/
public <U, V> Observable<V> aggregateIndexed(
U seed, Func3<U, ? super T, ? super Integer, U> accumulator,
Func2<? super U, ? super Integer, ? extends V> resultSelector
) {
return create(new OperationAggregate.AggregateIndexedSelector<T, U, V>(this, seed, accumulator, resultSelector));
}

/**
* Returns an Observable that applies a function of your choosing to the
Expand Down
158 changes: 158 additions & 0 deletions rxjava-core/src/main/java/rx/operators/OperationAggregate.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,158 @@
/**
* Copyright 2013 Netflix, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package rx.operators;

import rx.Observable;
import rx.Observable.OnSubscribeFunc;
import rx.Observer;
import rx.Subscription;
import rx.util.functions.Func1;
import rx.util.functions.Func2;
import rx.util.functions.Func3;

/**
* Aggregate overloads with index and selector functions.
*/
public final class OperationAggregate {
/** Utility class. */
private OperationAggregate() { throw new IllegalStateException("No instances!"); }

/**
* Aggregate and emit a value after running it through a selector.
* @param <T> the input value type
* @param <U> the intermediate value type
* @param <V> the result value type
*/
public static final class AggregateSelector<T, U, V> implements OnSubscribeFunc<V> {
final Observable<? extends T> source;
final U seed;
final Func2<U, ? super T, U> aggregator;
final Func1<? super U, ? extends V> resultSelector;

public AggregateSelector(
Observable<? extends T> source, U seed,
Func2<U, ? super T, U> aggregator,
Func1<? super U, ? extends V> resultSelector) {
this.source = source;
this.seed = seed;
this.aggregator = aggregator;
this.resultSelector = resultSelector;
}

@Override
public Subscription onSubscribe(Observer<? super V> t1) {
return source.subscribe(new AggregatorObserver(t1, seed));
}
/** The aggregator observer of the source. */
private final class AggregatorObserver implements Observer<T> {
final Observer<? super V> observer;
U accumulator;
public AggregatorObserver(Observer<? super V> observer, U seed) {
this.observer = observer;
this.accumulator = seed;
}

@Override
public void onNext(T args) {
accumulator = aggregator.call(accumulator, args);
}

@Override
public void onError(Throwable e) {
accumulator = null;
observer.onError(e);
}

@Override
public void onCompleted() {
U a = accumulator;
accumulator = null;
try {
observer.onNext(resultSelector.call(a));
} catch (Throwable t) {
observer.onError(t);
return;
}
observer.onCompleted();
}
}
}
/**
* Indexed aggregate and emit a value after running it through an indexed selector.
* @param <T> the input value type
* @param <U> the intermediate value type
* @param <V> the result value type
*/
public static final class AggregateIndexedSelector<T, U, V> implements OnSubscribeFunc<V> {
final Observable<? extends T> source;
final U seed;
final Func3<U, ? super T, ? super Integer, U> aggregator;
final Func2<? super U, ? super Integer, ? extends V> resultSelector;

public AggregateIndexedSelector(
Observable<? extends T> source,
U seed,
Func3<U, ? super T, ? super Integer, U> aggregator,
Func2<? super U, ? super Integer, ? extends V> resultSelector) {
this.source = source;
this.seed = seed;
this.aggregator = aggregator;
this.resultSelector = resultSelector;
}



@Override
public Subscription onSubscribe(Observer<? super V> t1) {
return source.subscribe(new AggregatorObserver(t1, seed));
}
/** The aggregator observer of the source. */
private final class AggregatorObserver implements Observer<T> {
final Observer<? super V> observer;
U accumulator;
int index;
public AggregatorObserver(Observer<? super V> observer, U seed) {
this.observer = observer;
this.accumulator = seed;
}

@Override
public void onNext(T args) {
accumulator = aggregator.call(accumulator, args, index++);
}

@Override
public void onError(Throwable e) {
accumulator = null;
observer.onError(e);
}

@Override
public void onCompleted() {
U a = accumulator;
accumulator = null;
try {
observer.onNext(resultSelector.call(a, index));
} catch (Throwable t) {
observer.onError(t);
return;
}
observer.onCompleted();
}
}
}
}
Loading

0 comments on commit 14b701d

Please sign in to comment.