Skip to content

Commit

Permalink
1.x: fix observeOn resource handling, add delayError capability
Browse files Browse the repository at this point in the history
  • Loading branch information
akarnokd committed Feb 9, 2016
1 parent 182833e commit 2367f90
Show file tree
Hide file tree
Showing 6 changed files with 240 additions and 141 deletions.
37 changes: 35 additions & 2 deletions src/main/java/rx/Observable.java
Original file line number Diff line number Diff line change
Expand Up @@ -5999,7 +5999,9 @@ public final Observable<T> mergeWith(Observable<? extends T> t1) {

/**
* Modifies an Observable to perform its emissions and notifications on a specified {@link Scheduler},
* asynchronously with an unbounded buffer.
* asynchronously with a bounded buffer.
* <p>Note that onError notifications will cut ahead of onNext notifications on the emission thread if Scheduler is truly
* asynchronous. If strict event ordering is required, consider using the {@link #observeOn(Scheduler, boolean)} overload.
* <p>
* <img width="640" height="308" src="https://raw.github.com/wiki/ReactiveX/RxJava/images/rx-operators/observeOn.png" alt="">
* <dl>
Expand All @@ -6014,12 +6016,43 @@ public final Observable<T> mergeWith(Observable<? extends T> t1) {
* @see <a href="http://reactivex.io/documentation/operators/observeon.html">ReactiveX operators documentation: ObserveOn</a>
* @see <a href="http://www.grahamlea.com/2014/07/rxjava-threading-examples/">RxJava Threading Examples</a>
* @see #subscribeOn
* @see #observeOn(Scheduler, boolean)
*/
public final Observable<T> observeOn(Scheduler scheduler) {
if (this instanceof ScalarSynchronousObservable) {
return ((ScalarSynchronousObservable<T>)this).scalarScheduleOn(scheduler);
}
return lift(new OperatorObserveOn<T>(scheduler));
return lift(new OperatorObserveOn<T>(scheduler, false));
}

/**
* Modifies an Observable to perform its emissions and notifications on a specified {@link Scheduler},
* asynchronously with a bounded buffer and optionally delays onError notifications.
* <p>
* <img width="640" height="308" src="https://raw.github.com/wiki/ReactiveX/RxJava/images/rx-operators/observeOn.png" alt="">
* <dl>
* <dt><b>Scheduler:</b></dt>
* <dd>you specify which {@link Scheduler} this operator will use</dd>
* </dl>
*
* @param scheduler
* the {@link Scheduler} to notify {@link Observer}s on
* @param delayError
* indicates if the onError notification may not cut ahead of onNext notification on the other side of the
* scheduling boundary. If true a sequence ending in onError will be replayed in the same order as was received
* from upstream
* @return the source Observable modified so that its {@link Observer}s are notified on the specified
* {@link Scheduler}
* @see <a href="http://reactivex.io/documentation/operators/observeon.html">ReactiveX operators documentation: ObserveOn</a>
* @see <a href="http://www.grahamlea.com/2014/07/rxjava-threading-examples/">RxJava Threading Examples</a>
* @see #subscribeOn
* @see #observeOn(Scheduler)
*/
public final Observable<T> observeOn(Scheduler scheduler, boolean delayError) {
if (this instanceof ScalarSynchronousObservable) {
return ((ScalarSynchronousObservable<T>)this).scalarScheduleOn(scheduler);
}
return lift(new OperatorObserveOn<T>(scheduler, delayError));
}

/**
Expand Down
4 changes: 3 additions & 1 deletion src/main/java/rx/Single.java
Original file line number Diff line number Diff line change
Expand Up @@ -1381,7 +1381,9 @@ public final Single<T> observeOn(Scheduler scheduler) {
if (this instanceof ScalarSynchronousSingle) {
return ((ScalarSynchronousSingle<T>)this).scalarScheduleOn(scheduler);
}
return lift(new OperatorObserveOn<T>(scheduler));
// Note that since Single emits onSuccess xor onError,
// there is no cut-ahead possible like with regular Observable sequences.
return lift(new OperatorObserveOn<T>(scheduler, false));
}

/**
Expand Down
Loading

0 comments on commit 2367f90

Please sign in to comment.