Skip to content

Commit

Permalink
Scan/Reduce with Seed Factory
Browse files Browse the repository at this point in the history
Adds overload with seed factory as per ReactiveX#1831
  • Loading branch information
benjchristensen committed Nov 8, 2014
1 parent faa270c commit 78539f0
Show file tree
Hide file tree
Showing 3 changed files with 114 additions and 4 deletions.
67 changes: 67 additions & 0 deletions src/main/java/rx/Observable.java
Original file line number Diff line number Diff line change
Expand Up @@ -5429,6 +5429,41 @@ public final Observable<T> reduce(Func2<T, T, T> accumulator) {
public final <R> Observable<R> reduce(R initialValue, Func2<R, ? super T, R> accumulator) {
return scan(initialValue, accumulator).takeLast(1);
}

/**
* Returns an Observable that applies a function of your choosing to the first item emitted by a source
* Observable and a specified seed value, then feeds the result of that function along with the second item
* emitted by an Observable into the same function, and so on until all items have been emitted by the
* source Observable, emitting the final result from the final call to your function as its sole item.
* <p>
* <img width="640" height="325" src="https://raw.github.com/wiki/ReactiveX/RxJava/images/rx-operators/reduceSeed.png" alt="">
* <p>
* This technique, which is called "reduce" here, is sometimec called "aggregate," "fold," "accumulate,"
* "compress," or "inject" in other programming contexts. Groovy, for instance, has an {@code inject} method
* that does a similar operation on lists.
* <dl>
* <dt><b>Backpressure Support:</b></dt>
* <dd>This operator does not support backpressure because by intent it will receive all values and reduce
* them to a single {@code onNext}.</dd>
* <dt><b>Scheduler:</b></dt>
* <dd>{@code reduce} does not operate by default on a particular {@link Scheduler}.</dd>
* </dl>
*
* @param initialValueFactory
* factory to produce the initial (seed) accumulator item each time the Observable is subscribed to
* @param accumulator
* an accumulator function to be invoked on each item emitted by the source Observable, the
* result of which will be used in the next accumulator call
* @return an Observable that emits a single item that is the result of accumulating the output from the
* items emitted by the source Observable
* @see <a href="https://github.com/ReactiveX/RxJava/wiki/Mathematical-and-Aggregate-Operators#reduce">RxJava wiki: reduce</a>
* @see <a href="http://msdn.microsoft.com/en-us/library/hh229154.aspx">MSDN: Observable.Aggregate</a>
* @see <a href="http://en.wikipedia.org/wiki/Fold_(higher-order_function)">Wikipedia: Fold (higher-order function)</a>
*/
public final <R> Observable<R> reduce(Func0<R> initialValueFactory, Func2<R, ? super T, R> accumulator) {
return scan(initialValueFactory, accumulator).takeLast(1);
}


/**
* Returns an Observable that repeats the sequence of items emitted by the source Observable indefinitely.
Expand Down Expand Up @@ -6491,6 +6526,38 @@ public final Observable<T> scan(Func2<T, T, T> accumulator) {
public final <R> Observable<R> scan(R initialValue, Func2<R, ? super T, R> accumulator) {
return lift(new OperatorScan<R, T>(initialValue, accumulator));
}

/**
* Returns an Observable that applies a function of your choosing to the first item emitted by a source
* Observable and a seed value, then feeds the result of that function along with the second item emitted by
* the source Observable into the same function, and so on until all items have been emitted by the source
* Observable, emitting the result of each of these iterations.
* <p>
* <img width="640" height="320" src="https://raw.github.com/wiki/ReactiveX/RxJava/images/rx-operators/scanSeed.png" alt="">
* <p>
* This sort of function is sometimes called an accumulator.
* <p>
* Note that the Observable that results from this method will emit {@code initialValue} as its first
* emitted item.
* <dl>
* <dt><b>Scheduler:</b></dt>
* <dd>{@code scan} does not operate by default on a particular {@link Scheduler}.</dd>
* </dl>
*
* @param initialValueFactory
* factory to produce the initial (seed) accumulator item each time the Observable is subscribed to
* @param accumulator
* an accumulator function to be invoked on each item emitted by the source Observable, whose
* result will be emitted to {@link Observer}s via {@link Observer#onNext onNext} and used in the
* next accumulator call
* @return an Observable that emits {@code initialValue} followed by the results of each call to the
* accumulator function
* @see <a href="https://github.com/ReactiveX/RxJava/wiki/Transforming-Observables#scan">RxJava wiki: scan</a>
* @see <a href="http://msdn.microsoft.com/en-us/library/hh211665.aspx">MSDN: Observable.Scan</a>
*/
public final <R> Observable<R> scan(Func0<R> initialValueFactory, Func2<R, ? super T, R> accumulator) {
return lift(new OperatorScan<R, T>(initialValueFactory, accumulator));
}

/**
* Forces an Observable's emissions and notifications to be serialized and for it to obey the Rx contract
Expand Down
20 changes: 17 additions & 3 deletions src/main/java/rx/internal/operators/OperatorScan.java
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,9 @@
import rx.Producer;
import rx.Subscriber;
import rx.exceptions.OnErrorThrowable;
import rx.functions.Func0;
import rx.functions.Func2;
import rx.internal.util.UtilityFunctions;

/**
* Returns an Observable that applies a function to the first item emitted by a source Observable, then feeds
Expand All @@ -38,7 +40,7 @@
*/
public final class OperatorScan<R, T> implements Operator<R, T> {

private final R initialValue;
private final Func0<R> initialValueFactory;
private final Func2<R, ? super T, R> accumulator;
// sentinel if we don't receive an initial value
private static final Object NO_INITIAL_VALUE = new Object();
Expand All @@ -54,8 +56,19 @@ public final class OperatorScan<R, T> implements Operator<R, T> {
* @see <a href="http://msdn.microsoft.com/en-us/library/hh212007.aspx">Observable.Scan(TSource, TAccumulate) Method (IObservable(TSource), TAccumulate, Func(TAccumulate, TSource,
* TAccumulate))</a>
*/
public OperatorScan(R initialValue, Func2<R, ? super T, R> accumulator) {
this.initialValue = initialValue;
public OperatorScan(final R initialValue, Func2<R, ? super T, R> accumulator) {
this(new Func0<R>() {

@Override
public R call() {
return initialValue;
}

}, accumulator);
}

public OperatorScan(Func0<R> initialValueFactory, Func2<R, ? super T, R> accumulator) {
this.initialValueFactory = initialValueFactory;
this.accumulator = accumulator;
}

Expand All @@ -75,6 +88,7 @@ public OperatorScan(final Func2<R, ? super T, R> accumulator) {
@Override
public Subscriber<? super T> call(final Subscriber<? super R> child) {
return new Subscriber<T>(child) {
private final R initialValue = initialValueFactory.call();
private R value = initialValue;
boolean initialized = false;

Expand Down
31 changes: 30 additions & 1 deletion src/test/java/rx/internal/operators/OperatorScanTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,8 @@
*/
package rx.internal.operators;

import static org.junit.Assert.*;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.fail;
import static org.mockito.Matchers.any;
import static org.mockito.Matchers.anyInt;
import static org.mockito.Matchers.anyString;
Expand All @@ -24,6 +25,9 @@
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;

import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.concurrent.atomic.AtomicInteger;

import org.junit.Before;
Expand All @@ -33,6 +37,7 @@
import rx.Observable;
import rx.Observer;
import rx.Subscriber;
import rx.functions.Func0;
import rx.functions.Func1;
import rx.functions.Func2;
import rx.observers.TestSubscriber;
Expand Down Expand Up @@ -263,4 +268,28 @@ public void onNext(Integer t) {
// we only expect to receive 101 as we'll receive all 100 + the initial value
assertEquals(101, count.get());
}

@Test
public void testSeedFactory() {
Observable<List<Integer>> o = Observable.range(1, 10)
.scan(new Func0<List<Integer>>() {

@Override
public List<Integer> call() {
return new ArrayList<Integer>();
}

}, new Func2<List<Integer>, Integer, List<Integer>>() {

@Override
public List<Integer> call(List<Integer> list, Integer t2) {
list.add(t2);
return list;
}

}).takeLast(1);

assertEquals(Arrays.asList(1, 2, 3, 4, 5, 6, 7, 8, 9, 10), o.toBlocking().single());
assertEquals(Arrays.asList(1, 2, 3, 4, 5, 6, 7, 8, 9, 10), o.toBlocking().single());
}
}

0 comments on commit 78539f0

Please sign in to comment.