From eb5df87c7ec4d7f62873dcf29108ddc2abcd13ca Mon Sep 17 00:00:00 2001 From: jmhofer Date: Sat, 6 Apr 2013 13:29:22 +0200 Subject: [PATCH 1/7] avoiding some synchronization on combineLatest --- .../rx/operators/OperationCombineLatest.java | 46 ++++++++----------- 1 file changed, 19 insertions(+), 27 deletions(-) diff --git a/rxjava-core/src/main/java/rx/operators/OperationCombineLatest.java b/rxjava-core/src/main/java/rx/operators/OperationCombineLatest.java index 382f8ba8aa..2d77c3d3ec 100644 --- a/rxjava-core/src/main/java/rx/operators/OperationCombineLatest.java +++ b/rxjava-core/src/main/java/rx/operators/OperationCombineLatest.java @@ -26,6 +26,7 @@ import java.util.Map; import java.util.Set; import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicInteger; import org.junit.Test; import org.mockito.InOrder; @@ -125,17 +126,13 @@ private static class Aggregator implements Func1, Subscription> { private final FuncN combineLatestFunction; private final AtomicBoolean running = new AtomicBoolean(true); - - // used as an internal lock for handling the latest values and the completed state of each observer + + // Stores how many observers have already completed + private final AtomicInteger numCompleted = new AtomicInteger(0); + + // Used as an internal lock for handling the latest values of each observer private final Object lockObject = new Object(); - /** - * Store when an observer completes. - *

- * Note that access to this set MUST BE SYNCHRONIZED via 'lockObject' above. - * */ - private final Set> completed = new HashSet>(); - /** * The latest value from each observer *

@@ -175,17 +172,14 @@ void addObserver(CombineObserver w) { * @param w The observer that has completed. */ void complete(CombineObserver w) { - synchronized(lockObject) { - // store that this CombineLatestObserver is completed - completed.add(w); - // if all CombineObservers are completed, we mark the whole thing as completed - if (completed.size() == observers.size()) { - if (running.get()) { - // mark ourselves as done - observer.onCompleted(); - // just to ensure we stop processing in case we receive more onNext/complete/error calls after this - running.set(false); - } + int completed = numCompleted.incrementAndGet(); + // if all CombineObservers are completed, we mark the whole thing as completed + if (completed == observers.size()) { + if (running.get()) { + // mark ourselves as done + observer.onCompleted(); + // just to ensure we stop processing in case we receive more onNext/complete/error calls after this + running.set(false); } } } @@ -228,14 +222,12 @@ void next(CombineObserver w, T arg) { // remember that this observer now has a latest value set hasLatestValue.add(w); - // if all observers in the 'observers' list have a value, invoke the combineLatestFunction - for (CombineObserver rw : observers) { - if (!hasLatestValue.contains(rw)) { - // we don't have a value yet for each observer to combine, so we don't have a combined value yet either - return; - } + if (hasLatestValue.size() < observers.size()) { + // we don't have a value yet for each observer to combine, so we don't have a combined value yet either + return; } - // if we get to here this means all the queues have data + + // if we get to here this means all the observers have a latest value int i = 0; for (CombineObserver _w : observers) { argsToCombineLatest[i++] = latestValue.get(_w); From 4250d279e381cc5aebe9bc9d421ae1b5e926494c Mon Sep 17 00:00:00 2001 From: jmhofer Date: Wed, 8 May 2013 11:11:58 +0200 Subject: [PATCH 2/7] handled combine functions that throw exceptions and added a test against this case --- .../rx/operators/OperationCombineLatest.java | 44 ++++++++++++++++--- 1 file changed, 38 insertions(+), 6 deletions(-) diff --git a/rxjava-core/src/main/java/rx/operators/OperationCombineLatest.java b/rxjava-core/src/main/java/rx/operators/OperationCombineLatest.java index 2d77c3d3ec..82d53f7caa 100644 --- a/rxjava-core/src/main/java/rx/operators/OperationCombineLatest.java +++ b/rxjava-core/src/main/java/rx/operators/OperationCombineLatest.java @@ -30,6 +30,7 @@ import org.junit.Test; import org.mockito.InOrder; +import org.mockito.Matchers; import rx.Observable; import rx.Observer; @@ -47,9 +48,12 @@ public class OperationCombineLatest { /** * Combines the two given observables, emitting an event containing an aggregation of the latest values of each of the source observables * each time an event is received from one of the source observables, where the aggregation is defined by the given function. - * @param w0 The first source observable. - * @param w1 The second source observable. - * @param combineLatestFunction The aggregation function used to combine the source observable values. + * @param w0 + * The first source observable. + * @param w1 + * The second source observable. + * @param combineLatestFunction + * The aggregation function used to combine the source observable values. * @return A function from an observer to a subscription. This can be used to create an observable from. */ public static Func1, Subscription> combineLatest(Observable w0, Observable w1, Func2 combineLatestFunction) { @@ -236,7 +240,12 @@ void next(CombineObserver w, T arg) { // if we did not return above from the synchronized block we can now invoke the combineLatestFunction with all of the args // we do this outside the synchronized block as it is now safe to call this concurrently and don't need to block other threads from calling // this 'next' method while another thread finishes calling this combineLatestFunction - observer.onNext(combineLatestFunction.call(argsToCombineLatest)); + try { + R combinedValue = combineLatestFunction.call(argsToCombineLatest); + observer.onNext(combinedValue); + } catch(Exception ex) { + observer.onError(ex); + } } @Override @@ -273,10 +282,33 @@ private void stop() { public static class UnitTest { - @SuppressWarnings("unchecked") - /* mock calls don't do generics */ + @Test + public void testCombineLatestWithFunctionThatThrowsAnException() { + @SuppressWarnings("unchecked") // mock calls don't do generics + Observer w = mock(Observer.class); + + TestObservable w1 = new TestObservable(); + TestObservable w2 = new TestObservable(); + + Observable combined = Observable.create(combineLatest(w1, w2, new Func2() { + @Override + public String call(String v1, String v2) { + throw new RuntimeException("I don't work."); + } + })); + combined.subscribe(w); + + w1.Observer.onNext("first value of w1"); + w2.Observer.onNext("first value of w2"); + + verify(w, never()).onNext(anyString()); + verify(w, never()).onCompleted(); + verify(w, times(1)).onError(Matchers.any()); + } + @Test public void testCombineLatestDifferentLengthObservableSequences1() { + @SuppressWarnings("unchecked") // mock calls don't do generics Observer w = mock(Observer.class); TestObservable w1 = new TestObservable(); From bb39cac1b3c24a508b0cd80abdd27e67a8bf4355 Mon Sep 17 00:00:00 2001 From: jmhofer Date: Wed, 8 May 2013 11:24:27 +0200 Subject: [PATCH 3/7] added combineLatest static methods to Observable --- rxjava-core/src/main/java/rx/Observable.java | 30 ++++++++++++++++++++ 1 file changed, 30 insertions(+) diff --git a/rxjava-core/src/main/java/rx/Observable.java b/rxjava-core/src/main/java/rx/Observable.java index 208e4c54d7..d96a62aa96 100644 --- a/rxjava-core/src/main/java/rx/Observable.java +++ b/rxjava-core/src/main/java/rx/Observable.java @@ -40,6 +40,7 @@ import rx.observables.GroupedObservable; import rx.operators.OperationAll; import rx.operators.OperationCache; +import rx.operators.OperationCombineLatest; import rx.operators.OperationConcat; import rx.operators.OperationDefer; import rx.operators.OperationDematerialize; @@ -2737,6 +2738,35 @@ public R call(T0 t0, T1 t1, T2 t2, T3 t3) { }); } + /** + * Combines the given observables, emitting an event containing an aggregation of the latest values of each of the source observables + * each time an event is received from one of the source observables, where the aggregation is defined by the given function. + * @param w0 + * The first source observable. + * @param w1 + * The second source observable. + * @param combineLatestFunction + * The aggregation function used to combine the source observable values. + * @return A function from an observer to a subscription. This can be used to create an observable from. + */ + public static Observable combineLatest(Observable w0, Observable w1, Func2 combineFunction) { + return create(OperationCombineLatest.combineLatest(w0, w1, combineFunction)); + } + + /** + * @see #combineLatest(Observable, Observable, Func2) + */ + public static Observable combineLatest(Observable w0, Observable w1, Observable w2, Func3 combineFunction) { + return create(OperationCombineLatest.combineLatest(w0, w1, w2, combineFunction)); + } + + /** + * @see #combineLatest(Observable, Observable, Func2) + */ + public static Observable combineLatest(Observable w0, Observable w1, Observable w2, Observable w3, Func4 combineFunction) { + return create(OperationCombineLatest.combineLatest(w0, w1, w2, w3, combineFunction)); + } + /** * Filters an Observable by discarding any of its emissions that do not meet some test. *

From ba41db1a689f391420428a4e2f5115616237bf6d Mon Sep 17 00:00:00 2001 From: jmhofer Date: Thu, 9 May 2013 16:54:23 +0200 Subject: [PATCH 4/7] Got rid of the last remnants of synchronization in favor of a concurrent map. --- .../rx/operators/OperationCombineLatest.java | 59 ++++++------------- 1 file changed, 17 insertions(+), 42 deletions(-) diff --git a/rxjava-core/src/main/java/rx/operators/OperationCombineLatest.java b/rxjava-core/src/main/java/rx/operators/OperationCombineLatest.java index 82d53f7caa..edd1e52ed7 100644 --- a/rxjava-core/src/main/java/rx/operators/OperationCombineLatest.java +++ b/rxjava-core/src/main/java/rx/operators/OperationCombineLatest.java @@ -19,12 +19,10 @@ import static org.mockito.Mockito.*; import java.util.Arrays; -import java.util.HashMap; -import java.util.HashSet; import java.util.LinkedList; import java.util.List; import java.util.Map; -import java.util.Set; +import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; @@ -96,7 +94,7 @@ public CombineObserver(Aggregator a, Observable w) { this.w = w; } - public synchronized void startWatching() { + private void startWatching() { if (subscription != null) { throw new RuntimeException("This should only be called once."); } @@ -134,23 +132,11 @@ private static class Aggregator implements Func1, Subscription> { // Stores how many observers have already completed private final AtomicInteger numCompleted = new AtomicInteger(0); - // Used as an internal lock for handling the latest values of each observer - private final Object lockObject = new Object(); - /** - * The latest value from each observer - *

- * Note that access to this set MUST BE SYNCHRONIZED via 'lockObject' above. - * */ - private final Map, Object> latestValue = new HashMap, Object>(); + * The latest value from each observer. + */ + private final Map, Object> latestValue = new ConcurrentHashMap, Object>(); - /** - * Whether each observer has a latest value at all. - *

- * Note that access to this set MUST BE SYNCHRONIZED via 'lockObject' above. - * */ - private final Set> hasLatestValue = new HashSet>(); - /** * Ordered list of observers to combine. * No synchronization is necessary as these can not be added or changed asynchronously. @@ -215,31 +201,20 @@ void next(CombineObserver w, T arg) { return; } - // define here so the variable is out of the synchronized scope + // remember this as the latest value for this observer + latestValue.put(w, arg); + + if (latestValue.size() < observers.size()) { + // we don't have a value yet for each observer to combine, so we don't have a combined value yet either + return; + } + Object[] argsToCombineLatest = new Object[observers.size()]; - - // we synchronize everything that touches latest values - synchronized (lockObject) { - // remember this as the latest value for this observer - latestValue.put(w, arg); - - // remember that this observer now has a latest value set - hasLatestValue.add(w); - - if (hasLatestValue.size() < observers.size()) { - // we don't have a value yet for each observer to combine, so we don't have a combined value yet either - return; - } - - // if we get to here this means all the observers have a latest value - int i = 0; - for (CombineObserver _w : observers) { - argsToCombineLatest[i++] = latestValue.get(_w); - } + int i = 0; + for (CombineObserver _w : observers) { + argsToCombineLatest[i++] = latestValue.get(_w); } - // if we did not return above from the synchronized block we can now invoke the combineLatestFunction with all of the args - // we do this outside the synchronized block as it is now safe to call this concurrently and don't need to block other threads from calling - // this 'next' method while another thread finishes calling this combineLatestFunction + try { R combinedValue = combineLatestFunction.call(argsToCombineLatest); observer.onNext(combinedValue); From d5259ca7682e7e968447c4867d16c05f3114272e Mon Sep 17 00:00:00 2001 From: jmhofer Date: Thu, 9 May 2013 17:02:03 +0200 Subject: [PATCH 5/7] Fixed a javadoc parameter. --- rxjava-core/src/main/java/rx/Observable.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/rxjava-core/src/main/java/rx/Observable.java b/rxjava-core/src/main/java/rx/Observable.java index d96a62aa96..e9d5bcaced 100644 --- a/rxjava-core/src/main/java/rx/Observable.java +++ b/rxjava-core/src/main/java/rx/Observable.java @@ -2745,7 +2745,7 @@ public R call(T0 t0, T1 t1, T2 t2, T3 t3) { * The first source observable. * @param w1 * The second source observable. - * @param combineLatestFunction + * @param combineFunction * The aggregation function used to combine the source observable values. * @return A function from an observer to a subscription. This can be used to create an observable from. */ From 150085992ca0ed93116ae342bf51ffec56a3f10d Mon Sep 17 00:00:00 2001 From: jmhofer Date: Thu, 9 May 2013 23:06:35 +0200 Subject: [PATCH 6/7] improved generics for combineLatest (PECS principle) --- rxjava-core/src/main/java/rx/Observable.java | 6 ++-- .../rx/operators/OperationCombineLatest.java | 28 +++++++++---------- 2 files changed, 17 insertions(+), 17 deletions(-) diff --git a/rxjava-core/src/main/java/rx/Observable.java b/rxjava-core/src/main/java/rx/Observable.java index e9d5bcaced..bf01201219 100644 --- a/rxjava-core/src/main/java/rx/Observable.java +++ b/rxjava-core/src/main/java/rx/Observable.java @@ -2749,21 +2749,21 @@ public R call(T0 t0, T1 t1, T2 t2, T3 t3) { * The aggregation function used to combine the source observable values. * @return A function from an observer to a subscription. This can be used to create an observable from. */ - public static Observable combineLatest(Observable w0, Observable w1, Func2 combineFunction) { + public static Observable combineLatest(Observable w0, Observable w1, Func2 combineFunction) { return create(OperationCombineLatest.combineLatest(w0, w1, combineFunction)); } /** * @see #combineLatest(Observable, Observable, Func2) */ - public static Observable combineLatest(Observable w0, Observable w1, Observable w2, Func3 combineFunction) { + public static Observable combineLatest(Observable w0, Observable w1, Observable w2, Func3 combineFunction) { return create(OperationCombineLatest.combineLatest(w0, w1, w2, combineFunction)); } /** * @see #combineLatest(Observable, Observable, Func2) */ - public static Observable combineLatest(Observable w0, Observable w1, Observable w2, Observable w3, Func4 combineFunction) { + public static Observable combineLatest(Observable w0, Observable w1, Observable w2, Observable w3, Func4 combineFunction) { return create(OperationCombineLatest.combineLatest(w0, w1, w2, w3, combineFunction)); } diff --git a/rxjava-core/src/main/java/rx/operators/OperationCombineLatest.java b/rxjava-core/src/main/java/rx/operators/OperationCombineLatest.java index edd1e52ed7..eb5ffc2b51 100644 --- a/rxjava-core/src/main/java/rx/operators/OperationCombineLatest.java +++ b/rxjava-core/src/main/java/rx/operators/OperationCombineLatest.java @@ -54,7 +54,7 @@ public class OperationCombineLatest { * The aggregation function used to combine the source observable values. * @return A function from an observer to a subscription. This can be used to create an observable from. */ - public static Func1, Subscription> combineLatest(Observable w0, Observable w1, Func2 combineLatestFunction) { + public static Func1, Subscription> combineLatest(Observable w0, Observable w1, Func2 combineLatestFunction) { Aggregator a = new Aggregator(Functions.fromFunc(combineLatestFunction)); a.addObserver(new CombineObserver(a, w0)); a.addObserver(new CombineObserver(a, w1)); @@ -64,7 +64,7 @@ public static Func1, Subscription> combineLatest(Observa /** * @see #combineLatest(Observable w0, Observable w1, Func2 combineLatestFunction) */ - public static Func1, Subscription> combineLatest(Observable w0, Observable w1, Observable w2, Func3 combineLatestFunction) { + public static Func1, Subscription> combineLatest(Observable w0, Observable w1, Observable w2, Func3 combineLatestFunction) { Aggregator a = new Aggregator(Functions.fromFunc(combineLatestFunction)); a.addObserver(new CombineObserver(a, w0)); a.addObserver(new CombineObserver(a, w1)); @@ -75,7 +75,7 @@ public static Func1, Subscription> combineLatest(Obs /** * @see #combineLatest(Observable w0, Observable w1, Func2 combineLatestFunction) */ - public static Func1, Subscription> combineLatest(Observable w0, Observable w1, Observable w2, Observable w3, Func4 combineLatestFunction) { + public static Func1, Subscription> combineLatest(Observable w0, Observable w1, Observable w2, Observable w3, Func4 combineLatestFunction) { Aggregator a = new Aggregator(Functions.fromFunc(combineLatestFunction)); a.addObserver(new CombineObserver(a, w0)); a.addObserver(new CombineObserver(a, w1)); @@ -85,11 +85,11 @@ public static Func1, Subscription> combineLatest } private static class CombineObserver implements Observer { - final Observable w; - final Aggregator a; + final Observable w; + final Aggregator a; private Subscription subscription; - public CombineObserver(Aggregator a, Observable w) { + public CombineObserver(Aggregator a, Observable w) { this.a = a; this.w = w; } @@ -122,11 +122,11 @@ public void onNext(T args) { * whenever we have received an event from one of the observables, as soon as each Observable has received * at least one event. */ - private static class Aggregator implements Func1, Subscription> { + private static class Aggregator implements Func1, Subscription> { - private Observer observer; + private Observer observer; - private final FuncN combineLatestFunction; + private final FuncN combineLatestFunction; private final AtomicBoolean running = new AtomicBoolean(true); // Stores how many observers have already completed @@ -135,7 +135,7 @@ private static class Aggregator implements Func1, Subscription> { /** * The latest value from each observer. */ - private final Map, Object> latestValue = new ConcurrentHashMap, Object>(); + private final Map, Object> latestValue = new ConcurrentHashMap, Object>(); /** * Ordered list of observers to combine. @@ -143,7 +143,7 @@ private static class Aggregator implements Func1, Subscription> { */ private final List> observers = new LinkedList>(); - public Aggregator(FuncN combineLatestFunction) { + public Aggregator(FuncN combineLatestFunction) { this.combineLatestFunction = combineLatestFunction; } @@ -161,7 +161,7 @@ void addObserver(CombineObserver w) { * * @param w The observer that has completed. */ - void complete(CombineObserver w) { + void complete(CombineObserver w) { int completed = numCompleted.incrementAndGet(); // if all CombineObservers are completed, we mark the whole thing as completed if (completed == observers.size()) { @@ -191,7 +191,7 @@ void error(Exception e) { * @param w * @param arg */ - void next(CombineObserver w, T arg) { + void next(CombineObserver w, T arg) { if (observer == null) { throw new RuntimeException("This shouldn't be running if an Observer isn't registered"); } @@ -224,7 +224,7 @@ void next(CombineObserver w, T arg) { } @Override - public Subscription call(Observer observer) { + public Subscription call(Observer observer) { if (this.observer != null) { throw new IllegalStateException("Only one Observer can subscribe to this Observable."); } From 797bf4dd49b4e0163ba991267a72a67e95d8d869 Mon Sep 17 00:00:00 2001 From: jmhofer Date: Sat, 11 May 2013 18:27:38 +0200 Subject: [PATCH 7/7] Tried to adapt according to Ben's review comments. --- .../rx/operators/OperationCombineLatest.java | 20 +++++++++++-------- .../java/rx/util/SynchronizedObserver.java | 4 ++-- 2 files changed, 14 insertions(+), 10 deletions(-) diff --git a/rxjava-core/src/main/java/rx/operators/OperationCombineLatest.java b/rxjava-core/src/main/java/rx/operators/OperationCombineLatest.java index eb5ffc2b51..e3cc17652c 100644 --- a/rxjava-core/src/main/java/rx/operators/OperationCombineLatest.java +++ b/rxjava-core/src/main/java/rx/operators/OperationCombineLatest.java @@ -34,6 +34,8 @@ import rx.Observer; import rx.Subscription; import rx.subscriptions.Subscriptions; +import rx.util.AtomicObservableSubscription; +import rx.util.SynchronizedObserver; import rx.util.functions.Func1; import rx.util.functions.Func2; import rx.util.functions.Func3; @@ -124,7 +126,7 @@ public void onNext(T args) { */ private static class Aggregator implements Func1, Subscription> { - private Observer observer; + private volatile Observer observer; private final FuncN combineLatestFunction; private final AtomicBoolean running = new AtomicBoolean(true); @@ -228,19 +230,21 @@ public Subscription call(Observer observer) { if (this.observer != null) { throw new IllegalStateException("Only one Observer can subscribe to this Observable."); } - this.observer = observer; + + AtomicObservableSubscription subscription = new AtomicObservableSubscription(new Subscription() { + @Override + public void unsubscribe() { + stop(); + } + }); + this.observer = new SynchronizedObserver(observer, subscription); /* start the observers */ for (CombineObserver rw : observers) { rw.startWatching(); } - return new Subscription() { - @Override - public void unsubscribe() { - stop(); - } - }; + return subscription; } private void stop() { diff --git a/rxjava-core/src/main/java/rx/util/SynchronizedObserver.java b/rxjava-core/src/main/java/rx/util/SynchronizedObserver.java index 11f7138303..d13ef4f938 100644 --- a/rxjava-core/src/main/java/rx/util/SynchronizedObserver.java +++ b/rxjava-core/src/main/java/rx/util/SynchronizedObserver.java @@ -64,12 +64,12 @@ public final class SynchronizedObserver implements Observer { * // TODO composing of this class should rarely happen now with updated design so this decision should be revisited */ - private final Observer observer; + private final Observer observer; private final AtomicObservableSubscription subscription; private volatile boolean finishRequested = false; private volatile boolean finished = false; - public SynchronizedObserver(Observer Observer, AtomicObservableSubscription subscription) { + public SynchronizedObserver(Observer Observer, AtomicObservableSubscription subscription) { this.observer = Observer; this.subscription = subscription; }