Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Pull 267 - Merge combineLatest #303

Merged
32 changes: 31 additions & 1 deletion rxjava-core/src/main/java/rx/Observable.java
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@
import rx.operators.AtomicObserver;
import rx.operators.OperationAll;
import rx.operators.OperationCache;
import rx.operators.OperationCombineLatest;
import rx.operators.OperationConcat;
import rx.operators.OperationDefer;
import rx.operators.OperationDematerialize;
Expand Down Expand Up @@ -2803,7 +2804,36 @@ public static <R> Observable<R> zip(Collection<Observable<?>> ws, final Object f
}

/**
* Filters an Observable by discarding any items it emits that do not meet some test.
* Combines the given observables, emitting an event containing an aggregation of the latest values of each of the source observables
* each time an event is received from one of the source observables, where the aggregation is defined by the given function.
* @param w0
* The first source observable.
* @param w1
* The second source observable.
* @param combineFunction
* The aggregation function used to combine the source observable values.
* @return A function from an observer to a subscription. This can be used to create an observable from.
*/
public static <R, T0, T1> Observable<R> combineLatest(Observable<? super T0> w0, Observable<? super T1> w1, Func2<? super T0, ? super T1, ? extends R> combineFunction) {
return create(OperationCombineLatest.combineLatest(w0, w1, combineFunction));
}

/**
* @see #combineLatest(Observable, Observable, Func2)
*/
public static <R, T0, T1, T2> Observable<R> combineLatest(Observable<? super T0> w0, Observable<? super T1> w1, Observable<? super T2> w2, Func3<? super T0, ? super T1, ? super T2, ? extends R> combineFunction) {
return create(OperationCombineLatest.combineLatest(w0, w1, w2, combineFunction));
}

/**
* @see #combineLatest(Observable, Observable, Func2)
*/
public static <R, T0, T1, T2, T3> Observable<R> combineLatest(Observable<? super T0> w0, Observable<? super T1> w1, Observable<? super T2> w2, Observable<? super T3> w3, Func4<? super T0, ? super T1, ? super T2, ? super T3, ? extends R> combineFunction) {
return create(OperationCombineLatest.combineLatest(w0, w1, w2, w3, combineFunction));
}

