Skip to content

Commit

Permalink
Multi replay operator (experimental API)
Browse files Browse the repository at this point in the history
Replay a Multi (turned into a hot stream) to multiple (possibly late) subscribers.

See #536 for the initial discussion.
  • Loading branch information
jponge committed Mar 7, 2022
1 parent 50b5c24 commit 12549c4
Show file tree
Hide file tree
Showing 11 changed files with 1,213 additions and 0 deletions.
1 change: 1 addition & 0 deletions documentation/src/main/jekyll/_data/categories.yml
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
- emit-subscription
- logging
- context-passing
- replaying-multis

- name: Integration
guides:
Expand Down
6 changes: 6 additions & 0 deletions documentation/src/main/jekyll/_data/guides.yml
Original file line number Diff line number Diff line change
Expand Up @@ -319,3 +319,9 @@ context-passing:
labels:
- intermediate
- advanced

replaying-multis:
title: Replaying Multis
text: Learn how multiple subscribers can replay from a Multi
labels:
- advanced
91 changes: 91 additions & 0 deletions documentation/src/main/jekyll/guides/replaying-multis.adoc
Original file line number Diff line number Diff line change
@@ -0,0 +1,91 @@
:page-layout: guides
:page-guide-id: replaying-multis
:page-liquid:
:include_dir: ../../../../src/test/java/guides/operators
:imagesdir: ../assets/images

A `Multi` is a _cold-source_: no processing happens until you subscribe.

While the `broadcast` operator can be used so that multiple subscribers consume a `Multi` events _at the same time_, it does not support replaying items for _late subscribers_: when a subscriber joins after the `Multi` has completed (or failed), then it won't receive any item.

This is where _replaying_ can be useful.

== Replaying all events

Replaying all events from an upstream `Multi` works as follows:

[source,java,indent=0]
----
include::{include_dir}/ReplayTest.java[tag=replay-all]
----

Both `item_1` and `item_2` trigger new subscriptions, and both lists contain the following elements:

----
[0, 1, 2, 3, 4, 5, 6, 7, 8, 9]
----

Replaying works by turning `upstream` into a _hot-stream_, meaning that it gets requested `Long.MAX_VALUE` elements.
This is done when the first subscription happens.

The replay operator stores the items in an internal _replay log_, and then each subscriber gets to replay them.

[IMPORTANT]
====
Subscribers demand and cancellation requests are honored while replaying, but `upstream` cannot be cancelled.
Be careful with unbounded streams as you can exhaust memory!
In such cases or when you need to replay large amounts of data, you might opt to use some eventing middleware rather than Mutiny replays.
====

== Replaying the last 'n' events

You can limit the number of elements to replay by using the `upTo` method:

[source,java,indent=0]
----
include::{include_dir}/ReplayTest.java[tag=replay-last]
----

Each new subscriber gets to replay from the last `n` elements from where the replay log is at subscription time.
For instance the first subscriber can observe all events, while a subscriber that joins 2 seconds later might not observe the earlier events.

Since `Multi.createFrom().range(0, 10)` is an _immediate_ stream, both `item_1` and `item_2` lists contain the last items:

----
[7, 8, 9]
----

== Prepending with seed data

In some cases you might want to prepend some _seed_ data that will be available for replay before the upstream starts emitting.

You can do so using an `Iterable` to provide such seed data:

[source,java,indent=0]
----
include::{include_dir}/ReplayTest.java[tag=replay-seed]
----

In which case subscribers can observe the following events:

----
[-10, -5, -1, 0, 1, 2, 3, 4, 5, 6, 7, 8, 9]
----

== Replay of failures and completions

Subscribers get to observe not just items but also the failure and completion events:

[source,java,indent=0]
----
include::{include_dir}/ReplayTest.java[tag=replay-errors]
----

Running this code yields the following output for any subscriber:

----
-> 7
-> 8
-> 9
Failed: boom
----
82 changes: 82 additions & 0 deletions documentation/src/test/java/guides/operators/ReplayTest.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,82 @@
package guides.operators;

import io.smallrye.mutiny.Multi;
import org.junit.jupiter.api.Test;

import java.io.IOException;
import java.util.Arrays;
import java.util.List;

import static org.assertj.core.api.Assertions.assertThat;

