diff --git a/implementation/src/main/java/io/smallrye/mutiny/operators/multi/MultiSubscribeOnOp.java b/implementation/src/main/java/io/smallrye/mutiny/operators/multi/MultiSubscribeOnOp.java index 66d926e1a..a5ed5435f 100644 --- a/implementation/src/main/java/io/smallrye/mutiny/operators/multi/MultiSubscribeOnOp.java +++ b/implementation/src/main/java/io/smallrye/mutiny/operators/multi/MultiSubscribeOnOp.java @@ -15,9 +15,10 @@ */ package io.smallrye.mutiny.operators.multi; +import static io.smallrye.mutiny.helpers.Subscriptions.CANCELLED; + import java.util.concurrent.Executor; import java.util.concurrent.Flow; -import java.util.concurrent.Flow.Subscription; import java.util.concurrent.RejectedExecutionException; import io.smallrye.mutiny.Multi; @@ -25,7 +26,8 @@ import io.smallrye.mutiny.subscription.MultiSubscriber; /** - * Subscribes to the upstream asynchronously using the given executor. + * Subscribes to the upstream asynchronously using the given executor, and ensure all + * {@link Flow.Subscription#request(long)} calls happen from that executor. * * @param the type of item */ @@ -55,43 +57,31 @@ static final class SubscribeOnProcessor extends MultiOperatorProcessor this.executor = executor; } - @Override - public void onSubscribe(Subscription subscription) { - if (compareAndSetUpstreamSubscription(null, subscription)) { - downstream.onSubscribe(this); - } else { - subscription.cancel(); - } - } - - void requestUpstream(final long n, final Subscription s) { - try { - executor.execute(() -> s.request(n)); - } catch (RejectedExecutionException rejected) { - super.onFailure(rejected); - } - } - - void scheduleSubscription(Multi upstream, Flow.Subscriber downstream) { + public void scheduleSubscription(Multi upstream, MultiSubscriber downstream) { try { executor.execute(() -> upstream.subscribe().withSubscriber(this)); - } catch (RejectedExecutionException rejected) { - if (!isDone()) { - downstream.onError(rejected); - } + } catch (RejectedExecutionException rejection) { + onFailure(rejection); } } @Override - public void onItem(T t) { - downstream.onItem(t); - } - - @Override - public void request(long n) { - if (n > 0) { - Subscription subscription = getUpstreamSubscription(); - requestUpstream(n, subscription); + public void request(long numberOfItems) { + if (numberOfItems <= 0) { + onFailure(new IllegalArgumentException("Invalid number of request, must be greater than 0")); + return; + } + if (!isDone()) { + try { + executor.execute(() -> { + Flow.Subscription subscription = getUpstreamSubscription(); + if (subscription != CANCELLED) { + subscription.request(numberOfItems); + } + }); + } catch (RejectedExecutionException rejected) { + onFailure(rejected); + } } } }