Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

ObserveOn Operator with Backpressure #835

Merged
merged 10 commits into from
Feb 9, 2014
36 changes: 27 additions & 9 deletions rxjava-core/src/main/java/rx/Observable.java
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,6 @@
import rx.operators.OperationDematerialize;
import rx.operators.OperationDistinct;
import rx.operators.OperationDistinctUntilChanged;
import rx.operators.OperatorDoOnEach;
import rx.operators.OperationElementAt;
import rx.operators.OperationFilter;
import rx.operators.OperationFinally;
Expand All @@ -63,13 +62,11 @@
import rx.operators.OperationMergeDelayError;
import rx.operators.OperationMinMax;
import rx.operators.OperationMulticast;
import rx.operators.OperationObserveOn;
import rx.operators.OperationOnErrorResumeNextViaFunction;
import rx.operators.OperationOnErrorResumeNextViaObservable;
import rx.operators.OperationOnErrorReturn;
import rx.operators.OperationOnExceptionResumeNextViaObservable;
import rx.operators.OperationParallelMerge;
import rx.operators.OperatorRepeat;
import rx.operators.OperationReplay;
import rx.operators.OperationRetry;
import rx.operators.OperationSample;
Expand All @@ -96,18 +93,21 @@
import rx.operators.OperationToObservableFuture;
import rx.operators.OperationUsing;
import rx.operators.OperationWindow;
import rx.operators.OperatorSubscribeOn;
import rx.operators.OperatorZip;
import rx.operators.OperatorCast;
import rx.operators.OperatorDoOnEach;
import rx.operators.OperatorFromIterable;
import rx.operators.OperatorGroupBy;
import rx.operators.OperatorMap;
import rx.operators.OperatorMerge;
import rx.operators.OperatorObserveOn;
import rx.operators.OperatorParallel;
import rx.operators.OperatorRepeat;
import rx.operators.OperatorSubscribeOn;
import rx.operators.OperatorTake;
import rx.operators.OperatorTimestamp;
import rx.operators.OperatorToObservableList;
import rx.operators.OperatorToObservableSortedList;
import rx.operators.OperatorZip;
import rx.operators.OperatorZipIterable;
import rx.plugins.RxJavaObservableExecutionHook;
import rx.plugins.RxJavaPlugins;
Expand Down Expand Up @@ -5139,8 +5139,7 @@ public final <R> ConnectableObservable<R> multicast(Subject<? super T, ? extends
}

/**
* Modify the source Observable so that it asynchronously notifies {@link Observer}s on the
* specified {@link Scheduler}.
* Move notifications to the specified {@link Scheduler} one `onNext` at a time.
* <p>
* <img width="640" src="https://raw.github.com/wiki/Netflix/RxJava/images/rx-operators/observeOn.png">
*
Expand All @@ -5151,9 +5150,26 @@ public final <R> ConnectableObservable<R> multicast(Subject<? super T, ? extends
* @see <a href="https://github.com/Netflix/RxJava/wiki/Observable-Utility-Operators#wiki-observeon">RxJava Wiki: observeOn()</a>
*/
public final Observable<T> observeOn(Scheduler scheduler) {
return create(OperationObserveOn.observeOn(this, scheduler));
return lift(new OperatorObserveOn<T>(scheduler));
}

/**
* Move notifications to the specified {@link Scheduler} asynchronously with a buffer of the given size.
* <p>
* <img width="640" src="https://raw.github.com/wiki/Netflix/RxJava/images/rx-operators/observeOn.png">
*
* @param scheduler
* the {@link Scheduler} to notify {@link Observer}s on
* @param bufferSize
* that will be rounded up to the next power of 2
* @return the source Observable modified so that its {@link Observer}s are notified on the
* specified {@link Scheduler}
* @see <a href="https://github.com/Netflix/RxJava/wiki/Observable-Utility-Operators#wiki-observeon">RxJava Wiki: observeOn()</a>
*/
public final Observable<T> observeOn(Scheduler scheduler, int bufferSize) {
return lift(new OperatorObserveOn<T>(scheduler, bufferSize));
}

/**
* Filters the items emitted by an Observable, only emitting those of the specified type.
* <p>
Expand Down Expand Up @@ -5296,7 +5312,9 @@ public final Observable<T> onExceptionResumeNext(final Observable<? extends T> r
* @see <a href="https://github.com/Netflix/RxJava/wiki/Observable-Utility-Operators#wiki-parallel">RxJava Wiki: parallel()</a>
*/
public final <R> Observable<R> parallel(Func1<Observable<T>, Observable<R>> f) {
return lift(new OperatorParallel<T, R>(f, Schedulers.computation()));
// TODO move this back to Schedulers.computation() again once that is properly using eventloops
// see https://github.com/Netflix/RxJava/issues/713 for why this was changed
return lift(new OperatorParallel<T, R>(f, Schedulers.newThread()));
}

/**
Expand Down
129 changes: 0 additions & 129 deletions rxjava-core/src/main/java/rx/operators/OperationObserveOn.java

This file was deleted.

Loading