Skip to content

Commit

Permalink
Merge pull request #718 from zsxwing/merge-overloads
Browse files Browse the repository at this point in the history
Implemented the Merge overloads
  • Loading branch information
benjchristensen committed Jan 14, 2014
2 parents e335e61 + 7526eb7 commit dadf17b
Show file tree
Hide file tree
Showing 3 changed files with 322 additions and 163 deletions.
170 changes: 147 additions & 23 deletions rxjava-core/src/main/java/rx/Observable.java
Original file line number Diff line number Diff line change
Expand Up @@ -1243,10 +1243,8 @@ public static <T> Observable<T> merge(Observable<? extends Observable<? extends
* @see <a href="https://github.com/Netflix/RxJava/wiki/Combining-Observables#merge">RxJava Wiki: merge()</a>
* @see <a href="http://msdn.microsoft.com/en-us/library/hh229099.aspx">MSDN: Observable.Merge</a>
*/
@SuppressWarnings("unchecked")
// suppress because the types are checked by the method signature before using a vararg
public static <T> Observable<T> merge(Observable<? extends T> t1, Observable<? extends T> t2) {
return create(OperationMerge.merge(t1, t2));
return merge(from(t1, t2));
}

/**
Expand All @@ -1266,10 +1264,8 @@ public static <T> Observable<T> merge(Observable<? extends T> t1, Observable<? e
* @see <a href="https://github.com/Netflix/RxJava/wiki/Combining-Observables#merge">RxJava Wiki: merge()</a>
* @see <a href="http://msdn.microsoft.com/en-us/library/hh229099.aspx">MSDN: Observable.Merge</a>
*/
@SuppressWarnings("unchecked")
// suppress because the types are checked by the method signature before using a vararg
public static <T> Observable<T> merge(Observable<? extends T> t1, Observable<? extends T> t2, Observable<? extends T> t3) {
return create(OperationMerge.merge(t1, t2, t3));
return merge(from(t1, t2, t3));
}

/**
Expand All @@ -1290,10 +1286,8 @@ public static <T> Observable<T> merge(Observable<? extends T> t1, Observable<? e
* @see <a href="https://github.com/Netflix/RxJava/wiki/Combining-Observables#merge">RxJava Wiki: merge()</a>
* @see <a href="http://msdn.microsoft.com/en-us/library/hh229099.aspx">MSDN: Observable.Merge</a>
*/
@SuppressWarnings("unchecked")
// suppress because the types are checked by the method signature before using a vararg
public static <T> Observable<T> merge(Observable<? extends T> t1, Observable<? extends T> t2, Observable<? extends T> t3, Observable<? extends T> t4) {
return create(OperationMerge.merge(t1, t2, t3, t4));
return merge(from(t1, t2, t3, t4));
}

/**
Expand All @@ -1315,10 +1309,8 @@ public static <T> Observable<T> merge(Observable<? extends T> t1, Observable<? e
* @see <a href="https://github.com/Netflix/RxJava/wiki/Combining-Observables#merge">RxJava Wiki: merge()</a>
* @see <a href="http://msdn.microsoft.com/en-us/library/hh229099.aspx">MSDN: Observable.Merge</a>
*/
@SuppressWarnings("unchecked")
// suppress because the types are checked by the method signature before using a vararg
public static <T> Observable<T> merge(Observable<? extends T> t1, Observable<? extends T> t2, Observable<? extends T> t3, Observable<? extends T> t4, Observable<? extends T> t5) {
return create(OperationMerge.merge(t1, t2, t3, t4, t5));
return merge(from(t1, t2, t3, t4, t5));
}

/**
Expand All @@ -1341,10 +1333,8 @@ public static <T> Observable<T> merge(Observable<? extends T> t1, Observable<? e
* @see <a href="https://github.com/Netflix/RxJava/wiki/Combining-Observables#merge">RxJava Wiki: merge()</a>
* @see <a href="http://msdn.microsoft.com/en-us/library/hh229099.aspx">MSDN: Observable.Merge</a>
*/
@SuppressWarnings("unchecked")
// suppress because the types are checked by the method signature before using a vararg
public static <T> Observable<T> merge(Observable<? extends T> t1, Observable<? extends T> t2, Observable<? extends T> t3, Observable<? extends T> t4, Observable<? extends T> t5, Observable<? extends T> t6) {
return create(OperationMerge.merge(t1, t2, t3, t4, t5, t6));
return merge(from(t1, t2, t3, t4, t5, t6));
}

/**
Expand All @@ -1368,10 +1358,8 @@ public static <T> Observable<T> merge(Observable<? extends T> t1, Observable<? e
* @see <a href="https://github.com/Netflix/RxJava/wiki/Combining-Observables#merge">RxJava Wiki: merge()</a>
* @see <a href="http://msdn.microsoft.com/en-us/library/hh229099.aspx">MSDN: Observable.Merge</a>
*/
@SuppressWarnings("unchecked")
// suppress because the types are checked by the method signature before using a vararg
public static <T> Observable<T> merge(Observable<? extends T> t1, Observable<? extends T> t2, Observable<? extends T> t3, Observable<? extends T> t4, Observable<? extends T> t5, Observable<? extends T> t6, Observable<? extends T> t7) {
return create(OperationMerge.merge(t1, t2, t3, t4, t5, t6, t7));
return merge(from(t1, t2, t3, t4, t5, t6, t7));
}

/**
Expand All @@ -1396,10 +1384,8 @@ public static <T> Observable<T> merge(Observable<? extends T> t1, Observable<? e
* @see <a href="https://github.com/Netflix/RxJava/wiki/Combining-Observables#merge">RxJava Wiki: merge()</a>
* @see <a href="http://msdn.microsoft.com/en-us/library/hh229099.aspx">MSDN: Observable.Merge</a>
*/
@SuppressWarnings("unchecked")
// suppress because the types are checked by the method signature before using a vararg
public static <T> Observable<T> merge(Observable<? extends T> t1, Observable<? extends T> t2, Observable<? extends T> t3, Observable<? extends T> t4, Observable<? extends T> t5, Observable<? extends T> t6, Observable<? extends T> t7, Observable<? extends T> t8) {
return create(OperationMerge.merge(t1, t2, t3, t4, t5, t6, t7, t8));
return merge(from(t1, t2, t3, t4, t5, t6, t7, t8));
}

/**
Expand All @@ -1425,10 +1411,148 @@ public static <T> Observable<T> merge(Observable<? extends T> t1, Observable<? e
* @see <a href="https://github.com/Netflix/RxJava/wiki/Combining-Observables#merge">RxJava Wiki: merge()</a>
* @see <a href="http://msdn.microsoft.com/en-us/library/hh229099.aspx">MSDN: Observable.Merge</a>
*/
@SuppressWarnings("unchecked")
// suppress because the types are checked by the method signature before using a vararg
public static <T> Observable<T> merge(Observable<? extends T> t1, Observable<? extends T> t2, Observable<? extends T> t3, Observable<? extends T> t4, Observable<? extends T> t5, Observable<? extends T> t6, Observable<? extends T> t7, Observable<? extends T> t8, Observable<? extends T> t9) {
return create(OperationMerge.merge(t1, t2, t3, t4, t5, t6, t7, t8, t9));
return merge(from(t1, t2, t3, t4, t5, t6, t7, t8, t9));
}

/**
* Flattens a sequence of Observables emitted by an Observable into one Observable, without any transformation.
* The number of concurrent subscriptions to the Observables is limited by maxConcurrent.
* <p>
* <img width="640" src="https://raw.github.com/wiki/Netflix/RxJava/images/rx-operators/merge.png">
* <p>
* You can combine the items emitted by multiple Observables so that they
* act like a single Observable, by using the {@code merge} method.
*
* @param source an Observable that emits Observables
* @param maxConcurrent the maximum number of Observables being subscribed to concurrently
* @return an Observable that emits items that are the result of flattening
* the items emitted by the Observables emitted by the
* {@code source} Observable
* @throw IllegalArgumentException if maxConcurrent <= 0
* @see <a href="https://github.com/Netflix/RxJava/wiki/Combining-Observables#merge">RxJava Wiki: merge()</a>
* @see <a href="http://msdn.microsoft.com/en-us/library/hh211914(v=vs.103).aspx">MSDN: Observable.Merge</a>
*/
public static <T> Observable<T> merge(Observable<? extends Observable<? extends T>> source, int maxConcurrent) {
return create(OperationMerge.merge(source, maxConcurrent));
}

/**
* Flattens an Observable Iterable into one Observable, without any transformation.
* <p>
* <img width="640" src="https://raw.github.com/wiki/Netflix/RxJava/images/rx-operators/merge.png">
* <p>
* You can combine the items emitted by multiple Observables so that they
* act like a single Observable, by using the {@code merge} method.
*
* @param sequences the Observable Iterable
* @return an Observable that emits items that are the result of flattening
* the items emitted by the Observables in the Iterable
* @see <a href="https://github.com/Netflix/RxJava/wiki/Combining-Observables#merge">RxJava Wiki: merge()</a>
* @see <a href="http://msdn.microsoft.com/en-us/library/hh229590(v=vs.103).aspx">MSDN: Observable.Merge</a>
*/
public static <T> Observable<T> merge(Iterable<? extends Observable<? extends T>> sequences) {
return merge(from(sequences));
}

/**
* Flattens an Observable Iterable into one Observable, without any transformation.
* The number of concurrent subscriptions to the Observables is limited by maxConcurrent.
* <p>
* <img width="640" src="https://raw.github.com/wiki/Netflix/RxJava/images/rx-operators/merge.png">
* <p>
* You can combine the items emitted by multiple Observables so that they
* act like a single Observable, by using the {@code merge} method.
*
* @param sequences the Observable Iterable
* @param maxConcurrent the maximum number of Observables being subscribed to concurrently
* @return an Observable that emits items that are the result of flattening
* the items emitted by the Observables in the Iterable
* @throw IllegalArgumentException if maxConcurrent <= 0
* @see <a href="https://github.com/Netflix/RxJava/wiki/Combining-Observables#merge">RxJava Wiki: merge()</a>
* @see <a href="http://msdn.microsoft.com/en-us/library/hh229923(v=vs.103).aspx">MSDN: Observable.Merge</a>
*/
public static <T> Observable<T> merge(Iterable<? extends Observable<? extends T>> sequences, int maxConcurrent) {
return merge(from(sequences), maxConcurrent);
}

/**
* Flattens an Observable Iterable into one Observable, without any transformation.
* The number of concurrent subscriptions to the Observables is limited by maxConcurrent.
* <p>
* <img width="640" src="https://raw.github.com/wiki/Netflix/RxJava/images/rx-operators/merge.png">
* <p>
* You can combine the items emitted by multiple Observables so that they
* act like a single Observable, by using the {@code merge} method.
*
* @param sequences the Observable Iterable
* @param maxConcurrent the maximum number of Observables being subscribed to concurrently
* @param scheduler the scheduler to traversal the Observable array on
* @return an Observable that emits items that are the result of flattening
* the items emitted by the Observables in the Iterable
* @throw IllegalArgumentException if maxConcurrent <= 0
* @see <a href="https://github.com/Netflix/RxJava/wiki/Combining-Observables#merge">RxJava Wiki: merge()</a>
* @see <a href="http://msdn.microsoft.com/en-us/library/hh244329(v=vs.103).aspx">MSDN: Observable.Merge</a>
*/
public static <T> Observable<T> merge(Iterable<? extends Observable<? extends T>> sequences, int maxConcurrent, Scheduler scheduler) {
return merge(from(sequences, scheduler), maxConcurrent);
}

/**
* Flattens an Observable Iterable into one Observable, without any transformation.
* <p>
* <img width="640" src="https://raw.github.com/wiki/Netflix/RxJava/images/rx-operators/merge.png">
* <p>
* You can combine the items emitted by multiple Observables so that they
* act like a single Observable, by using the {@code merge} method.
*
* @param sequences the Observable Iterable
* @param scheduler the scheduler to traversal the Observable array on
* @return an Observable that emits items that are the result of flattening
* the items emitted by the Observables in the Iterable
* @see <a href="https://github.com/Netflix/RxJava/wiki/Combining-Observables#merge">RxJava Wiki: merge()</a>
* @see <a href="http://msdn.microsoft.com/en-us/library/hh244336(v=vs.103).aspx">MSDN: Observable.Merge</a>
*/
public static <T> Observable<T> merge(Iterable<? extends Observable<? extends T>> sequences, Scheduler scheduler) {
return merge(from(sequences, scheduler));
}

/**
* Flattens an Observable array into one Observable, without any transformation.
* <p>
* <img width="640" src="https://raw.github.com/wiki/Netflix/RxJava/images/rx-operators/merge.png">
* <p>
* You can combine the items emitted by multiple Observables so that they
* act like a single Observable, by using the {@code merge} method.
*
* @param sequences the Observable array
* @return an Observable that emits items that are the result of flattening
* the items emitted by the Observables in the array
* @see <a href="https://github.com/Netflix/RxJava/wiki/Combining-Observables#merge">RxJava Wiki: merge()</a>
* @see <a href="http://msdn.microsoft.com/en-us/library/hh229099(v=vs.103).aspx">MSDN: Observable.Merge</a>
*/
public static <T> Observable<T> merge(Observable<? extends T>[] sequences) {
return merge(from(sequences));
}

/**
* Flattens an Observable array into one Observable, without any transformation.
* <p>
* <img width="640" src="https://raw.github.com/wiki/Netflix/RxJava/images/rx-operators/merge.png">
* <p>
* You can combine the items emitted by multiple Observables so that they
* act like a single Observable, by using the {@code merge} method.
*
* @param sequences the Observable array
* @param scheduler the scheduler to traversal the Observable array on
* @return an Observable that emits items that are the result of flattening
* the items emitted by the Observables in the array
* @see <a href="https://github.com/Netflix/RxJava/wiki/Combining-Observables#merge">RxJava Wiki: merge()</a>
* @see <a href="http://msdn.microsoft.com/en-us/library/hh229061(v=vs.103).aspx">MSDN: Observable.Merge</a>
*/
public static <T> Observable<T> merge(Observable<? extends T>[] sequences, Scheduler scheduler) {
return merge(from(sequences, scheduler));
}

/**
Expand Down
Loading

0 comments on commit dadf17b

Please sign in to comment.