Skip to content

Commit

Permalink
2.x: add subscribeOn overload to avoid same-pool deadlock with create (
Browse files Browse the repository at this point in the history
  • Loading branch information
akarnokd authored Jun 4, 2017
1 parent a43265f commit 8a62afb
Show file tree
Hide file tree
Showing 3 changed files with 94 additions and 4 deletions.
47 changes: 45 additions & 2 deletions src/main/java/io/reactivex/Flowable.java
Original file line number Diff line number Diff line change
Expand Up @@ -13045,6 +13045,44 @@ public final <E extends Subscriber<? super T>> E subscribeWith(E subscriber) {
/**
* Asynchronously subscribes Subscribers to this Publisher on the specified {@link Scheduler}.
* <p>
* If there is a {@link #create(FlowableOnSubscribe, BackpressureStrategy)} type source up in the
* chain, it is recommended to use {@code subscribeOn(scheduler, false)} instead
* to avoid same-pool deadlock because requests may pile up behind a eager/blocking emitter.
* <p>
* <img width="640" height="305" src="https://raw.github.com/wiki/ReactiveX/RxJava/images/rx-operators/subscribeOn.png" alt="">
* <dl>
* <dt><b>Backpressure:</b></dt>
* <dd>The operator doesn't interfere with backpressure which is determined by the source {@code Publisher}'s backpressure
* behavior.</dd>
* <dt><b>Scheduler:</b></dt>
* <dd>You specify which {@link Scheduler} this operator will use</dd>
* </dl>
*
* @param scheduler
* the {@link Scheduler} to perform subscription actions on
* @return the source Publisher modified so that its subscriptions happen on the
* specified {@link Scheduler}
* @see <a href="http://reactivex.io/documentation/operators/subscribeon.html">ReactiveX operators documentation: SubscribeOn</a>
* @see <a href="http://www.grahamlea.com/2014/07/rxjava-threading-examples/">RxJava Threading Examples</a>
* @see #observeOn
* @see #subscribeOn(Scheduler, boolean)
*/
@CheckReturnValue
@BackpressureSupport(BackpressureKind.PASS_THROUGH)
@SchedulerSupport(SchedulerSupport.CUSTOM)
public final Flowable<T> subscribeOn(@NonNull Scheduler scheduler) {
ObjectHelper.requireNonNull(scheduler, "scheduler is null");
return subscribeOn(scheduler, !(this instanceof FlowableCreate));
}

/**
* Asynchronously subscribes Subscribers to this Publisher on the specified {@link Scheduler}
* optionally reroutes requests from other threads to the same {@link Scheduler} thread.
* <p>
* If there is a {@link #create(FlowableOnSubscribe, BackpressureStrategy)} type source up in the
* chain, it is recommended to have {@code requestOn} false to avoid same-pool deadlock
* because requests may pile up behind a eager/blocking emitter.
* <p>
* <img width="640" height="305" src="https://raw.github.com/wiki/ReactiveX/RxJava/images/rx-operators/subscribeOn.png" alt="">
* <dl>
* <dt><b>Backpressure:</b></dt>
Expand All @@ -13056,18 +13094,23 @@ public final <E extends Subscriber<? super T>> E subscribeWith(E subscriber) {
*
* @param scheduler
* the {@link Scheduler} to perform subscription actions on
* @param requestOn if true, requests are rerouted to the given Scheduler as well (strong pipelining)
* if false, requests coming from any thread are simply forwarded to
* the upstream on the same thread (weak pipelining)
* @return the source Publisher modified so that its subscriptions happen on the
* specified {@link Scheduler}
* @see <a href="http://reactivex.io/documentation/operators/subscribeon.html">ReactiveX operators documentation: SubscribeOn</a>
* @see <a href="http://www.grahamlea.com/2014/07/rxjava-threading-examples/">RxJava Threading Examples</a>
* @see #observeOn
* @since 2.1.1 - experimental
*/
@CheckReturnValue
@BackpressureSupport(BackpressureKind.PASS_THROUGH)
@SchedulerSupport(SchedulerSupport.CUSTOM)
public final Flowable<T> subscribeOn(Scheduler scheduler) {
@Experimental
public final Flowable<T> subscribeOn(@NonNull Scheduler scheduler, boolean requestOn) {
ObjectHelper.requireNonNull(scheduler, "scheduler is null");
return RxJavaPlugins.onAssembly(new FlowableSubscribeOn<T>(this, scheduler, this instanceof FlowableCreate));
return RxJavaPlugins.onAssembly(new FlowableSubscribeOn<T>(this, scheduler, requestOn));
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -65,13 +65,13 @@ static final class SubscribeOnSubscriber<T> extends AtomicReference<Thread>

Publisher<T> source;

SubscribeOnSubscriber(Subscriber<? super T> actual, Scheduler.Worker worker, Publisher<T> source, boolean nonScheduledRequests) {
SubscribeOnSubscriber(Subscriber<? super T> actual, Scheduler.Worker worker, Publisher<T> source, boolean requestOn) {
this.actual = actual;
this.worker = worker;
this.source = source;
this.s = new AtomicReference<Subscription>();
this.requested = new AtomicLong();
this.nonScheduledRequests = nonScheduledRequests;
this.nonScheduledRequests = !requestOn;
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -372,4 +372,51 @@ public void subscribe(FlowableEmitter<Object> s) throws Exception {
.assertNoErrors()
.assertComplete();
}

@Test
public void nonScheduledRequestsNotSubsequentSubscribeOn() {
TestSubscriber<Object> ts = Flowable.create(new FlowableOnSubscribe<Object>() {
@Override
public void subscribe(FlowableEmitter<Object> s) throws Exception {
for (int i = 1; i < 1001; i++) {
s.onNext(i);
Thread.sleep(1);
}
s.onComplete();
}
}, BackpressureStrategy.DROP)
.map(Functions.identity())
.subscribeOn(Schedulers.single(), false)
.observeOn(Schedulers.computation())
.test()
.awaitDone(5, TimeUnit.SECONDS)
.assertNoErrors()
.assertComplete();

int c = ts.valueCount();

assertTrue("" + c, c > Flowable.bufferSize());
}

@Test
public void scheduledRequestsNotSubsequentSubscribeOn() {
Flowable.create(new FlowableOnSubscribe<Object>() {
@Override
public void subscribe(FlowableEmitter<Object> s) throws Exception {
for (int i = 1; i < 1001; i++) {
s.onNext(i);
Thread.sleep(1);
}
s.onComplete();
}
}, BackpressureStrategy.DROP)
.map(Functions.identity())
.subscribeOn(Schedulers.single(), true)
.observeOn(Schedulers.computation())
.test()
.awaitDone(5, TimeUnit.SECONDS)
.assertValueCount(Flowable.bufferSize())
.assertNoErrors()
.assertComplete();
}
}

0 comments on commit 8a62afb

Please sign in to comment.