public class ReplayTest {

@Test
public void replayAll() {
// tag::replay-all[]
Multi<Integer> upstream = Multi.createFrom().range(0, 10);

Multi<Integer> replay = Multi.createBy().replaying().ofMulti(upstream);

List<Integer> items_1 = replay.collect().asList().await().indefinitely();
List<Integer> items_2 = replay.collect().asList().await().indefinitely();
// end::replay-all[]

assertThat(items_1)
.isEqualTo(items_2)
.hasSize(10);
System.out.println(items_1);
}

@Test
public void replayLast() {
// tag::replay-last[]
Multi<Integer> upstream = Multi.createFrom().range(0, 10);

Multi<Integer> replay = Multi.createBy().replaying().upTo(3).ofMulti(upstream);

List<Integer> items_1 = replay.collect().asList().await().indefinitely();
List<Integer> items_2 = replay.collect().asList().await().indefinitely();
// end::replay-last[]

assertThat(items_1)
.isEqualTo(items_2)
.hasSize(3);
System.out.println(items_1);
}

@Test
public void replayWithSeed() {
// tag::replay-seed[]
Multi<Integer> upstream = Multi.createFrom().range(0, 10);
Iterable<Integer> seed = Arrays.asList(-10, -5, -1);

Multi<Integer> replay = Multi.createBy().replaying().ofSeedAndMulti(seed, upstream);

List<Integer> items_1 = replay.collect().asList().await().indefinitely();
List<Integer> items_2 = replay.collect().asList().await().indefinitely();
// end::replay-seed[]

assertThat(items_1)
.isEqualTo(items_2)
.hasSize(13);
System.out.println(items_1);
}

@Test
public void errors() {
// tag::replay-errors[]
Multi<Integer> upstream = Multi.createBy().concatenating().streams(
Multi.createFrom().range(0, 10),
Multi.createFrom().failure(() -> new IOException("boom"))
);

Multi<Integer> replay = Multi.createBy().replaying().upTo(3).ofMulti(upstream);

replay.subscribe().with(
n -> System.out.println(" -> " + n),
failure -> System.out.println("Failed: " + failure.getMessage()),
() -> System.out.println("Completed"));
// end::replay-errors[]
}
}
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package io.smallrye.mutiny.groups;

import io.smallrye.common.annotation.CheckReturnValue;
import io.smallrye.common.annotation.Experimental;
import io.smallrye.mutiny.Multi;

/**
Expand Down Expand Up @@ -72,4 +73,15 @@ public MultiRepetition repeating() {
return new MultiRepetition();
}

/**
* Creates a new {@link Multi} that replays elements from another {@link Multi} to any number of current and late
* subscribers.
*
* @return the object to configure the replay behavior
*/
@CheckReturnValue
@Experimental("Replaying of Multi is an experimental feature in Mutiny 1.4.0")
public MultiReplay replaying() {
return new MultiReplay();
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,79 @@
package io.smallrye.mutiny.groups;

import static io.smallrye.mutiny.helpers.ParameterValidation.nonNull;
import static io.smallrye.mutiny.helpers.ParameterValidation.positive;

import io.smallrye.common.annotation.Experimental;
import io.smallrye.mutiny.Multi;
import io.smallrye.mutiny.operators.multi.replay.ReplayOperator;

/**
* Group to configure replaying a {@link Multi} to multiple subscribers.
*/
@Experimental("Replaying of Multi is an experimental feature in Mutiny 1.4.0")
public class MultiReplay {

private long numberOfItemsToReplay = Long.MAX_VALUE;

/**
* Limit the number of items each new subscriber gets.
* The default is to replay all events.
*
* @param numberOfItemsToReplay a strictly positive number of items to be replayed, where {@code Long.MAX_VALUE} means
* replaying all events
* @return this group
*/
public MultiReplay upTo(long numberOfItemsToReplay) {
this.numberOfItemsToReplay = positive(numberOfItemsToReplay, "numberOfItemsToReplay");
return this;
}

/**
* Create a replay {@link Multi}.
* <p>
* Replaying work as follows.
* <ol>
* <li>The provided {@code upstream} {@link Multi} is turned into a hot-stream as it gets requested {@code Long.MAX_VALUE}
* elements.
* This happens at the first subscription request. Note that {@code upstream} will never be cancelled.</li>
* <li>Each new subscriber to this replay {@link Multi} is able to replay items at its own pace (back-pressure is
* honored).</li>
* <li>When the number of items to replay is limited using {@link #upTo(long)}, then a new subscriber gets to replay
* starting from the current position in the upstream replay log.
* When the number of elements to replay is unbounded, then a new subscriber replays from the start.</li>
* <li>All current and late subscribers observe terminal completion / error signals.</li>
* <li>Items are pushed synchronously to subscribers when they call {@link org.reactivestreams.Subscription#request(long)}
* and there are enough elements to satisfy a part of the demand.
* Otherwise items are pushed from the upstream to all subscribers with an outstanding demand.</li>
* </ol>
* <p>
* Replaying a large number of elements can be costly, as items have to be kept in-memory.
* It is not recommended using this operator with unbounded streams, especially as they can't be cancelled (the subscribers
* can cancel replays, though).
* In such cases and especially when you have to keep replay data around for a long time then some eventing middleware might
* be a better fit.
*
* @param upstream the {@link Multi} to replay, must not be {@code null}
* @param <T> the items type
* @return a replaying {@link Multi}
*/
public <T> Multi<T> ofMulti(Multi<T> upstream) {
return new ReplayOperator<>(nonNull(upstream, "upstream"), numberOfItemsToReplay);
}

/**
* Create a replay {@link Multi} with some seed elements inserted before the provided {@link Multi} items.
* <p>
* The behavior is that of {@link #ofMulti(Multi)}, except that the items from {@code seed} are prepended to those from
* {@code upstream} in the replay log.
*
* @param seed the seed elements, must not be {@code null}, must not contain any {@code null} element
* @param upstream the {@link Multi} to replay, must not be {@code null}
* @param <T> the items type
* @return a replaying {@link Multi}
* @see #ofMulti(Multi)
*/
public <T> Multi<T> ofSeedAndMulti(Iterable<T> seed, Multi<T> upstream) {
return new ReplayOperator<>(nonNull(upstream, "upstream"), numberOfItemsToReplay, nonNull(seed, "seed"));
}
}
Loading

0 comments on commit 12549c4

Please sign in to comment.