Skip to content

Commit

Permalink
Removed the 'vararg' overload and added 2-9 args overloads
Browse files Browse the repository at this point in the history
  • Loading branch information
zsxwing committed Nov 3, 2013
1 parent 24619f0 commit 50b04ec
Show file tree
Hide file tree
Showing 2 changed files with 250 additions and 10 deletions.
170 changes: 166 additions & 4 deletions rxjava-core/src/main/java/rx/Observable.java
Original file line number Diff line number Diff line change
Expand Up @@ -4580,15 +4580,177 @@ public Observable<TimeInterval<T>> timeInterval(Scheduler scheduler) {
/**
* Propagates the observable sequence that reacts first.
*
* @param sources
* observable sources competing to react first.
* @param o1
* an observable competing to react first.
* @param o2
* an observable competing to react first.
* @return
* an observable sequence that surfaces any of the given sequences, whichever reacted first.
* @see <a href="http://msdn.microsoft.com/en-us/library/hh229733(v=vs.103).aspx">MSDN: Observable.Amb</a>
*/
public static <T> Observable<T> amb(Observable<? extends T> o1, Observable<? extends T> o2) {
return create(OperationAmb.amb(o1, o2));
}

/**
* Propagates the observable sequence that reacts first.
*
* @param o1
* an observable competing to react first.
* @param o2
* an observable competing to react first.
* @param o3
* an observable competing to react first.
* @return
* an observable sequence that surfaces any of the given sequences, whichever reacted first.
* @see <a href="http://msdn.microsoft.com/en-us/library/hh229733(v=vs.103).aspx">MSDN: Observable.Amb</a>
*/
public static <T> Observable<T> amb(Observable<? extends T>... sources) {
return create(OperationAmb.amb(sources));
public static <T> Observable<T> amb(Observable<? extends T> o1, Observable<? extends T> o2, Observable<? extends T> o3) {
return create(OperationAmb.amb(o1, o2, o3));
}

/**
* Propagates the observable sequence that reacts first.
*
* @param o1
* an observable competing to react first.
* @param o2
* an observable competing to react first.
* @param o3
* an observable competing to react first.
* @param o4
* an observable competing to react first.
* @return
* an observable sequence that surfaces any of the given sequences, whichever reacted first.
* @see <a href="http://msdn.microsoft.com/en-us/library/hh229733(v=vs.103).aspx">MSDN: Observable.Amb</a>
*/
public static <T> Observable<T> amb(Observable<? extends T> o1, Observable<? extends T> o2, Observable<? extends T> o3, Observable<? extends T> o4) {
return create(OperationAmb.amb(o1, o2, o3, o4));
}

/**
* Propagates the observable sequence that reacts first.
*
* @param o1
* an observable competing to react first.
* @param o2
* an observable competing to react first.
* @param o3
* an observable competing to react first.
* @param o4
* an observable competing to react first.
* @param o5
* an observable competing to react first.
* @return
* an observable sequence that surfaces any of the given sequences, whichever reacted first.
* @see <a href="http://msdn.microsoft.com/en-us/library/hh229733(v=vs.103).aspx">MSDN: Observable.Amb</a>
*/
public static <T> Observable<T> amb(Observable<? extends T> o1, Observable<? extends T> o2, Observable<? extends T> o3, Observable<? extends T> o4, Observable<? extends T> o5) {
return create(OperationAmb.amb(o1, o2, o3, o4, o5));
}

/**
* Propagates the observable sequence that reacts first.
*
* @param o1
* an observable competing to react first.
* @param o2
* an observable competing to react first.
* @param o3
* an observable competing to react first.
* @param o4
* an observable competing to react first.
* @param o5
* an observable competing to react first.
* @param o6
* an observable competing to react first.
* @return
* an observable sequence that surfaces any of the given sequences, whichever reacted first.
* @see <a href="http://msdn.microsoft.com/en-us/library/hh229733(v=vs.103).aspx">MSDN: Observable.Amb</a>
*/
public static <T> Observable<T> amb(Observable<? extends T> o1, Observable<? extends T> o2, Observable<? extends T> o3, Observable<? extends T> o4, Observable<? extends T> o5, Observable<? extends T> o6) {
return create(OperationAmb.amb(o1, o2, o3, o4, o5, o6));
}

/**
* Propagates the observable sequence that reacts first.
*
* @param o1
* an observable competing to react first.
* @param o2
* an observable competing to react first.
* @param o3
* an observable competing to react first.
* @param o4
* an observable competing to react first.
* @param o5
* an observable competing to react first.
* @param o6
* an observable competing to react first.
* @param o7
* an observable competing to react first.
* @return
* an observable sequence that surfaces any of the given sequences, whichever reacted first.
* @see <a href="http://msdn.microsoft.com/en-us/library/hh229733(v=vs.103).aspx">MSDN: Observable.Amb</a>
*/
public static <T> Observable<T> amb(Observable<? extends T> o1, Observable<? extends T> o2, Observable<? extends T> o3, Observable<? extends T> o4, Observable<? extends T> o5, Observable<? extends T> o6, Observable<? extends T> o7) {
return create(OperationAmb.amb(o1, o2, o3, o4, o5, o6, o7));
}

/**
* Propagates the observable sequence that reacts first.
*
* @param o1
* an observable competing to react first.
* @param o2
* an observable competing to react first.
* @param o3
* an observable competing to react first.
* @param o4
* an observable competing to react first.
* @param o5
* an observable competing to react first.
* @param o6
* an observable competing to react first.
* @param o7
* an observable competing to react first.
* @param o8
* an observable competing to react first.
* @return
* an observable sequence that surfaces any of the given sequences, whichever reacted first.
* @see <a href="http://msdn.microsoft.com/en-us/library/hh229733(v=vs.103).aspx">MSDN: Observable.Amb</a>
*/
public static <T> Observable<T> amb(Observable<? extends T> o1, Observable<? extends T> o2, Observable<? extends T> o3, Observable<? extends T> o4, Observable<? extends T> o5, Observable<? extends T> o6, Observable<? extends T> o7, Observable<? extends T> o8) {
return create(OperationAmb.amb(o1, o2, o3, o4, o5, o6, o7, o8));
}

/**
* Propagates the observable sequence that reacts first.
*
* @param o1
* an observable competing to react first.
* @param o2
* an observable competing to react first.
* @param o3
* an observable competing to react first.
* @param o4
* an observable competing to react first.
* @param o5
* an observable competing to react first.
* @param o6
* an observable competing to react first.
* @param o7
* an observable competing to react first.
* @param o8
* an observable competing to react first.
* @param o9
* an observable competing to react first.
* @return
* an observable sequence that surfaces any of the given sequences, whichever reacted first.
* @see <a href="http://msdn.microsoft.com/en-us/library/hh229733(v=vs.103).aspx">MSDN: Observable.Amb</a>
*/
public static <T> Observable<T> amb(Observable<? extends T> o1, Observable<? extends T> o2, Observable<? extends T> o3, Observable<? extends T> o4, Observable<? extends T> o5, Observable<? extends T> o6, Observable<? extends T> o7, Observable<? extends T> o8, Observable<? extends T> o9) {
return create(OperationAmb.amb(o1, o2, o3, o4, o5, o6, o7, o8, o9));
}

/**
Expand Down
90 changes: 84 additions & 6 deletions rxjava-core/src/main/java/rx/operators/OperationAmb.java
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,8 @@
import static org.mockito.Mockito.times;

import java.io.IOException;
import java.util.Arrays;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;

Expand All @@ -41,8 +42,88 @@
*/
public class OperationAmb {

public static <T> OnSubscribeFunc<T> amb(Observable<? extends T>... sources) {
return amb(Arrays.asList(sources));
public static <T> OnSubscribeFunc<T> amb(Observable<? extends T> o1, Observable<? extends T> o2) {
List<Observable<? extends T>> sources = new ArrayList<Observable<? extends T>>();
sources.add(o1);
sources.add(o2);
return amb(sources);
}

public static <T> OnSubscribeFunc<T> amb(Observable<? extends T> o1, Observable<? extends T> o2, Observable<? extends T> o3) {
List<Observable<? extends T>> sources = new ArrayList<Observable<? extends T>>();
sources.add(o1);
sources.add(o2);
sources.add(o3);
return amb(sources);
}

public static <T> OnSubscribeFunc<T> amb(Observable<? extends T> o1, Observable<? extends T> o2, Observable<? extends T> o3, Observable<? extends T> o4) {
List<Observable<? extends T>> sources = new ArrayList<Observable<? extends T>>();
sources.add(o1);
sources.add(o2);
sources.add(o3);
sources.add(o4);
return amb(sources);
}

public static <T> OnSubscribeFunc<T> amb(Observable<? extends T> o1, Observable<? extends T> o2, Observable<? extends T> o3, Observable<? extends T> o4, Observable<? extends T> o5) {
List<Observable<? extends T>> sources = new ArrayList<Observable<? extends T>>();
sources.add(o1);
sources.add(o2);
sources.add(o3);
sources.add(o4);
sources.add(o5);
return amb(sources);
}

public static <T> OnSubscribeFunc<T> amb(Observable<? extends T> o1, Observable<? extends T> o2, Observable<? extends T> o3, Observable<? extends T> o4, Observable<? extends T> o5, Observable<? extends T> o6) {
List<Observable<? extends T>> sources = new ArrayList<Observable<? extends T>>();
sources.add(o1);
sources.add(o2);
sources.add(o3);
sources.add(o4);
sources.add(o5);
sources.add(o6);
return amb(sources);
}

public static <T> OnSubscribeFunc<T> amb(Observable<? extends T> o1, Observable<? extends T> o2, Observable<? extends T> o3, Observable<? extends T> o4, Observable<? extends T> o5, Observable<? extends T> o6, Observable<? extends T> o7) {
List<Observable<? extends T>> sources = new ArrayList<Observable<? extends T>>();
sources.add(o1);
sources.add(o2);
sources.add(o3);
sources.add(o4);
sources.add(o5);
sources.add(o6);
sources.add(o7);
return amb(sources);
}

public static <T> OnSubscribeFunc<T> amb(Observable<? extends T> o1, Observable<? extends T> o2, Observable<? extends T> o3, Observable<? extends T> o4, Observable<? extends T> o5, Observable<? extends T> o6, Observable<? extends T> o7, Observable<? extends T> o8) {
List<Observable<? extends T>> sources = new ArrayList<Observable<? extends T>>();
sources.add(o1);
sources.add(o2);
sources.add(o3);
sources.add(o4);
sources.add(o5);
sources.add(o6);
sources.add(o7);
sources.add(o8);
return amb(sources);
}

public static <T> OnSubscribeFunc<T> amb(Observable<? extends T> o1, Observable<? extends T> o2, Observable<? extends T> o3, Observable<? extends T> o4, Observable<? extends T> o5, Observable<? extends T> o6, Observable<? extends T> o7, Observable<? extends T> o8, Observable<? extends T> o9) {
List<Observable<? extends T>> sources = new ArrayList<Observable<? extends T>>();
sources.add(o1);
sources.add(o2);
sources.add(o3);
sources.add(o4);
sources.add(o5);
sources.add(o6);
sources.add(o7);
sources.add(o8);
sources.add(o9);
return amb(sources);
}

public static <T> OnSubscribeFunc<T> amb(
Expand Down Expand Up @@ -170,7 +251,6 @@ public void testAmb() {
Observable<String> observable3 = createObservable(new String[] {
"3", "33", "333", "3333" }, 3000, null);

@SuppressWarnings("unchecked")
Observable<String> o = Observable.create(amb(observable1,
observable2, observable3));

Expand Down Expand Up @@ -200,7 +280,6 @@ public void testAmb2() {
Observable<String> observable3 = createObservable(new String[] {},
3000, new IOException("fake exception"));

@SuppressWarnings("unchecked")
Observable<String> o = Observable.create(amb(observable1,
observable2, observable3));

Expand Down Expand Up @@ -228,7 +307,6 @@ public void testAmb3() {
Observable<String> observable3 = createObservable(new String[] {
"3" }, 3000, null);

@SuppressWarnings("unchecked")
Observable<String> o = Observable.create(amb(observable1,
observable2, observable3));

Expand Down

0 comments on commit 50b04ec

Please sign in to comment.