Skip to content

Commit

Permalink
Merge pull request #1257 from smallrye/internal/simplify-multi-subscr…
Browse files Browse the repository at this point in the history
…ibe-on-op

Simplify the MultiSubscribe operator implementation
  • Loading branch information
cescoffier authored Apr 21, 2023
2 parents 9c5b3e8 + 74b435a commit 62b9a26
Showing 1 changed file with 23 additions and 33 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -15,17 +15,19 @@
*/
package io.smallrye.mutiny.operators.multi;

import static io.smallrye.mutiny.helpers.Subscriptions.CANCELLED;

import java.util.concurrent.Executor;
import java.util.concurrent.Flow;
import java.util.concurrent.Flow.Subscription;
import java.util.concurrent.RejectedExecutionException;

import io.smallrye.mutiny.Multi;
import io.smallrye.mutiny.helpers.ParameterValidation;
import io.smallrye.mutiny.subscription.MultiSubscriber;

/**
* Subscribes to the upstream asynchronously using the given executor.
* Subscribes to the upstream asynchronously using the given executor, and ensure all
* {@link Flow.Subscription#request(long)} calls happen from that executor.
*
* @param <T> the type of item
*/
Expand Down Expand Up @@ -55,43 +57,31 @@ static final class SubscribeOnProcessor<T> extends MultiOperatorProcessor<T, T>
this.executor = executor;
}

@Override
public void onSubscribe(Subscription subscription) {
if (compareAndSetUpstreamSubscription(null, subscription)) {
downstream.onSubscribe(this);
} else {
subscription.cancel();
}
}

void requestUpstream(final long n, final Subscription s) {
try {
executor.execute(() -> s.request(n));
} catch (RejectedExecutionException rejected) {
super.onFailure(rejected);
}
}

void scheduleSubscription(Multi<? extends T> upstream, Flow.Subscriber<? super T> downstream) {
public void scheduleSubscription(Multi<? extends T> upstream, MultiSubscriber<? super T> downstream) {
try {
executor.execute(() -> upstream.subscribe().withSubscriber(this));
} catch (RejectedExecutionException rejected) {
if (!isDone()) {
downstream.onError(rejected);
}
} catch (RejectedExecutionException rejection) {
onFailure(rejection);
}
}

@Override
public void onItem(T t) {
downstream.onItem(t);
}

@Override
public void request(long n) {
if (n > 0) {
Subscription subscription = getUpstreamSubscription();
requestUpstream(n, subscription);
public void request(long numberOfItems) {
if (numberOfItems <= 0) {
onFailure(new IllegalArgumentException("Invalid number of request, must be greater than 0"));
return;
}
if (!isDone()) {
try {
executor.execute(() -> {
Flow.Subscription subscription = getUpstreamSubscription();
if (subscription != CANCELLED) {
subscription.request(numberOfItems);
}
});
} catch (RejectedExecutionException rejected) {
onFailure(rejected);
}
}
}
}
Expand Down

0 comments on commit 62b9a26

Please sign in to comment.