Skip to content

Commit

Permalink
Merge pull request ReactiveX#303 from benjchristensen/pull-267-combin…
Browse files Browse the repository at this point in the history
…eLatest

Pull 267 - Merge combineLatest
  • Loading branch information
benjchristensen committed Jul 5, 2013
2 parents 7c0348c + 589b2c4 commit f047210
Show file tree
Hide file tree
Showing 3 changed files with 125 additions and 96 deletions.
32 changes: 31 additions & 1 deletion rxjava-core/src/main/java/rx/Observable.java
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@
import rx.operators.AtomicObserver;
import rx.operators.OperationAll;
import rx.operators.OperationCache;
import rx.operators.OperationCombineLatest;
import rx.operators.OperationConcat;
import rx.operators.OperationDefer;
import rx.operators.OperationDematerialize;
Expand Down Expand Up @@ -2803,7 +2804,36 @@ public static <R> Observable<R> zip(Collection<Observable<?>> ws, final Object f
}

/**
* Filters an Observable by discarding any items it emits that do not meet some test.
* Combines the given observables, emitting an event containing an aggregation of the latest values of each of the source observables
* each time an event is received from one of the source observables, where the aggregation is defined by the given function.
* @param w0
* The first source observable.
* @param w1
* The second source observable.
* @param combineFunction
* The aggregation function used to combine the source observable values.
* @return A function from an observer to a subscription. This can be used to create an observable from.
*/
public static <R, T0, T1> Observable<R> combineLatest(Observable<? super T0> w0, Observable<? super T1> w1, Func2<? super T0, ? super T1, ? extends R> combineFunction) {
return create(OperationCombineLatest.combineLatest(w0, w1, combineFunction));
}

/**
* @see #combineLatest(Observable, Observable, Func2)
*/
public static <R, T0, T1, T2> Observable<R> combineLatest(Observable<? super T0> w0, Observable<? super T1> w1, Observable<? super T2> w2, Func3<? super T0, ? super T1, ? super T2, ? extends R> combineFunction) {
return create(OperationCombineLatest.combineLatest(w0, w1, w2, combineFunction));
}

/**
* @see #combineLatest(Observable, Observable, Func2)
*/
public static <R, T0, T1, T2, T3> Observable<R> combineLatest(Observable<? super T0> w0, Observable<? super T1> w1, Observable<? super T2> w2, Observable<? super T3> w3, Func4<? super T0, ? super T1, ? super T2, ? super T3, ? extends R> combineFunction) {
return create(OperationCombineLatest.combineLatest(w0, w1, w2, w3, combineFunction));
}

