Skip to content

Commit

Permalink
Performance refactoring: OperatorSubscribeFunction
Browse files Browse the repository at this point in the history
- migrated Func1 to OperatorSubscribeFunction for internal operator implementations
- do not wrap with AtomicObserver when it's a trusted operator

ReactiveX#104
  • Loading branch information
benjchristensen committed Jan 22, 2013
1 parent fdfdb70 commit 0fa344f
Show file tree
Hide file tree
Showing 21 changed files with 87 additions and 55 deletions.
30 changes: 19 additions & 11 deletions rxjava-core/src/main/java/rx/observables/Observable.java
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@
import rx.observables.operations.OperationToObservableList;
import rx.observables.operations.OperationToObservableSortedList;
import rx.observables.operations.OperationZip;
import rx.observables.operations.OperatorSubscribeFunction;
import rx.util.AtomicObservableSubscription;
import rx.util.AtomicObserver;
import rx.util.functions.Action0;
Expand Down Expand Up @@ -74,7 +75,7 @@ public class Observable<T> {

private final Func1<Observer<T>, Subscription> onSubscribe;

public Observable(Func1<Observer<T>, Subscription> onSubscribe) {
protected Observable(Func1<Observer<T>, Subscription> onSubscribe) {
this.onSubscribe = onSubscribe;
}

Expand Down Expand Up @@ -104,16 +105,23 @@ public Observable(Func1<Observer<T>, Subscription> onSubscribe) {
* to stop receiving notifications before the provider has finished sending them
*/
public Subscription subscribe(Observer<T> observer) {
/*
* Wrap the observer and subscription in Atomic* wrappers to:
*
* - ensure correct behavior of onNext, onCompleted and onError.
* - allow the Observer to have access to the subscription in asynchronous execution for checking if unsubscribed occurred without onComplete/onError.
* - handle both synchronous and asynchronous subscribe() execution flows
*/
final AtomicObservableSubscription subscription = new AtomicObservableSubscription();
final Observer<T> atomicObserver = new AtomicObserver<T>(observer, subscription);
return subscription.wrap(onSubscribe.call(atomicObserver));
if (onSubscribe instanceof OperatorSubscribeFunction) {
/*
* This means it's a 'trusted' operator so we won't wrap it.
*/
return onSubscribe.call(observer);
} else {
/*
* Wrap the observer and subscription in Atomic* wrappers to:
*
* - ensure correct behavior of onNext, onCompleted and onError.
* - allow the Observer to have access to the subscription in asynchronous execution for checking if unsubscribed occurred without onComplete/onError.
* - handle both synchronous and asynchronous subscribe() execution flows
*/
final AtomicObservableSubscription subscription = new AtomicObservableSubscription();
final Observer<T> atomicObserver = new AtomicObserver<T>(observer, subscription);
return subscription.wrap(onSubscribe.call(atomicObserver));
}
};

@SuppressWarnings({ "rawtypes", "unchecked" })
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -103,7 +103,7 @@ public void onNext(Object args) {
*
* @param <R>
*/
private static class Aggregator<R> implements Func1<Observer<R>, Subscription> {
private static class Aggregator<R> implements OperatorSubscribeFunction<R> {

private final FuncN<R> combineLatestFunction;
private Observer<R> Observer;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import rx.observables.Observable;
import rx.observables.Observer;
import rx.observables.Subscription;
import rx.util.AtomicObservableSubscription;
import rx.util.functions.Func1;

public final class OperationFilter<T> {
Expand All @@ -32,26 +33,28 @@ public static <T> Func1<Observer<T>, Subscription> filter(Observable<T> that, Fu
return new Filter<T>(that, predicate);
}

private static class Filter<T> implements Func1<Observer<T>, Subscription> {
private static class Filter<T> implements OperatorSubscribeFunction<T> {

private final Observable<T> that;
private final Func1<T, Boolean> predicate;
private final AtomicObservableSubscription subscription = new AtomicObservableSubscription();

public Filter(Observable<T> that, Func1<T, Boolean> predicate) {
this.that = that;
this.predicate = predicate;
}

public Subscription call(final Observer<T> observer) {
return that.subscribe(new Observer<T>() {
return subscription.wrap(that.subscribe(new Observer<T>() {
public void onNext(T value) {
try {
if ((boolean) predicate.call(value)) {
observer.onNext(value);
}
} catch (Exception ex) {
observer.onError(ex);
// TODO is there a way to tell 'that' to unsubscribe if we have an error?
// this will work if the sequence is asynchronous, it will have no effect on a synchronous observable
subscription.unsubscribe();
}
}

Expand All @@ -62,7 +65,7 @@ public void onError(Exception ex) {
public void onCompleted() {
observer.onCompleted();
}
});
}));
}

}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ public static <T> Func1<Observer<T>, Subscription> last(Observable<T> observable
return new Last<T>(observable);
}

private static class Last<T> implements Func1<Observer<T>, Subscription> {
private static class Last<T> implements OperatorSubscribeFunction<T> {

private final AtomicReference<T> lastValue = new AtomicReference<T>();
private final Observable<T> that;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,7 @@ public static <T, R> Func1<Observer<R>, Subscription> mapMany(Observable<T> sequ
* @param <R>
* the type of the output sequence.
*/
private static class MapObservable<T, R> implements Func1<Observer<R>, Subscription> {
private static class MapObservable<T, R> implements OperatorSubscribeFunction<R> {
public MapObservable(Observable<T> sequence, Func1<T, R> func) {
this.sequence = sequence;
this.func = func;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ public static <T> Func1<Observer<Notification<T>>, Subscription> materialize(fin
return new MaterializeObservable<T>(sequence);
}

private static class MaterializeObservable<T> implements Func1<Observer<Notification<T>>, Subscription> {
private static class MaterializeObservable<T> implements OperatorSubscribeFunction<Notification<T>> {

private final Observable<T> sequence;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ public final class OperationMerge {
*/
public static <T> Func1<Observer<T>, Subscription> merge(final Observable<Observable<T>> o) {
// wrap in a Func so that if a chain is built up, then asynchronously subscribed to twice we will have 2 instances of Take<T> rather than 1 handing both, which is not thread-safe.
return new Func1<Observer<T>, Subscription>() {
return new OperatorSubscribeFunction<T>() {

@Override
public Subscription call(Observer<T> observer) {
Expand All @@ -56,7 +56,7 @@ public Subscription call(Observer<T> observer) {
}

public static <T> Func1<Observer<T>, Subscription> merge(final Observable<T>... sequences) {
return merge(Observable.create(new Func1<Observer<Observable<T>>, Subscription>() {
return merge(Observable.create(new OperatorSubscribeFunction<Observable<T>>() {
private volatile boolean unsubscribed = false;

@Override
Expand Down Expand Up @@ -85,7 +85,7 @@ public void unsubscribe() {
}

public static <T> Func1<Observer<T>, Subscription> merge(final List<Observable<T>> sequences) {
return merge(Observable.create(new Func1<Observer<Observable<T>>, Subscription>() {
return merge(Observable.create(new OperatorSubscribeFunction<Observable<T>>() {

private volatile boolean unsubscribed = false;

Expand Down Expand Up @@ -126,7 +126,7 @@ public void unsubscribe() {
*
* @param <T>
*/
private static final class MergeObservable<T> implements Func1<Observer<T>, Subscription> {
private static final class MergeObservable<T> implements OperatorSubscribeFunction<T> {
private final Observable<Observable<T>> sequences;
private final MergeSubscription ourSubscription = new MergeSubscription();
private AtomicBoolean stopped = new AtomicBoolean(false);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ public final class OperationMergeDelayError {
*/
public static <T> Func1<Observer<T>, Subscription> mergeDelayError(final Observable<Observable<T>> sequences) {
// wrap in a Func so that if a chain is built up, then asynchronously subscribed to twice we will have 2 instances of Take<T> rather than 1 handing both, which is not thread-safe.
return new Func1<Observer<T>, Subscription>() {
return new OperatorSubscribeFunction<T>() {

@Override
public Subscription call(Observer<T> observer) {
Expand All @@ -68,7 +68,7 @@ public Subscription call(Observer<T> observer) {
}

public static <T> Func1<Observer<T>, Subscription> mergeDelayError(final Observable<T>... sequences) {
return mergeDelayError(Observable.create(new Func1<Observer<Observable<T>>, Subscription>() {
return mergeDelayError(Observable.create(new OperatorSubscribeFunction<Observable<T>>() {
private volatile boolean unsubscribed = false;

@Override
Expand Down Expand Up @@ -97,7 +97,7 @@ public void unsubscribe() {
}

public static <T> Func1<Observer<T>, Subscription> mergeDelayError(final List<Observable<T>> sequences) {
return mergeDelayError(Observable.create(new Func1<Observer<Observable<T>>, Subscription>() {
return mergeDelayError(Observable.create(new OperatorSubscribeFunction<Observable<T>>() {

private volatile boolean unsubscribed = false;

Expand Down Expand Up @@ -138,7 +138,7 @@ public void unsubscribe() {
*
* @param <T>
*/
private static final class MergeDelayErrorObservable<T> implements Func1<Observer<T>, Subscription> {
private static final class MergeDelayErrorObservable<T> implements OperatorSubscribeFunction<T> {
private final Observable<Observable<T>> sequences;
private final MergeSubscription ourSubscription = new MergeSubscription();
private AtomicBoolean stopped = new AtomicBoolean(false);
Expand Down Expand Up @@ -848,7 +848,6 @@ private static class CaptureObserver implements Observer<String> {

@Override
public void onCompleted() {
// TODO Auto-generated method stub

}

Expand All @@ -859,7 +858,6 @@ public void onError(Exception e) {

@Override
public void onNext(String args) {
// TODO Auto-generated method stub

}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ public static <T> Func1<Observer<T>, Subscription> onErrorResumeNextViaFunction(
return new OnErrorResumeNextViaFunction<T>(originalSequence, resumeFunction);
}

private static class OnErrorResumeNextViaFunction<T> implements Func1<Observer<T>, Subscription> {
private static class OnErrorResumeNextViaFunction<T> implements OperatorSubscribeFunction<T> {

private final Func1<Exception, Observable<T>> resumeFunction;
private final Observable<T> originalSequence;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ public static <T> Func1<Observer<T>, Subscription> onErrorResumeNextViaObservabl
return new OnErrorResumeNextViaObservable<T>(originalSequence, resumeSequence);
}

private static class OnErrorResumeNextViaObservable<T> implements Func1<Observer<T>, Subscription> {
private static class OnErrorResumeNextViaObservable<T> implements OperatorSubscribeFunction<T> {

private final Observable<T> resumeSequence;
private final Observable<T> originalSequence;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ public static <T> Func1<Observer<T>, Subscription> onErrorReturn(Observable<T> o
return new OnErrorReturn<T>(originalSequence, resumeFunction);
}

private static class OnErrorReturn<T> implements Func1<Observer<T>, Subscription> {
private static class OnErrorReturn<T> implements OperatorSubscribeFunction<T> {
private final Func1<Exception, T> resumeFunction;
private final Observable<T> originalSequence;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import rx.observables.Observable;
import rx.observables.Observer;
import rx.observables.Subscription;
import rx.util.AtomicObservableSubscription;
import rx.util.functions.Func1;
import rx.util.functions.Func2;

Expand Down Expand Up @@ -61,10 +62,11 @@ public static <T> Func1<Observer<T>, Subscription> scan(Observable<T> sequence,
return new Accumulator<T>(sequence, null, accumulator);
}

private static class Accumulator<T> implements Func1<Observer<T>, Subscription> {
private static class Accumulator<T> implements OperatorSubscribeFunction<T> {
private final Observable<T> sequence;
private final T initialValue;
private Func2<T, T, T> accumlatorFunction;
private final AtomicObservableSubscription subscription = new AtomicObservableSubscription();

private Accumulator(Observable<T> sequence, T initialValue, Func2<T, T, T> accumulator) {
this.sequence = sequence;
Expand All @@ -74,7 +76,7 @@ private Accumulator(Observable<T> sequence, T initialValue, Func2<T, T, T> accum

public Subscription call(final Observer<T> observer) {

return sequence.subscribe(new Observer<T>() {
return subscription.wrap(sequence.subscribe(new Observer<T>() {
private T acc = initialValue;
private boolean hasSentInitialValue = false;

Expand Down Expand Up @@ -108,7 +110,8 @@ public synchronized void onNext(T value) {
observer.onNext(acc);
} catch (Exception ex) {
observer.onError(ex);
// TODO is there a correct way to unsubscribe from the sequence?
// this will work if the sequence is asynchronous, it will have no effect on a synchronous observable
subscription.unsubscribe();
}
}

Expand All @@ -124,7 +127,7 @@ public synchronized void onCompleted() {
}
observer.onCompleted();
}
});
}));
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,7 @@ public Subscription call(Observer<T> observer) {
*
* @param <T>
*/
private static class Skip<T> implements Func1<Observer<T>, Subscription> {
private static class Skip<T> implements OperatorSubscribeFunction<T> {
private final int num;
private final Observable<T> items;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ public static <T> Func1<Observer<T>, Subscription> synchronize(Observable<T> obs
return new Synchronize<T>(observable);
}

private static class Synchronize<T> implements Func1<Observer<T>, Subscription> {
private static class Synchronize<T> implements OperatorSubscribeFunction<T> {

public Synchronize(Observable<T> innerObservable) {
this.innerObservable = innerObservable;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
import rx.observables.Observable;
import rx.observables.Observer;
import rx.observables.Subscription;
import rx.util.AtomicObservableSubscription;
import rx.util.functions.Func1;

/**
Expand All @@ -44,7 +45,7 @@ public final class OperationTake {
*/
public static <T> Func1<Observer<T>, Subscription> take(final Observable<T> items, final int num) {
// wrap in a Watchbable so that if a chain is built up, then asynchronously subscribed to twice we will have 2 instances of Take<T> rather than 1 handing both, which is not thread-safe.
return new Func1<Observer<T>, Subscription>() {
return new OperatorSubscribeFunction<T>() {

@Override
public Subscription call(Observer<T> observer) {
Expand All @@ -65,17 +66,18 @@ public Subscription call(Observer<T> observer) {
*
* @param <T>
*/
private static class Take<T> implements Func1<Observer<T>, Subscription> {
private static class Take<T> implements OperatorSubscribeFunction<T> {
private final int num;
private final Observable<T> items;
private final AtomicObservableSubscription subscription = new AtomicObservableSubscription();

Take(final Observable<T> items, final int num) {
this.num = num;
this.items = items;
}

public Subscription call(Observer<T> observer) {
return items.subscribe(new ItemObserver(observer));
return subscription.wrap(items.subscribe(new ItemObserver(observer)));
}

/**
Expand Down Expand Up @@ -105,8 +107,8 @@ public void onNext(T args) {
if (counter.getAndIncrement() < num) {
observer.onNext(args);
} else {
observer.onCompleted();
// TODO do we need to unsubscribe here?
// this will work if the sequence is asynchronous, it will have no effect on a synchronous observable
subscription.unsubscribe();
}
}

Expand Down Expand Up @@ -168,8 +170,7 @@ public void testUnsubscribeAfterTake() {
verify(aObserver, times(1)).onNext("one");
verify(aObserver, never()).onNext("two");
verify(aObserver, never()).onNext("three");
// TODO commented this out for now as it's broken and I'm questioning whether it needs to be
// verify(s, times(1)).unsubscribe();
verify(s, times(1)).unsubscribe();
}

private static class TestObservable extends Observable<String> {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ public static <T> Func1<Observer<T>, Subscription> toObservableIterable(Iterable
return new ToObservableIterable<T>(list);
}

private static class ToObservableIterable<T> implements Func1<Observer<T>, Subscription> {
private static class ToObservableIterable<T> implements OperatorSubscribeFunction<T> {
public ToObservableIterable(Iterable<T> list) {
this.iterable = list;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ public static <T> Func1<Observer<List<T>>, Subscription> toObservableList(Observ
return new ToObservableList<T>(that);
}

private static class ToObservableList<T> implements Func1<Observer<List<T>>, Subscription> {
private static class ToObservableList<T> implements OperatorSubscribeFunction<List<T>> {

private final Observable<T> that;
final ConcurrentLinkedQueue<T> list = new ConcurrentLinkedQueue<T>();
Expand Down
Loading

0 comments on commit 0fa344f

Please sign in to comment.