Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

2.x: Fix Generics T[] in Zip & CombineLatest #4525

Merged
merged 5 commits into from
Sep 10, 2016
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
22 changes: 11 additions & 11 deletions src/main/java/io/reactivex/Flowable.java
Original file line number Diff line number Diff line change
Expand Up @@ -163,7 +163,7 @@ public static int bufferSize() {
*/
@SchedulerSupport(SchedulerSupport.NONE)
@BackpressureSupport(BackpressureKind.FULL)
public static <T, R> Flowable<R> combineLatest(Publisher<? extends T>[] sources, Function<? super T[], ? extends R> combiner) {
public static <T, R> Flowable<R> combineLatest(Publisher<? extends T>[] sources, Function<? super Object[], ? extends R> combiner) {
Copy link
Member

@akarnokd akarnokd Sep 10, 2016

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It might be worth adding a sentence to the javadocs since people tend to ask why this isn't ? super T[]

<p>
Note on method signature: since Java doesn't allow creating a generic array with {@code new T[]}, the 
implementation of this operator has to create an {@code Object[]} instead. Unfortunately, a 
{@code Function<Integer[], R>} passed to the method would trigger a {@code ClassCastException}.

If you feel so, you could add these two sentences after the first sentence of all the affected methods and before the marble diagram (as people forget to scroll down below the diagram).

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Will do

return combineLatest(sources, combiner, bufferSize());
}

Expand Down Expand Up @@ -194,7 +194,7 @@ public static <T, R> Flowable<R> combineLatest(Publisher<? extends T>[] sources,
*/
@SchedulerSupport(SchedulerSupport.NONE)
@BackpressureSupport(BackpressureKind.FULL)
public static <T, R> Flowable<R> combineLatest(Function<? super T[], ? extends R> combiner, Publisher<? extends T>... sources) {
public static <T, R> Flowable<R> combineLatest(Function<? super Object[], ? extends R> combiner, Publisher<? extends T>... sources) {
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this one does not seem necessary to be honest. It just has switched arguments with the one above. Or do I oversee anything?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This and other overloads of operators let's you use varargs to specify any number of sources at the expense that varargs has to be the last argument.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ups right didn't see those little dots there too many overloads ...

return combineLatest(sources, combiner, bufferSize());
}

Expand Down Expand Up @@ -227,7 +227,7 @@ public static <T, R> Flowable<R> combineLatest(Function<? super T[], ? extends R
*/
@SchedulerSupport(SchedulerSupport.NONE)
@BackpressureSupport(BackpressureKind.FULL)
public static <T, R> Flowable<R> combineLatest(Publisher<? extends T>[] sources, Function<? super T[], ? extends R> combiner, int bufferSize) {
public static <T, R> Flowable<R> combineLatest(Publisher<? extends T>[] sources, Function<? super Object[], ? extends R> combiner, int bufferSize) {
ObjectHelper.requireNonNull(sources, "sources is null");
if (sources.length == 0) {
return empty();
Expand Down Expand Up @@ -265,7 +265,7 @@ public static <T, R> Flowable<R> combineLatest(Publisher<? extends T>[] sources,
@SchedulerSupport(SchedulerSupport.NONE)
@BackpressureSupport(BackpressureKind.FULL)
public static <T, R> Flowable<R> combineLatest(Iterable<? extends Publisher<? extends T>> sources,
Function<? super T[], ? extends R> combiner) {
Function<? super Object[], ? extends R> combiner) {
return combineLatest(sources, combiner, bufferSize());
}

Expand Down Expand Up @@ -299,7 +299,7 @@ public static <T, R> Flowable<R> combineLatest(Iterable<? extends Publisher<? ex
@SchedulerSupport(SchedulerSupport.NONE)
@BackpressureSupport(BackpressureKind.FULL)
public static <T, R> Flowable<R> combineLatest(Iterable<? extends Publisher<? extends T>> sources,
Function<? super T[], ? extends R> combiner, int bufferSize) {
Function<? super Object[], ? extends R> combiner, int bufferSize) {
ObjectHelper.requireNonNull(sources, "sources is null");
ObjectHelper.requireNonNull(combiner, "combiner is null");
ObjectHelper.verifyPositive(bufferSize, "bufferSize");
Expand Down Expand Up @@ -334,7 +334,7 @@ public static <T, R> Flowable<R> combineLatest(Iterable<? extends Publisher<? ex
@SchedulerSupport(SchedulerSupport.NONE)
@BackpressureSupport(BackpressureKind.FULL)
public static <T, R> Flowable<R> combineLatestDelayError(Publisher<? extends T>[] sources,
Function<? super T[], ? extends R> combiner) {
Function<? super Object[], ? extends R> combiner) {
return combineLatestDelayError(sources, combiner, bufferSize());
}

Expand Down Expand Up @@ -367,7 +367,7 @@ public static <T, R> Flowable<R> combineLatestDelayError(Publisher<? extends T>[
*/
@SchedulerSupport(SchedulerSupport.NONE)
@BackpressureSupport(BackpressureKind.FULL)
public static <T, R> Flowable<R> combineLatestDelayError(Function<? super T[], ? extends R> combiner,
public static <T, R> Flowable<R> combineLatestDelayError(Function<? super Object[], ? extends R> combiner,
Publisher<? extends T>... sources) {
return combineLatestDelayError(sources, combiner, bufferSize());
}
Expand Down Expand Up @@ -398,7 +398,7 @@ public static <T, R> Flowable<R> combineLatestDelayError(Function<? super T[], ?
* @see <a href="http://reactivex.io/documentation/operators/combinelatest.html">ReactiveX operators documentation: CombineLatest</a>
*/
@SchedulerSupport(SchedulerSupport.NONE)
public static <T, R> Flowable<R> combineLatestDelayError(Function<? super T[], ? extends R> combiner,
public static <T, R> Flowable<R> combineLatestDelayError(Function<? super Object[], ? extends R> combiner,
int bufferSize, Publisher<? extends T>... sources) {
return combineLatestDelayError(sources, combiner, bufferSize);
}
Expand Down Expand Up @@ -435,7 +435,7 @@ public static <T, R> Flowable<R> combineLatestDelayError(Function<? super T[], ?
@SchedulerSupport(SchedulerSupport.NONE)
@BackpressureSupport(BackpressureKind.FULL)
public static <T, R> Flowable<R> combineLatestDelayError(Publisher<? extends T>[] sources,
Function<? super T[], ? extends R> combiner, int bufferSize) {
Function<? super Object[], ? extends R> combiner, int bufferSize) {
ObjectHelper.requireNonNull(sources, "sources is null");
ObjectHelper.requireNonNull(combiner, "combiner is null");
ObjectHelper.verifyPositive(bufferSize, "bufferSize");
Expand Down Expand Up @@ -475,7 +475,7 @@ public static <T, R> Flowable<R> combineLatestDelayError(Publisher<? extends T>[
@SchedulerSupport(SchedulerSupport.NONE)
@BackpressureSupport(BackpressureKind.FULL)
public static <T, R> Flowable<R> combineLatestDelayError(Iterable<? extends Publisher<? extends T>> sources,
Function<? super T[], ? extends R> combiner) {
Function<? super Object[], ? extends R> combiner) {
return combineLatestDelayError(sources, combiner, bufferSize());
}

Expand Down Expand Up @@ -511,7 +511,7 @@ public static <T, R> Flowable<R> combineLatestDelayError(Iterable<? extends Publ
@SchedulerSupport(SchedulerSupport.NONE)
@BackpressureSupport(BackpressureKind.FULL)
public static <T, R> Flowable<R> combineLatestDelayError(Iterable<? extends Publisher<? extends T>> sources,
Function<? super T[], ? extends R> combiner, int bufferSize) {
Function<? super Object[], ? extends R> combiner, int bufferSize) {
ObjectHelper.requireNonNull(sources, "sources is null");
ObjectHelper.requireNonNull(combiner, "combiner is null");
ObjectHelper.verifyPositive(bufferSize, "bufferSize");
Expand Down
3 changes: 2 additions & 1 deletion src/main/java/io/reactivex/Maybe.java
Original file line number Diff line number Diff line change
Expand Up @@ -1362,7 +1362,7 @@ public static <T> Maybe<T> wrap(MaybeSource<T> source) {
* @see <a href="http://reactivex.io/documentation/operators/zip.html">ReactiveX operators documentation: Zip</a>
*/
@SchedulerSupport(SchedulerSupport.NONE)
public static <T, R> Maybe<R> zip(Iterable<? extends MaybeSource<? extends T>> sources, Function<? super T[], ? extends R> zipper) {
public static <T, R> Maybe<R> zip(Iterable<? extends MaybeSource<? extends T>> sources, Function<? super Object[], ? extends R> zipper) {
ObjectHelper.requireNonNull(zipper, "zipper is null");
ObjectHelper.requireNonNull(sources, "sources is null");
return RxJavaPlugins.onAssembly(new MaybeZipIterable<T, R>(sources, zipper));
Expand Down Expand Up @@ -1796,6 +1796,7 @@ public static <T1, T2, T3, T4, T5, T6, T7, T8, T9, R> Maybe<R> zip(
@SchedulerSupport(SchedulerSupport.NONE)
public static <T, R> Maybe<R> zipArray(Function<? super Object[], ? extends R> zipper,
MaybeSource<? extends T>... sources) {
ObjectHelper.requireNonNull(sources, "sources is null");
if (sources.length == 0) {
return empty();
}
Expand Down
28 changes: 14 additions & 14 deletions src/main/java/io/reactivex/Observable.java
Original file line number Diff line number Diff line change
Expand Up @@ -148,7 +148,7 @@ public static int bufferSize() {
* @see <a href="http://reactivex.io/documentation/operators/combinelatest.html">ReactiveX operators documentation: CombineLatest</a>
*/
@SchedulerSupport(SchedulerSupport.NONE)
public static <T, R> Observable<R> combineLatest(Function<? super T[], ? extends R> combiner, int bufferSize, ObservableSource<? extends T>... sources) {
public static <T, R> Observable<R> combineLatest(Function<? super Object[], ? extends R> combiner, int bufferSize, ObservableSource<? extends T>... sources) {
return combineLatest(sources, combiner, bufferSize);
}

Expand All @@ -175,7 +175,7 @@ public static <T, R> Observable<R> combineLatest(Function<? super T[], ? extends
*/
@SchedulerSupport(SchedulerSupport.NONE)
public static <T, R> Observable<R> combineLatest(Iterable<? extends ObservableSource<? extends T>> sources,
Function<? super T[], ? extends R> combiner) {
Function<? super Object[], ? extends R> combiner) {
return combineLatest(sources, combiner, bufferSize());
}

Expand Down Expand Up @@ -205,7 +205,7 @@ public static <T, R> Observable<R> combineLatest(Iterable<? extends ObservableSo
*/
@SchedulerSupport(SchedulerSupport.NONE)
public static <T, R> Observable<R> combineLatest(Iterable<? extends ObservableSource<? extends T>> sources,
Function<? super T[], ? extends R> combiner, int bufferSize) {
Function<? super Object[], ? extends R> combiner, int bufferSize) {
ObjectHelper.requireNonNull(sources, "sources is null");
ObjectHelper.requireNonNull(combiner, "combiner is null");
ObjectHelper.verifyPositive(bufferSize, "bufferSize");
Expand Down Expand Up @@ -238,7 +238,7 @@ public static <T, R> Observable<R> combineLatest(Iterable<? extends ObservableSo
*/
@SchedulerSupport(SchedulerSupport.NONE)
public static <T, R> Observable<R> combineLatest(ObservableSource<? extends T>[] sources,
Function<? super T[], ? extends R> combiner) {
Function<? super Object[], ? extends R> combiner) {
return combineLatest(sources, combiner, bufferSize());
}

Expand Down Expand Up @@ -267,7 +267,7 @@ public static <T, R> Observable<R> combineLatest(ObservableSource<? extends T>[]
*/
@SchedulerSupport(SchedulerSupport.NONE)
public static <T, R> Observable<R> combineLatest(ObservableSource<? extends T>[] sources,
Function<? super T[], ? extends R> combiner, int bufferSize) {
Function<? super Object[], ? extends R> combiner, int bufferSize) {
ObjectHelper.requireNonNull(sources, "sources is null");
if (sources.length == 0) {
return empty();
Expand Down Expand Up @@ -659,7 +659,7 @@ public static <T1, T2, T3, T4, T5, T6, T7, T8, T9, R> Observable<R> combineLates
*/
@SchedulerSupport(SchedulerSupport.NONE)
public static <T, R> Observable<R> combineLatestDelayError(ObservableSource<? extends T>[] sources,
Function<? super T[], ? extends R> combiner) {
Function<? super Object[], ? extends R> combiner) {
return combineLatestDelayError(sources, combiner, bufferSize());
}

Expand Down Expand Up @@ -689,7 +689,7 @@ public static <T, R> Observable<R> combineLatestDelayError(ObservableSource<? ex
* @see <a href="http://reactivex.io/documentation/operators/combinelatest.html">ReactiveX operators documentation: CombineLatest</a>
*/
@SchedulerSupport(SchedulerSupport.NONE)
public static <T, R> Observable<R> combineLatestDelayError(Function<? super T[], ? extends R> combiner,
public static <T, R> Observable<R> combineLatestDelayError(Function<? super Object[], ? extends R> combiner,
int bufferSize, ObservableSource<? extends T>... sources) {
return combineLatestDelayError(sources, combiner, bufferSize);
}
Expand Down Expand Up @@ -721,7 +721,7 @@ public static <T, R> Observable<R> combineLatestDelayError(Function<? super T[],
*/
@SchedulerSupport(SchedulerSupport.NONE)
public static <T, R> Observable<R> combineLatestDelayError(ObservableSource<? extends T>[] sources,
Function<? super T[], ? extends R> combiner, int bufferSize) {
Function<? super Object[], ? extends R> combiner, int bufferSize) {
ObjectHelper.verifyPositive(bufferSize, "bufferSize");
ObjectHelper.requireNonNull(combiner, "combiner is null");
if (sources.length == 0) {
Expand Down Expand Up @@ -757,7 +757,7 @@ public static <T, R> Observable<R> combineLatestDelayError(ObservableSource<? ex
*/
@SchedulerSupport(SchedulerSupport.NONE)
public static <T, R> Observable<R> combineLatestDelayError(Iterable<? extends ObservableSource<? extends T>> sources,
Function<? super T[], ? extends R> combiner) {
Function<? super Object[], ? extends R> combiner) {
return combineLatestDelayError(sources, combiner, bufferSize());
}

Expand Down Expand Up @@ -788,7 +788,7 @@ public static <T, R> Observable<R> combineLatestDelayError(Iterable<? extends Ob
*/
@SchedulerSupport(SchedulerSupport.NONE)
public static <T, R> Observable<R> combineLatestDelayError(Iterable<? extends ObservableSource<? extends T>> sources,
Function<? super T[], ? extends R> combiner, int bufferSize) {
Function<? super Object[], ? extends R> combiner, int bufferSize) {
ObjectHelper.requireNonNull(sources, "sources is null");
ObjectHelper.requireNonNull(combiner, "combiner is null");
ObjectHelper.verifyPositive(bufferSize, "bufferSize");
Expand Down Expand Up @@ -3375,7 +3375,7 @@ public static <T> Observable<T> wrap(ObservableSource<T> source) {
* @see <a href="http://reactivex.io/documentation/operators/zip.html">ReactiveX operators documentation: Zip</a>
*/
@SchedulerSupport(SchedulerSupport.NONE)
public static <T, R> Observable<R> zip(Iterable<? extends ObservableSource<? extends T>> sources, Function<? super T[], ? extends R> zipper) {
public static <T, R> Observable<R> zip(Iterable<? extends ObservableSource<? extends T>> sources, Function<? super Object[], ? extends R> zipper) {
ObjectHelper.requireNonNull(zipper, "zipper is null");
ObjectHelper.requireNonNull(sources, "sources is null");
return RxJavaPlugins.onAssembly(new ObservableZip<T, R>(null, sources, zipper, bufferSize(), false));
Expand Down Expand Up @@ -3423,7 +3423,7 @@ public static <T, R> Observable<R> zip(Iterable<? extends ObservableSource<? ext
*/
@SuppressWarnings({ "rawtypes", "unchecked" })
@SchedulerSupport(SchedulerSupport.NONE)
public static <T, R> Observable<R> zip(ObservableSource<? extends ObservableSource<? extends T>> sources, final Function<? super T[], ? extends R> zipper) {
public static <T, R> Observable<R> zip(ObservableSource<? extends ObservableSource<? extends T>> sources, final Function<? super Object[], ? extends R> zipper) {
ObjectHelper.requireNonNull(zipper, "zipper is null");
ObjectHelper.requireNonNull(sources, "sources is null");
return RxJavaPlugins.onAssembly(new ObservableToList(sources, 16)
Expand Down Expand Up @@ -4095,7 +4095,7 @@ public static <T1, T2, T3, T4, T5, T6, T7, T8, T9, R> Observable<R> zip(
* @see <a href="http://reactivex.io/documentation/operators/zip.html">ReactiveX operators documentation: Zip</a>
*/
@SchedulerSupport(SchedulerSupport.NONE)
public static <T, R> Observable<R> zipArray(Function<? super T[], ? extends R> zipper,
public static <T, R> Observable<R> zipArray(Function<? super Object[], ? extends R> zipper,
boolean delayError, int bufferSize, ObservableSource<? extends T>... sources) {
if (sources.length == 0) {
return empty();
Expand Down Expand Up @@ -4152,7 +4152,7 @@ public static <T, R> Observable<R> zipArray(Function<? super T[], ? extends R> z
*/
@SchedulerSupport(SchedulerSupport.NONE)
public static <T, R> Observable<R> zipIterable(Iterable<? extends ObservableSource<? extends T>> sources,
Function<? super T[], ? extends R> zipper, boolean delayError,
Function<? super Object[], ? extends R> zipper, boolean delayError,
int bufferSize) {
ObjectHelper.requireNonNull(zipper, "zipper is null");
ObjectHelper.requireNonNull(sources, "sources is null");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,14 +40,14 @@ public final class FlowableCombineLatest<T, R>

final Iterable<? extends Publisher<? extends T>> iterable;

final Function<? super T[], ? extends R> combiner;
final Function<? super Object[], ? extends R> combiner;

final int bufferSize;

final boolean delayErrors;

public FlowableCombineLatest(Publisher<? extends T>[] array,
Function<? super T[], ? extends R> combiner,
Function<? super Object[], ? extends R> combiner,
int bufferSize, boolean delayErrors) {
if (bufferSize <= 0) {
throw new IllegalArgumentException("BUFFER_SIZE > 0 required but it was " + bufferSize);
Expand All @@ -61,7 +61,7 @@ public FlowableCombineLatest(Publisher<? extends T>[] array,
}

public FlowableCombineLatest(Iterable<? extends Publisher<? extends T>> iterable,
Function<? super T[], ? extends R> combiner,
Function<? super Object[], ? extends R> combiner,
int bufferSize, boolean delayErrors) {
if (bufferSize <= 0) {
throw new IllegalArgumentException("BUFFER_SIZE > 0 required but it was " + bufferSize);
Expand Down Expand Up @@ -173,7 +173,7 @@ static final class CombineLatestCoordinator<T, R>

final Subscriber<? super R> actual;

final Function<? super T[], ? extends R> combiner;
final Function<? super Object[], ? extends R> combiner;

final CombineLatestInnerSubscriber<T>[] subscribers;

Expand All @@ -198,7 +198,7 @@ static final class CombineLatestCoordinator<T, R>
final AtomicReference<Throwable> error;

public CombineLatestCoordinator(Subscriber<? super R> actual,
Function<? super T[], ? extends R> combiner, int n,
Function<? super Object[], ? extends R> combiner, int n,
int bufferSize, boolean delayErrors) {
this.actual = actual;
this.combiner = combiner;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,9 +27,9 @@ public final class MaybeZipArray<T, R> extends Maybe<R> {

final MaybeSource<? extends T>[] sources;

final Function<? super T[], ? extends R> zipper;
final Function<? super Object[], ? extends R> zipper;

public MaybeZipArray(MaybeSource<? extends T>[] sources, Function<? super T[], ? extends R> zipper) {
public MaybeZipArray(MaybeSource<? extends T>[] sources, Function<? super Object[], ? extends R> zipper) {
this.sources = sources;
this.zipper = zipper;
}
Expand Down Expand Up @@ -71,14 +71,14 @@ static final class ZipCoordinator<T, R> extends AtomicInteger implements Disposa

final MaybeObserver<? super R> actual;

final Function<? super T[], ? extends R> zipper;
final Function<? super Object[], ? extends R> zipper;

final ZipMaybeObserver<T>[] observers;

final Object[] values;

@SuppressWarnings("unchecked")
public ZipCoordinator(MaybeObserver<? super R> observer, int n, Function<? super T[], ? extends R> zipper) {
public ZipCoordinator(MaybeObserver<? super R> observer, int n, Function<? super Object[], ? extends R> zipper) {
super(n);
this.actual = observer;
this.zipper = zipper;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,9 +25,9 @@ public final class MaybeZipIterable<T, R> extends Maybe<R> {

final Iterable<? extends MaybeSource<? extends T>> sources;

final Function<? super T[], ? extends R> zipper;
final Function<? super Object[], ? extends R> zipper;

public MaybeZipIterable(Iterable<? extends MaybeSource<? extends T>> sources, Function<? super T[], ? extends R> zipper) {
public MaybeZipIterable(Iterable<? extends MaybeSource<? extends T>> sources, Function<? super Object[], ? extends R> zipper) {
this.sources = sources;
this.zipper = zipper;
}
Expand Down
Loading