Skip to content

Commit

Permalink
Merge pull request #467 from benjchristensen/amb-merge
Browse files Browse the repository at this point in the history
Merge Amb Operator in PR #460
  • Loading branch information
benjchristensen committed Nov 5, 2013
2 parents 069ae42 + 4b9ca57 commit 5e43558
Show file tree
Hide file tree
Showing 3 changed files with 539 additions and 0 deletions.
191 changes: 191 additions & 0 deletions rxjava-core/src/main/java/rx/Observable.java
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
import rx.observables.ConnectableObservable;
import rx.observables.GroupedObservable;
import rx.operators.OperationAll;
import rx.operators.OperationAmb;
import rx.operators.OperationAny;
import rx.operators.OperationAverage;
import rx.operators.OperationBuffer;
Expand Down Expand Up @@ -4576,6 +4577,196 @@ public Observable<TimeInterval<T>> timeInterval(Scheduler scheduler) {
return create(OperationTimeInterval.timeInterval(this, scheduler));
}

/**
* Propagates the observable sequence that reacts first.
*
* @param o1
* an observable competing to react first.
* @param o2
* an observable competing to react first.
* @return
* an observable sequence that surfaces any of the given sequences, whichever reacted first.
* @see <a href="http://msdn.microsoft.com/en-us/library/hh229733(v=vs.103).aspx">MSDN: Observable.Amb</a>
*/
public static <T> Observable<T> amb(Observable<? extends T> o1, Observable<? extends T> o2) {
return create(OperationAmb.amb(o1, o2));
}

/**
* Propagates the observable sequence that reacts first.
*
* @param o1
* an observable competing to react first.
* @param o2
* an observable competing to react first.
* @param o3
* an observable competing to react first.
* @return
* an observable sequence that surfaces any of the given sequences, whichever reacted first.
* @see <a href="http://msdn.microsoft.com/en-us/library/hh229733(v=vs.103).aspx">MSDN: Observable.Amb</a>
*/
public static <T> Observable<T> amb(Observable<? extends T> o1, Observable<? extends T> o2, Observable<? extends T> o3) {
return create(OperationAmb.amb(o1, o2, o3));
}

/**
* Propagates the observable sequence that reacts first.
*
* @param o1
* an observable competing to react first.
* @param o2
* an observable competing to react first.
* @param o3
* an observable competing to react first.
* @param o4
* an observable competing to react first.
* @return
* an observable sequence that surfaces any of the given sequences, whichever reacted first.
* @see <a href="http://msdn.microsoft.com/en-us/library/hh229733(v=vs.103).aspx">MSDN: Observable.Amb</a>
*/
public static <T> Observable<T> amb(Observable<? extends T> o1, Observable<? extends T> o2, Observable<? extends T> o3, Observable<? extends T> o4) {
return create(OperationAmb.amb(o1, o2, o3, o4));
}

/**
* Propagates the observable sequence that reacts first.
*
* @param o1
* an observable competing to react first.
* @param o2
* an observable competing to react first.
* @param o3
* an observable competing to react first.
* @param o4
* an observable competing to react first.
* @param o5
* an observable competing to react first.
* @return
* an observable sequence that surfaces any of the given sequences, whichever reacted first.
* @see <a href="http://msdn.microsoft.com/en-us/library/hh229733(v=vs.103).aspx">MSDN: Observable.Amb</a>
*/
public static <T> Observable<T> amb(Observable<? extends T> o1, Observable<? extends T> o2, Observable<? extends T> o3, Observable<? extends T> o4, Observable<? extends T> o5) {
return create(OperationAmb.amb(o1, o2, o3, o4, o5));
}

/**
* Propagates the observable sequence that reacts first.
*
* @param o1
* an observable competing to react first.
* @param o2
* an observable competing to react first.
* @param o3
* an observable competing to react first.
* @param o4
* an observable competing to react first.
* @param o5
* an observable competing to react first.
* @param o6
* an observable competing to react first.
* @return
* an observable sequence that surfaces any of the given sequences, whichever reacted first.
* @see <a href="http://msdn.microsoft.com/en-us/library/hh229733(v=vs.103).aspx">MSDN: Observable.Amb</a>
*/
public static <T> Observable<T> amb(Observable<? extends T> o1, Observable<? extends T> o2, Observable<? extends T> o3, Observable<? extends T> o4, Observable<? extends T> o5, Observable<? extends T> o6) {
return create(OperationAmb.amb(o1, o2, o3, o4, o5, o6));
}