/**
* Filters an Observable by discarding any of its emissions that do not meet some test.
* <p>
* <img width="640" src="https://raw.github.com/wiki/Netflix/RxJava/images/rx-operators/filter.png">
*
Expand Down
185 changes: 92 additions & 93 deletions rxjava-core/src/main/java/rx/operators/OperationCombineLatest.java
Original file line number Diff line number Diff line change
Expand Up @@ -19,16 +19,16 @@
import static org.mockito.Mockito.*;

import java.util.Arrays;
import java.util.HashMap;
import java.util.HashSet;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;

import org.junit.Test;
import org.mockito.InOrder;
import org.mockito.Matchers;

import rx.Observable;
import rx.Observer;
Expand All @@ -52,16 +52,17 @@
public class OperationCombineLatest {

/**
* Combines the two given Observables, emitting an item that aggregates the latest values of
* each of the source Observables each time an item is received from either of the source
* Observables, where the aggregation is defined by the given function.
* @param w0 the first source Observable.
* @param w1 the second source Observable.
* @param combineLatestFunction the aggregation function that combines the source Observable
* items.
* @return a function from an Observer to a Subscription. This can be used to create an Observable.
* Combines the two given observables, emitting an event containing an aggregation of the latest values of each of the source observables
* each time an event is received from one of the source observables, where the aggregation is defined by the given function.
* @param w0
* The first source observable.
* @param w1
* The second source observable.
* @param combineLatestFunction
* The aggregation function used to combine the source observable values.
* @return A function from an observer to a subscription. This can be used to create an observable from.
*/
public static <T0, T1, R> Func1<Observer<R>, Subscription> combineLatest(Observable<T0> w0, Observable<T1> w1, Func2<T0, T1, R> combineLatestFunction) {
public static <T0, T1, R> Func1<Observer<? super R>, Subscription> combineLatest(Observable<? super T0> w0, Observable<? super T1> w1, Func2<? super T0, ? super T1, ? extends R> combineLatestFunction) {
Aggregator<R> a = new Aggregator<R>(Functions.fromFunc(combineLatestFunction));
a.addObserver(new CombineObserver<R, T0>(a, w0));
a.addObserver(new CombineObserver<R, T1>(a, w1));
Expand All @@ -71,7 +72,7 @@ public static <T0, T1, R> Func1<Observer<R>, Subscription> combineLatest(Observa
/**
* @see #combineLatest(Observable w0, Observable w1, Func2 combineLatestFunction)
*/
public static <T0, T1, T2, R> Func1<Observer<R>, Subscription> combineLatest(Observable<T0> w0, Observable<T1> w1, Observable<T2> w2, Func3<T0, T1, T2, R> combineLatestFunction) {
public static <T0, T1, T2, R> Func1<Observer<? super R>, Subscription> combineLatest(Observable<? super T0> w0, Observable<? super T1> w1, Observable<? super T2> w2, Func3<? super T0, ? super T1, ? super T2, ? extends R> combineLatestFunction) {
Aggregator<R> a = new Aggregator<R>(Functions.fromFunc(combineLatestFunction));
a.addObserver(new CombineObserver<R, T0>(a, w0));
a.addObserver(new CombineObserver<R, T1>(a, w1));
Expand All @@ -82,7 +83,7 @@ public static <T0, T1, T2, R> Func1<Observer<R>, Subscription> combineLatest(Obs
/**
* @see #combineLatest(Observable w0, Observable w1, Func2 combineLatestFunction)
*/
public static <T0, T1, T2, T3, R> Func1<Observer<R>, Subscription> combineLatest(Observable<T0> w0, Observable<T1> w1, Observable<T2> w2, Observable<T3> w3, Func4<T0, T1, T2, T3, R> combineLatestFunction) {
public static <T0, T1, T2, T3, R> Func1<Observer<? super R>, Subscription> combineLatest(Observable<? super T0> w0, Observable<? super T1> w1, Observable<? super T2> w2, Observable<? super T3> w3, Func4<? super T0, ? super T1, ? super T2, ? super T3, ? extends R> combineLatestFunction) {
Aggregator<R> a = new Aggregator<R>(Functions.fromFunc(combineLatestFunction));
a.addObserver(new CombineObserver<R, T0>(a, w0));
a.addObserver(new CombineObserver<R, T1>(a, w1));
Expand All @@ -92,16 +93,16 @@ public static <T0, T1, T2, T3, R> Func1<Observer<R>, Subscription> combineLatest
}

private static class CombineObserver<R, T> implements Observer<T> {
final Observable<T> w;
final Aggregator<R> a;
final Observable<? super T> w;
final Aggregator<? super R> a;
private Subscription subscription;

public CombineObserver(Aggregator<R> a, Observable<T> w) {
public CombineObserver(Aggregator<? super R> a, Observable<? super T> w) {
this.a = a;
this.w = w;
}

public synchronized void startWatching() {
private void startWatching() {
if (subscription != null) {
throw new RuntimeException("This should only be called once.");
}
Expand Down Expand Up @@ -129,44 +130,28 @@ public void onNext(T args) {
* whenever we have received an event from one of the observables, as soon as each Observable has received
* at least one event.
*/
private static class Aggregator<R> implements Func1<Observer<R>, Subscription> {
private static class Aggregator<R> implements Func1<Observer<? super R>, Subscription> {

private Observer<R> observer;
private volatile Observer<? super R> observer;

private final FuncN<R> combineLatestFunction;
private final FuncN<? extends R> combineLatestFunction;
private final AtomicBoolean running = new AtomicBoolean(true);

// used as an internal lock for handling the latest values and the completed state of each observer
private final Object lockObject = new Object();

/**
* Store when an observer completes.
* <p>
* Note that access to this set MUST BE SYNCHRONIZED via 'lockObject' above.
* */
private final Set<CombineObserver<R, ?>> completed = new HashSet<CombineObserver<R, ?>>();

/**
* The latest value from each observer
* <p>
* Note that access to this set MUST BE SYNCHRONIZED via 'lockObject' above.
* */
private final Map<CombineObserver<R, ?>, Object> latestValue = new HashMap<CombineObserver<R, ?>, Object>();
// Stores how many observers have already completed
private final AtomicInteger numCompleted = new AtomicInteger(0);

/**
* Whether each observer has a latest value at all.
* <p>
* Note that access to this set MUST BE SYNCHRONIZED via 'lockObject' above.
* */
private final Set<CombineObserver<R, ?>> hasLatestValue = new HashSet<CombineObserver<R, ?>>();

* The latest value from each observer.
*/
private final Map<CombineObserver<? extends R, ?>, Object> latestValue = new ConcurrentHashMap<CombineObserver<? extends R, ?>, Object>();

/**
* Ordered list of observers to combine.
* No synchronization is necessary as these can not be added or changed asynchronously.
*/
private final List<CombineObserver<R, ?>> observers = new LinkedList<CombineObserver<R, ?>>();

public Aggregator(FuncN<R> combineLatestFunction) {
public Aggregator(FuncN<? extends R> combineLatestFunction) {
this.combineLatestFunction = combineLatestFunction;
}

Expand All @@ -184,18 +169,15 @@ <T> void addObserver(CombineObserver<R, T> w) {
*
* @param w The observer that has completed.
*/
<T> void complete(CombineObserver<R, T> w) {
synchronized(lockObject) {
// store that this CombineLatestObserver is completed
completed.add(w);
// if all CombineObservers are completed, we mark the whole thing as completed
if (completed.size() == observers.size()) {
if (running.get()) {
// mark ourselves as done
observer.onCompleted();
// just to ensure we stop processing in case we receive more onNext/complete/error calls after this
running.set(false);
}
<T> void complete(CombineObserver<? extends R, ? super T> w) {
int completed = numCompleted.incrementAndGet();
// if all CombineObservers are completed, we mark the whole thing as completed
if (completed == observers.size()) {
if (running.get()) {
// mark ourselves as done
observer.onCompleted();
// just to ensure we stop processing in case we receive more onNext/complete/error calls after this
running.set(false);
}
}
}
Expand All @@ -217,7 +199,7 @@ void error(Exception e) {
* @param w
* @param arg
*/
<T> void next(CombineObserver<R, T> w, T arg) {
<T> void next(CombineObserver<? extends R, ? super T> w, T arg) {
if (observer == null) {
throw new RuntimeException("This shouldn't be running if an Observer isn't registered");
}
Expand All @@ -227,54 +209,48 @@ <T> void next(CombineObserver<R, T> w, T arg) {
return;
}

// define here so the variable is out of the synchronized scope
// remember this as the latest value for this observer
latestValue.put(w, arg);

if (latestValue.size() < observers.size()) {
// we don't have a value yet for each observer to combine, so we don't have a combined value yet either
return;
}

Object[] argsToCombineLatest = new Object[observers.size()];

// we synchronize everything that touches latest values
synchronized (lockObject) {
// remember this as the latest value for this observer
latestValue.put(w, arg);

// remember that this observer now has a latest value set
hasLatestValue.add(w);

// if all observers in the 'observers' list have a value, invoke the combineLatestFunction
for (CombineObserver<R, ?> rw : observers) {
if (!hasLatestValue.contains(rw)) {
// we don't have a value yet for each observer to combine, so we don't have a combined value yet either
return;
}
}
// if we get to here this means all the queues have data
int i = 0;
for (CombineObserver<R, ?> _w : observers) {
argsToCombineLatest[i++] = latestValue.get(_w);
}
int i = 0;
for (CombineObserver<R, ?> _w : observers) {
argsToCombineLatest[i++] = latestValue.get(_w);
}

try {
R combinedValue = combineLatestFunction.call(argsToCombineLatest);
observer.onNext(combinedValue);
} catch(Exception ex) {
observer.onError(ex);
}
// if we did not return above from the synchronized block we can now invoke the combineLatestFunction with all of the args
// we do this outside the synchronized block as it is now safe to call this concurrently and don't need to block other threads from calling
// this 'next' method while another thread finishes calling this combineLatestFunction
observer.onNext(combineLatestFunction.call(argsToCombineLatest));
}

@Override
public Subscription call(Observer<R> observer) {
public Subscription call(Observer<? super R> observer) {
if (this.observer != null) {
throw new IllegalStateException("Only one Observer can subscribe to this Observable.");
}
this.observer = observer;

AtomicObservableSubscription subscription = new AtomicObservableSubscription(new Subscription() {
@Override
public void unsubscribe() {
stop();
}
});
this.observer = new SynchronizedObserver<R>(observer, subscription);

/* start the observers */
for (CombineObserver<R, ?> rw : observers) {
rw.startWatching();
}

return new Subscription() {
@Override
public void unsubscribe() {
stop();
}
};
return subscription;
}

private void stop() {
Expand All @@ -291,10 +267,33 @@ private void stop() {

public static class UnitTest {

@SuppressWarnings("unchecked")
/* mock calls don't do generics */
@Test
public void testCombineLatestWithFunctionThatThrowsAnException() {
@SuppressWarnings("unchecked") // mock calls don't do generics
Observer<String> w = mock(Observer.class);

TestObservable w1 = new TestObservable();
TestObservable w2 = new TestObservable();

Observable<String> combined = Observable.create(combineLatest(w1, w2, new Func2<String, String, String>() {
@Override
public String call(String v1, String v2) {
throw new RuntimeException("I don't work.");
}
}));
combined.subscribe(w);

w1.Observer.onNext("first value of w1");
w2.Observer.onNext("first value of w2");

verify(w, never()).onNext(anyString());
verify(w, never()).onCompleted();
verify(w, times(1)).onError(Matchers.<RuntimeException>any());
}

@Test
public void testCombineLatestDifferentLengthObservableSequences1() {
@SuppressWarnings("unchecked") // mock calls don't do generics
Observer<String> w = mock(Observer.class);

TestObservable w1 = new TestObservable();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -64,12 +64,12 @@ public final class SynchronizedObserver<T> implements Observer<T> {
* // TODO composing of this class should rarely happen now with updated design so this decision should be revisited
*/

private final Observer<T> observer;
private final Observer<? super T> observer;
private final AtomicObservableSubscription subscription;
private volatile boolean finishRequested = false;
private volatile boolean finished = false;

public SynchronizedObserver(Observer<T> Observer, AtomicObservableSubscription subscription) {
public SynchronizedObserver(Observer<? super T> Observer, AtomicObservableSubscription subscription) {
this.observer = Observer;
this.subscription = subscription;
}
Expand Down