Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

2.x: add subscribeOn overload to avoid same-pool deadlock with create #5386

Merged
merged 1 commit into from
Jun 4, 2017
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
47 changes: 45 additions & 2 deletions src/main/java/io/reactivex/Flowable.java
Original file line number Diff line number Diff line change
Expand Up @@ -13045,6 +13045,44 @@ public final <E extends Subscriber<? super T>> E subscribeWith(E subscriber) {
/**
* Asynchronously subscribes Subscribers to this Publisher on the specified {@link Scheduler}.
* <p>
* If there is a {@link #create(FlowableOnSubscribe, BackpressureStrategy)} type source up in the
* chain, it is recommended to use {@code subscribeOn(scheduler, false)} instead
* to avoid same-pool deadlock because requests may pile up behind a eager/blocking emitter.
* <p>
* <img width="640" height="305" src="https://raw.github.com/wiki/ReactiveX/RxJava/images/rx-operators/subscribeOn.png" alt="">
* <dl>
* <dt><b>Backpressure:</b></dt>
* <dd>The operator doesn't interfere with backpressure which is determined by the source {@code Publisher}'s backpressure
* behavior.</dd>
* <dt><b>Scheduler:</b></dt>
* <dd>You specify which {@link Scheduler} this operator will use</dd>
* </dl>
*
* @param scheduler
* the {@link Scheduler} to perform subscription actions on
* @return the source Publisher modified so that its subscriptions happen on the
* specified {@link Scheduler}
* @see <a href="http://reactivex.io/documentation/operators/subscribeon.html">ReactiveX operators documentation: SubscribeOn</a>
* @see <a href="http://www.grahamlea.com/2014/07/rxjava-threading-examples/">RxJava Threading Examples</a>
* @see #observeOn
* @see #subscribeOn(Scheduler, boolean)
*/
@CheckReturnValue
@BackpressureSupport(BackpressureKind.PASS_THROUGH)
@SchedulerSupport(SchedulerSupport.CUSTOM)
public final Flowable<T> subscribeOn(@NonNull Scheduler scheduler) {
ObjectHelper.requireNonNull(scheduler, "scheduler is null");
return subscribeOn(scheduler, !(this instanceof FlowableCreate));
}

/**
* Asynchronously subscribes Subscribers to this Publisher on the specified {@link Scheduler}
* optionally reroutes requests from other threads to the same {@link Scheduler} thread.
* <p>
* If there is a {@link #create(FlowableOnSubscribe, BackpressureStrategy)} type source up in the
* chain, it is recommended to have {@code requestOn} false to avoid same-pool deadlock
* because requests may pile up behind a eager/blocking emitter.
* <p>
* <img width="640" height="305" src="https://raw.github.com/wiki/ReactiveX/RxJava/images/rx-operators/subscribeOn.png" alt="">
* <dl>
* <dt><b>Backpressure:</b></dt>
Expand All @@ -13056,18 +13094,23 @@ public final <E extends Subscriber<? super T>> E subscribeWith(E subscriber) {
*
* @param scheduler
* the {@link Scheduler} to perform subscription actions on
* @param requestOn if true, requests are rerouted to the given Scheduler as well (strong pipelining)
* if false, requests coming from any thread are simply forwarded to
* the upstream on the same thread (weak pipelining)
* @return the source Publisher modified so that its subscriptions happen on the
* specified {@link Scheduler}
* @see <a href="http://reactivex.io/documentation/operators/subscribeon.html">ReactiveX operators documentation: SubscribeOn</a>
* @see <a href="http://www.grahamlea.com/2014/07/rxjava-threading-examples/">RxJava Threading Examples</a>
* @see #observeOn
* @since 2.1.1 - experimental
*/
@CheckReturnValue
@BackpressureSupport(BackpressureKind.PASS_THROUGH)
@SchedulerSupport(SchedulerSupport.CUSTOM)
public final Flowable<T> subscribeOn(Scheduler scheduler) {
@Experimental
public final Flowable<T> subscribeOn(@NonNull Scheduler scheduler, boolean requestOn) {
ObjectHelper.requireNonNull(scheduler, "scheduler is null");
return RxJavaPlugins.onAssembly(new FlowableSubscribeOn<T>(this, scheduler, this instanceof FlowableCreate));
return RxJavaPlugins.onAssembly(new FlowableSubscribeOn<T>(this, scheduler, requestOn));
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -65,13 +65,13 @@ static final class SubscribeOnSubscriber<T> extends AtomicReference<Thread>

Publisher<T> source;

SubscribeOnSubscriber(Subscriber<? super T> actual, Scheduler.Worker worker, Publisher<T> source, boolean nonScheduledRequests) {
SubscribeOnSubscriber(Subscriber<? super T> actual, Scheduler.Worker worker, Publisher<T> source, boolean requestOn) {
this.actual = actual;
this.worker = worker;
this.source = source;
this.s = new AtomicReference<Subscription>();
this.requested = new AtomicLong();
this.nonScheduledRequests = nonScheduledRequests;
this.nonScheduledRequests = !requestOn;
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -372,4 +372,51 @@ public void subscribe(FlowableEmitter<Object> s) throws Exception {
.assertNoErrors()
.assertComplete();
}

@Test
public void nonScheduledRequestsNotSubsequentSubscribeOn() {
TestSubscriber<Object> ts = Flowable.create(new FlowableOnSubscribe<Object>() {
@Override
public void subscribe(FlowableEmitter<Object> s) throws Exception {
for (int i = 1; i < 1001; i++) {
s.onNext(i);
Thread.sleep(1);
}
s.onComplete();
}
}, BackpressureStrategy.DROP)
.map(Functions.identity())
.subscribeOn(Schedulers.single(), false)
.observeOn(Schedulers.computation())
.test()
.awaitDone(5, TimeUnit.SECONDS)
.assertNoErrors()
.assertComplete();

int c = ts.valueCount();

assertTrue("" + c, c > Flowable.bufferSize());
}

@Test
public void scheduledRequestsNotSubsequentSubscribeOn() {
Flowable.create(new FlowableOnSubscribe<Object>() {
@Override
public void subscribe(FlowableEmitter<Object> s) throws Exception {
for (int i = 1; i < 1001; i++) {
s.onNext(i);
Thread.sleep(1);
}
s.onComplete();
}
}, BackpressureStrategy.DROP)
.map(Functions.identity())
.subscribeOn(Schedulers.single(), true)
.observeOn(Schedulers.computation())
.test()
.awaitDone(5, TimeUnit.SECONDS)
.assertValueCount(Flowable.bufferSize())
.assertNoErrors()
.assertComplete();
}
}