From 44c0d7ebe73d0efab0b2fe413e3f0853974c3f5b Mon Sep 17 00:00:00 2001 From: akarnokd Date: Thu, 1 Jun 2017 10:18:23 +0200 Subject: [PATCH] 2.x: add subscribeOn overload to avoid same-pool deadlock with create --- src/main/java/io/reactivex/Flowable.java | 47 ++++++++++++++++++- .../flowable/FlowableSubscribeOn.java | 4 +- .../flowable/FlowableSubscribeOnTest.java | 47 +++++++++++++++++++ 3 files changed, 94 insertions(+), 4 deletions(-) diff --git a/src/main/java/io/reactivex/Flowable.java b/src/main/java/io/reactivex/Flowable.java index 17b63d7970..21c1be9d19 100644 --- a/src/main/java/io/reactivex/Flowable.java +++ b/src/main/java/io/reactivex/Flowable.java @@ -13045,6 +13045,44 @@ public final > E subscribeWith(E subscriber) { /** * Asynchronously subscribes Subscribers to this Publisher on the specified {@link Scheduler}. *

+ * If there is a {@link #create(FlowableOnSubscribe, BackpressureStrategy)} type source up in the + * chain, it is recommended to use {@code subscribeOn(scheduler, false)} instead + * to avoid same-pool deadlock because requests may pile up behind a eager/blocking emitter. + *

+ * + *

+ *
Backpressure:
+ *
The operator doesn't interfere with backpressure which is determined by the source {@code Publisher}'s backpressure + * behavior.
+ *
Scheduler:
+ *
You specify which {@link Scheduler} this operator will use
+ *
+ * + * @param scheduler + * the {@link Scheduler} to perform subscription actions on + * @return the source Publisher modified so that its subscriptions happen on the + * specified {@link Scheduler} + * @see ReactiveX operators documentation: SubscribeOn + * @see RxJava Threading Examples + * @see #observeOn + * @see #subscribeOn(Scheduler, boolean) + */ + @CheckReturnValue + @BackpressureSupport(BackpressureKind.PASS_THROUGH) + @SchedulerSupport(SchedulerSupport.CUSTOM) + public final Flowable subscribeOn(@NonNull Scheduler scheduler) { + ObjectHelper.requireNonNull(scheduler, "scheduler is null"); + return subscribeOn(scheduler, !(this instanceof FlowableCreate)); + } + + /** + * Asynchronously subscribes Subscribers to this Publisher on the specified {@link Scheduler} + * optionally reroutes requests from other threads to the same {@link Scheduler} thread. + *

+ * If there is a {@link #create(FlowableOnSubscribe, BackpressureStrategy)} type source up in the + * chain, it is recommended to have {@code requestOn} false to avoid same-pool deadlock + * because requests may pile up behind a eager/blocking emitter. + *

* *

*
Backpressure:
@@ -13056,18 +13094,23 @@ public final > E subscribeWith(E subscriber) { * * @param scheduler * the {@link Scheduler} to perform subscription actions on + * @param requestOn if true, requests are rerouted to the given Scheduler as well (strong pipelining) + * if false, requests coming from any thread are simply forwarded to + * the upstream on the same thread (weak pipelining) * @return the source Publisher modified so that its subscriptions happen on the * specified {@link Scheduler} * @see ReactiveX operators documentation: SubscribeOn * @see RxJava Threading Examples * @see #observeOn + * @since 2.1.1 - experimental */ @CheckReturnValue @BackpressureSupport(BackpressureKind.PASS_THROUGH) @SchedulerSupport(SchedulerSupport.CUSTOM) - public final Flowable subscribeOn(Scheduler scheduler) { + @Experimental + public final Flowable subscribeOn(@NonNull Scheduler scheduler, boolean requestOn) { ObjectHelper.requireNonNull(scheduler, "scheduler is null"); - return RxJavaPlugins.onAssembly(new FlowableSubscribeOn(this, scheduler, this instanceof FlowableCreate)); + return RxJavaPlugins.onAssembly(new FlowableSubscribeOn(this, scheduler, requestOn)); } /** diff --git a/src/main/java/io/reactivex/internal/operators/flowable/FlowableSubscribeOn.java b/src/main/java/io/reactivex/internal/operators/flowable/FlowableSubscribeOn.java index cd0c50e1e6..1bb3f1e829 100644 --- a/src/main/java/io/reactivex/internal/operators/flowable/FlowableSubscribeOn.java +++ b/src/main/java/io/reactivex/internal/operators/flowable/FlowableSubscribeOn.java @@ -65,13 +65,13 @@ static final class SubscribeOnSubscriber extends AtomicReference Publisher source; - SubscribeOnSubscriber(Subscriber actual, Scheduler.Worker worker, Publisher source, boolean nonScheduledRequests) { + SubscribeOnSubscriber(Subscriber actual, Scheduler.Worker worker, Publisher source, boolean requestOn) { this.actual = actual; this.worker = worker; this.source = source; this.s = new AtomicReference(); this.requested = new AtomicLong(); - this.nonScheduledRequests = nonScheduledRequests; + this.nonScheduledRequests = !requestOn; } @Override diff --git a/src/test/java/io/reactivex/internal/operators/flowable/FlowableSubscribeOnTest.java b/src/test/java/io/reactivex/internal/operators/flowable/FlowableSubscribeOnTest.java index c505c7610d..7798cc88f1 100644 --- a/src/test/java/io/reactivex/internal/operators/flowable/FlowableSubscribeOnTest.java +++ b/src/test/java/io/reactivex/internal/operators/flowable/FlowableSubscribeOnTest.java @@ -372,4 +372,51 @@ public void subscribe(FlowableEmitter s) throws Exception { .assertNoErrors() .assertComplete(); } + + @Test + public void nonScheduledRequestsNotSubsequentSubscribeOn() { + TestSubscriber ts = Flowable.create(new FlowableOnSubscribe() { + @Override + public void subscribe(FlowableEmitter s) throws Exception { + for (int i = 1; i < 1001; i++) { + s.onNext(i); + Thread.sleep(1); + } + s.onComplete(); + } + }, BackpressureStrategy.DROP) + .map(Functions.identity()) + .subscribeOn(Schedulers.single(), false) + .observeOn(Schedulers.computation()) + .test() + .awaitDone(5, TimeUnit.SECONDS) + .assertNoErrors() + .assertComplete(); + + int c = ts.valueCount(); + + assertTrue("" + c, c > Flowable.bufferSize()); + } + + @Test + public void scheduledRequestsNotSubsequentSubscribeOn() { + Flowable.create(new FlowableOnSubscribe() { + @Override + public void subscribe(FlowableEmitter s) throws Exception { + for (int i = 1; i < 1001; i++) { + s.onNext(i); + Thread.sleep(1); + } + s.onComplete(); + } + }, BackpressureStrategy.DROP) + .map(Functions.identity()) + .subscribeOn(Schedulers.single(), true) + .observeOn(Schedulers.computation()) + .test() + .awaitDone(5, TimeUnit.SECONDS) + .assertValueCount(Flowable.bufferSize()) + .assertNoErrors() + .assertComplete(); + } }