diff --git a/rxjava-core/src/main/java/rx/Observable.java b/rxjava-core/src/main/java/rx/Observable.java index 6bcd3e9f77..3dcff7fb69 100644 --- a/rxjava-core/src/main/java/rx/Observable.java +++ b/rxjava-core/src/main/java/rx/Observable.java @@ -3520,11 +3520,18 @@ public Observable onErrorReturn(Func1 resumeFunction) * Observable, whose result will be used in the next accumulator call * @return an Observable that emits a single item that is the result of accumulating the * output from the source Observable + * @throws IllegalArgumentException + * if Observable sequence is empty. * @see MSDN: Observable.Aggregate * @see Wikipedia: Fold (higher-order function) */ public Observable reduce(Func2 accumulator) { - return create(OperationScan.scan(this, accumulator)).takeLast(1); + /* + * Discussion and confirmation of implementation at https://github.com/Netflix/RxJava/issues/423#issuecomment-27642532 + * + * It should use last() not takeLast(1) since it needs to emit an error if the sequence is empty. + */ + return create(OperationScan.scan(this, accumulator)).last(); } /** diff --git a/rxjava-core/src/test/java/rx/ObservableTests.java b/rxjava-core/src/test/java/rx/ObservableTests.java index 496f520a11..d2ff7d5d88 100644 --- a/rxjava-core/src/test/java/rx/ObservableTests.java +++ b/rxjava-core/src/test/java/rx/ObservableTests.java @@ -36,6 +36,7 @@ import rx.observables.ConnectableObservable; import rx.subscriptions.BooleanSubscription; import rx.subscriptions.Subscriptions; +import rx.util.functions.Action0; import rx.util.functions.Action1; import rx.util.functions.Func1; import rx.util.functions.Func2; @@ -210,6 +211,31 @@ public Integer call(Integer t1, Integer t2) { verify(w).onNext(10); } + + /** + * A reduce should fail with an IllegalArgumentException if done on an empty Observable. + */ + @Test(expected = IllegalArgumentException.class) + public void testReduceWithEmptyObservable() { + Observable observable = Observable.range(1, 0); + observable.reduce(new Func2() { + + @Override + public Integer call(Integer t1, Integer t2) { + return t1 + t2; + } + + }).toBlockingObservable().forEach(new Action1() { + + @Override + public void call(Integer t1) { + // do nothing ... we expect an exception instead + } + }); + + fail("Expected an exception to be thrown"); + } + @Test public void testReduceWithInitialValue() { Observable observable = Observable.from(1, 2, 3, 4);