From f1aaab4faff2e37e989c51322a5f6fb16c4e4997 Mon Sep 17 00:00:00 2001 From: Ben Christensen Date: Tue, 7 May 2013 09:16:06 -0700 Subject: [PATCH] Removing SynchronizedObserver usage from Subject implementations. - We don't need to add synchronization as the subjects can trust their source Observables to comply with the Rx contract. - This optimization follows Rx Design Guidelines 6.8. Avoid serializing operators This was discussed at https://github.com/Netflix/RxJava/pull/256 --- .../src/main/java/rx/subjects/AsyncSubject.java | 10 +++------- .../src/main/java/rx/subjects/BehaviorSubject.java | 6 ++---- .../src/main/java/rx/subjects/PublishSubject.java | 3 +-- 3 files changed, 6 insertions(+), 13 deletions(-) diff --git a/rxjava-core/src/main/java/rx/subjects/AsyncSubject.java b/rxjava-core/src/main/java/rx/subjects/AsyncSubject.java index f496b76e4b6..5bc15623b64 100644 --- a/rxjava-core/src/main/java/rx/subjects/AsyncSubject.java +++ b/rxjava-core/src/main/java/rx/subjects/AsyncSubject.java @@ -15,11 +15,8 @@ */ package rx.subjects; -import static org.mockito.Matchers.any; -import static org.mockito.Mockito.mock; -import static org.mockito.Mockito.times; -import static org.mockito.Mockito.verify; -import static org.mockito.Matchers.anyString; +import static org.mockito.Matchers.*; +import static org.mockito.Mockito.*; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.atomic.AtomicReference; @@ -30,7 +27,6 @@ import rx.Observer; import rx.Subscription; import rx.util.AtomicObservableSubscription; -import rx.util.SynchronizedObserver; import rx.util.functions.Action1; import rx.util.functions.Func0; import rx.util.functions.Func1; @@ -80,7 +76,7 @@ public void unsubscribe() { }); // on subscribe add it to the map of outbound observers to notify - observers.put(subscription, new SynchronizedObserver(observer, subscription)); + observers.put(subscription, observer); return subscription; } }; diff --git a/rxjava-core/src/main/java/rx/subjects/BehaviorSubject.java b/rxjava-core/src/main/java/rx/subjects/BehaviorSubject.java index eb7c6554148..5f3b16dc908 100644 --- a/rxjava-core/src/main/java/rx/subjects/BehaviorSubject.java +++ b/rxjava-core/src/main/java/rx/subjects/BehaviorSubject.java @@ -27,7 +27,6 @@ import rx.Observer; import rx.Subscription; import rx.util.AtomicObservableSubscription; -import rx.util.SynchronizedObserver; import rx.util.functions.Action1; import rx.util.functions.Func0; import rx.util.functions.Func1; @@ -86,11 +85,10 @@ public void unsubscribe() { } }); - SynchronizedObserver synchronizedObserver = new SynchronizedObserver(observer, subscription); - synchronizedObserver.onNext(currentValue.get()); + observer.onNext(currentValue.get()); // on subscribe add it to the map of outbound observers to notify - observers.put(subscription, synchronizedObserver); + observers.put(subscription, observer); return subscription; } }; diff --git a/rxjava-core/src/main/java/rx/subjects/PublishSubject.java b/rxjava-core/src/main/java/rx/subjects/PublishSubject.java index 092af4f607f..030bde520c0 100644 --- a/rxjava-core/src/main/java/rx/subjects/PublishSubject.java +++ b/rxjava-core/src/main/java/rx/subjects/PublishSubject.java @@ -34,7 +34,6 @@ import rx.Observer; import rx.Subscription; import rx.util.AtomicObservableSubscription; -import rx.util.SynchronizedObserver; import rx.util.functions.Action1; import rx.util.functions.Func0; import rx.util.functions.Func1; @@ -78,7 +77,7 @@ public void unsubscribe() { }); // on subscribe add it to the map of outbound observers to notify - observers.put(subscription, new SynchronizedObserver(observer, subscription)); + observers.put(subscription, observer); return subscription; } };