From a755ef4c2bb7facdfd4cbffb961904b1480dd970 Mon Sep 17 00:00:00 2001 From: Niklas Baudy Date: Sat, 10 Sep 2016 13:10:32 +0200 Subject: [PATCH 1/5] 2.x: Fix Generics T[] in Zip & CombineLatest --- src/main/java/io/reactivex/Flowable.java | 22 +++++++-------- src/main/java/io/reactivex/Maybe.java | 3 +- src/main/java/io/reactivex/Observable.java | 28 +++++++++---------- .../flowable/FlowableCombineLatest.java | 10 +++---- .../operators/maybe/MaybeZipArray.java | 8 +++--- .../operators/maybe/MaybeZipIterable.java | 4 +-- .../observable/ObservableCombineLatest.java | 8 +++--- .../observable/ObservableInternalHelper.java | 6 ++-- .../operators/observable/ObservableZip.java | 8 +++--- .../io/reactivex/flowable/FlowableTests.java | 15 ++++++++++ .../java/io/reactivex/maybe/MaybeTest.java | 14 ++++++++++ .../reactivex/observable/ObservableTest.java | 15 ++++++++++ 12 files changed, 93 insertions(+), 48 deletions(-) diff --git a/src/main/java/io/reactivex/Flowable.java b/src/main/java/io/reactivex/Flowable.java index 27ba8c954d..cff34c0640 100644 --- a/src/main/java/io/reactivex/Flowable.java +++ b/src/main/java/io/reactivex/Flowable.java @@ -163,7 +163,7 @@ public static int bufferSize() { */ @SchedulerSupport(SchedulerSupport.NONE) @BackpressureSupport(BackpressureKind.FULL) - public static Flowable combineLatest(Publisher[] sources, Function combiner) { + public static Flowable combineLatest(Publisher[] sources, Function combiner) { return combineLatest(sources, combiner, bufferSize()); } @@ -194,7 +194,7 @@ public static Flowable combineLatest(Publisher[] sources, */ @SchedulerSupport(SchedulerSupport.NONE) @BackpressureSupport(BackpressureKind.FULL) - public static Flowable combineLatest(Function combiner, Publisher... sources) { + public static Flowable combineLatest(Function combiner, Publisher... sources) { return combineLatest(sources, combiner, bufferSize()); } @@ -227,7 +227,7 @@ public static Flowable combineLatest(Function Flowable combineLatest(Publisher[] sources, Function combiner, int bufferSize) { + public static Flowable combineLatest(Publisher[] sources, Function combiner, int bufferSize) { ObjectHelper.requireNonNull(sources, "sources is null"); if (sources.length == 0) { return empty(); @@ -265,7 +265,7 @@ public static Flowable combineLatest(Publisher[] sources, @SchedulerSupport(SchedulerSupport.NONE) @BackpressureSupport(BackpressureKind.FULL) public static Flowable combineLatest(Iterable> sources, - Function combiner) { + Function combiner) { return combineLatest(sources, combiner, bufferSize()); } @@ -299,7 +299,7 @@ public static Flowable combineLatest(Iterable Flowable combineLatest(Iterable> sources, - Function combiner, int bufferSize) { + Function combiner, int bufferSize) { ObjectHelper.requireNonNull(sources, "sources is null"); ObjectHelper.requireNonNull(combiner, "combiner is null"); ObjectHelper.verifyPositive(bufferSize, "bufferSize"); @@ -334,7 +334,7 @@ public static Flowable combineLatest(Iterable Flowable combineLatestDelayError(Publisher[] sources, - Function combiner) { + Function combiner) { return combineLatestDelayError(sources, combiner, bufferSize()); } @@ -367,7 +367,7 @@ public static Flowable combineLatestDelayError(Publisher[ */ @SchedulerSupport(SchedulerSupport.NONE) @BackpressureSupport(BackpressureKind.FULL) - public static Flowable combineLatestDelayError(Function combiner, + public static Flowable combineLatestDelayError(Function combiner, Publisher... sources) { return combineLatestDelayError(sources, combiner, bufferSize()); } @@ -398,7 +398,7 @@ public static Flowable combineLatestDelayError(FunctionReactiveX operators documentation: CombineLatest */ @SchedulerSupport(SchedulerSupport.NONE) - public static Flowable combineLatestDelayError(Function combiner, + public static Flowable combineLatestDelayError(Function combiner, int bufferSize, Publisher... sources) { return combineLatestDelayError(sources, combiner, bufferSize); } @@ -435,7 +435,7 @@ public static Flowable combineLatestDelayError(Function Flowable combineLatestDelayError(Publisher[] sources, - Function combiner, int bufferSize) { + Function combiner, int bufferSize) { ObjectHelper.requireNonNull(sources, "sources is null"); ObjectHelper.requireNonNull(combiner, "combiner is null"); ObjectHelper.verifyPositive(bufferSize, "bufferSize"); @@ -475,7 +475,7 @@ public static Flowable combineLatestDelayError(Publisher[ @SchedulerSupport(SchedulerSupport.NONE) @BackpressureSupport(BackpressureKind.FULL) public static Flowable combineLatestDelayError(Iterable> sources, - Function combiner) { + Function combiner) { return combineLatestDelayError(sources, combiner, bufferSize()); } @@ -511,7 +511,7 @@ public static Flowable combineLatestDelayError(Iterable Flowable combineLatestDelayError(Iterable> sources, - Function combiner, int bufferSize) { + Function combiner, int bufferSize) { ObjectHelper.requireNonNull(sources, "sources is null"); ObjectHelper.requireNonNull(combiner, "combiner is null"); ObjectHelper.verifyPositive(bufferSize, "bufferSize"); diff --git a/src/main/java/io/reactivex/Maybe.java b/src/main/java/io/reactivex/Maybe.java index 85a25fc3e0..467b190ab9 100644 --- a/src/main/java/io/reactivex/Maybe.java +++ b/src/main/java/io/reactivex/Maybe.java @@ -1362,7 +1362,7 @@ public static Maybe wrap(MaybeSource source) { * @see ReactiveX operators documentation: Zip */ @SchedulerSupport(SchedulerSupport.NONE) - public static Maybe zip(Iterable> sources, Function zipper) { + public static Maybe zip(Iterable> sources, Function zipper) { ObjectHelper.requireNonNull(zipper, "zipper is null"); ObjectHelper.requireNonNull(sources, "sources is null"); return RxJavaPlugins.onAssembly(new MaybeZipIterable(sources, zipper)); @@ -1796,6 +1796,7 @@ public static Maybe zip( @SchedulerSupport(SchedulerSupport.NONE) public static Maybe zipArray(Function zipper, MaybeSource... sources) { + ObjectHelper.requireNonNull(sources, "sources is null"); if (sources.length == 0) { return empty(); } diff --git a/src/main/java/io/reactivex/Observable.java b/src/main/java/io/reactivex/Observable.java index 5996e7180a..2170c8aef3 100644 --- a/src/main/java/io/reactivex/Observable.java +++ b/src/main/java/io/reactivex/Observable.java @@ -148,7 +148,7 @@ public static int bufferSize() { * @see ReactiveX operators documentation: CombineLatest */ @SchedulerSupport(SchedulerSupport.NONE) - public static Observable combineLatest(Function combiner, int bufferSize, ObservableSource... sources) { + public static Observable combineLatest(Function combiner, int bufferSize, ObservableSource... sources) { return combineLatest(sources, combiner, bufferSize); } @@ -175,7 +175,7 @@ public static Observable combineLatest(Function Observable combineLatest(Iterable> sources, - Function combiner) { + Function combiner) { return combineLatest(sources, combiner, bufferSize()); } @@ -205,7 +205,7 @@ public static Observable combineLatest(Iterable Observable combineLatest(Iterable> sources, - Function combiner, int bufferSize) { + Function combiner, int bufferSize) { ObjectHelper.requireNonNull(sources, "sources is null"); ObjectHelper.requireNonNull(combiner, "combiner is null"); ObjectHelper.verifyPositive(bufferSize, "bufferSize"); @@ -238,7 +238,7 @@ public static Observable combineLatest(Iterable Observable combineLatest(ObservableSource[] sources, - Function combiner) { + Function combiner) { return combineLatest(sources, combiner, bufferSize()); } @@ -267,7 +267,7 @@ public static Observable combineLatest(ObservableSource[] */ @SchedulerSupport(SchedulerSupport.NONE) public static Observable combineLatest(ObservableSource[] sources, - Function combiner, int bufferSize) { + Function combiner, int bufferSize) { ObjectHelper.requireNonNull(sources, "sources is null"); if (sources.length == 0) { return empty(); @@ -659,7 +659,7 @@ public static Observable combineLates */ @SchedulerSupport(SchedulerSupport.NONE) public static Observable combineLatestDelayError(ObservableSource[] sources, - Function combiner) { + Function combiner) { return combineLatestDelayError(sources, combiner, bufferSize()); } @@ -689,7 +689,7 @@ public static Observable combineLatestDelayError(ObservableSourceReactiveX operators documentation: CombineLatest */ @SchedulerSupport(SchedulerSupport.NONE) - public static Observable combineLatestDelayError(Function combiner, + public static Observable combineLatestDelayError(Function combiner, int bufferSize, ObservableSource... sources) { return combineLatestDelayError(sources, combiner, bufferSize); } @@ -721,7 +721,7 @@ public static Observable combineLatestDelayError(Function Observable combineLatestDelayError(ObservableSource[] sources, - Function combiner, int bufferSize) { + Function combiner, int bufferSize) { ObjectHelper.verifyPositive(bufferSize, "bufferSize"); ObjectHelper.requireNonNull(combiner, "combiner is null"); if (sources.length == 0) { @@ -757,7 +757,7 @@ public static Observable combineLatestDelayError(ObservableSource Observable combineLatestDelayError(Iterable> sources, - Function combiner) { + Function combiner) { return combineLatestDelayError(sources, combiner, bufferSize()); } @@ -788,7 +788,7 @@ public static Observable combineLatestDelayError(Iterable Observable combineLatestDelayError(Iterable> sources, - Function combiner, int bufferSize) { + Function combiner, int bufferSize) { ObjectHelper.requireNonNull(sources, "sources is null"); ObjectHelper.requireNonNull(combiner, "combiner is null"); ObjectHelper.verifyPositive(bufferSize, "bufferSize"); @@ -3375,7 +3375,7 @@ public static Observable wrap(ObservableSource source) { * @see ReactiveX operators documentation: Zip */ @SchedulerSupport(SchedulerSupport.NONE) - public static Observable zip(Iterable> sources, Function zipper) { + public static Observable zip(Iterable> sources, Function zipper) { ObjectHelper.requireNonNull(zipper, "zipper is null"); ObjectHelper.requireNonNull(sources, "sources is null"); return RxJavaPlugins.onAssembly(new ObservableZip(null, sources, zipper, bufferSize(), false)); @@ -3423,7 +3423,7 @@ public static Observable zip(Iterable Observable zip(ObservableSource> sources, final Function zipper) { + public static Observable zip(ObservableSource> sources, final Function zipper) { ObjectHelper.requireNonNull(zipper, "zipper is null"); ObjectHelper.requireNonNull(sources, "sources is null"); return RxJavaPlugins.onAssembly(new ObservableToList(sources, 16) @@ -4095,7 +4095,7 @@ public static Observable zip( * @see ReactiveX operators documentation: Zip */ @SchedulerSupport(SchedulerSupport.NONE) - public static Observable zipArray(Function zipper, + public static Observable zipArray(Function zipper, boolean delayError, int bufferSize, ObservableSource... sources) { if (sources.length == 0) { return empty(); @@ -4152,7 +4152,7 @@ public static Observable zipArray(Function z */ @SchedulerSupport(SchedulerSupport.NONE) public static Observable zipIterable(Iterable> sources, - Function zipper, boolean delayError, + Function zipper, boolean delayError, int bufferSize) { ObjectHelper.requireNonNull(zipper, "zipper is null"); ObjectHelper.requireNonNull(sources, "sources is null"); diff --git a/src/main/java/io/reactivex/internal/operators/flowable/FlowableCombineLatest.java b/src/main/java/io/reactivex/internal/operators/flowable/FlowableCombineLatest.java index a6f21df0cf..fa490a8da1 100644 --- a/src/main/java/io/reactivex/internal/operators/flowable/FlowableCombineLatest.java +++ b/src/main/java/io/reactivex/internal/operators/flowable/FlowableCombineLatest.java @@ -40,14 +40,14 @@ public final class FlowableCombineLatest final Iterable> iterable; - final Function combiner; + final Function combiner; final int bufferSize; final boolean delayErrors; public FlowableCombineLatest(Publisher[] array, - Function combiner, + Function combiner, int bufferSize, boolean delayErrors) { if (bufferSize <= 0) { throw new IllegalArgumentException("BUFFER_SIZE > 0 required but it was " + bufferSize); @@ -61,7 +61,7 @@ public FlowableCombineLatest(Publisher[] array, } public FlowableCombineLatest(Iterable> iterable, - Function combiner, + Function combiner, int bufferSize, boolean delayErrors) { if (bufferSize <= 0) { throw new IllegalArgumentException("BUFFER_SIZE > 0 required but it was " + bufferSize); @@ -173,7 +173,7 @@ static final class CombineLatestCoordinator final Subscriber actual; - final Function combiner; + final Function combiner; final CombineLatestInnerSubscriber[] subscribers; @@ -198,7 +198,7 @@ static final class CombineLatestCoordinator final AtomicReference error; public CombineLatestCoordinator(Subscriber actual, - Function combiner, int n, + Function combiner, int n, int bufferSize, boolean delayErrors) { this.actual = actual; this.combiner = combiner; diff --git a/src/main/java/io/reactivex/internal/operators/maybe/MaybeZipArray.java b/src/main/java/io/reactivex/internal/operators/maybe/MaybeZipArray.java index aece8cc510..5f58a7e62c 100644 --- a/src/main/java/io/reactivex/internal/operators/maybe/MaybeZipArray.java +++ b/src/main/java/io/reactivex/internal/operators/maybe/MaybeZipArray.java @@ -27,9 +27,9 @@ public final class MaybeZipArray extends Maybe { final MaybeSource[] sources; - final Function zipper; + final Function zipper; - public MaybeZipArray(MaybeSource[] sources, Function zipper) { + public MaybeZipArray(MaybeSource[] sources, Function zipper) { this.sources = sources; this.zipper = zipper; } @@ -71,14 +71,14 @@ static final class ZipCoordinator extends AtomicInteger implements Disposa final MaybeObserver actual; - final Function zipper; + final Function zipper; final ZipMaybeObserver[] observers; final Object[] values; @SuppressWarnings("unchecked") - public ZipCoordinator(MaybeObserver observer, int n, Function zipper) { + public ZipCoordinator(MaybeObserver observer, int n, Function zipper) { super(n); this.actual = observer; this.zipper = zipper; diff --git a/src/main/java/io/reactivex/internal/operators/maybe/MaybeZipIterable.java b/src/main/java/io/reactivex/internal/operators/maybe/MaybeZipIterable.java index 8f89193c33..4ea94f0731 100644 --- a/src/main/java/io/reactivex/internal/operators/maybe/MaybeZipIterable.java +++ b/src/main/java/io/reactivex/internal/operators/maybe/MaybeZipIterable.java @@ -25,9 +25,9 @@ public final class MaybeZipIterable extends Maybe { final Iterable> sources; - final Function zipper; + final Function zipper; - public MaybeZipIterable(Iterable> sources, Function zipper) { + public MaybeZipIterable(Iterable> sources, Function zipper) { this.sources = sources; this.zipper = zipper; } diff --git a/src/main/java/io/reactivex/internal/operators/observable/ObservableCombineLatest.java b/src/main/java/io/reactivex/internal/operators/observable/ObservableCombineLatest.java index 3797763783..f0ffecfbb7 100644 --- a/src/main/java/io/reactivex/internal/operators/observable/ObservableCombineLatest.java +++ b/src/main/java/io/reactivex/internal/operators/observable/ObservableCombineLatest.java @@ -27,13 +27,13 @@ public final class ObservableCombineLatest extends Observable { final ObservableSource[] sources; final Iterable> sourcesIterable; - final Function combiner; + final Function combiner; final int bufferSize; final boolean delayError; public ObservableCombineLatest(ObservableSource[] sources, Iterable> sourcesIterable, - Function combiner, int bufferSize, + Function combiner, int bufferSize, boolean delayError) { this.sources = sources; this.sourcesIterable = sourcesIterable; @@ -75,7 +75,7 @@ static final class LatestCoordinator extends AtomicInteger implements Disp /** */ private static final long serialVersionUID = 8567835998786448817L; final Observer actual; - final Function combiner; + final Function combiner; final int count; final CombinerSubscriber[] subscribers; final int bufferSize; @@ -94,7 +94,7 @@ static final class LatestCoordinator extends AtomicInteger implements Disp @SuppressWarnings("unchecked") public LatestCoordinator(Observer actual, - Function combiner, + Function combiner, int count, int bufferSize, boolean delayError) { this.actual = actual; this.combiner = combiner; diff --git a/src/main/java/io/reactivex/internal/operators/observable/ObservableInternalHelper.java b/src/main/java/io/reactivex/internal/operators/observable/ObservableInternalHelper.java index 411bd44218..f9aaac221e 100644 --- a/src/main/java/io/reactivex/internal/operators/observable/ObservableInternalHelper.java +++ b/src/main/java/io/reactivex/internal/operators/observable/ObservableInternalHelper.java @@ -296,9 +296,9 @@ public static Function>, ObservableSource static final class ZipIterableFunction implements Function>, ObservableSource> { - private final Function zipper; + private final Function zipper; - ZipIterableFunction(Function zipper) { + ZipIterableFunction(Function zipper) { this.zipper = zipper; } @@ -308,7 +308,7 @@ public ObservableSource apply(List> l } } - public static Function>, ObservableSource> zipIterable(final Function zipper) { + public static Function>, ObservableSource> zipIterable(final Function zipper) { return new ZipIterableFunction(zipper); } diff --git a/src/main/java/io/reactivex/internal/operators/observable/ObservableZip.java b/src/main/java/io/reactivex/internal/operators/observable/ObservableZip.java index 92b54a110d..e110813703 100644 --- a/src/main/java/io/reactivex/internal/operators/observable/ObservableZip.java +++ b/src/main/java/io/reactivex/internal/operators/observable/ObservableZip.java @@ -27,13 +27,13 @@ public final class ObservableZip extends Observable { final ObservableSource[] sources; final Iterable> sourcesIterable; - final Function zipper; + final Function zipper; final int bufferSize; final boolean delayError; public ObservableZip(ObservableSource[] sources, Iterable> sourcesIterable, - Function zipper, + Function zipper, int bufferSize, boolean delayError) { this.sources = sources; @@ -75,7 +75,7 @@ static final class ZipCoordinator extends AtomicInteger implements Disposa /** */ private static final long serialVersionUID = 2983708048395377667L; final Observer actual; - final Function zipper; + final Function zipper; final ZipSubscriber[] subscribers; final T[] row; final boolean delayError; @@ -84,7 +84,7 @@ static final class ZipCoordinator extends AtomicInteger implements Disposa @SuppressWarnings("unchecked") public ZipCoordinator(Observer actual, - Function zipper, + Function zipper, int count, boolean delayError) { this.actual = actual; this.zipper = zipper; diff --git a/src/test/java/io/reactivex/flowable/FlowableTests.java b/src/test/java/io/reactivex/flowable/FlowableTests.java index de01ccac68..6e6326a073 100644 --- a/src/test/java/io/reactivex/flowable/FlowableTests.java +++ b/src/test/java/io/reactivex/flowable/FlowableTests.java @@ -1060,4 +1060,19 @@ public void toObservableRange() { public void toObservableError() { Flowable.error(new TestException()).toObservable().test().assertFailure(TestException.class); } + + @Test + public void zipIterableObject() { + final List> flowables = Arrays.asList(Flowable.just(1, 2, 3), Flowable.just(1, 2, 3)); + Flowable.zip(flowables, new Function() { + @Override + public Object apply(Object[] o) throws Exception { + int sum = 0; + for (Object i : o) { + sum += (Integer) i; + } + return sum; + } + }).test().assertResult(2, 4, 6); + } } diff --git a/src/test/java/io/reactivex/maybe/MaybeTest.java b/src/test/java/io/reactivex/maybe/MaybeTest.java index 56330ba989..b1e1db85d3 100644 --- a/src/test/java/io/reactivex/maybe/MaybeTest.java +++ b/src/test/java/io/reactivex/maybe/MaybeTest.java @@ -2858,4 +2858,18 @@ public void ambWith2SignalsSuccess() { ts.assertResult(2); } + @Test + public void zipIterableObject() { + final List> maybes = Arrays.asList(Maybe.just(1), Maybe.just(4)); + Maybe.zip(maybes, new Function() { + @Override + public Object apply(final Object[] o) throws Exception { + int sum = 0; + for (Object i : o) { + sum += (Integer) i; + } + return sum; + } + }).test().assertResult(5); + } } diff --git a/src/test/java/io/reactivex/observable/ObservableTest.java b/src/test/java/io/reactivex/observable/ObservableTest.java index 0df7803154..8e2e1b608e 100644 --- a/src/test/java/io/reactivex/observable/ObservableTest.java +++ b/src/test/java/io/reactivex/observable/ObservableTest.java @@ -1056,4 +1056,19 @@ public void singleDefault() { Observable.empty().toSingle(100).test().assertResult(100); } + + @Test + public void zipIterableObject() { + final List> observables = Arrays.asList(Observable.just(1, 2, 3), Observable.just(1, 2, 3)); + Observable.zip(observables, new Function() { + @Override + public Object apply(Object[] o) throws Exception { + int sum = 0; + for (Object i : o) { + sum += (Integer) i; + } + return sum; + } + }).test().assertResult(2, 4, 6); + } } From 94f45034a2c7162f7c63cec3f6495cbd2184829f Mon Sep 17 00:00:00 2001 From: Niklas Baudy Date: Sat, 10 Sep 2016 13:17:04 +0200 Subject: [PATCH 2/5] Add Single zipIterableObject test --- src/test/java/io/reactivex/single/SingleTest.java | 15 +++++++++++++++ 1 file changed, 15 insertions(+) diff --git a/src/test/java/io/reactivex/single/SingleTest.java b/src/test/java/io/reactivex/single/SingleTest.java index ee8e52a00f..5a6396c298 100644 --- a/src/test/java/io/reactivex/single/SingleTest.java +++ b/src/test/java/io/reactivex/single/SingleTest.java @@ -515,5 +515,20 @@ public void toFlowableIterableRemove() { iterator.next(); iterator.remove(); } + + @Test + public void zipIterableObject() { + final List> singles = Arrays.asList(Single.just(1), Single.just(4)); + Single.zip(singles, new Function() { + @Override + public Object apply(final Object[] o) throws Exception { + int sum = 0; + for (Object i : o) { + sum += (Integer) i; + } + return sum; + } + }).test().assertResult(5); + } } From b8e510801ecab571c946ca89e460dc8f7670554c Mon Sep 17 00:00:00 2001 From: Niklas Baudy Date: Sat, 10 Sep 2016 13:29:40 +0200 Subject: [PATCH 3/5] Add combineLatestObject test to Flowable + Object --- .../java/io/reactivex/flowable/FlowableTests.java | 15 +++++++++++++++ .../io/reactivex/observable/ObservableTest.java | 15 +++++++++++++++ 2 files changed, 30 insertions(+) diff --git a/src/test/java/io/reactivex/flowable/FlowableTests.java b/src/test/java/io/reactivex/flowable/FlowableTests.java index 6e6326a073..fc56cd59de 100644 --- a/src/test/java/io/reactivex/flowable/FlowableTests.java +++ b/src/test/java/io/reactivex/flowable/FlowableTests.java @@ -1075,4 +1075,19 @@ public Object apply(Object[] o) throws Exception { } }).test().assertResult(2, 4, 6); } + + @Test + public void combineLatestObject() { + final List> flowables = Arrays.asList(Flowable.just(1, 2, 3), Flowable.just(1, 2, 3)); + Flowable.combineLatest(flowables, new Function() { + @Override + public Object apply(final Object[] o) throws Exception { + int sum = 1; + for (Object i : o) { + sum *= (Integer) i; + } + return sum; + } + }).test().assertResult(3, 6, 9); + } } diff --git a/src/test/java/io/reactivex/observable/ObservableTest.java b/src/test/java/io/reactivex/observable/ObservableTest.java index 8e2e1b608e..6dcaea8fd2 100644 --- a/src/test/java/io/reactivex/observable/ObservableTest.java +++ b/src/test/java/io/reactivex/observable/ObservableTest.java @@ -1071,4 +1071,19 @@ public Object apply(Object[] o) throws Exception { } }).test().assertResult(2, 4, 6); } + + @Test + public void combineLatestObject() { + final List> observables = Arrays.asList(Observable.just(1, 2, 3), Observable.just(1, 2, 3)); + Observable.combineLatest(observables, new Function() { + @Override + public Object apply(final Object[] o) throws Exception { + int sum = 1; + for (Object i : o) { + sum *= (Integer) i; + } + return sum; + } + }).test().assertResult(3, 6, 9); + } } From 39a9e312bbdf4361b6f9de6434d82e3b47e1b59b Mon Sep 17 00:00:00 2001 From: Niklas Baudy Date: Sat, 10 Sep 2016 13:38:50 +0200 Subject: [PATCH 4/5] Add Javadoc explanation --- src/main/java/io/reactivex/Flowable.java | 52 ++++++++++++++++- src/main/java/io/reactivex/Maybe.java | 10 ++++ src/main/java/io/reactivex/Observable.java | 66 ++++++++++++++++++++++ 3 files changed, 127 insertions(+), 1 deletion(-) diff --git a/src/main/java/io/reactivex/Flowable.java b/src/main/java/io/reactivex/Flowable.java index cff34c0640..0dad7ebcc9 100644 --- a/src/main/java/io/reactivex/Flowable.java +++ b/src/main/java/io/reactivex/Flowable.java @@ -140,6 +140,11 @@ public static int bufferSize() { * Combines a collection of source Publishers by emitting an item that aggregates the latest values of each of * the source Publishers each time an item is received from any of the source Publishers, where this * aggregation is defined by a specified function. + *

+ * Note on method signature: since Java doesn't allow creating a generic array with {@code new T[]}, the + * implementation of this operator has to create an {@code Object[]} instead. Unfortunately, a + * {@code Function} passed to the method would trigger a {@code ClassCastException}. + * *

*
Backpressure:
*
The returned {@code Publisher} honors backpressure from downstream. The source {@code Publisher}s @@ -171,6 +176,11 @@ public static Flowable combineLatest(Publisher[] sources, * Combines a collection of source Publishers by emitting an item that aggregates the latest values of each of * the source Publishers each time an item is received from any of the source Publishers, where this * aggregation is defined by a specified function. + *

+ * Note on method signature: since Java doesn't allow creating a generic array with {@code new T[]}, the + * implementation of this operator has to create an {@code Object[]} instead. Unfortunately, a + * {@code Function} passed to the method would trigger a {@code ClassCastException}. + * *

*
Backpressure:
*
The returned {@code Publisher} honors backpressure from downstream. The source {@code Publisher}s @@ -202,6 +212,11 @@ public static Flowable combineLatest(Function + * Note on method signature: since Java doesn't allow creating a generic array with {@code new T[]}, the + * implementation of this operator has to create an {@code Object[]} instead. Unfortunately, a + * {@code Function} passed to the method would trigger a {@code ClassCastException}. + * *
*
Backpressure:
*
The returned {@code Publisher} honors backpressure from downstream. The source {@code Publisher}s @@ -241,6 +256,11 @@ public static Flowable combineLatest(Publisher[] sources, * Combines a collection of source Publishers by emitting an item that aggregates the latest values of each of * the source Publishers each time an item is received from any of the source Publishers, where this * aggregation is defined by a specified function. + *

+ * Note on method signature: since Java doesn't allow creating a generic array with {@code new T[]}, the + * implementation of this operator has to create an {@code Object[]} instead. Unfortunately, a + * {@code Function} passed to the method would trigger a {@code ClassCastException}. + * *

*
Backpressure:
*
The returned {@code Publisher} honors backpressure from downstream. The source {@code Publisher}s @@ -273,6 +293,11 @@ public static Flowable combineLatest(Iterable + * Note on method signature: since Java doesn't allow creating a generic array with {@code new T[]}, the + * implementation of this operator has to create an {@code Object[]} instead. Unfortunately, a + * {@code Function} passed to the method would trigger a {@code ClassCastException}. + * *
*
Backpressure:
*
The returned {@code Publisher} honors backpressure from downstream. The source {@code Publisher}s @@ -310,6 +335,11 @@ public static Flowable combineLatest(Iterable + * Note on method signature: since Java doesn't allow creating a generic array with {@code new T[]}, the + * implementation of this operator has to create an {@code Object[]} instead. Unfortunately, a + * {@code Function} passed to the method would trigger a {@code ClassCastException}. + * *
*
Backpressure:
*
The returned {@code Publisher} honors backpressure from downstream. The source {@code Publisher}s @@ -343,6 +373,10 @@ public static Flowable combineLatestDelayError(Publisher[ * the source Publishers each time an item is received from any of the source Publishers, where this * aggregation is defined by a specified function and delays any error from the sources until * all source Publishers terminate. + *

+ * Note on method signature: since Java doesn't allow creating a generic array with {@code new T[]}, the + * implementation of this operator has to create an {@code Object[]} instead. Unfortunately, a + * {@code Function} passed to the method would trigger a {@code ClassCastException}. * *

*
Backpressure:
@@ -377,6 +411,10 @@ public static Flowable combineLatestDelayError(Function + * Note on method signature: since Java doesn't allow creating a generic array with {@code new T[]}, the + * implementation of this operator has to create an {@code Object[]} instead. Unfortunately, a + * {@code Function} passed to the method would trigger a {@code ClassCastException}. * *
*
Scheduler:
@@ -408,6 +446,10 @@ public static Flowable combineLatestDelayError(Function + * Note on method signature: since Java doesn't allow creating a generic array with {@code new T[]}, the + * implementation of this operator has to create an {@code Object[]} instead. Unfortunately, a + * {@code Function} passed to the method would trigger a {@code ClassCastException}. * *
*
Backpressure:
@@ -450,6 +492,10 @@ public static Flowable combineLatestDelayError(Publisher[ * the source Publishers each time an item is received from any of the source Publishers, where this * aggregation is defined by a specified function and delays any error from the sources until * all source Publishers terminate. + *

+ * Note on method signature: since Java doesn't allow creating a generic array with {@code new T[]}, the + * implementation of this operator has to create an {@code Object[]} instead. Unfortunately, a + * {@code Function} passed to the method would trigger a {@code ClassCastException}. * *

*
Backpressure:
@@ -484,6 +530,10 @@ public static Flowable combineLatestDelayError(Iterable + * Note on method signature: since Java doesn't allow creating a generic array with {@code new T[]}, the + * implementation of this operator has to create an {@code Object[]} instead. Unfortunately, a + * {@code Function} passed to the method would trigger a {@code ClassCastException}. * *
*
Backpressure:
@@ -14955,4 +15005,4 @@ public final TestSubscriber test(long initialRequest, boolean cancel) { // No return ts; } -} \ No newline at end of file +} diff --git a/src/main/java/io/reactivex/Maybe.java b/src/main/java/io/reactivex/Maybe.java index 467b190ab9..cb0a0f0fc0 100644 --- a/src/main/java/io/reactivex/Maybe.java +++ b/src/main/java/io/reactivex/Maybe.java @@ -1343,6 +1343,11 @@ public static Maybe wrap(MaybeSource source) { * Returns a Maybe that emits the results of a specified combiner function applied to combinations of * items emitted, in sequence, by an Iterable of other MaybeSources. *

+ * Note on method signature: since Java doesn't allow creating a generic array with {@code new T[]}, the + * implementation of this operator has to create an {@code Object[]} instead. Unfortunately, a + * {@code Function} passed to the method would trigger a {@code ClassCastException}. + * + *

* *

This operator terminates eagerly if any of the source MaybeSources signal an onError or onComplete. This * also means it is possible some sources may not get subscribed to at all. @@ -1774,6 +1779,11 @@ public static Maybe zip( * Returns a Maybe that emits the results of a specified combiner function applied to combinations of * items emitted, in sequence, by an array of other MaybeSources. *

+ * Note on method signature: since Java doesn't allow creating a generic array with {@code new T[]}, the + * implementation of this operator has to create an {@code Object[]} instead. Unfortunately, a + * {@code Function} passed to the method would trigger a {@code ClassCastException}. + * + *

* *

This operator terminates eagerly if any of the source MaybeSources signal an onError or onComplete. This * also means it is possible some sources may not get subscribed to at all. diff --git a/src/main/java/io/reactivex/Observable.java b/src/main/java/io/reactivex/Observable.java index 2170c8aef3..74469ee628 100644 --- a/src/main/java/io/reactivex/Observable.java +++ b/src/main/java/io/reactivex/Observable.java @@ -128,6 +128,11 @@ public static int bufferSize() { * Combines a collection of source ObservableSources by emitting an item that aggregates the latest values of each of * the source ObservableSources each time an item is received from any of the source ObservableSources, where this * aggregation is defined by a specified function. + *

+ * Note on method signature: since Java doesn't allow creating a generic array with {@code new T[]}, the + * implementation of this operator has to create an {@code Object[]} instead. Unfortunately, a + * {@code Function} passed to the method would trigger a {@code ClassCastException}. + * *

*
Scheduler:
*
{@code combineLatest} does not operate by default on a particular {@link Scheduler}.
@@ -156,6 +161,11 @@ public static Observable combineLatest(Function + * Note on method signature: since Java doesn't allow creating a generic array with {@code new T[]}, the + * implementation of this operator has to create an {@code Object[]} instead. Unfortunately, a + * {@code Function} passed to the method would trigger a {@code ClassCastException}. + * *
*
Scheduler:
*
{@code combineLatest} does not operate by default on a particular {@link Scheduler}.
@@ -184,6 +194,11 @@ public static Observable combineLatest(Iterable + * Note on method signature: since Java doesn't allow creating a generic array with {@code new T[]}, the + * implementation of this operator has to create an {@code Object[]} instead. Unfortunately, a + * {@code Function} passed to the method would trigger a {@code ClassCastException}. + * *
*
Scheduler:
*
{@code combineLatest} does not operate by default on a particular {@link Scheduler}.
@@ -219,6 +234,11 @@ public static Observable combineLatest(Iterable + * Note on method signature: since Java doesn't allow creating a generic array with {@code new T[]}, the + * implementation of this operator has to create an {@code Object[]} instead. Unfortunately, a + * {@code Function} passed to the method would trigger a {@code ClassCastException}. + * *
*
Scheduler:
*
{@code combineLatest} does not operate by default on a particular {@link Scheduler}.
@@ -246,6 +266,11 @@ public static Observable combineLatest(ObservableSource[] * Combines a collection of source ObservableSources by emitting an item that aggregates the latest values of each of * the source ObservableSources each time an item is received from any of the source ObservableSources, where this * aggregation is defined by a specified function. + *

+ * Note on method signature: since Java doesn't allow creating a generic array with {@code new T[]}, the + * implementation of this operator has to create an {@code Object[]} instead. Unfortunately, a + * {@code Function} passed to the method would trigger a {@code ClassCastException}. + * *

*
Scheduler:
*
{@code combineLatest} does not operate by default on a particular {@link Scheduler}.
@@ -640,6 +665,11 @@ public static Observable combineLates * Combines a collection of source ObservableSources by emitting an item that aggregates the latest values of each of * the source ObservableSources each time an item is received from any of the source ObservableSources, where this * aggregation is defined by a specified function. + *

+ * Note on method signature: since Java doesn't allow creating a generic array with {@code new T[]}, the + * implementation of this operator has to create an {@code Object[]} instead. Unfortunately, a + * {@code Function} passed to the method would trigger a {@code ClassCastException}. + * *

*
Scheduler:
*
{@code combineLatest} does not operate by default on a particular {@link Scheduler}.
@@ -668,6 +698,10 @@ public static Observable combineLatestDelayError(ObservableSource + * Note on method signature: since Java doesn't allow creating a generic array with {@code new T[]}, the + * implementation of this operator has to create an {@code Object[]} instead. Unfortunately, a + * {@code Function} passed to the method would trigger a {@code ClassCastException}. * *
*
Scheduler:
@@ -699,6 +733,10 @@ public static Observable combineLatestDelayError(Function + * Note on method signature: since Java doesn't allow creating a generic array with {@code new T[]}, the + * implementation of this operator has to create an {@code Object[]} instead. Unfortunately, a + * {@code Function} passed to the method would trigger a {@code ClassCastException}. * *
*
Scheduler:
@@ -737,6 +775,10 @@ public static Observable combineLatestDelayError(ObservableSource + * Note on method signature: since Java doesn't allow creating a generic array with {@code new T[]}, the + * implementation of this operator has to create an {@code Object[]} instead. Unfortunately, a + * {@code Function} passed to the method would trigger a {@code ClassCastException}. * *
*
Scheduler:
@@ -766,6 +808,10 @@ public static Observable combineLatestDelayError(Iterable + * Note on method signature: since Java doesn't allow creating a generic array with {@code new T[]}, the + * implementation of this operator has to create an {@code Object[]} instead. Unfortunately, a + * {@code Function} passed to the method would trigger a {@code ClassCastException}. * *
*
Scheduler:
@@ -3358,6 +3404,11 @@ public static Observable wrap(ObservableSource source) { * use {@code doOnUnsubscribed()} as well or use {@code using()} to do cleanup in case of completion * or unsubscription. *

+ * Note on method signature: since Java doesn't allow creating a generic array with {@code new T[]}, the + * implementation of this operator has to create an {@code Object[]} instead. Unfortunately, a + * {@code Function} passed to the method would trigger a {@code ClassCastException}. + * + *

* *

*
Scheduler:
@@ -3405,6 +3456,11 @@ public static Observable zip(Iterable + * Note on method signature: since Java doesn't allow creating a generic array with {@code new T[]}, the + * implementation of this operator has to create an {@code Object[]} instead. Unfortunately, a + * {@code Function} passed to the method would trigger a {@code ClassCastException}. + * + *

* *

*
Scheduler:
@@ -4074,6 +4130,11 @@ public static Observable zip( * use {@code doOnUnsubscribed()} as well or use {@code using()} to do cleanup in case of completion * or unsubscription. *

+ * Note on method signature: since Java doesn't allow creating a generic array with {@code new T[]}, the + * implementation of this operator has to create an {@code Object[]} instead. Unfortunately, a + * {@code Function} passed to the method would trigger a {@code ClassCastException}. + * + *

* *

*
Scheduler:
@@ -4129,6 +4190,11 @@ public static Observable zipArray(Function + * Note on method signature: since Java doesn't allow creating a generic array with {@code new T[]}, the + * implementation of this operator has to create an {@code Object[]} instead. Unfortunately, a + * {@code Function} passed to the method would trigger a {@code ClassCastException}. + * + *

* *

*
Scheduler:
From 4a972f2ad4ec11397d6eee147b44bb3e6ab050bb Mon Sep 17 00:00:00 2001 From: Niklas Baudy Date: Sat, 10 Sep 2016 13:42:42 +0200 Subject: [PATCH 5/5] Add it to Single too --- src/main/java/io/reactivex/Single.java | 10 ++++++++++ 1 file changed, 10 insertions(+) diff --git a/src/main/java/io/reactivex/Single.java b/src/main/java/io/reactivex/Single.java index 80ab7d5d77..e22aaabadf 100644 --- a/src/main/java/io/reactivex/Single.java +++ b/src/main/java/io/reactivex/Single.java @@ -877,6 +877,11 @@ public static Single wrap(SingleSource source) { * value and calls a zipper function with an array of these values to return a result * to be emitted to downstream. *

+ * Note on method signature: since Java doesn't allow creating a generic array with {@code new T[]}, the + * implementation of this operator has to create an {@code Object[]} instead. Unfortunately, a + * {@code Function} passed to the method would trigger a {@code ClassCastException}. + * + *

* *

* If any of the SingleSources signal an error, all other SingleSources get cancelled and the @@ -1295,6 +1300,11 @@ public static Single zip( * value and calls a zipper function with an array of these values to return a result * to be emitted to downstream. *

+ * Note on method signature: since Java doesn't allow creating a generic array with {@code new T[]}, the + * implementation of this operator has to create an {@code Object[]} instead. Unfortunately, a + * {@code Function} passed to the method would trigger a {@code ClassCastException}. + * + *

* *

* If any of the SingleSources signal an error, all other SingleSources get cancelled and the