From 78539f05022cf8572199f91db1488c8e1ec73402 Mon Sep 17 00:00:00 2001 From: Ben Christensen Date: Sat, 8 Nov 2014 08:56:23 -0800 Subject: [PATCH] Scan/Reduce with Seed Factory Adds overload with seed factory as per https://github.com/ReactiveX/RxJava/issues/1831 --- src/main/java/rx/Observable.java | 67 +++++++++++++++++++ .../rx/internal/operators/OperatorScan.java | 20 +++++- .../internal/operators/OperatorScanTest.java | 31 ++++++++- 3 files changed, 114 insertions(+), 4 deletions(-) diff --git a/src/main/java/rx/Observable.java b/src/main/java/rx/Observable.java index c86202b94f..3fd69e9181 100644 --- a/src/main/java/rx/Observable.java +++ b/src/main/java/rx/Observable.java @@ -5429,6 +5429,41 @@ public final Observable reduce(Func2 accumulator) { public final Observable reduce(R initialValue, Func2 accumulator) { return scan(initialValue, accumulator).takeLast(1); } + + /** + * Returns an Observable that applies a function of your choosing to the first item emitted by a source + * Observable and a specified seed value, then feeds the result of that function along with the second item + * emitted by an Observable into the same function, and so on until all items have been emitted by the + * source Observable, emitting the final result from the final call to your function as its sole item. + *

+ * + *

+ * This technique, which is called "reduce" here, is sometimec called "aggregate," "fold," "accumulate," + * "compress," or "inject" in other programming contexts. Groovy, for instance, has an {@code inject} method + * that does a similar operation on lists. + *