/**
* Propagates the observable sequence that reacts first.
*
* @param o1
* an observable competing to react first.
* @param o2
* an observable competing to react first.
* @param o3
* an observable competing to react first.
* @param o4
* an observable competing to react first.
* @param o5
* an observable competing to react first.
* @param o6
* an observable competing to react first.
* @param o7
* an observable competing to react first.
* @return
* an observable sequence that surfaces any of the given sequences, whichever reacted first.
* @see <a href="http://msdn.microsoft.com/en-us/library/hh229733(v=vs.103).aspx">MSDN: Observable.Amb</a>
*/
public static <T> Observable<T> amb(Observable<? extends T> o1, Observable<? extends T> o2, Observable<? extends T> o3, Observable<? extends T> o4, Observable<? extends T> o5, Observable<? extends T> o6, Observable<? extends T> o7) {
return create(OperationAmb.amb(o1, o2, o3, o4, o5, o6, o7));
}

/**
* Propagates the observable sequence that reacts first.
*
* @param o1
* an observable competing to react first.
* @param o2
* an observable competing to react first.
* @param o3
* an observable competing to react first.
* @param o4
* an observable competing to react first.
* @param o5
* an observable competing to react first.
* @param o6
* an observable competing to react first.
* @param o7
* an observable competing to react first.
* @param o8
* an observable competing to react first.
* @return
* an observable sequence that surfaces any of the given sequences, whichever reacted first.
* @see <a href="http://msdn.microsoft.com/en-us/library/hh229733(v=vs.103).aspx">MSDN: Observable.Amb</a>
*/
public static <T> Observable<T> amb(Observable<? extends T> o1, Observable<? extends T> o2, Observable<? extends T> o3, Observable<? extends T> o4, Observable<? extends T> o5, Observable<? extends T> o6, Observable<? extends T> o7, Observable<? extends T> o8) {
return create(OperationAmb.amb(o1, o2, o3, o4, o5, o6, o7, o8));
}

/**
* Propagates the observable sequence that reacts first.
*
* @param o1
* an observable competing to react first.
* @param o2
* an observable competing to react first.
* @param o3
* an observable competing to react first.
* @param o4
* an observable competing to react first.
* @param o5
* an observable competing to react first.
* @param o6
* an observable competing to react first.
* @param o7
* an observable competing to react first.
* @param o8
* an observable competing to react first.
* @param o9
* an observable competing to react first.
* @return
* an observable sequence that surfaces any of the given sequences, whichever reacted first.
* @see <a href="http://msdn.microsoft.com/en-us/library/hh229733(v=vs.103).aspx">MSDN: Observable.Amb</a>
*/
public static <T> Observable<T> amb(Observable<? extends T> o1, Observable<? extends T> o2, Observable<? extends T> o3, Observable<? extends T> o4, Observable<? extends T> o5, Observable<? extends T> o6, Observable<? extends T> o7, Observable<? extends T> o8, Observable<? extends T> o9) {
return create(OperationAmb.amb(o1, o2, o3, o4, o5, o6, o7, o8, o9));
}

/**
* Propagates the observable sequence that reacts first.
*
* @param sources
* observable sources competing to react first.
*
* @return
* an observable sequence that surfaces any of the given sequences, whichever reacted first.
* @see <a href="http://msdn.microsoft.com/en-us/library/hh229115(v=vs.103).aspx">MSDN: Observable.Amb</a>
*/
public static <T> Observable<T> amb(Iterable<? extends Observable<? extends T>> sources) {
return create(OperationAmb.amb(sources));
}

