Skip to content

Commit

Permalink
Merge pull request #698 from benjchristensen/pull-657-merge
Browse files Browse the repository at this point in the history
Merge of Pull 657: Average and Sum
  • Loading branch information
benjchristensen committed Dec 27, 2013
2 parents d39d9cf + c451ce9 commit 92ba6e7
Show file tree
Hide file tree
Showing 6 changed files with 1,078 additions and 3 deletions.
100 changes: 98 additions & 2 deletions rxjava-core/src/main/java/rx/Observable.java
Original file line number Diff line number Diff line change
Expand Up @@ -4405,6 +4405,54 @@ public static Observable<Double> sumDoubles(Observable<Double> source) {
return OperationSum.sumDoubles(source);
}

/**
* Create an Observable that extracts integer values from this Observable via
* the provided function and computes the integer sum of the value sequence.
*
* @param valueExtractor the function to extract an integer from this Observable
* @return an Observable that extracts integer values from this Observable via
* the provided function and computes the integer sum of the value sequence.
*/
public Observable<Integer> sumInteger(Func1<? super T, Integer> valueExtractor) {
return create(new OperationSum.SumIntegerExtractor<T>(this, valueExtractor));
}

/**
* Create an Observable that extracts long values from this Observable via
* the provided function and computes the long sum of the value sequence.
*
* @param valueExtractor the function to extract an long from this Observable
* @return an Observable that extracts long values from this Observable via
* the provided function and computes the long sum of the value sequence.
*/
public Observable<Long> sumLong(Func1<? super T, Long> valueExtractor) {
return create(new OperationSum.SumLongExtractor<T>(this, valueExtractor));
}

/**
* Create an Observable that extracts float values from this Observable via
* the provided function and computes the float sum of the value sequence.
*
* @param valueExtractor the function to extract an float from this Observable
* @return an Observable that extracts float values from this Observable via
* the provided function and computes the float sum of the value sequence.
*/
public Observable<Float> sumFloat(Func1<? super T, Float> valueExtractor) {
return create(new OperationSum.SumFloatExtractor<T>(this, valueExtractor));
}

/**
* Create an Observable that extracts double values from this Observable via
* the provided function and computes the double sum of the value sequence.
*
* @param valueExtractor the function to extract an double from this Observable
* @return an Observable that extracts double values from this Observable via
* the provided function and computes the double sum of the value sequence.
*/
public Observable<Double> sumDouble(Func1<? super T, Double> valueExtractor) {
return create(new OperationSum.SumDoubleExtractor<T>(this, valueExtractor));
}

/**
* Returns an Observable that computes the average of the Integers emitted
* by the source Observable.
Expand Down Expand Up @@ -4470,6 +4518,54 @@ public static Observable<Double> averageDoubles(Observable<Double> source) {
return OperationAverage.averageDoubles(source);
}

/**
* Create an Observable that extracts integer values from this Observable via
* the provided function and computes the integer average of the value sequence.
*
* @param valueExtractor the function to extract an integer from this Observable
* @return an Observable that extracts integer values from this Observable via
* the provided function and computes the integer average of the value sequence.
*/
public Observable<Integer> averageInteger(Func1<? super T, Integer> valueExtractor) {
return create(new OperationAverage.AverageIntegerExtractor<T>(this, valueExtractor));
}

/**
* Create an Observable that extracts long values from this Observable via
* the provided function and computes the long average of the value sequence.
*
* @param valueExtractor the function to extract an long from this Observable
* @return an Observable that extracts long values from this Observable via
* the provided function and computes the long average of the value sequence.
*/
public Observable<Long> averageLong(Func1<? super T, Long> valueExtractor) {
return create(new OperationAverage.AverageLongExtractor<T>(this, valueExtractor));
}

/**
* Create an Observable that extracts float values from this Observable via
* the provided function and computes the float average of the value sequence.
*
* @param valueExtractor the function to extract an float from this Observable
* @return an Observable that extracts float values from this Observable via
* the provided function and computes the float average of the value sequence.
*/
public Observable<Float> averageFloat(Func1<? super T, Float> valueExtractor) {
return create(new OperationAverage.AverageFloatExtractor<T>(this, valueExtractor));
}

/**
* Create an Observable that extracts double values from this Observable via
* the provided function and computes the double average of the value sequence.
*
* @param valueExtractor the function to extract an double from this Observable
* @return an Observable that extracts double values from this Observable via
* the provided function and computes the double average of the value sequence.
*/
public Observable<Double> averageDouble(Func1<? super T, Double> valueExtractor) {
return create(new OperationAverage.AverageDoubleExtractor<T>(this, valueExtractor));
}