+ *
Backpressure Support:
+ *
This operator does not support backpressure because by intent it will receive all values and reduce + * them to a single {@code onNext}.
+ *
Scheduler:
+ *
{@code reduce} does not operate by default on a particular {@link Scheduler}.
+ *
+ * + * @param initialValueFactory + * factory to produce the initial (seed) accumulator item each time the Observable is subscribed to + * @param accumulator + * an accumulator function to be invoked on each item emitted by the source Observable, the + * result of which will be used in the next accumulator call + * @return an Observable that emits a single item that is the result of accumulating the output from the + * items emitted by the source Observable + * @see RxJava wiki: reduce + * @see MSDN: Observable.Aggregate + * @see Wikipedia: Fold (higher-order function) + */ + public final Observable reduce(Func0 initialValueFactory, Func2 accumulator) { + return scan(initialValueFactory, accumulator).takeLast(1); + } + /** * Returns an Observable that repeats the sequence of items emitted by the source Observable indefinitely. @@ -6491,6 +6526,38 @@ public final Observable scan(Func2 accumulator) { public final Observable scan(R initialValue, Func2 accumulator) { return lift(new OperatorScan(initialValue, accumulator)); } + + /** + * Returns an Observable that applies a function of your choosing to the first item emitted by a source + * Observable and a seed value, then feeds the result of that function along with the second item emitted by + * the source Observable into the same function, and so on until all items have been emitted by the source + * Observable, emitting the result of each of these iterations. + *

+ * + *

+ * This sort of function is sometimes called an accumulator. + *

+ * Note that the Observable that results from this method will emit {@code initialValue} as its first + * emitted item. + *

+ *
Scheduler:
+ *
{@code scan} does not operate by default on a particular {@link Scheduler}.
+ *
+ * + * @param initialValueFactory + * factory to produce the initial (seed) accumulator item each time the Observable is subscribed to + * @param accumulator + * an accumulator function to be invoked on each item emitted by the source Observable, whose + * result will be emitted to {@link Observer}s via {@link Observer#onNext onNext} and used in the + * next accumulator call + * @return an Observable that emits {@code initialValue} followed by the results of each call to the + * accumulator function + * @see RxJava wiki: scan + * @see MSDN: Observable.Scan + */ + public final Observable scan(Func0 initialValueFactory, Func2 accumulator) { + return lift(new OperatorScan(initialValueFactory, accumulator)); + } /** * Forces an Observable's emissions and notifications to be serialized and for it to obey the Rx contract diff --git a/src/main/java/rx/internal/operators/OperatorScan.java b/src/main/java/rx/internal/operators/OperatorScan.java index 818e3de8ad..b0685edb3f 100644 --- a/src/main/java/rx/internal/operators/OperatorScan.java +++ b/src/main/java/rx/internal/operators/OperatorScan.java @@ -21,7 +21,9 @@ import rx.Producer; import rx.Subscriber; import rx.exceptions.OnErrorThrowable; +import rx.functions.Func0; import rx.functions.Func2; +import rx.internal.util.UtilityFunctions; /** * Returns an Observable that applies a function to the first item emitted by a source Observable, then feeds @@ -38,7 +40,7 @@ */ public final class OperatorScan implements Operator { - private final R initialValue; + private final Func0 initialValueFactory; private final Func2 accumulator; // sentinel if we don't receive an initial value private static final Object NO_INITIAL_VALUE = new Object(); @@ -54,8 +56,19 @@ public final class OperatorScan implements Operator { * @see Observable.Scan(TSource, TAccumulate) Method (IObservable(TSource), TAccumulate, Func(TAccumulate, TSource, * TAccumulate)) */ - public OperatorScan(R initialValue, Func2 accumulator) { - this.initialValue = initialValue; + public OperatorScan(final R initialValue, Func2 accumulator) { + this(new Func0() { + + @Override + public R call() { + return initialValue; + } + + }, accumulator); + } + + public OperatorScan(Func0 initialValueFactory, Func2 accumulator) { + this.initialValueFactory = initialValueFactory; this.accumulator = accumulator; } @@ -75,6 +88,7 @@ public OperatorScan(final Func2 accumulator) { @Override public Subscriber call(final Subscriber child) { return new Subscriber(child) { + private final R initialValue = initialValueFactory.call(); private R value = initialValue; boolean initialized = false; diff --git a/src/test/java/rx/internal/operators/OperatorScanTest.java b/src/test/java/rx/internal/operators/OperatorScanTest.java index 2cbd6954f0..c382e4da20 100644 --- a/src/test/java/rx/internal/operators/OperatorScanTest.java +++ b/src/test/java/rx/internal/operators/OperatorScanTest.java @@ -15,7 +15,8 @@ */ package rx.internal.operators; -import static org.junit.Assert.*; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.fail; import static org.mockito.Matchers.any; import static org.mockito.Matchers.anyInt; import static org.mockito.Matchers.anyString; @@ -24,6 +25,9 @@ import static org.mockito.Mockito.times; import static org.mockito.Mockito.verify; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.List; import java.util.concurrent.atomic.AtomicInteger; import org.junit.Before; @@ -33,6 +37,7 @@ import rx.Observable; import rx.Observer; import rx.Subscriber; +import rx.functions.Func0; import rx.functions.Func1; import rx.functions.Func2; import rx.observers.TestSubscriber; @@ -263,4 +268,28 @@ public void onNext(Integer t) { // we only expect to receive 101 as we'll receive all 100 + the initial value assertEquals(101, count.get()); } + + @Test + public void testSeedFactory() { + Observable> o = Observable.range(1, 10) + .scan(new Func0>() { + + @Override + public List call() { + return new ArrayList(); + } + + }, new Func2, Integer, List>() { + + @Override + public List call(List list, Integer t2) { + list.add(t2); + return list; + } + + }).takeLast(1); + + assertEquals(Arrays.asList(1, 2, 3, 4, 5, 6, 7, 8, 9, 10), o.toBlocking().single()); + assertEquals(Arrays.asList(1, 2, 3, 4, 5, 6, 7, 8, 9, 10), o.toBlocking().single()); + } }