/**
* Filters an Observable by discarding any of its emissions that do not meet some test.
* <p>
* <img width="640" src="https://raw.github.com/wiki/Netflix/RxJava/images/rx-operators/filter.png">
*
Expand Down
185 changes: 92 additions & 93 deletions rxjava-core/src/main/java/rx/operators/OperationCombineLatest.java
Original file line number Diff line number Diff line change
Expand Up @@ -19,16 +19,16 @@
import static org.mockito.Mockito.*;

import java.util.Arrays;
import java.util.HashMap;
import java.util.HashSet;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;

import org.junit.Test;
import org.mockito.InOrder;
import org.mockito.Matchers;

import rx.Observable;
import rx.Observer;
Expand All @@ -52,16 +52,17 @@
public class OperationCombineLatest {

/**
* Combines the two given Observables, emitting an item that aggregates the latest values of
* each of the source Observables each time an item is received from either of the source
* Observables, where the aggregation is defined by the given function.
* @param w0 the first source Observable.
* @param w1 the second source Observable.
* @param combineLatestFunction the aggregation function that combines the source Observable
* items.
* @return a function from an Observer to a Subscription. This can be used to create an Observable.
* Combines the two given observables, emitting an event containing an aggregation of the latest values of each of the source observables
* each time an event is received from one of the source observables, where the aggregation is defined by the given function.
* @param w0
* The first source observable.
* @param w1
* The second source observable.
* @param combineLatestFunction
* The aggregation function used to combine the source observable values.
* @return A function from an observer to a subscription. This can be used to create an observable from.
*/
public static <T0, T1, R> Func1<Observer<R>, Subscription> combineLatest(Observable<T0> w0, Observable<T1> w1, Func2<T0, T1, R> combineLatestFunction) {
public static <T0, T1, R> Func1<Observer<? super R>, Subscription> combineLatest(Observable<? super T0> w0, Observable<? super T1> w1, Func2<? super T0, ? super T1, ? extends R> combineLatestFunction) {
Aggregator<R> a = new Aggregator<R>(Functions.fromFunc(combineLatestFunction));
a.addObserver(new CombineObserver<R, T0>(a, w0));
a.addObserver(new CombineObserver<R, T1>(a, w1));
Expand All @@ -71,7 +72,7 @@ public static <T0, T1, R> Func1<Observer<R>, Subscription> combineLatest(Observa
/**
* @see #combineLatest(Observable w0, Observable w1, Func2 combineLatestFunction)
*/
public static <T0, T1, T2, R> Func1<Observer<R>, Subscription> combineLatest(Observable<T0> w0, Observable<T1> w1, Observable<T2> w2, Func3<T0, T1, T2, R> combineLatestFunction) {
public static <T0, T1, T2, R> Func1<Observer<? super R>, Subscription> combineLatest(Observable<? super T0> w0, Observable<? super T1> w1, Observable<? super T2> w2, Func3<? super T0, ? super T1, ? super T2, ? extends R> combineLatestFunction) {
Aggregator<R> a = new Aggregator<R>(Functions.fromFunc(combineLatestFunction));
a.addObserver(new CombineObserver<R, T0>(a, w0));
a.addObserver(new CombineObserver<R, T1>(a, w1));
Expand All @@ -82,7 +83,7 @@ public static <T0, T1, T2, R> Func1<Observer<R>, Subscription> combineLatest(Obs
/**
* @see #combineLatest(Observable w0, Observable w1, Func2 combineLatestFunction)
*/
public static <T0, T1, T2, T3, R> Func1<Observer<R>, Subscription> combineLatest(Observable<T0> w0, Observable<T1> w1, Observable<T2> w2, Observable<T3> w3, Func4<T0, T1, T2, T3, R> combineLatestFunction) {
public static <T0, T1, T2, T3, R> Func1<Observer<? super R>, Subscription> combineLatest(Observable<? super T0> w0, Observable<? super T1> w1, Observable<? super T2> w2, Observable<? super T3> w3, Func4<? super T0, ? super T1, ? super T2, ? super T3, ? extends R> combineLatestFunction) {
Aggregator<R> a = new Aggregator<R>(Functions.fromFunc(combineLatestFunction));
a.addObserver(new CombineObserver<R, T0>(a, w0));
a.addObserver(new CombineObserver<R, T1>(a, w1));
Expand All @@ -92,16 +93,16 @@ public static <T0, T1, T2, T3, R> Func1<Observer<R>, Subscription> combineLatest
}

private static class CombineObserver<R, T> implements Observer<T> {
final Observable<T> w;
final Aggregator<R> a;
final Observable<? super T> w;
final Aggregator<? super R> a;
private Subscription subscription;

public CombineObserver(Aggregator<R> a, Observable<T> w) {
public CombineObserver(Aggregator<? super R> a, Observable<? super T> w) {
this.a = a;
this.w = w;
}

public synchronized void startWatching() {
private void startWatching() {
if (subscription != null) {
throw new RuntimeException("This should only be called once.");
}
Expand Down Expand Up @@ -129,44 +130,28 @@ public void onNext(T args) {
* whenever we have received an event from one of the observables, as soon as each Observable has received
* at least one event.
*/
private static class Aggregator<R> implements Func1<Observer<R>, Subscription> {
private static class Aggregator<R> implements Func1<Observer<? super R>, Subscription> {

private Observer<R> observer;
private volatile Observer<? super R> observer;

private final FuncN<R> combineLatestFunction;
private final FuncN<? extends R> combineLatestFunction;
private final AtomicBoolean running = new AtomicBoolean(true);

// used as an internal lock for handling the latest values and the completed state of each observer
private final Object lockObject = new Object();

/**
* Store when an observer completes.
* <p>
* Note that access to this set MUST BE SYNCHRONIZED via 'lockObject' above.
* */
private final Set<CombineObserver<R, ?>> completed = new HashSet<CombineObserver<R, ?>>();

/**
* The latest value from each observer
* <p>
* Note that access to this set MUST BE SYNCHRONIZED via 'lockObject' above.
* */
private final Map<CombineObserver<R, ?>, Object> latestValue = new HashMap<CombineObserver<R, ?>, Object>();
// Stores how many observers have already completed
private final AtomicInteger numCompleted = new AtomicInteger(0);

/**
* Whether each observer has a latest value at all.
* <p>
* Note that access to this set MUST BE SYNCHRONIZED via 'lockObject' above.
* */
private final Set<CombineObserver<R, ?>> hasLatestValue = new HashSet<CombineObserver<R, ?>>();

* The latest value from each observer.
*/
private final Map<CombineObserver<? extends R, ?>, Object> latestValue = new ConcurrentHashMap<CombineObserver<? extends R, ?>, Object>();

/**
* Ordered list of observers to combine.
* No synchronization is necessary as these can not be added or changed asynchronously.
*/
private final List<CombineObserver<R, ?>> observers = new LinkedList<CombineObserver<R, ?>>();

public Aggregator(FuncN<R> combineLatestFunction) {
public Aggregator(FuncN<? extends R> combineLatestFunction) {
this.combineLatestFunction = combineLatestFunction;
}

Expand All @@ -184,18 +169,15 @@ <T> void addObserver(CombineObserver<R, T> w) {
*
* @param w The observer that has completed.
*/
<T> void complete(CombineObserver<R, T> w) {
synchronized(lockObject) {
// store that this CombineLatestObserver is completed
completed.add(w);
// if all CombineObservers are completed, we mark the whole thing as completed
if (completed.size() == observers.size()) {
if (running.get()) {
// mark ourselves as done
observer.onCompleted();
// just to ensure we stop processing in case we receive more onNext/complete/error calls after this
running.set(false);
}
<T> void complete(CombineObserver<? extends R, ? super T> w) {
int completed = numCompleted.incrementAndGet();
// if all CombineObservers are completed, we mark the whole thing as completed
if (completed == observers.size()) {
if (running.get()) {
// mark ourselves as done
observer.onCompleted();
// just to ensure we stop processing in case we receive more onNext/complete/error calls after this
running.set(false);
}
}
}
Expand All @@ -217,7 +199,7 @@ void error(Exception e) {
* @param w
* @param arg
*/
<T> void next(CombineObserver<R, T> w, T arg) {
<T> void next(CombineObserver<? extends R, ? super T> w, T arg) {
if (observer == null) {
throw new RuntimeException("This shouldn't be running if an Observer isn't registered");
}
Expand All @@ -227,54 +209,48 @@ <T> void next(CombineObserver<R, T> w, T arg) {
return;
}

// define here so the variable is out of the synchronized scope
// remember this as the latest value for this observer
latestValue.put(w, arg);

if (latestValue.size() < observers.size()) {
// we don't have a value yet for each observer to combine, so we don't have a combined value yet either
return;
}

Object[] argsToCombineLatest = new Object[observers.size()];

// we synchronize everything that touches latest values
synchronized (lockObject) {
// remember this as the latest value for this observer
latestValue.put(w, arg);

// remember that this observer now has a latest value set
hasLatestValue.add(w);

// if all observers in the 'observers' list have a value, invoke the combineLatestFunction
for (CombineObserver<R, ?> rw : observers) {
if (!hasLatestValue.contains(rw)) {
// we don't have a value yet for each observer to combine, so we don't have a combined value yet either
return;
}
}
// if we get to here this means all the queues have data
int i = 0;
for (CombineObserver<R, ?> _w : observers) {
argsToCombineLatest[i++] = latestValue.get(_w);
}
int i = 0;
for (CombineObserver<R, ?> _w : observers) {
argsToCombineLatest[i++] = latestValue.get(_w);
}

try {
R combinedValue = combineLatestFunction.call(argsToCombineLatest);
observer.onNext(combinedValue);
} catch(Exception ex) {
observer.onError(ex);
}
// if we did not return above from the synchronized block we can now invoke the combineLatestFunction with all of the args
// we do this outside the synchronized block as it is now safe to call this concurrently and don't need to block other threads from calling
// this 'next' method while another thread finishes calling this combineLatestFunction
observer.onNext(combineLatestFunction.call(argsToCombineLatest));
}

@Override
public Subscription call(Observer<R> observer) {
public Subscription call(Observer<? super R> observer) {
if (this.observer != null) {
throw new IllegalStateException("Only one Observer can subscribe to this Observable.");
}
this.observer = observer;

AtomicObservableSubscription subscription = new AtomicObservableSubscription(new Subscription() {
@Override
public void unsubscribe() {
stop();
}
});
this.observer = new SynchronizedObserver<R>(observer, subscription);

/* start the observers */
for (CombineObserver<R, ?> rw : observers) {
rw.startWatching();
}

return new Subscription() {
@Override
public void unsubscribe() {
stop();
}
};
return subscription;
}

private void stop() {
Expand All @@ -291,10 +267,33 @@ private void stop() {

public static class UnitTest {

@SuppressWarnings("unchecked")
/* mock calls don't do generics */
@Test
public void testCombineLatestWithFunctionThatThrowsAnException() {
@SuppressWarnings("unchecked") // mock calls don't do generics
Observer<String> w = mock(Observer.class);

TestObservable w1 = new TestObservable();
TestObservable w2 = new TestObservable();

Observable<String> combined = Observable.create(combineLatest(w1, w2, new Func2<String, String, String>() {
@Override
public String call(String v1, String v2) {
throw new RuntimeException("I don't work.");
}
}));
combined.subscribe(w);

w1.Observer.onNext("first value of w1");
w2.Observer.onNext("first value of w2");

verify(w, never()).onNext(anyString());
verify(w, never()).onCompleted();
verify(w, times(1)).onError(Matchers.<RuntimeException>any());
}

@Test
public void testCombineLatestDifferentLengthObservableSequences1() {
@SuppressWarnings("unchecked") // mock calls don't do generics
Observer<String> w = mock(Observer.class);

TestObservable w1 = new TestObservable();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -64,12 +64,12 @@ public final class SynchronizedObserver<T> implements Observer<T> {
* // TODO composing of this class should rarely happen now with updated design so this decision should be revisited
*/

private final Observer<T> observer;
private final Observer<? super T> observer;
private final AtomicObservableSubscription subscription;
private volatile boolean finishRequested = false;
private volatile boolean finished = false;

public SynchronizedObserver(Observer<T> Observer, AtomicObservableSubscription subscription) {
public SynchronizedObserver(Observer<? super T> Observer, AtomicObservableSubscription subscription) {
this.observer = Observer;
this.subscription = subscription;
}
Expand Down

0 comments on commit f047210

Please sign in to comment.