/**
* Returns an Observable that emits the minimum item emitted by the source
* Observable. If there is more than one such item, it returns the
Expand Down Expand Up @@ -5230,7 +5326,7 @@ public Observable<T> aggregate(Func2<T, T, T> accumulator) {
public <R> Observable<R> reduce(R initialValue, Func2<R, ? super T, R> accumulator) {
return create(OperationScan.scan(this, initialValue, accumulator)).takeLast(1);
}

/**
* Synonymous with <code>reduce()</code>.
* <p>
Expand All @@ -5243,7 +5339,7 @@ public <R> Observable<R> reduce(R initialValue, Func2<R, ? super T, R> accumulat
public <R> Observable<R> aggregate(R initialValue, Func2<R, ? super T, R> accumulator) {
return reduce(initialValue, accumulator);
}

/**
* Returns an Observable that applies a function of your choosing to the
* first item emitted by a source Observable, then feeds the result of that
Expand Down
227 changes: 227 additions & 0 deletions rxjava-core/src/main/java/rx/operators/OperationAverage.java
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,9 @@
package rx.operators;

import rx.Observable;
import rx.Observable.OnSubscribeFunc;
import rx.Observer;
import rx.Subscription;
import rx.util.functions.Func1;
import rx.util.functions.Func2;

Expand Down Expand Up @@ -102,4 +105,228 @@ public Double call(Tuple2<Double> result) {
}
});
}

/**
* Compute the average by extracting integer values from the source via an
* extractor function.
* @param <T> the source value type
*/
public static final class AverageIntegerExtractor<T> implements OnSubscribeFunc<Integer> {
final Observable<? extends T> source;
final Func1<? super T, Integer> valueExtractor;

public AverageIntegerExtractor(Observable<? extends T> source, Func1<? super T, Integer> valueExtractor) {
this.source = source;
this.valueExtractor = valueExtractor;
}

@Override
public Subscription onSubscribe(Observer<? super Integer> t1) {
return source.subscribe(new AverageObserver(t1));
}
/** Computes the average. */
private final class AverageObserver implements Observer<T> {
final Observer<? super Integer> observer;
int sum;
int count;
public AverageObserver(Observer<? super Integer> observer) {
this.observer = observer;
}

@Override
public void onNext(T args) {
sum += valueExtractor.call(args);
count++;
}

@Override
public void onError(Throwable e) {
observer.onError(e);
}

@Override
public void onCompleted() {
if (count > 0) {
try {
observer.onNext(sum / count);
} catch (Throwable t) {
observer.onError(t);
return;
}
observer.onCompleted();
} else {
observer.onError(new IllegalArgumentException("Sequence contains no elements"));
}
}

}
}

/**
* Compute the average by extracting long values from the source via an
* extractor function.
* @param <T> the source value type
*/
public static final class AverageLongExtractor<T> implements OnSubscribeFunc<Long> {
final Observable<? extends T> source;
final Func1<? super T, Long> valueExtractor;

public AverageLongExtractor(Observable<? extends T> source, Func1<? super T, Long> valueExtractor) {
this.source = source;
this.valueExtractor = valueExtractor;
}

@Override
public Subscription onSubscribe(Observer<? super Long> t1) {
return source.subscribe(new AverageObserver(t1));
}
/** Computes the average. */
private final class AverageObserver implements Observer<T> {
final Observer<? super Long> observer;
long sum;
int count;
public AverageObserver(Observer<? super Long> observer) {
this.observer = observer;
}

@Override
public void onNext(T args) {
sum += valueExtractor.call(args);
count++;
}

@Override
public void onError(Throwable e) {
observer.onError(e);
}

@Override
public void onCompleted() {
if (count > 0) {
try {
observer.onNext(sum / count);
} catch (Throwable t) {
observer.onError(t);
return;
}
observer.onCompleted();
} else {
observer.onError(new IllegalArgumentException("Sequence contains no elements"));
}
}

}
}

/**
* Compute the average by extracting float values from the source via an
* extractor function.
* @param <T> the source value type
*/
public static final class AverageFloatExtractor<T> implements OnSubscribeFunc<Float> {
final Observable<? extends T> source;
final Func1<? super T, Float> valueExtractor;

public AverageFloatExtractor(Observable<? extends T> source, Func1<? super T, Float> valueExtractor) {
this.source = source;
this.valueExtractor = valueExtractor;
}

@Override
public Subscription onSubscribe(Observer<? super Float> t1) {
return source.subscribe(new AverageObserver(t1));
}
/** Computes the average. */
private final class AverageObserver implements Observer<T> {
final Observer<? super Float> observer;
float sum;
int count;
public AverageObserver(Observer<? super Float> observer) {
this.observer = observer;
}

@Override
public void onNext(T args) {
sum += valueExtractor.call(args);
count++;
}

@Override
public void onError(Throwable e) {
observer.onError(e);
}

@Override
public void onCompleted() {
if (count > 0) {
try {
observer.onNext(sum / count);
} catch (Throwable t) {
observer.onError(t);
return;
}
observer.onCompleted();
} else {
observer.onError(new IllegalArgumentException("Sequence contains no elements"));
}
}

}
}

/**
* Compute the average by extracting double values from the source via an
* extractor function.
* @param <T> the source value type
*/
public static final class AverageDoubleExtractor<T> implements OnSubscribeFunc<Double> {
final Observable<? extends T> source;
final Func1<? super T, Double> valueExtractor;

public AverageDoubleExtractor(Observable<? extends T> source, Func1<? super T, Double> valueExtractor) {
this.source = source;
this.valueExtractor = valueExtractor;
}

@Override
public Subscription onSubscribe(Observer<? super Double> t1) {
return source.subscribe(new AverageObserver(t1));
}
/** Computes the average. */
private final class AverageObserver implements Observer<T> {
final Observer<? super Double> observer;
double sum;
int count;
public AverageObserver(Observer<? super Double> observer) {
this.observer = observer;
}

@Override
public void onNext(T args) {
sum += valueExtractor.call(args);
count++;
}

@Override
public void onError(Throwable e) {
observer.onError(e);
}

@Override
public void onCompleted() {
if (count > 0) {
try {
observer.onNext(sum / count);
} catch (Throwable t) {
observer.onError(t);
return;
}
observer.onCompleted();
} else {
observer.onError(new IllegalArgumentException("Sequence contains no elements"));
}
}

}
}
}
Loading

0 comments on commit 92ba6e7

Please sign in to comment.