function) {
return Infrastructure.onMultiCreation(new MultiDemandCapping<>(this, nonNull(function, "function")));
}
-
}
diff --git a/implementation/src/main/java/io/smallrye/mutiny/operators/multi/split/MultiSplitter.java b/implementation/src/main/java/io/smallrye/mutiny/operators/multi/split/MultiSplitter.java
new file mode 100644
index 000000000..ac4783245
--- /dev/null
+++ b/implementation/src/main/java/io/smallrye/mutiny/operators/multi/split/MultiSplitter.java
@@ -0,0 +1,272 @@
+package io.smallrye.mutiny.operators.multi.split;
+
+import static io.smallrye.mutiny.helpers.ParameterValidation.nonNull;
+
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.Flow;
+import java.util.concurrent.atomic.AtomicLong;
+import java.util.concurrent.atomic.AtomicReference;
+import java.util.function.Function;
+
+import io.smallrye.common.annotation.CheckReturnValue;
+import io.smallrye.mutiny.Context;
+import io.smallrye.mutiny.Multi;
+import io.smallrye.mutiny.helpers.Subscriptions;
+import io.smallrye.mutiny.infrastructure.Infrastructure;
+import io.smallrye.mutiny.operators.AbstractMulti;
+import io.smallrye.mutiny.subscription.ContextSupport;
+import io.smallrye.mutiny.subscription.MultiSubscriber;
+
+/**
+ * Splits a {@link Multi} into several co-operating {@link Multi}.
+ *
+ * Each split {@link Multi} receives items based on a function that maps each item to a key from an enumeration.
+ *
+ * The demand of each split {@link Multi} is independent.
+ * Items flow when all keys from the enumeration have a split subscriber, and until either one of the split has a {@code 0}
+ * demand,
+ * or when one of the split subscriber cancels.
+ * The flow resumes when all keys have a subscriber again, and when the demand for each split is strictly positive.
+ *
+ * Calls to {@link #get(Enum)} result in new {@link Multi} objects, but given a key {@code K} then there can be only one
+ * active subscription. If there is already a subscriber for {@code K} then any subscription request to a {@link Multi} for key
+ * {@code K} results in a terminal failure.
+ * Note that when a subscriber for {@code K} has cancelled then a request to subscribe for a {@link Multi} for {@code K} can
+ * succeed.
+ *
+ * If the upstream {@link Multi} has already completed or failed, then any new subscriber will receive the terminal signal
+ * (see {@link MultiSubscriber#onCompletion()} and {@link MultiSubscriber#onFailure(Throwable)}).
+ *
+ * Note on {@link Context} support: it is assumed that all split subscribers share the same {@link Context} instance, if any.
+ * The {@link Context} is passed to the upstream {@link Multi} when the first split subscription happens.
+ * When disjoint {@link Context} are in use by the different split subscribers then the behavior of your code will be most
+ * likely incorrect.
+ *
+ * @param the items type
+ * @param the enumeration type
+ */
+public class MultiSplitter> {
+
+ private final Multi extends T> upstream;
+ private final Function splitter;
+ private final ConcurrentHashMap splits;
+ private final int requiredNumberOfSubscribers;
+ private final Class keyType;
+
+ public MultiSplitter(Multi extends T> upstream, Class keyType, Function splitter) {
+ this.upstream = nonNull(upstream, "upstream");
+ if (!nonNull(keyType, "keyType").isEnum()) {
+ // Note: the Java compiler enforces a type check on keyType being some enum, so this branch is only here for added peace of mind
+ throw new IllegalArgumentException("The key type must be that of an enumeration");
+ }
+ this.keyType = keyType;
+ this.splitter = nonNull(splitter, "splitter");
+ this.splits = new ConcurrentHashMap<>();
+ this.requiredNumberOfSubscribers = keyType.getEnumConstants().length;
+ }
+
+ /**
+ * Get a {@link Multi} for a given key.
+ *
+ * @param key the key
+ * @return a new {@link Multi}
+ */
+ @CheckReturnValue
+ public Multi get(K key) {
+ return Infrastructure.onMultiCreation(new SplitMulti(key));
+ }
+
+ /**
+ * Get the (enum) key type.
+ *
+ * @return the key type
+ */
+ public Class keyType() {
+ return keyType;
+ }
+
+ private enum State {
+ INIT,
+ AWAITING_SUBSCRIPTION,
+ SUBSCRIBED,
+ COMPLETED,
+ FAILED
+ }
+
+ private final AtomicReference state = new AtomicReference<>(State.INIT);
+
+ private volatile Throwable terminalFailure;
+
+ private Flow.Subscription upstreamSubscription;
+
+ private void onSplitRequest() {
+ if (state.get() != State.SUBSCRIBED || splits.size() < requiredNumberOfSubscribers) {
+ return;
+ }
+ for (SplitMulti.Split split : splits.values()) {
+ if (split.demand.get() == 0L) {
+ return;
+ }
+ }
+ upstreamSubscription.request(1L);
+ }
+
+ private void onUpstreamFailure() {
+ for (SplitMulti.Split split : splits.values()) {
+ split.downstream.onFailure(terminalFailure);
+ }
+ splits.clear();
+ }
+
+ private void onUpstreamCompletion() {
+ for (SplitMulti.Split split : splits.values()) {
+ split.downstream.onCompletion();
+ }
+ splits.clear();
+ }
+
+ private void onUpstreamItem(T item) {
+ try {
+ K key = splitter.apply(item);
+ if (key == null) {
+ throw new NullPointerException("The splitter function returned null");
+ }
+ // Note: if the target subscriber was removed between the last upstream demand and now, it is simply discarded
+ SplitMulti.Split target = splits.get(key);
+ if (target != null) {
+ target.downstream.onItem(item);
+ if (splits.size() == requiredNumberOfSubscribers
+ && (target.demand.get() == Long.MAX_VALUE || target.demand.decrementAndGet() > 0L)) {
+ upstreamSubscription.request(1L);
+ }
+ }
+ } catch (Throwable err) {
+ terminalFailure = err;
+ state.set(State.FAILED);
+ onUpstreamFailure();
+ }
+ }
+
+ // Note: we need a subscriber class because another onCompletion definition exists in Multi
+ private class Forwarder implements MultiSubscriber, ContextSupport {
+
+ private final Context context;
+
+ private Forwarder(MultiSubscriber super T> firstSubscriber) {
+ if (firstSubscriber instanceof ContextSupport) {
+ context = ((ContextSupport) firstSubscriber).context();
+ } else {
+ context = Context.empty();
+ }
+ }
+
+ @Override
+ public void onItem(T item) {
+ if (state.get() != State.SUBSCRIBED) {
+ return;
+ }
+ onUpstreamItem(item);
+ }
+
+ @Override
+ public void onFailure(Throwable failure) {
+ if (state.compareAndSet(State.SUBSCRIBED, State.FAILED)) {
+ terminalFailure = failure;
+ onUpstreamFailure();
+ }
+ }
+
+ @Override
+ public void onCompletion() {
+ if (state.compareAndSet(State.SUBSCRIBED, State.COMPLETED)) {
+ onUpstreamCompletion();
+ }
+ }
+
+ @Override
+ public void onSubscribe(Flow.Subscription subscription) {
+ if (state.get() != State.AWAITING_SUBSCRIPTION) {
+ subscription.cancel();
+ } else {
+ upstreamSubscription = subscription;
+ state.set(State.SUBSCRIBED);
+ // In case all splits would be subscribed...
+ onSplitRequest();
+ }
+ }
+
+ @Override
+ public Context context() {
+ return context;
+ }
+ }
+
+ private class SplitMulti extends AbstractMulti {
+
+ private final K key;
+
+ private SplitMulti(K key) {
+ this.key = key;
+ }
+
+ @Override
+ public void subscribe(MultiSubscriber super T> subscriber) {
+ nonNull(subscriber, "subscriber");
+
+ // First subscription triggers upstream subscription
+ if (state.compareAndSet(State.INIT, State.AWAITING_SUBSCRIPTION)) {
+ // Assumption: all split subscribers share the same context, if any
+ upstream.subscribe().withSubscriber(new Forwarder(subscriber));
+ }
+
+ // Early exits
+ State stateWhenSubscribing = state.get();
+ if (stateWhenSubscribing == State.FAILED) {
+ subscriber.onSubscribe(Subscriptions.CANCELLED);
+ subscriber.onFailure(terminalFailure);
+ return;
+ }
+ if (stateWhenSubscribing == State.COMPLETED) {
+ subscriber.onSubscribe(Subscriptions.CANCELLED);
+ subscriber.onCompletion();
+ return;
+ }
+
+ // Regular subscription path
+ Split split = new Split(subscriber);
+ Split previous = splits.putIfAbsent(key, split);
+ if (previous == null) {
+ subscriber.onSubscribe(split);
+ } else {
+ subscriber.onSubscribe(Subscriptions.CANCELLED);
+ subscriber.onError(new IllegalStateException("There is already a subscriber for key " + key));
+ }
+ }
+
+ private class Split implements Flow.Subscription {
+
+ MultiSubscriber super T> downstream;
+ AtomicLong demand = new AtomicLong();
+
+ private Split(MultiSubscriber super T> subscriber) {
+ this.downstream = subscriber;
+ }
+
+ @Override
+ public void request(long n) {
+ if (n <= 0) {
+ cancel();
+ downstream.onError(Subscriptions.getInvalidRequestException());
+ return;
+ }
+ Subscriptions.add(demand, n);
+ onSplitRequest();
+ }
+
+ @Override
+ public void cancel() {
+ splits.remove(key);
+ }
+ }
+ }
+}
diff --git a/implementation/src/main/java/module-info.java b/implementation/src/main/java/module-info.java
index 8df86c907..0494bd048 100644
--- a/implementation/src/main/java/module-info.java
+++ b/implementation/src/main/java/module-info.java
@@ -13,6 +13,7 @@
exports io.smallrye.mutiny.infrastructure;
exports io.smallrye.mutiny.operators;
exports io.smallrye.mutiny.operators.multi.processors;
+ exports io.smallrye.mutiny.operators.multi.split;
exports io.smallrye.mutiny.subscription;
exports io.smallrye.mutiny.tuples;
exports io.smallrye.mutiny.unchecked;
diff --git a/implementation/src/test/java/io/smallrye/mutiny/operators/multi/split/MultiSplitterTest.java b/implementation/src/test/java/io/smallrye/mutiny/operators/multi/split/MultiSplitterTest.java
new file mode 100644
index 000000000..124e75cdf
--- /dev/null
+++ b/implementation/src/test/java/io/smallrye/mutiny/operators/multi/split/MultiSplitterTest.java
@@ -0,0 +1,239 @@
+package io.smallrye.mutiny.operators.multi.split;
+
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.junit.jupiter.api.Assertions.assertThrows;
+
+import java.util.List;
+
+import org.junit.jupiter.api.Test;
+
+import io.smallrye.mutiny.Context;
+import io.smallrye.mutiny.Multi;
+import io.smallrye.mutiny.helpers.test.AssertSubscriber;
+
+class MultiSplitterTest {
+
+ enum OddEven {
+ ODD,
+ EVEN
+ }
+
+ MultiSplitter evenOddSplitter() {
+ return Multi.createFrom().items(1, 2, 3, 4, 5, 6, 7, 8, 9, 10)
+ .split(OddEven.class, n -> (n % 2 == 0) ? OddEven.EVEN : OddEven.ODD);
+ }
+
+ @Test
+ void rejectNullKeyType() {
+ var err = assertThrows(IllegalArgumentException.class,
+ () -> Multi.createFrom().nothing().split(null, null));
+ assertThat(err.getMessage()).contains("keyType");
+ }
+
+ @Test
+ void rejectNullSplitter() {
+ var err = assertThrows(IllegalArgumentException.class,
+ () -> Multi.createFrom().nothing().split(OddEven.class, null));
+ assertThat(err.getMessage()).contains("splitter");
+ }
+
+ @Test
+ void rejectNegativeDemand() {
+ var sub = evenOddSplitter().get(OddEven.ODD)
+ .subscribe().withSubscriber(AssertSubscriber.create());
+ sub.request(-10);
+ sub.assertFailedWith(IllegalArgumentException.class, "must be greater than 0");
+ }
+
+ @Test
+ void checkBasicBehavior() {
+ var splitter = evenOddSplitter();
+ assertThat(splitter.keyType()).isEqualTo(OddEven.class);
+
+ var odd = splitter.get(OddEven.ODD)
+ .subscribe().withSubscriber(AssertSubscriber.create());
+ var even = splitter.get(OddEven.EVEN)
+ .subscribe().withSubscriber(AssertSubscriber.create());
+
+ odd.assertHasNotReceivedAnyItem();
+ even.assertHasNotReceivedAnyItem();
+
+ odd.request(2L);
+
+ odd.assertHasNotReceivedAnyItem();
+ even.assertHasNotReceivedAnyItem();
+
+ even.request(1L);
+
+ odd.assertItems(1);
+ even.assertItems(2);
+
+ even.request(1L);
+
+ odd.assertItems(1, 3);
+ even.assertItems(2);
+
+ odd.request(1L);
+
+ odd.assertItems(1, 3);
+ even.assertItems(2, 4);
+
+ even.request(1L);
+
+ odd.assertItems(1, 3, 5);
+ even.assertItems(2, 4);
+
+ odd.request(Long.MAX_VALUE);
+
+ odd.assertItems(1, 3, 5);
+ even.assertItems(2, 4, 6);
+
+ even.request(Long.MAX_VALUE);
+
+ odd.assertItems(1, 3, 5, 7, 9);
+ even.assertItems(2, 4, 6, 8, 10);
+
+ odd.assertCompleted();
+ even.assertCompleted();
+
+ var afterWork = splitter.get(OddEven.ODD)
+ .subscribe().withSubscriber(AssertSubscriber.create(Long.MAX_VALUE));
+ afterWork.assertHasNotReceivedAnyItem().hasCompleted();
+ }
+
+ @Test
+ void checkPauseOnCancellation() {
+ var splitter = evenOddSplitter();
+ var odd = splitter.get(OddEven.ODD)
+ .subscribe().withSubscriber(AssertSubscriber.create());
+ var even = splitter.get(OddEven.EVEN)
+ .subscribe().withSubscriber(AssertSubscriber.create());
+
+ odd.request(2L);
+ even.request(2L);
+
+ odd.assertItems(1, 3);
+ even.assertItems(2);
+
+ even.cancel();
+ odd.request(2L);
+
+ odd.assertItems(1, 3);
+
+ even = splitter.get(OddEven.EVEN)
+ .subscribe().withSubscriber(AssertSubscriber.create());
+ even.request(2L);
+
+ odd.assertItems(1, 3, 5);
+ even.assertItems(4, 6);
+ }
+
+ @Test
+ void boundedDemandPrevailsOverUnboundedDemand() {
+ var splitter = evenOddSplitter();
+ var odd = splitter.get(OddEven.ODD)
+ .subscribe().withSubscriber(AssertSubscriber.create());
+ var even = splitter.get(OddEven.EVEN)
+ .subscribe().withSubscriber(AssertSubscriber.create());
+
+ odd.request(Long.MAX_VALUE);
+ even.request(2L);
+
+ odd.assertItems(1, 3);
+ even.assertItems(2, 4);
+ }
+
+ @Test
+ void rejectSubscriptionWhenAlreadyActive() {
+ var splitter = evenOddSplitter();
+ var ok = splitter.get(OddEven.ODD)
+ .subscribe().withSubscriber(AssertSubscriber.create(Long.MAX_VALUE));
+ var failing = splitter.get(OddEven.ODD)
+ .subscribe().withSubscriber(AssertSubscriber.create(Long.MAX_VALUE));
+
+ ok.assertHasNotReceivedAnyItem().assertNotTerminated();
+ failing.assertFailedWith(IllegalStateException.class, "There is already a subscriber for key ODD");
+ }
+
+ @Test
+ void nullReturningSplitter() {
+ var splitter = Multi.createFrom().items(1, 2, 3)
+ .split(OddEven.class, n -> null);
+ var odd = splitter.get(OddEven.ODD)
+ .subscribe().withSubscriber(AssertSubscriber.create());
+ var even = splitter.get(OddEven.EVEN)
+ .subscribe().withSubscriber(AssertSubscriber.create());
+
+ odd.request(Long.MAX_VALUE);
+ even.request(Long.MAX_VALUE);
+
+ odd.assertFailedWith(NullPointerException.class, "The splitter function returned null");
+ even.assertFailedWith(NullPointerException.class, "The splitter function returned null");
+
+ var afterWork = splitter.get(OddEven.ODD)
+ .subscribe().withSubscriber(AssertSubscriber.create(Long.MAX_VALUE));
+ afterWork.assertFailedWith(NullPointerException.class, "The splitter function returned null");
+ }
+
+ @Test
+ void throwingSplitter() {
+ var splitter = Multi.createFrom().items(1, 2, 3)
+ .split(OddEven.class, n -> {
+ throw new RuntimeException("boom");
+ });
+ var odd = splitter.get(OddEven.ODD)
+ .subscribe().withSubscriber(AssertSubscriber.create());
+ var even = splitter.get(OddEven.EVEN)
+ .subscribe().withSubscriber(AssertSubscriber.create());
+
+ odd.request(Long.MAX_VALUE);
+ even.request(Long.MAX_VALUE);
+
+ odd.assertFailedWith(RuntimeException.class, "boom");
+ even.assertFailedWith(RuntimeException.class, "boom");
+
+ var afterWork = splitter.get(OddEven.ODD)
+ .subscribe().withSubscriber(AssertSubscriber.create(Long.MAX_VALUE));
+ afterWork.assertFailedWith(RuntimeException.class, "boom");
+ }
+
+ @Test
+ void failedMultiSubscriptions() {
+ var splitter = Multi.createFrom().items(1, 2, 3)
+ .onCompletion().switchTo(Multi.createFrom().failure(new RuntimeException("boom")))
+ .split(OddEven.class, n -> (n % 2 == 0) ? OddEven.EVEN : OddEven.ODD);
+
+ var odd = splitter.get(OddEven.ODD)
+ .subscribe().withSubscriber(AssertSubscriber.create(Long.MAX_VALUE));
+ var even = splitter.get(OddEven.EVEN)
+ .subscribe().withSubscriber(AssertSubscriber.create(Long.MAX_VALUE));
+
+ odd.assertFailedWith(RuntimeException.class, "boom")
+ .assertItems(1, 3);
+
+ even.assertFailedWith(RuntimeException.class, "boom")
+ .assertItems(2);
+
+ var afterWork = splitter.get(OddEven.ODD)
+ .subscribe().withSubscriber(AssertSubscriber.create(Long.MAX_VALUE));
+ afterWork.assertHasNotReceivedAnyItem()
+ .assertFailedWith(RuntimeException.class, "boom");
+ }
+
+ @Test
+ void contextPassing() {
+ var splitter = Multi.createFrom().context(ctx -> Multi.createFrom().iterable(ctx.> get("items")))
+ .split(OddEven.class, n -> (n % 2 == 0) ? OddEven.EVEN : OddEven.ODD);
+
+ var ctx = Context.of("items", List.of(1, 2, 3, 4, 5, 6));
+
+ var odd = splitter.get(OddEven.ODD)
+ .subscribe().withSubscriber(AssertSubscriber.create(ctx, Long.MAX_VALUE));
+
+ var even = splitter.get(OddEven.EVEN)
+ .subscribe().withSubscriber(AssertSubscriber.create(ctx, Long.MAX_VALUE));
+
+ odd.assertCompleted().assertItems(1, 3, 5);
+ even.assertCompleted().assertItems(2, 4, 6);
+ }
+}
diff --git a/reactive-streams-tck-tests/src/test/java/io/smallrye/mutiny/tcktests/MultiSplitTckTest.java b/reactive-streams-tck-tests/src/test/java/io/smallrye/mutiny/tcktests/MultiSplitTckTest.java
new file mode 100644
index 000000000..06cf9e236
--- /dev/null
+++ b/reactive-streams-tck-tests/src/test/java/io/smallrye/mutiny/tcktests/MultiSplitTckTest.java
@@ -0,0 +1,24 @@
+package io.smallrye.mutiny.tcktests;
+
+import java.util.concurrent.Flow;
+
+public class MultiSplitTckTest extends AbstractPublisherTck {
+
+ enum Anything {
+ AnyValue
+ }
+
+ @Override
+ public Flow.Publisher createFlowPublisher(long elements) {
+ return upstream(elements)
+ .split(Anything.class, n -> Anything.AnyValue)
+ .get(Anything.AnyValue);
+ }
+
+ @Override
+ public Flow.Publisher createFailedFlowPublisher() {
+ return failedUpstream()
+ .split(Anything.class, n -> Anything.AnyValue)
+ .get(Anything.AnyValue);
+ }
+}