Skip to content

Commit

Permalink
Merge pull request #1 from benjchristensen/super-extends-additions
Browse files Browse the repository at this point in the history
Zip and CombineLatest Operators: Generic Order and More Arities
  • Loading branch information
Joachim Hofer committed Sep 1, 2013
2 parents 6b9867c + ac6a0a1 commit ebaa616
Show file tree
Hide file tree
Showing 4 changed files with 411 additions and 48 deletions.
3 changes: 2 additions & 1 deletion language-adaptors/rxjava-scala/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,8 @@ sourceSets {
}

dependencies {
compile 'org.scala-lang:scala-library:2.10.2'
// pinning to 2.10.1 as having issues with 2.10.2
compile 'org.scala-lang:scala-library:2.10.1'

compile project(':rxjava-core')

Expand Down
257 changes: 225 additions & 32 deletions rxjava-core/src/main/java/rx/Observable.java
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,11 @@
import rx.util.functions.Func2;
import rx.util.functions.Func3;
import rx.util.functions.Func4;
import rx.util.functions.Func5;
import rx.util.functions.Func6;
import rx.util.functions.Func7;
import rx.util.functions.Func8;
import rx.util.functions.Func9;
import rx.util.functions.FuncN;
import rx.util.functions.Function;

Expand Down Expand Up @@ -908,18 +913,17 @@ public static <T> Observable<T> from(Future<T> future, long timeout, TimeUnit un
* The resulting {@code Observable<R>} returned from {@code zip} will invoke {@link Observer#onNext onNext} as many times as the number of {@code onNext} invocations
* of the source Observable that emits the fewest items.
*
* @param w0
* @param o1
* one source Observable
* @param w1
* @param o2
* another source Observable
* @param reduceFunction
* a function that, when applied to a pair of items, each emitted by one of the two
* source Observables, results in an item that will be emitted by the resulting
* Observable
* @param zipFunction
* a function that, when applied to an item emitted by each of the source
* Observables, results in an item that will be emitted by the resulting Observable
* @return an Observable that emits the zipped results
*/
public static <R, T0, T1> Observable<R> zip(Observable<? extends T0> w0, Observable<? extends T1> w1, Func2<? super T0, ? super T1, ? extends R> reduceFunction) {
return create(OperationZip.zip(w0, w1, reduceFunction));
public static <T1, T2, R> Observable<R> zip(Observable<? extends T1> o1, Observable<? extends T2> o2, Func2<? super T1, ? super T2, ? extends R> zipFunction) {
return create(OperationZip.zip(o1, o2, zipFunction));
}

/**
Expand Down Expand Up @@ -981,19 +985,19 @@ public static <T> Observable<Boolean> sequenceEqual(Observable<T> first, Observa
* The resulting {@code Observable<R>} returned from {@code zip} will invoke {@link Observer#onNext onNext} as many times as the number of {@code onNext} invocations
* of the source Observable that emits the fewest items.
*
* @param w0
* @param o1
* one source Observable
* @param w1
* another source Observable
* @param w2
* @param o2
* a second source Observable
* @param o3
* a third source Observable
* @param function
* @param zipFunction
* a function that, when applied to an item emitted by each of the source
* Observables, results in an item that will be emitted by the resulting Observable
* @return an Observable that emits the zipped results
*/
public static <R, T0, T1, T2> Observable<R> zip(Observable<? extends T0> w0, Observable<? extends T1> w1, Observable<? extends T2> w2, Func3<? super T0, ? super T1, ? super T2, ? extends R> function) {
return create(OperationZip.zip(w0, w1, w2, function));
public static <T1, T2, T3, R> Observable<R> zip(Observable<? extends T1> o1, Observable<? extends T2> o2, Observable<? extends T3> o3, Func3<? super T1, ? super T2, ? super T3, ? extends R> zipFunction) {
return create(OperationZip.zip(o1, o2, o3, zipFunction));
}

/**
Expand All @@ -1010,21 +1014,210 @@ public static <R, T0, T1, T2> Observable<R> zip(Observable<? extends T0> w0, Obs
* The resulting {@code Observable<R>} returned from {@code zip} will invoke {@link Observer#onNext onNext} as many times as the number of {@code onNext} invocations
* of the source Observable that emits the fewest items.
*
* @param w0
* @param o1
* one source Observable
* @param w1
* another source Observable
* @param w2
* @param o2
* a second source Observable
* @param o3
* a third source Observable
* @param w3
* @param o4
* a fourth source Observable
* @param reduceFunction
* @param zipFunction
* a function that, when applied to an item emitted by each of the source
* Observables, results in an item that will be emitted by the resulting Observable
* @return an Observable that emits the zipped results
*/
public static <T1, T2, T3, T4, R> Observable<R> zip(Observable<? extends T1> o1, Observable<? extends T2> o2, Observable<? extends T3> o3, Observable<? extends T4> o4, Func4<? super T1, ? super T2, ? super T3, ? super T4, ? extends R> zipFunction) {
return create(OperationZip.zip(o1, o2, o3, o4, zipFunction));
}

/**
* Returns an Observable that emits the results of a function of your choosing applied to
* combinations of four items emitted, in sequence, by four other Observables.
* <p>
* <img width="640" src="https://raw.github.com/wiki/Netflix/RxJava/images/rx-operators/zip.png">
* <p> {@code zip} applies this function in strict sequence, so the first item emitted by the
* new Observable will be the result of the function applied to the first item emitted by {@code w0}, the first item emitted by {@code w1}, the first item emitted by {@code w2}, and the first item
* emitted by {@code w3}; the second item emitted by
* the new Observable will be the result of the function applied to the second item emitted by
* each of those Observables; and so forth.
* <p>
* The resulting {@code Observable<R>} returned from {@code zip} will invoke {@link Observer#onNext onNext} as many times as the number of {@code onNext} invocations
* of the source Observable that emits the fewest items.
*
* @param o1
* one source Observable
* @param o2
* a second source Observable
* @param o3
* a third source Observable
* @param o4
* a fourth source Observable
* @param o5
* a fifth source Observable
* @param zipFunction
* a function that, when applied to an item emitted by each of the source
* Observables, results in an item that will be emitted by the resulting Observable
* @return an Observable that emits the zipped results
*/
public static <T1, T2, T3, T4, T5, R> Observable<R> zip(Observable<? extends T1> o1, Observable<? extends T2> o2, Observable<? extends T3> o3, Observable<? extends T4> o4, Observable<? extends T5> o5, Func5<? super T1, ? super T2, ? super T3, ? super T4, ? super T5, ? extends R> zipFunction) {
return create(OperationZip.zip(o1, o2, o3, o4, o5, zipFunction));
}

/**
* Returns an Observable that emits the results of a function of your choosing applied to
* combinations of four items emitted, in sequence, by four other Observables.
* <p>
* <img width="640" src="https://raw.github.com/wiki/Netflix/RxJava/images/rx-operators/zip.png">
* <p> {@code zip} applies this function in strict sequence, so the first item emitted by the
* new Observable will be the result of the function applied to the first item emitted by {@code w0}, the first item emitted by {@code w1}, the first item emitted by {@code w2}, and the first item
* emitted by {@code w3}; the second item emitted by
* the new Observable will be the result of the function applied to the second item emitted by
* each of those Observables; and so forth.
* <p>
* The resulting {@code Observable<R>} returned from {@code zip} will invoke {@link Observer#onNext onNext} as many times as the number of {@code onNext} invocations
* of the source Observable that emits the fewest items.
*
* @param o1
* one source Observable
* @param o2
* a second source Observable
* @param o3
* a third source Observable
* @param o4
* a fourth source Observable
* @param o5
* a fifth source Observable
* @param o6
* a sixth source Observable
* @param zipFunction
* a function that, when applied to an item emitted by each of the source
* Observables, results in an item that will be emitted by the resulting Observable
* @return an Observable that emits the zipped results
*/
public static <T1, T2, T3, T4, T5, T6, R> Observable<R> zip(Observable<? extends T1> o1, Observable<? extends T2> o2, Observable<? extends T3> o3, Observable<? extends T4> o4, Observable<? extends T5> o5, Observable<? extends T6> o6,
Func6<? super T1, ? super T2, ? super T3, ? super T4, ? super T5, ? super T6, ? extends R> zipFunction) {
return create(OperationZip.zip(o1, o2, o3, o4, o5, o6, zipFunction));
}

/**
* Returns an Observable that emits the results of a function of your choosing applied to
* combinations of four items emitted, in sequence, by four other Observables.
* <p>
* <img width="640" src="https://raw.github.com/wiki/Netflix/RxJava/images/rx-operators/zip.png">
* <p> {@code zip} applies this function in strict sequence, so the first item emitted by the
* new Observable will be the result of the function applied to the first item emitted by {@code w0}, the first item emitted by {@code w1}, the first item emitted by {@code w2}, and the first item
* emitted by {@code w3}; the second item emitted by
* the new Observable will be the result of the function applied to the second item emitted by
* each of those Observables; and so forth.
* <p>
* The resulting {@code Observable<R>} returned from {@code zip} will invoke {@link Observer#onNext onNext} as many times as the number of {@code onNext} invocations
* of the source Observable that emits the fewest items.
*
* @param o1
* one source Observable
* @param o2
* a second source Observable
* @param o3
* a third source Observable
* @param o4
* a fourth source Observable
* @param o5
* a fifth source Observable
* @param o6
* a sixth source Observable
* @param o7
* a seventh source Observable
* @param zipFunction
* a function that, when applied to an item emitted by each of the source
* Observables, results in an item that will be emitted by the resulting Observable
* @return an Observable that emits the zipped results
*/
public static <R, T0, T1, T2, T3> Observable<R> zip(Observable<? extends T0> w0, Observable<? extends T1> w1, Observable<? extends T2> w2, Observable<? extends T3> w3, Func4<? super T0, ? super T1, ? super T2, ? super T3, ? extends R> reduceFunction) {
return create(OperationZip.zip(w0, w1, w2, w3, reduceFunction));
public static <T1, T2, T3, T4, T5, T6, T7, R> Observable<R> zip(Observable<? extends T1> o1, Observable<? extends T2> o2, Observable<? extends T3> o3, Observable<? extends T4> o4, Observable<? extends T5> o5, Observable<? extends T6> o6, Observable<? extends T7> o7,
Func7<? super T1, ? super T2, ? super T3, ? super T4, ? super T5, ? super T6, ? super T7, ? extends R> zipFunction) {
return create(OperationZip.zip(o1, o2, o3, o4, o5, o6, o7, zipFunction));
}

/**
* Returns an Observable that emits the results of a function of your choosing applied to
* combinations of four items emitted, in sequence, by four other Observables.
* <p>
* <img width="640" src="https://raw.github.com/wiki/Netflix/RxJava/images/rx-operators/zip.png">
* <p> {@code zip} applies this function in strict sequence, so the first item emitted by the
* new Observable will be the result of the function applied to the first item emitted by {@code w0}, the first item emitted by {@code w1}, the first item emitted by {@code w2}, and the first item
* emitted by {@code w3}; the second item emitted by
* the new Observable will be the result of the function applied to the second item emitted by
* each of those Observables; and so forth.
* <p>
* The resulting {@code Observable<R>} returned from {@code zip} will invoke {@link Observer#onNext onNext} as many times as the number of {@code onNext} invocations
* of the source Observable that emits the fewest items.
*
* @param o1
* one source Observable
* @param o2
* a second source Observable
* @param o3
* a third source Observable
* @param o4
* a fourth source Observable
* @param o5
* a fifth source Observable
* @param o6
* a sixth source Observable
* @param o7
* a seventh source Observable
* @param o8
* an eighth source Observable
* @param zipFunction
* a function that, when applied to an item emitted by each of the source
* Observables, results in an item that will be emitted by the resulting Observable
* @return an Observable that emits the zipped results
*/
public static <T1, T2, T3, T4, T5, T6, T7, T8, R> Observable<R> zip(Observable<? extends T1> o1, Observable<? extends T2> o2, Observable<? extends T3> o3, Observable<? extends T4> o4, Observable<? extends T5> o5, Observable<? extends T6> o6, Observable<? extends T7> o7, Observable<? extends T8> o8,
Func8<? super T1, ? super T2, ? super T3, ? super T4, ? super T5, ? super T6, ? super T7, ? super T8, ? extends R> zipFunction) {
return create(OperationZip.zip(o1, o2, o3, o4, o5, o6, o7, o8, zipFunction));
}

/**
* Returns an Observable that emits the results of a function of your choosing applied to
* combinations of four items emitted, in sequence, by four other Observables.
* <p>
* <img width="640" src="https://raw.github.com/wiki/Netflix/RxJava/images/rx-operators/zip.png">
* <p> {@code zip} applies this function in strict sequence, so the first item emitted by the
* new Observable will be the result of the function applied to the first item emitted by {@code w0}, the first item emitted by {@code w1}, the first item emitted by {@code w2}, and the first item
* emitted by {@code w3}; the second item emitted by
* the new Observable will be the result of the function applied to the second item emitted by
* each of those Observables; and so forth.
* <p>
* The resulting {@code Observable<R>} returned from {@code zip} will invoke {@link Observer#onNext onNext} as many times as the number of {@code onNext} invocations
* of the source Observable that emits the fewest items.
*
* @param o1
* one source Observable
* @param o2
* a second source Observable
* @param o3
* a third source Observable
* @param o4
* a fourth source Observable
* @param o5
* a fifth source Observable
* @param o6
* a sixth source Observable
* @param o7
* a seventh source Observable
* @param o8
* an eighth source Observable
* @param o9
* a ninth source Observable
* @param zipFunction
* a function that, when applied to an item emitted by each of the source
* Observables, results in an item that will be emitted by the resulting Observable
* @return an Observable that emits the zipped results
*/
public static <T1, T2, T3, T4, T5, T6, T7, T8, T9, R> Observable<R> zip(Observable<? extends T1> o1, Observable<? extends T2> o2, Observable<? extends T3> o3, Observable<? extends T4> o4, Observable<? extends T5> o5, Observable<? extends T6> o6, Observable<? extends T7> o7, Observable<? extends T8> o8,
Observable<? extends T9> o9, Func9<? super T1, ? super T2, ? super T3, ? super T4, ? super T5, ? super T6, ? super T7, ? super T8, ? super T9, ? extends R> zipFunction) {
return create(OperationZip.zip(o1, o2, o3, o4, o5, o6, o7, o8, o9, zipFunction));
}

/**
Expand All @@ -1033,30 +1226,30 @@ public static <R, T0, T1, T2, T3> Observable<R> zip(Observable<? extends T0> w0,
* <p>
* <img width="640" src="https://github.com/Netflix/RxJava/wiki/images/rx-operators/combineLatest.png">
*
* @param w0
* @param o1
* The first source observable.
* @param w1
* @param o2
* The second source observable.
* @param combineFunction
* The aggregation function used to combine the source observable values.
* @return An Observable that combines the source Observables with the given combine function
*/
public static <R, T0, T1> Observable<R> combineLatest(Observable<T0> w0, Observable<T1> w1, Func2<? super T0, ? super T1, ? extends R> combineFunction) {
return create(OperationCombineLatest.combineLatest(w0, w1, combineFunction));
public static <T1, T2, R> Observable<R> combineLatest(Observable<T1> o1, Observable<T2> o2, Func2<? super T1, ? super T2, ? extends R> combineFunction) {
return create(OperationCombineLatest.combineLatest(o1, o2, combineFunction));
}

/**
* @see #combineLatest(Observable, Observable, Func2)
*/
public static <R, T0, T1, T2> Observable<R> combineLatest(Observable<T0> w0, Observable<T1> w1, Observable<T2> w2, Func3<? super T0, ? super T1, ? super T2, ? extends R> combineFunction) {
return create(OperationCombineLatest.combineLatest(w0, w1, w2, combineFunction));
public static <T1, T2, T3, R> Observable<R> combineLatest(Observable<T1> o1, Observable<T2> o2, Observable<T3> o3, Func3<? super T1, ? super T2, ? super T3, ? extends R> combineFunction) {
return create(OperationCombineLatest.combineLatest(o1, o2, o3, combineFunction));
}

/**
* @see #combineLatest(Observable, Observable, Func2)
*/
public static <R, T0, T1, T2, T3> Observable<R> combineLatest(Observable<T0> w0, Observable<T1> w1, Observable<T2> w2, Observable<T3> w3, Func4<? super T0, ? super T1, ? super T2, ? super T3, ? extends R> combineFunction) {
return create(OperationCombineLatest.combineLatest(w0, w1, w2, w3, combineFunction));
public static <T1, T2, T3, T4, R> Observable<R> combineLatest(Observable<T1> o1, Observable<T2> o2, Observable<T3> o3, Observable<T4> o4, Func4<? super T1, ? super T2, ? super T3, ? super T4, ? extends R> combineFunction) {
return create(OperationCombineLatest.combineLatest(o1, o2, o3, o4, combineFunction));
}

/**
Expand Down Expand Up @@ -1307,7 +1500,7 @@ public Observable<R> call(List<Observable<?>> wsList) {
*
* @param ws
* A collection of source Observables
* @param reduceFunction
* @param zipFunction
* a function that, when applied to an item emitted by each of the source
* Observables, results in an item that will be emitted by the resulting Observable
* @return an Observable that emits the zipped results
Expand Down
Loading

0 comments on commit ebaa616

Please sign in to comment.