/**
* Whether a given {@link Function} is an internal implementation inside rx.* packages or not.
* <p>
Expand Down
191 changes: 191 additions & 0 deletions rxjava-core/src/main/java/rx/operators/OperationAmb.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,191 @@
/**
* Copyright 2013 Netflix, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package rx.operators;

import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.atomic.AtomicInteger;

import rx.Observable;
import rx.Observable.OnSubscribeFunc;
import rx.Observer;
import rx.Subscription;
import rx.subscriptions.CompositeSubscription;

/**
* Propagates the observable sequence that reacts first.
*/
public class OperationAmb {

public static <T> OnSubscribeFunc<T> amb(Observable<? extends T> o1, Observable<? extends T> o2) {
List<Observable<? extends T>> sources = new ArrayList<Observable<? extends T>>();
sources.add(o1);
sources.add(o2);
return amb(sources);
}

public static <T> OnSubscribeFunc<T> amb(Observable<? extends T> o1, Observable<? extends T> o2, Observable<? extends T> o3) {
List<Observable<? extends T>> sources = new ArrayList<Observable<? extends T>>();
sources.add(o1);
sources.add(o2);
sources.add(o3);
return amb(sources);
}

public static <T> OnSubscribeFunc<T> amb(Observable<? extends T> o1, Observable<? extends T> o2, Observable<? extends T> o3, Observable<? extends T> o4) {
List<Observable<? extends T>> sources = new ArrayList<Observable<? extends T>>();
sources.add(o1);
sources.add(o2);
sources.add(o3);
sources.add(o4);
return amb(sources);
}

public static <T> OnSubscribeFunc<T> amb(Observable<? extends T> o1, Observable<? extends T> o2, Observable<? extends T> o3, Observable<? extends T> o4, Observable<? extends T> o5) {
List<Observable<? extends T>> sources = new ArrayList<Observable<? extends T>>();
sources.add(o1);
sources.add(o2);
sources.add(o3);
sources.add(o4);
sources.add(o5);
return amb(sources);
}

public static <T> OnSubscribeFunc<T> amb(Observable<? extends T> o1, Observable<? extends T> o2, Observable<? extends T> o3, Observable<? extends T> o4, Observable<? extends T> o5, Observable<? extends T> o6) {
List<Observable<? extends T>> sources = new ArrayList<Observable<? extends T>>();
sources.add(o1);
sources.add(o2);
sources.add(o3);
sources.add(o4);
sources.add(o5);
sources.add(o6);
return amb(sources);
}

public static <T> OnSubscribeFunc<T> amb(Observable<? extends T> o1, Observable<? extends T> o2, Observable<? extends T> o3, Observable<? extends T> o4, Observable<? extends T> o5, Observable<? extends T> o6, Observable<? extends T> o7) {
List<Observable<? extends T>> sources = new ArrayList<Observable<? extends T>>();
sources.add(o1);
sources.add(o2);
sources.add(o3);
sources.add(o4);
sources.add(o5);
sources.add(o6);
sources.add(o7);
return amb(sources);
}

public static <T> OnSubscribeFunc<T> amb(Observable<? extends T> o1, Observable<? extends T> o2, Observable<? extends T> o3, Observable<? extends T> o4, Observable<? extends T> o5, Observable<? extends T> o6, Observable<? extends T> o7, Observable<? extends T> o8) {
List<Observable<? extends T>> sources = new ArrayList<Observable<? extends T>>();
sources.add(o1);
sources.add(o2);
sources.add(o3);
sources.add(o4);
sources.add(o5);
sources.add(o6);
sources.add(o7);
sources.add(o8);
return amb(sources);
}

public static <T> OnSubscribeFunc<T> amb(Observable<? extends T> o1, Observable<? extends T> o2, Observable<? extends T> o3, Observable<? extends T> o4, Observable<? extends T> o5, Observable<? extends T> o6, Observable<? extends T> o7, Observable<? extends T> o8, Observable<? extends T> o9) {
List<Observable<? extends T>> sources = new ArrayList<Observable<? extends T>>();
sources.add(o1);
sources.add(o2);
sources.add(o3);
sources.add(o4);
sources.add(o5);
sources.add(o6);
sources.add(o7);
sources.add(o8);
sources.add(o9);
return amb(sources);
}

public static <T> OnSubscribeFunc<T> amb(
final Iterable<? extends Observable<? extends T>> sources) {
return new OnSubscribeFunc<T>() {

@Override
public Subscription onSubscribe(final Observer<? super T> observer) {
AtomicInteger choice = new AtomicInteger(AmbObserver.NONE);
int index = 0;
CompositeSubscription parentSubscription = new CompositeSubscription();
for (Observable<? extends T> source : sources) {
SafeObservableSubscription subscription = new SafeObservableSubscription();
AmbObserver<T> ambObserver = new AmbObserver<T>(
subscription, observer, index, choice);
parentSubscription.add(subscription.wrap(source
.subscribe(ambObserver)));
index++;
}
return parentSubscription;
}
};
}

private static class AmbObserver<T> implements Observer<T> {

private static final int NONE = -1;

private Subscription subscription;
private Observer<? super T> observer;
private int index;
private AtomicInteger choice;

private AmbObserver(Subscription subscription,
Observer<? super T> observer, int index, AtomicInteger choice) {
this.subscription = subscription;
this.observer = observer;
this.choice = choice;
this.index = index;
}

@Override
public void onNext(T args) {
if (!isSelected()) {
subscription.unsubscribe();
return;
}
observer.onNext(args);
}

@Override
public void onCompleted() {
if (!isSelected()) {
subscription.unsubscribe();
return;
}
observer.onCompleted();
}

@Override
public void onError(Throwable e) {
if (!isSelected()) {
subscription.unsubscribe();
return;
}
observer.onError(e);
}

private boolean isSelected() {
if (choice.get() == NONE) {
return choice.compareAndSet(NONE, index);
}
return choice.get() == index;
}
}

}
Loading

0 comments on commit 5e43558

Please sign in to comment.