Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Multi split operator #1309

Merged
merged 4 commits into from
Jul 10, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
232 changes: 167 additions & 65 deletions documentation/Pipfile.lock

Large diffs are not rendered by default.

52 changes: 52 additions & 0 deletions documentation/docs/guides/multi-split.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
---
tags:
- guide
- intermediate
---

# Splitting a Multi into several Multi

It is possible to split a `Multi` into several `Multi` streams.

## Using the split operator

Suppose that we have a stream of strings that represent _signals_, and that we want a `Multi` for each kind of signal:

- `?foo`, `?bar` are _input_ signals,
- `!foo`, `!bar` are _output_ signals,
- `foo`, `bar` are _other_ signals.

To do that, we need a function that maps each item of the stream to its target stream.
The splitter API needs a Java enumeration to define keys, as in:

```java linenums="1"
{{ insert('java/guides/operators/SplitTest.java', 'enum') }}
```

Now we can use the `split` operator that provides a splitter object, and fetch individual `Multi` for each split stream using the `get` method:

```java linenums="1"
{{ insert('java/guides/operators/SplitTest.java', 'splits') }}
```

This prints the following console output:

```
output - a
input - b
output - c
output - d
other - 123
input - e
```

## Notes on using splits

- Items flow when all splits have a subscriber.
- The flow stops when either of the subscribers cancels, or when any subscriber has a no outstanding demand.
- The flow resumes when all splits have a subscriber again, and when all subscribers have outstanding demand.
- Only one subscriber can be active for a given split. Other subscription attempts will receive an error.
- When a subscriber cancels, then a new subscription attempt on its corresponding split can succeed.
- Subscribing to an already completed or errored split results in receiving the terminal signal (`onComplete()` or `onFailure(err)`).
- The upstream `Multi` gets subscribed to when the first split subscription happens, no matter which split it is.
- The first split subscription passes its context, if any, to the upstream `Multi`. It is expected that all split subscribers share the same context object, or the behavior of your code will most likely be incorrect.
1 change: 1 addition & 0 deletions documentation/mkdocs.yml
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@ nav:
- 'guides/context-passing.md'
- 'guides/replaying-multis.md'
- 'guides/controlling-demand.md'
- 'guides/multi-split.md'
- 'Reference':
- 'reference/migrating-to-mutiny-2.md'
- 'reference/why-is-asynchronous-important.md'
Expand Down
45 changes: 45 additions & 0 deletions documentation/src/test/java/guides/operators/SplitTest.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
package guides.operators;

import io.smallrye.mutiny.Multi;
import org.junit.jupiter.api.Test;

public class SplitTest {

// <enum>
enum Signals {
INPUT,
OUTPUT,
OTHER
}
// </enum>

@Test
public void splitDemo() {
// <splits>
Multi<String> multi = Multi.createFrom().items(
"!a", "?b", "!c", "!d", "123", "?e"
);

var splitter = multi.split(Signals.class, s -> {
if (s.startsWith("?")) {
return Signals.INPUT;
} else if (s.startsWith("!")) {
return Signals.OUTPUT;
} else {
return Signals.OTHER;
}
});

splitter.get(Signals.INPUT)
.onItem().transform(s -> s.substring(1))
.subscribe().with(signal -> System.out.println("input - " + signal));

splitter.get(Signals.OUTPUT)
.onItem().transform(s -> s.substring(1))
.subscribe().with(signal -> System.out.println("output - " + signal));

splitter.get(Signals.OTHER)
.subscribe().with(signal -> System.out.println("other - " + signal));
// </splits>
}
}
1 change: 1 addition & 0 deletions implementation/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -108,6 +108,7 @@
<sourceFileInclude>io/smallrye/mutiny/infrastructure/*.java</sourceFileInclude>
<sourceFileInclude>io/smallrye/mutiny/operators/*.java</sourceFileInclude>
<sourceFileInclude>io/smallrye/mutiny/operators/multi/processors/*.java</sourceFileInclude>
<sourceFileInclude>io/smallrye/mutiny/operators/multi/split/*.java</sourceFileInclude>
<sourceFileInclude>io/smallrye/mutiny/subscription/*.java</sourceFileInclude>
<sourceFileInclude>io/smallrye/mutiny/tuples/*.java</sourceFileInclude>
<sourceFileInclude>io/smallrye/mutiny/unchecked/*.java</sourceFileInclude>
Expand Down
31 changes: 31 additions & 0 deletions implementation/src/main/java/io/smallrye/mutiny/Multi.java
Original file line number Diff line number Diff line change
Expand Up @@ -10,8 +10,10 @@
import java.util.function.*;

import io.smallrye.common.annotation.CheckReturnValue;
import io.smallrye.common.annotation.Experimental;
import io.smallrye.mutiny.groups.*;
import io.smallrye.mutiny.infrastructure.Infrastructure;
import io.smallrye.mutiny.operators.multi.split.MultiSplitter;

public interface Multi<T> extends Publisher<T> {

Expand Down Expand Up @@ -637,4 +639,33 @@ default Multi<T> capDemandsTo(long max) {
*/
@CheckReturnValue
Multi<T> capDemandsUsing(LongFunction<Long> function);

/**
* Splits this {@link Multi} into several co-operating {@link Multi} based on an enumeration and a mapping function.
* <p>
* Here is a sample where a stream of integers is split into streams for odd and even numbers:
*
* <pre>
* {@code
* // Split someMulti into 2 streams
* var splitter = someMulti.split(OddEven.class, n -> (n % 2 == 0) ? OddEven.EVEN : OddEven.ODD);
*
* // Stream for odd numbers
* vor odd = splitter.get(OddEven.ODD).subscribe().with(...);
*
* // Stream for even numbers
* vor even = splitter.get(OddEven.EVEN).subscribe().with(...);
* }
* </pre>
*
* @param keyType the key type
* @param splitter the splitter function
* @return a splitter
* @param <K> the key type
*/
@CheckReturnValue
@Experimental("Multi splitting is an experimental API in Mutiny 2.3.0")
default <K extends Enum<K>> MultiSplitter<T, K> split(Class<K> keyType, Function<T, K> splitter) {
return new MultiSplitter<>(this, keyType, splitter);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -179,5 +179,4 @@ public MultiDemandPacing<T> paceDemand() {
public Multi<T> capDemandsUsing(LongFunction<Long> function) {
return Infrastructure.onMultiCreation(new MultiDemandCapping<>(this, nonNull(function, "function")));
}

}
Loading