Skip to content

Commit

Permalink
Removing SynchronizedObserver usage from Subject implementations.
Browse files Browse the repository at this point in the history
- We don't need to add synchronization as the subjects can trust their source Observables to comply with the Rx contract.
- This optimization follows Rx Design Guidelines 6.8. Avoid serializing operators

This was discussed at ReactiveX#256
  • Loading branch information
benjchristensen committed May 7, 2013
1 parent 7d413f5 commit f1aaab4
Show file tree
Hide file tree
Showing 3 changed files with 6 additions and 13 deletions.
10 changes: 3 additions & 7 deletions rxjava-core/src/main/java/rx/subjects/AsyncSubject.java
Original file line number Diff line number Diff line change
Expand Up @@ -15,11 +15,8 @@
*/
package rx.subjects;

import static org.mockito.Matchers.any;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
import static org.mockito.Matchers.anyString;
import static org.mockito.Matchers.*;
import static org.mockito.Mockito.*;

import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicReference;
Expand All @@ -30,7 +27,6 @@
import rx.Observer;
import rx.Subscription;
import rx.util.AtomicObservableSubscription;
import rx.util.SynchronizedObserver;
import rx.util.functions.Action1;
import rx.util.functions.Func0;
import rx.util.functions.Func1;
Expand Down Expand Up @@ -80,7 +76,7 @@ public void unsubscribe() {
});

// on subscribe add it to the map of outbound observers to notify
observers.put(subscription, new SynchronizedObserver<T>(observer, subscription));
observers.put(subscription, observer);
return subscription;
}
};
Expand Down
6 changes: 2 additions & 4 deletions rxjava-core/src/main/java/rx/subjects/BehaviorSubject.java
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,6 @@
import rx.Observer;
import rx.Subscription;
import rx.util.AtomicObservableSubscription;
import rx.util.SynchronizedObserver;
import rx.util.functions.Action1;
import rx.util.functions.Func0;
import rx.util.functions.Func1;
Expand Down Expand Up @@ -86,11 +85,10 @@ public void unsubscribe() {
}
});

SynchronizedObserver<T> synchronizedObserver = new SynchronizedObserver<T>(observer, subscription);
synchronizedObserver.onNext(currentValue.get());
observer.onNext(currentValue.get());

// on subscribe add it to the map of outbound observers to notify
observers.put(subscription, synchronizedObserver);
observers.put(subscription, observer);
return subscription;
}
};
Expand Down
3 changes: 1 addition & 2 deletions rxjava-core/src/main/java/rx/subjects/PublishSubject.java
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,6 @@
import rx.Observer;
import rx.Subscription;
import rx.util.AtomicObservableSubscription;
import rx.util.SynchronizedObserver;
import rx.util.functions.Action1;
import rx.util.functions.Func0;
import rx.util.functions.Func1;
Expand Down Expand Up @@ -78,7 +77,7 @@ public void unsubscribe() {
});

// on subscribe add it to the map of outbound observers to notify
observers.put(subscription, new SynchronizedObserver<T>(observer, subscription));
observers.put(subscription, observer);
return subscription;
}
};
Expand Down

0 comments on commit f1aaab4

Please sign in to comment.